sse

package module
v2.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2023 License: MPL-2.0 Imports: 15 Imported by: 0

README

SSE - Server Sent Events Client/Server Library for Go

Synopsis

This is a server and client implementation of Server-Sent Events for Golang.

The package is a fork of https://pkg.go.dev/github.com/r3labs/sse/v2. I use it in a couple projects and intend to maintain and update it with the fixes that I need. Backward compatibility will be kept for v2. Contributions welcome.

Quick start

To install:

go get github.com/emmrk/sse/v2

To Test:

$ make deps
$ make test
Example Server

There are two parts of the server. It is comprised of the message scheduler and a http handler function. The messaging system is started when running:

func main() {
	server := sse.New()
}

To add a stream to this handler:

func main() {
	server := sse.New()
	server.CreateStream("messages")
}

This creates a new stream inside of the scheduler. Seeing as there are no consumers, publishing a message to this channel will do nothing. Clients can connect to this stream once the http handler is started by specifying stream as a url parameter, like so:

http://server/events?stream=messages

In order to start the http server:

func main() {
	server := sse.New()

	// Create a new Mux and set the handler
	mux := http.NewServeMux()
	mux.HandleFunc("/events", server.ServeHTTP)

	http.ListenAndServe(":8080", mux)
}

To publish messages to a stream:

func main() {
	server := sse.New()

	// Publish a payload to the stream
	server.Publish("messages", &sse.Event{
		Data: []byte("ping"),
	})
}

Please note there must be a stream with the name you specify and there must be subscribers to that stream

A way to detect disconnected clients:

func main() {
	server := sse.New()

	mux := http.NewServeMux()
	mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
		go func() {
			// Received Browser Disconnection
			<-r.Context().Done()
			println("The client is disconnected here")
			return
		}()

		server.ServeHTTP(w, r)
	})

	http.ListenAndServe(":8080", mux)
}
Example Client

The client exposes a way to connect to an SSE server. The client can also handle multiple events under the same url.

To create a new client:

func main() {
	client := sse.NewClient("http://server/events")
}

To subscribe to an event stream, please use the Subscribe function. This accepts the name of the stream and a handler function:

func main() {
	client := sse.NewClient("http://server/events")

	client.Subscribe("messages", func(msg *sse.Event) {
		// Got some data!
		fmt.Println(msg.Data)
	})
}

Please note that this function will block the current thread. You can run this function in a go routine.

If you wish to have events sent to a channel, you can use SubscribeChan:

func main() {
	events := make(chan *sse.Event)

	client := sse.NewClient("http://server/events")
	client.SubscribeChan("messages", events)
}
HTTP client parameters

To add additional parameters to the http client, such as disabling ssl verification for self signed certs, you can override the http client or update its options:

func main() {
	client := sse.NewClient("http://server/events")
	client.Connection.Transport =  &http.Transport{
		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
	}
}
URL query parameters

To set custom query parameters on the client or disable the stream parameter altogether:

func main() {
	client := sse.NewClient("http://server/events?search=example")

	client.SubscribeRaw(func(msg *sse.Event) {
		// Got some data!
		fmt.Println(msg.Data)
	})
}

Contributing

Contributions welcome, please be constructive.

Versioning

For transparency into our release cycle and in striving to maintain backward compatibility, this project is maintained under the Semantic Versioning guidelines.

Code and documentation copyright since 2015 r3labs.io authors.

Code released under the Mozilla Public License Version 2.0.

Documentation

Index

Constants

View Source
const DefaultBufferSize = 1024

DefaultBufferSize size of the queue that holds the streams messages.

Variables

This section is empty.

Functions

func ClientMaxBufferSize

func ClientMaxBufferSize(s int) func(c *Client)

Types

type Client

type Client struct {
	Retry             time.Time
	ReconnectStrategy backoff.BackOff

	Headers           map[string]string
	ReconnectNotify   backoff.Notify
	ResponseValidator ResponseValidator
	Connection        *http.Client
	URL               string
	LastEventID       atomic.Value // []byte

	EncodingBase64 bool
	Connected      bool
	// contains filtered or unexported fields
}

Client handles an incoming server stream

func NewClient

func NewClient(url string, opts ...func(c *Client)) *Client

NewClient creates a new client

func (*Client) OnConnect

func (c *Client) OnConnect(fn ConnCallback)

OnConnect specifies the function to run when the connection is successful

func (*Client) OnDisconnect

func (c *Client) OnDisconnect(fn ConnCallback)

OnDisconnect specifies the function to run when the connection disconnects

func (*Client) Subscribe

func (c *Client) Subscribe(stream string, handler func(msg *Event)) error

Subscribe to a data stream

func (*Client) SubscribeChan

func (c *Client) SubscribeChan(stream string, ch chan *Event) error

SubscribeChan sends all events to the provided channel

func (*Client) SubscribeChanRaw

func (c *Client) SubscribeChanRaw(ch chan *Event) error

SubscribeChanRaw sends all events to the provided channel

func (*Client) SubscribeChanRawWithContext

func (c *Client) SubscribeChanRawWithContext(ctx context.Context, ch chan *Event) error

SubscribeChanRawWithContext sends all events to the provided channel with context

func (*Client) SubscribeChanWithContext

func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch chan *Event) error

SubscribeChanWithContext sends all events to the provided channel with context

func (*Client) SubscribeRaw

func (c *Client) SubscribeRaw(handler func(msg *Event)) error

SubscribeRaw to an sse endpoint

func (*Client) SubscribeRawWithContext

func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error

SubscribeRawWithContext to an sse endpoint with context

func (*Client) SubscribeWithContext

func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error

SubscribeWithContext to a data stream with context

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ch chan *Event)

Unsubscribe unsubscribes a channel

type ConnCallback

type ConnCallback func(c *Client)

ConnCallback defines a function to be called on a particular connection event

type Event

type Event struct {
	ID      []byte
	Data    []byte
	Event   []byte
	Retry   []byte
	Comment []byte
	// contains filtered or unexported fields
}

Event holds all of the event source fields

type EventLog

type EventLog struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

EventLog holds unexpired previous events

func NewEventLog

func NewEventLog(eventTTL time.Duration, maxCapacity int) *EventLog

NewEventLog creates a new Event Log.

EventTTL determines for how long the event is considered valid. Valid events will be replayed for newly joined clients. Expired events are periodically removed from the log to save space if EventTTL != 0. To preserve backwards compatibility, with EventTTL == 0 all events ever published on a given stream are forever retained and replayed, bevare of the balooning memory as a result.

MaxCapacity is a soft limit on the number of entries in the Event Log. Filling the Event Log up to MaxCapacity will trigger an unscheduled removal of expired entries; if that is not enough, the oldest entry in the Event Log will be deleted to free up the space. MaxCapacity == 0 means unlimited capacity.

func (*EventLog) Add

func (e *EventLog) Add(ev *Event)

Add event to EventLog

func (*EventLog) CleanUp

func (e *EventLog) CleanUp()

CleanUp removes expired events immediately

func (*EventLog) Clear

func (e *EventLog) Clear()

Clear removes all events from the Event Log

func (*EventLog) Replay

func (e *EventLog) Replay(s *Subscriber)

Replay plays unexpired previous events to a subscriber

type EventStreamReader

type EventStreamReader struct {
	// contains filtered or unexported fields
}

EventStreamReader scans an io.Reader looking for EventStream messages.

func NewEventStreamReader

func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader

NewEventStreamReader creates an instance of EventStreamReader.

func (*EventStreamReader) ReadEvent

func (e *EventStreamReader) ReadEvent() ([]byte, error)

ReadEvent scans the EventStream for events.

type ResponseValidator

type ResponseValidator func(c *Client, resp *http.Response) error

ResponseValidator validates a response

type Server

type Server struct {
	// Extra headers adding to the HTTP response to each client
	Headers map[string]string
	// Sets a ttl that prevents old events from being transmitted
	EventTTL time.Duration
	// Max messages stored for replaying per stream, measured in items
	MaxCapacity int
	// Specifies the size of the message buffer for each stream
	BufferSize int
	// Encodes all data as base64
	EncodeBase64 bool
	// Splits an events data into multiple data: entries
	SplitData bool
	// Enables creation of a stream when a client connects
	AutoStream bool
	// Enables automatic replay for each new subscriber that connects
	AutoReplay bool

	// Specifies the function to run when client subscribe or un-subscribe
	OnSubscribe   func(streamID string, sub *Subscriber)
	OnUnsubscribe func(streamID string, sub *Subscriber)
	// contains filtered or unexported fields
}

Server Is our main struct

func New

func New() *Server

New will create a server and setup defaults

func NewWithCallback

func NewWithCallback(onSubscribe, onUnsubscribe func(streamID string, sub *Subscriber)) *Server

NewWithCallback will create a server and setup defaults with callback function

func (*Server) Close

func (s *Server) Close()

Close shuts down the server, closes all of the streams and connections

func (*Server) CreateStream

func (s *Server) CreateStream(id string) *Stream

CreateStream will create a new stream and register it

func (*Server) Publish

func (s *Server) Publish(id string, event *Event)

Publish sends a mesage to every client in a streamID. If the stream's buffer is full, it blocks until the message is sent out to all subscribers (but not necessarily arrived the clients), or when the stream is closed.

func (*Server) RemoveStream

func (s *Server) RemoveStream(id string)

RemoveStream will remove a stream

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP serves new connections with events for a given stream ...

func (*Server) StreamExists

func (s *Server) StreamExists(id string) bool

StreamExists checks whether a stream by a given id exists

func (*Server) TryPublish

func (s *Server) TryPublish(id string, event *Event) bool

TryPublish is the same as Publish except that when the operation would cause the call to be blocked, it simply drops the message and returns false. Together with a small BufferSize, it can be useful when publishing the latest message ASAP is more important than reliable delivery.

type Stream

type Stream struct {
	ID string

	Eventlog *EventLog

	// Enables replaying of eventlog to newly added subscribers
	AutoReplay bool

	// Specifies the function to run when client subscribe or un-subscribe
	OnSubscribe   func(streamID string, sub *Subscriber)
	OnUnsubscribe func(streamID string, sub *Subscriber)
	// contains filtered or unexported fields
}

Stream ...

type Subscriber

type Subscriber struct {
	URL *url.URL
	// contains filtered or unexported fields
}

Subscriber ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL