sse

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: Apache-2.0 Imports: 18 Imported by: 2

README

Hertz-SSE

(This is a community driven project)

English | 中文

Server-Sent events is a specification for implementing server-side-push for web frontend applications, through plain-old HTTP. The Server-Sent Events EventSource API is standardized as part of HTML5[1] by the W3C. This repository is a fork of manucorporat/sse and r3labs/sse for Hertz.

Install

go get github.com/hertz-contrib/sse

Example

Server
package main

import (
  "context"
  "net/http"
  "time"

  "github.com/hertz-contrib/sse"

  "github.com/cloudwego/hertz/pkg/app"
  "github.com/cloudwego/hertz/pkg/app/server"
  "github.com/cloudwego/hertz/pkg/common/hlog"
)

func main() {
  h := server.Default()

  h.GET("/sse", func(ctx context.Context, c *app.RequestContext) {
    // client can tell server last event it received with Last-Event-ID header
    lastEventID := sse.GetLastEventID(c)
    hlog.CtxInfof(ctx, "last event ID: %s", lastEventID)

    // you must set status code and response headers before first render call
    c.SetStatusCode(http.StatusOK)
    s := sse.NewStream(c)
    for t := range time.NewTicker(1 * time.Second).C {
      event := &sse.Event{
        Event: "timestamp",
        Data:  []byte(t.Format(time.RFC3339)),
      }
      err := s.Publish(event)
      if err != nil {
        return
      }
    }
  })

  h.Spin()
}

Client
package main

import (
  "context"
  "sync"

  "github.com/hertz-contrib/sse"

  "github.com/cloudwego/hertz/pkg/common/hlog"
)

var wg sync.WaitGroup

func main() {
  wg.Add(2)
  go func() {
    // pass in the server-side URL to initialize the client	  
    c := sse.NewClient("http://127.0.0.1:8888/sse")

    // touch off when connected to the server
    c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
      hlog.Infof("client1 connect to server %s success with %s method", c.GetURL(), c.GetMethod())
    })

    // touch off when the connection is shutdown
    c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
      hlog.Infof("client1 disconnect to server %s success with %s method", c.GetURL(), c.GetMethod())
    })

    events := make(chan *sse.Event)
    errChan := make(chan error)
    go func() {
      cErr := c.Subscribe(func(msg *sse.Event) {
        if msg.Data != nil {
          events <- msg
          return
        }
      })
      errChan <- cErr
    }()
    for {
      select {
      case e := <-events:
        hlog.Info(e)
      case err := <-errChan:
        hlog.CtxErrorf(context.Background(), "err = %s", err.Error())
        wg.Done()
        return
      }
    }
  }()

  go func() {
    // pass in the server-side URL to initialize the client	  
    c := sse.NewClient("http://127.0.0.1:8888/sse")

    // touch off when connected to the server
    c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
      hlog.Infof("client2 %s connect to server success with %s method", c.GetURL(), c.GetMethod())
    })

    // touch off when the connection is shutdown
    c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
      hlog.Infof("client2 %s disconnect to server success with %s method", c.GetURL(), c.GetMethod())
    })

    events := make(chan *sse.Event)
    errChan := make(chan error)
    go func() {
      cErr := c.Subscribe( func(msg *sse.Event) {
        if msg.Data != nil {
          events <- msg
          return
        }
      })
      errChan <- cErr
    }()
    for {
      select {
      case e := <-events:
        hlog.Info(e)
      case err := <-errChan:
        hlog.CtxErrorf(context.Background(), "err = %s", err.Error())
        wg.Done()
        return
      }
    }
  }()

  wg.Wait()
}

Real-world examples

This repository comes with two server-examples to demonstrate how to build realtime applications with server-sent event.

Stock Price (examples/server/stockprice)

A web server that push (randomly generated) stock price periodically.

  1. Run exmaples/server/chat/main.go to start server.
  2. Send a GET request to /price
curl -N --location 'localhost:8888/price'
#id:1681141432283
#event:AAPL
#data:92.607347
#
#id:1681141432283
#event:AMZN
#data:73.540894
#
#id:1681141433283
#event:AAPL
#data:23.536702
#
#id:1681141433283
#event:AMZN
#data:63.156229
#

Chat Server (examples/server/chat)

A chat server that push new messages to clients using server-sent events. It supports both direct and broadcast messaging.

  1. Run examples/server/chat/main.go to start server.
  2. Send a get request to /chat/sse.
# receive message on behalf of user hertz
curl -N --location 'http://localhost:8888/chat/sse?username=hertz'
  1. Open a new terminal and send messages to hertz.
# send a broadcast message
curl --location --request POST 'http://localhost:8888/chat/broadcast?from=kitex&message=cloudwego'
# send a direct message
curl --location --request POST 'http://localhost:8888/chat/direct?from=kitex&message=hello%20hertz&to=hertz'

On the first terminal, you should see 2 messages.

curl -N --location 'http://localhost:8888/chat/sse?username=hertz'
#event:broadcast
#data:{"Type":"broadcast","From":"kitex","To":"","Message":"cloudwego","Timestamp":"2023-04-10T23:48:55.019742+08:00"}
#
#event:direct
#data:{"Type":"direct","From":"kitex","To":"hertz","Message":"hello hertz","Timestamp":"2023-04-10T23:48:56.212855+08:00"}

Benchmark Results

All benchmarks are stored for each commit, they can be viewed here:

https://hertz-contrib.github.io/sse/benchmarks/

Documentation

Index

Constants

View Source
const (
	ContentType = "text/event-stream"

	LastEventID = "Last-Event-ID"
)

Variables

This section is empty.

Functions

func Encode

func Encode(w io.Writer, e *Event) (err error)

func GetLastEventID

func GetLastEventID(c *app.RequestContext) string

GetLastEventID retrieve Last-Event-ID header if present.

Types

type Client added in v0.0.2

type Client struct {
	// contains filtered or unexported fields
}

Client handles an incoming server stream

func NewClient added in v0.0.2

func NewClient(url string) *Client

NewClient creates a new client

func (*Client) GetBody added in v0.0.3

func (c *Client) GetBody() []byte

GetBody get sse client request body

func (*Client) GetHeaders added in v0.0.2

func (c *Client) GetHeaders() map[string]string

GetHeaders get sse client headers

func (*Client) GetHertzClient added in v0.0.2

func (c *Client) GetHertzClient() *client.Client

GetHertzClient get sse client

func (*Client) GetLastEventID added in v0.0.2

func (c *Client) GetLastEventID() []byte

GetLastEventID get sse client lastEventID

func (*Client) GetMethod added in v0.0.2

func (c *Client) GetMethod() string

GetMethod get sse client method

func (*Client) GetURL added in v0.0.2

func (c *Client) GetURL() string

GetURL get sse client url

func (*Client) SetBody added in v0.0.3

func (c *Client) SetBody(body []byte)

SetBody set sse client request body

func (*Client) SetDisconnectCallback added in v0.0.2

func (c *Client) SetDisconnectCallback(fn ConnCallback)

SetDisconnectCallback specifies the function to run when the connection disconnects

func (*Client) SetEncodingBase64 added in v0.0.2

func (c *Client) SetEncodingBase64(encodingBase64 bool)

SetEncodingBase64 set sse client whether use the base64

func (*Client) SetHeaders added in v0.0.2

func (c *Client) SetHeaders(headers map[string]string)

SetHeaders set sse client headers

func (*Client) SetHertzClient added in v0.0.2

func (c *Client) SetHertzClient(hertzClient *client.Client)

SetHertzClient set sse client

func (*Client) SetMaxBufferSize added in v0.0.2

func (c *Client) SetMaxBufferSize(size int)

SetMaxBufferSize set sse client MaxBufferSize

func (*Client) SetMethod added in v0.0.2

func (c *Client) SetMethod(method string)

SetMethod set sse client request method

func (*Client) SetOnConnectCallback added in v0.0.2

func (c *Client) SetOnConnectCallback(fn ConnCallback)

SetOnConnectCallback specifies the function to run when the connection is successful

func (*Client) SetResponseCallback added in v0.0.2

func (c *Client) SetResponseCallback(responseCallback ResponseCallback)

SetResponseCallback set sse client responseCallback

func (*Client) SetURL added in v0.0.2

func (c *Client) SetURL(url string)

SetURL set sse client url

func (*Client) Subscribe added in v0.0.2

func (c *Client) Subscribe(handler func(msg *Event)) error

Subscribe to a data stream

func (*Client) SubscribeWithContext added in v0.0.2

func (c *Client) SubscribeWithContext(ctx context.Context, handler func(msg *Event)) error

SubscribeWithContext to a data stream with context

type ConnCallback added in v0.0.2

type ConnCallback func(ctx context.Context, client *Client)

ConnCallback defines a function to be called on a particular connection event

type Event

type Event struct {
	Event string
	ID    string
	Retry uint64
	Data  []byte
}

type EventStreamReader added in v0.0.2

type EventStreamReader struct {
	// contains filtered or unexported fields
}

EventStreamReader scans an io.Reader looking for EventStream messages.

func NewEventStreamReader added in v0.0.2

func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader

NewEventStreamReader creates an instance of EventStreamReader.

func (*EventStreamReader) ReadEvent added in v0.0.2

func (e *EventStreamReader) ReadEvent(ctx context.Context) ([]byte, error)

ReadEvent scans the EventStream for events.

type ResponseCallback added in v0.0.2

type ResponseCallback func(ctx context.Context, req *protocol.Request, resp *protocol.Response) error

ResponseCallback validates a response

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(c *app.RequestContext) *Stream

NewStream creates a new stream for publishing Event.

func (*Stream) Publish

func (c *Stream) Publish(event *Event) error

Publish push an event to client. If error is returned, you need to stop 'publish'.

Directories

Path Synopsis
examples
client/quickstart
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.
server/quickstart
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL