sseserver: bitbucket.org/advbet/sseserver Index | Examples | Files

package sseserver

import "bitbucket.org/advbet/sseserver"

Package sseserver is a library for creating SSE server HTTP handlers.

This library provides a publish/subscribe interface for generating SSE streams. It handles keep-alive messages, allows setting client reconnect timeout, automatically disconnect long-lived connections. Message data are always marshaled to JSON.

This library is targeted for advanced SSE server implementations that supports client state resyncing after disconnect. Different client resync strategies are provided by multiple Stream interface implementations.

Typical usage of this package is:

	* Create new stream object that satisfies Stream interface with one of
	  the New... constructors.
	* Start a goroutine that generates events and publishes them via
	  Publish() method.
	* Create HTTP handlers that parses Last-Event-ID header, everything else
	  is handled by the Subscribe() method.
	* If graceful web server shutdown is required use DropSubscribers() to
	  gracefully disconnect all active streams.
     * If dynamic stream creation is required use Stop() to free resources
       allocated for sse stream.

Code:

stream := sseserver.NewCached("", sseserver.DefaultConfig, 5*time.Minute, time.Minute)
go eventSource(stream)

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    id := r.Header.Get("Last-Event-ID")
    if err := stream.Subscribe(w, id); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
    }
})

fmt.Println(http.ListenAndServe(":8000", nil))

// Test with:
//   curl http://localhost:8000/
//   curl -H "Last-Event-ID: 5" http://localhost:8000/

Code:

stream := sseserver.NewCachedCount("", sseserver.DefaultConfig, 5)
go eventSource(stream)

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    id := r.Header.Get("Last-Event-ID")
    if err := stream.Subscribe(w, id); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
    }
})

fmt.Println(http.ListenAndServe(":8000", nil))

// Test with:
//   curl http://localhost:8000/
//   curl -H "Last-Event-ID: 5" http://localhost:8000/

Code:

package main

import (
    "fmt"
    "net/http"
    "strconv"
    "time"

    "bitbucket.org/advbet/sseserver"
)

func newEvent(topic string, id string) *sseserver.Event {
    return &sseserver.Event{
        ID:    id,
        Event: "counter",
        Data: map[string]interface{}{
            "msg":   "ticks since start",
            "topic": topic,
            "val":   id,
        },
    }
}

func lookupEvents(topic string, fromStr string, toStr string) ([]sseserver.Event, error) {
    if fromStr == "" {
        // New client
        // no resync, continue sending live events
        return nil, nil
    }

    from, err := strconv.Atoi(fromStr)
    if err != nil {
        return nil, err
    }
    to, err := strconv.Atoi(toStr)
    if err != nil {
        return nil, err
    }

    if from >= to {
        // Client is up to date
        // no resync, continue sending live events
        return nil, nil
    }

    events := []sseserver.Event{}
    switch {
    case to-from > 10:
        // do not resync more than 10 events at a time
        for i := from + 1; i <= from+10; i++ {
            events = append(events, *newEvent(topic, strconv.Itoa(i)))
        }
        // send first 10 missing events
        return events, nil
    default:
        for i := from + 1; i <= to; i++ {
            events = append(events, *newEvent(topic, strconv.Itoa(i)))
        }
        // send missing events, continue sending live events
        return events, nil
    }
}

func eventGenerator(stream sseserver.Stream) {
    i := 0
    c := time.Tick(time.Second)

    for range c {
        i++
        stream.Publish(newEvent("", strconv.Itoa(i)))
    }
}

func main() {
    stream := sseserver.NewGeneric(lookupEvents, "0", sseserver.DefaultConfig)
    go eventGenerator(stream)

    requestHandler := func(w http.ResponseWriter, r *http.Request) {
        var err error
        if _, err = strconv.Atoi(r.Header.Get("Last-Event-ID")); err != nil {
            fmt.Println(err)
        }
        err = stream.Subscribe(w, r.Header.Get("Last-Event-ID"))
        if err != nil {
            fmt.Println(err)
        }
    }

    http.HandleFunc("/", requestHandler)
    fmt.Println(http.ListenAndServe(":8000", nil))

    // Test with:
    //   curl http://localhost:8000/
    //   curl -H "Last-Event-ID: 5" http://localhost:8000/
}

Index

Examples

Package Files

broker.go cached.go cachedcount.go doc.go generic.go lastonly.go stream.go writer.go

Variables

var DefaultConfig = Config{
    Reconnect:             500 * time.Millisecond,
    KeepAlive:             30 * time.Second,
    Lifetime:              5 * time.Minute,
    QueueLength:           32,
    ResyncEventsThreshold: 1000,
}

DefaultConfig is a recommended SSE configuration.

var ErrCacheMiss = errors.New("missing events in cache")

ErrCacheMiss is returned from cachedStream.Subscribe if resyncinc client is not possible because events are not found in a cache. This situation will usualy occur if client was disconnected for too long and the oldes events were evicted from the cache.

This error is returned before writing anything to the response writer. It isresponsibiity of the caller of cachedStream.Subscribe to generate a response if this error is returned.

func Respond Uses

func Respond(w http.ResponseWriter, source <-chan *Event, cfg *Config, stop <-chan struct{}) error

Respond reads Events from a channel and writes SSE HTTP reponse. function provides a lower level API that allows manually generating SSE stream. In most cases this function should not be used directly.

Cfg is SSE stream configuration, if nil is passed configuration from DefaultConfiguration global will be used.

Stop is an optional channel for stopping SSE stream, if this channel is closed SSE stream will stopped and http connection closed. If stream stopping functionality is not required Stop should be set to nil.

This function returns nil if end of stream is reached, stream lifetime expired, client closes the connection or request to stop is received on the stop channel. Otherwise it returns an error.

Note! After passing source channel to Stream it cannot be reused (for example passed to the Stream function again). This function will drain source channel on exit.

type CachedCountStream Uses

type CachedCountStream struct {
    // contains filtered or unexported fields
}

func NewCachedCount Uses

func NewCachedCount(lastID string, cfg Config, size int) *CachedCountStream

NewCachedCount creates a new SSE stream. All published events are cached. The number of cached events can be set by passing the desired size argument. Clients are automatically resynced on reconnect.

Passing empty string as last event ID for Subscribe() would connect client without resync.

Call to Subscribe() might return ErrCacheMiss if client requests to resync from an event not found in the cache. If ErrCacheMiss is returned user of this library is responsible for generating HTTP response to the client. It is recommended to return 204 no content response to stop client from reconnecting until he syncs event state manually.

func NewCachedCountMultiStream Uses

func NewCachedCountMultiStream(lastIDs map[string]string, cfg Config, size int) *CachedCountStream

NewCachedCountMultiStream is similar to NewCachedCount but allows setting initial last event ID values for multiple topics.

func (*CachedCountStream) DropSubscribers Uses

func (s *CachedCountStream) DropSubscribers()

func (*CachedCountStream) Publish Uses

func (s *CachedCountStream) Publish(event *Event)

func (*CachedCountStream) PublishBroadcast Uses

func (s *CachedCountStream) PublishBroadcast(event *Event)

func (*CachedCountStream) PublishTopic Uses

func (s *CachedCountStream) PublishTopic(topic string, event *Event)

func (*CachedCountStream) Stop Uses

func (s *CachedCountStream) Stop()

func (*CachedCountStream) Subscribe Uses

func (s *CachedCountStream) Subscribe(w http.ResponseWriter, lastClientID string) error

func (*CachedCountStream) SubscribeFiltered Uses

func (s *CachedCountStream) SubscribeFiltered(w http.ResponseWriter, lastClientID string, f FilterFn) error

func (*CachedCountStream) SubscribeTopic Uses

func (s *CachedCountStream) SubscribeTopic(w http.ResponseWriter, topic string, lastClientID string) error

func (*CachedCountStream) SubscribeTopicFiltered Uses

func (s *CachedCountStream) SubscribeTopicFiltered(w http.ResponseWriter, topic string, lastClientID string, f FilterFn) error

type CachedStream Uses

type CachedStream struct {
    // contains filtered or unexported fields
}

func NewCached Uses

func NewCached(lastID string, cfg Config, expiration, cleanup time.Duration) *CachedStream

NewCached creates a new SSE stream. All published events are cached for up to expiration time in local cache and clients are automatically resynced on reconnect.

Passing empty string as last event ID for Subscribe() would connect client without resync.

Call to Subscribe() might return ErrCacheMiss if client requests to resync from an event not found in the cache. If ErrCacheMiss is returned user of this library is responsible for generating HTTP response to the client. It is recommended to return 204 no content response to stop client from reconnecting until he syncs event state manually.

func NewCachedMultiStream Uses

func NewCachedMultiStream(lastIDs map[string]string, cfg Config, expiration, cleanup time.Duration) *CachedStream

NewCachedMultiStream is similar to NewCached but allows setting initial last event ID values for multiple topics.

func (*CachedStream) DropSubscribers Uses

func (s *CachedStream) DropSubscribers()

func (*CachedStream) Publish Uses

func (s *CachedStream) Publish(event *Event)

func (*CachedStream) PublishBroadcast Uses

func (s *CachedStream) PublishBroadcast(event *Event)

func (*CachedStream) PublishTopic Uses

func (s *CachedStream) PublishTopic(topic string, event *Event)

func (*CachedStream) Stop Uses

func (s *CachedStream) Stop()

func (*CachedStream) Subscribe Uses

func (s *CachedStream) Subscribe(w http.ResponseWriter, lastClientID string) error

func (*CachedStream) SubscribeFiltered Uses

func (s *CachedStream) SubscribeFiltered(w http.ResponseWriter, lastClientID string, f FilterFn) error

func (*CachedStream) SubscribeTopic Uses

func (s *CachedStream) SubscribeTopic(w http.ResponseWriter, topic string, lastClientID string) error

func (*CachedStream) SubscribeTopicFiltered Uses

func (s *CachedStream) SubscribeTopicFiltered(w http.ResponseWriter, topic string, lastClientID string, f FilterFn) error

type Config Uses

type Config struct {
    // Reconnect is a time duration before successive reconnects, it is
    // passed as a recommendation for SSE clients. Setting Reconnect to zero
    // disables sending a reconnect hint and client will use its default
    // value. Recommended value is 500 milliseconds.
    Reconnect time.Duration

    // KeepAlive sets how often SSE stream should include a dummy keep alive
    // message. Setting KeepAlive to zero disables sending keep alive
    // messages. It is recommended to keep this value lower than 60 seconds
    // if nginx proxy is used. By default nginx will timeout the request if
    // there is more than 60 seconds gap between two successive reads.
    KeepAlive time.Duration

    // Lifetime is a maximum amount of time connection is allowed to stay
    // open before a forced reconnect. Setting Lifetime to zero allows SSE
    // connections to be open indefinitely.
    Lifetime time.Duration

    // QueueLength is the maximum number of events pending to be transmitted
    // to the client before connection is closed. Note queue length of 0
    // should be never used, recommended size is 32.
    QueueLength int

    // ResyncEventsThreshold is the threshold number of events that can be
    // returned to the client after resync with a last seen event id. After
    // the threshold is crossed no more attempts at a resync will be performed
    // and the client will be disconnected.
    ResyncEventsThreshold int
}

Config holds SSE stream configuration. Single Config instance can be safely used in multiple go routines (http request handlers) simultaneously without locking.

type Event Uses

type Event struct {
    ID    string      // ID value
    Event string      // Event type value
    Data  interface{} // Data value will be marshaled to JSON
}

Event holds data for single event in SSE stream.

type FilterFn Uses

type FilterFn func(e *Event) *Event

FilterFn is a callback function used to mutate event stream for individual subscriptions. This function will be invoked for each event before sending it to the client, result of this function will be sent instead of original event. If this function returns `nil` event will be omitted.

Original event passed to this function should NOT be mutated. Filtering function with the same event data will be called in separate per-subscriber go-routines. Event mutation will cause guaranteed data race condition. If event needs to be altered fresh copy needs to be returned.

type GenericStream Uses

type GenericStream struct {
    // contains filtered or unexported fields
}

GenericStream is the most generic SSE stream implementation where resync logic is supplied by the user of this package.

func NewGeneric Uses

func NewGeneric(resync ResyncFn, lastID string, cfg Config) *GenericStream

NewGeneric creates a new instance of SSE stream. Creating new stream requires to provide a resync function with ResyncFn signature. It is used to generate a list of events that client might have missed during a reconnect. Argument lastID is used set last event ID that was published before application was started, this value is passed to the resync function and later replaced by the events published with stream.Publish method.

func (*GenericStream) DropSubscribers Uses

func (s *GenericStream) DropSubscribers()

func (*GenericStream) Publish Uses

func (s *GenericStream) Publish(event *Event)

func (*GenericStream) PublishBroadcast Uses

func (s *GenericStream) PublishBroadcast(event *Event)

func (*GenericStream) PublishTopic Uses

func (s *GenericStream) PublishTopic(topic string, event *Event)

func (*GenericStream) Stop Uses

func (s *GenericStream) Stop()

func (*GenericStream) Subscribe Uses

func (s *GenericStream) Subscribe(w http.ResponseWriter, lastEventID string) error

func (*GenericStream) SubscribeFiltered Uses

func (s *GenericStream) SubscribeFiltered(w http.ResponseWriter, lastEventID string, f FilterFn) error

func (*GenericStream) SubscribeTopic Uses

func (s *GenericStream) SubscribeTopic(w http.ResponseWriter, topic string, lastEventID string) error

func (*GenericStream) SubscribeTopicFiltered Uses

func (s *GenericStream) SubscribeTopicFiltered(w http.ResponseWriter, topic string, lastEventID string, f FilterFn) error

type LastOnlyStream Uses

type LastOnlyStream struct {
    sync.RWMutex
    // contains filtered or unexported fields
}

func NewLastOnly Uses

func NewLastOnly(cfg Config) *LastOnlyStream

NewLastOnly creates a new sse stream that resends only last seen event to all newly connected clients. If client alredy have seen the lates event is is not repeated.

Event filtering is not supported.

func (*LastOnlyStream) DropSubscribers Uses

func (s *LastOnlyStream) DropSubscribers()

func (*LastOnlyStream) Publish Uses

func (s *LastOnlyStream) Publish(event *Event)

func (*LastOnlyStream) PublishBroadcast Uses

func (s *LastOnlyStream) PublishBroadcast(event *Event)

PublishBroadcast for LastOnlyStream does not cache a broadcasted event and thus does not permit sending an event with ID value.

func (*LastOnlyStream) PublishTopic Uses

func (s *LastOnlyStream) PublishTopic(topic string, event *Event)

func (*LastOnlyStream) Stop Uses

func (s *LastOnlyStream) Stop()

func (*LastOnlyStream) Subscribe Uses

func (s *LastOnlyStream) Subscribe(w http.ResponseWriter, lastEventID string) error

func (*LastOnlyStream) SubscribeFiltered Uses

func (s *LastOnlyStream) SubscribeFiltered(w http.ResponseWriter, lastEventID string, f FilterFn) error

func (*LastOnlyStream) SubscribeTopic Uses

func (s *LastOnlyStream) SubscribeTopic(w http.ResponseWriter, topic string, lastEventID string) error

func (*LastOnlyStream) SubscribeTopicFiltered Uses

func (s *LastOnlyStream) SubscribeTopicFiltered(w http.ResponseWriter, topic string, lastEventID string, f FilterFn) error

type MultiStream Uses

type MultiStream interface {
    // PublishTopic broadcast given event to all currently connected clients
    // (subscribers) on a given topic.
    //
    // Publish on a stopped stream will cause panic.
    PublishTopic(topic string, event *Event)

    // PublishBroadcast emits given event to all connected subscribers (for
    // all topics).
    PublishBroadcast(event *Event)

    // DropSubscribers removes all currently active stream subscribers and
    // close all active HTTP responses. After call to this method all new
    // subscribers would be closed immediately. Calling DropSubscribers more
    // than one time would panic.
    //
    // This function is useful in implementing graceful application
    // shutdown, this method should be called only when web server are not
    // accepting any new connections and all that is left is terminating
    // already connected ones.
    DropSubscribers()

    // Stop closes event stream. It will disconnect all connected
    // subscribers and deallocate all resources used for the stream. After
    // stream is stopped it can not started again and should not be used
    // anymore.
    //
    // Calls to Publish or Subscribe after stream was stopped will cause
    // panic.
    Stop()

    // Subscribe handles HTTP request to receive SSE stream for a given
    // topic. Caller is responsible for extracting Last event ID value from
    // the request.
    //
    // Subscribe on a stopped stream will cause panic.
    SubscribeTopic(w http.ResponseWriter, topic string, lastEventID string) error

    // SubscribeFiltered is similar to Subscribe but each event before being
    // sent to client will be passed to given filtering function. Events
    // returned by the filtering function will be used instead.
    SubscribeTopicFiltered(w http.ResponseWriter, topic string, lastEventID string, f FilterFn) error
}

MultiStream is an abstraction of multiple SSE streams. Single instance of object could be used to transmit multiple independent SSE stream. Each stream is identified by a unique topic name. Application can broadcast events using stream.PublishTopic method. HTTP handlers for SSE client endpoints should use stream.SubscribeTopic to tap into the event stream.

type ResyncFn Uses

type ResyncFn func(topic string, fromID, toID string) (events []Event, err error)

ResyncFn is a definition of function used to lookup events missed by client reconnects. Users of this package must provide an implementation of this function when creating new streams.

For multi-streams topic argument will be set to sub-stream name, for single-streams it will be empty string.

This function takes two event ID values as an argument and should return all events having IDs in interval (fromID, toID]. ResyncFn will be called repeatedly until an empty events slice is returned (no more missing events) or ResyncEventsThreshold is reached. Note that event with ID equal to fromID SHOULD NOT be included, but event with toID SHOULD be included. Argument fromID can be empty string if empty string was passed to stream.Subscribe(), it usually means client has connected to the SSE stream for the first time. Argument toID can also be empty string if empty string was passed as initial last event ID when stream was created and client have connected to the SSE stream before any events were published using stream.Publish().

ResyncFn should return all events in a given range in a slice. If the second return variable err is not nil the subscription will disconnect with error.

Correct implementation of this function is essential for proper client resync and vital to whole SSE functionality.

type Stream Uses

type Stream interface {
    // Publish broadcast given event to all currently connected clients
    // (subscribers) on a default topic.
    //
    // Publish on a stopped stream will cause panic.
    Publish(event *Event)

    // DropSubscribers removes all currently active stream subscribers and
    // close all active HTTP responses. After call to this method all new
    // subscribers would be closed immediately. Calling DropSubscribers more
    // than one time would panic.
    //
    // This function is useful in implementing graceful application
    // shutdown, this method should be called only when web server are not
    // accepting any new connections and all that is left is terminating
    // already connected ones.
    DropSubscribers()

    // Stop closes event stream. It will disconnect all connected
    // subscribers and deallocate all resources used for the stream. After
    // stream is stopped it can not started again and should not be used
    // anymore.
    //
    // Calls to Publish or Subscribe after stream was stopped will cause
    // panic.
    Stop()

    // Subscribe handles HTTP request to receive SSE stream for a default
    // topic. Caller is responsible for extracting Last event ID value from
    // the request.
    //
    // Subscribe on a stopped stream will cause panic.
    Subscribe(w http.ResponseWriter, lastEventID string) error

    // SubscribeFiltered is similar to Subscribe but each event before being
    // sent to client will be passed to given filtering function. Events
    // returned by the filtering function will be used instead.
    SubscribeFiltered(w http.ResponseWriter, lastEventID string, f FilterFn) error
}

Stream is an abstraction of SSE stream. Single instance of stream should be created for each SSE stream available in the application. Application can broadcast streams using stream.Publish method. HTTP handlers for SSE client endpoints should use stream.Subscribe to tap into the event stream.

Package sseserver imports 9 packages (graph). Updated 2020-02-12. Refresh now. Tools for package owners.