eventsource

package module
v1.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2022 License: Apache-2.0 Imports: 14 Imported by: 34

README

GoDoc CircleCI

Eventsource

Eventsource implements a Go implementation of client and server to allow streaming data one-way over a HTTP connection using the Server-Sent Events API http://dev.w3.org/html5/eventsource/

This is a fork of: https://github.com/donovanhide/eventsource

This version of the library supports Go 1.8 and higher. However, its unit tests can only be run in Go 1.13 or higher.

The package is a Go module, but can still be imported by projects that do not use modules.

Installation

go get github.com/launchdarkly/eventsource

Documentation

License

Eventsource is available under the Apache License, Version 2.0.

Documentation

Overview

Package eventsource implements a client and server to allow streaming data one-way over a HTTP connection using the Server-Sent Events API http://dev.w3.org/html5/eventsource/

The client and server respect the Last-Event-ID header. If the Repository interface is implemented on the server, events can be replayed in case of a network disconnection.

Index

Examples

Constants

View Source
const (
	// DefaultInitialRetry is the default value for StreamOptionalInitialRetry.
	DefaultInitialRetry = time.Second * 3
	// DefaultRetryResetInterval is the default value for StreamOptionRetryResetInterval.
	DefaultRetryResetInterval = time.Second * 60
)

Variables

View Source
var (
	// ErrReadTimeout is the error that will be emitted if a stream was closed due to not
	// receiving any data within the configured read timeout interval.
	ErrReadTimeout = errors.New("Read timeout on stream")
)

Functions

This section is empty.

Types

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

A Decoder is capable of reading Events from a stream.

func NewDecoder

func NewDecoder(r io.Reader) *Decoder

NewDecoder returns a new Decoder instance that reads events with the given io.Reader.

func NewDecoderWithOptions

func NewDecoderWithOptions(r io.Reader, options ...DecoderOption) *Decoder

NewDecoderWithOptions returns a new Decoder instance that reads events with the given io.Reader, with optional configuration parameters.

func (*Decoder) Decode

func (dec *Decoder) Decode() (Event, error)

Decode reads the next Event from a stream (and will block until one comes in). Graceful disconnects (between events) are indicated by an io.EOF error. Any error occurring mid-event is considered non-graceful and will show up as some other error (most likely io.ErrUnexpectedEOF).

type DecoderOption

type DecoderOption interface {
	// contains filtered or unexported methods
}

DecoderOption is a common interface for optional configuration parameters that can be used in creating a Decoder.

func DecoderOptionLastEventID added in v1.7.0

func DecoderOptionLastEventID(lastEventID string) DecoderOption

DecoderOptionLastEventID returns an option that sets the last event ID property for a Decoder when the Decoder is created. This allows the last ID to be included in new events if they do not override it.

func DecoderOptionReadTimeout

func DecoderOptionReadTimeout(timeout time.Duration) DecoderOption

DecoderOptionReadTimeout returns an option that sets the read timeout interval for a Decoder when the Decoder is created. If the Decoder does not receive new data within this length of time, it will return an error. By default, there is no read timeout.

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

An Encoder is capable of writing Events to a stream. Optionally Events can be gzip compressed in this process.

func NewEncoder

func NewEncoder(w io.Writer, compressed bool) *Encoder

NewEncoder returns an Encoder for a given io.Writer. When compressed is set to true, a gzip writer will be created.

func (*Encoder) Encode

func (enc *Encoder) Encode(ec eventOrComment) error

Encode writes an event or comment in the format specified by the server-sent events protocol.

type Event

type Event interface {
	// Id is an identifier that can be used to allow a client to replay
	// missed Events by returning the Last-Event-Id header.
	// Return empty string if not required.
	Id() string
	// The name of the event. Return empty string if not required.
	Event() string
	// The payload of the event.
	Data() string
}

Event is the interface for any event received by the client or sent by the server.

Example
package main

import (
	"fmt"
	"net"
	"net/http"
	"time"

	"github.com/launchdarkly/eventsource"
)

type TimeEvent time.Time

func (t TimeEvent) Id() string    { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string  { return time.Time(t).String() }

const (
	TICK_COUNT = 5
)

func TimePublisher(srv *eventsource.Server) {
	start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
	ticker := time.NewTicker(time.Second)
	for i := 0; i < TICK_COUNT; i++ {
		<-ticker.C
		srv.Publish([]string{"time"}, TimeEvent(start))
		start = start.Add(time.Second)
	}
}

func main() {
	srv := eventsource.NewServer()
	srv.Gzip = true
	defer srv.Close()
	l, err := net.Listen("tcp", ":8080")
	if err != nil {
		return
	}
	defer l.Close()
	http.HandleFunc("/time", srv.Handler("time"))
	go http.Serve(l, nil)
	go TimePublisher(srv)
	stream, err := eventsource.Subscribe("http://127.0.0.1:8080/time", "")
	if err != nil {
		return
	}
	for i := 0; i < TICK_COUNT; i++ {
		ev := <-stream.Events
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}

}
Output:

1356998400000000000 Tick 2013-01-01 00:00:00 +0000 UTC
1356998401000000000 Tick 2013-01-01 00:00:01 +0000 UTC
1356998402000000000 Tick 2013-01-01 00:00:02 +0000 UTC
1356998403000000000 Tick 2013-01-01 00:00:03 +0000 UTC
1356998404000000000 Tick 2013-01-01 00:00:04 +0000 UTC

type EventWithLastID added in v1.7.0

type EventWithLastID interface {
	// LastEventID is the value of the `id:` field that was most recently seen in an event
	// from this stream, if any. This differs from Event.Id() in that it retains the same
	// value in subsequent events if they do not provide their own `id:` field.
	LastEventID() string
}

EventWithLastID is an additional interface for an event received by the client, allowing access to the LastEventID method.

This is defined as a separate interface for backward compatibility, since this feature was added after the Event interface had been defined and adding a method to Event would break existing implementations. All events returned by Stream do implement this interface, and in a future major version the Event type will be changed to always include this field.

type Logger

type Logger interface {
	Println(...interface{})
	Printf(string, ...interface{})
}

Logger is the interface for a custom logging implementation that can handle log output for a Stream.

type Repository

type Repository interface {
	// Gets the Events which should follow on from the specified channel and event id. This method may be called
	// from different goroutines, so it must be safe for concurrent access.
	//
	// It is important for the Repository to close the channel after all the necessary events have been
	// written to it. The stream will not be able to proceed to any new events until it has finished consuming
	// the channel that was returned by Replay.
	//
	// Replay may return nil if there are no events to be sent.
	Replay(channel, id string) chan Event
}

Repository is an interface to be used with Server.Register() allowing clients to replay previous events through the server, if history is required.

Example
package main

import (
	"encoding/json"
	"fmt"
	"github.com/launchdarkly/eventsource"
	"net"
	"net/http"
)

type NewsArticle struct {
	id             string
	Title, Content string
}

func (a *NewsArticle) Id() string    { return a.id }
func (a *NewsArticle) Event() string { return "News Article" }
func (a *NewsArticle) Data() string  { b, _ := json.Marshal(a); return string(b) }

var articles = []NewsArticle{
	{"2", "Governments struggle to control global price of gas", "Hot air...."},
	{"1", "Tomorrow is another day", "And so is the day after."},
	{"3", "News for news' sake", "Nothing has happened."},
}

func buildRepo(srv *eventsource.Server) {
	repo := eventsource.NewSliceRepository()
	srv.Register("articles", repo)
	for i := range articles {
		repo.Add("articles", &articles[i])
		srv.Publish([]string{"articles"}, &articles[i])
	}
}

func main() {
	srv := eventsource.NewServer()
	defer srv.Close()
	http.HandleFunc("/articles", srv.Handler("articles"))
	l, err := net.Listen("tcp", ":8080")
	if err != nil {
		return
	}
	defer l.Close()
	go http.Serve(l, nil)
	stream, err := eventsource.Subscribe("http://127.0.0.1:8080/articles", "")
	if err != nil {
		return
	}
	go buildRepo(srv)
	// This will receive events in the order that they come
	for i := 0; i < 3; i++ {
		ev := <-stream.Events
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}
	stream, err = eventsource.Subscribe("http://127.0.0.1:8080/articles", "1")
	if err != nil {
		fmt.Println(err)
		return
	}
	// This will replay the events in order of id
	for i := 0; i < 3; i++ {
		ev := <-stream.Events
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}
}
Output:

2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."}
1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."}
3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}
1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."}
2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."}
3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}

type Server

type Server struct {
	AllowCORS   bool          // Enable all handlers to be accessible from any origin
	ReplayAll   bool          // Replay repository even if there's no Last-Event-Id specified
	BufferSize  int           // How many messages do we let the client get behind before disconnecting
	Gzip        bool          // Enable compression if client can accept it
	MaxConnTime time.Duration // If non-zero, HTTP connections will be automatically closed after this time
	Logger      Logger        // Logger is a logger that, when set, will be used for logging debug messages
	// contains filtered or unexported fields
}

Server manages any number of event-publishing channels and allows subscribers to consume them. To use it within an HTTP server, create a handler for each channel with Handler().

func NewServer

func NewServer() *Server

NewServer creates a new Server instance.

func (*Server) Close

func (srv *Server) Close()

Close permanently shuts down the Server. It will no longer allow new subscriptions.

func (*Server) Handler

func (srv *Server) Handler(channel string) http.HandlerFunc

Handler creates a new HTTP handler for serving a specified channel.

The channel does not have to have been previously registered with Register, but if it has been, the handler may replay events from the registered Repository depending on the setting of server.ReplayAll and the Last-Event-Id header of the request.

func (*Server) Publish

func (srv *Server) Publish(channels []string, ev Event)

Publish publishes an event to one or more channels.

func (*Server) PublishComment

func (srv *Server) PublishComment(channels []string, text string)

PublishComment publishes a comment to one or more channels.

func (*Server) PublishWithAcknowledgment added in v1.6.0

func (srv *Server) PublishWithAcknowledgment(channels []string, ev Event) <-chan struct{}

PublishWithAcknowledgment publishes an event to one or more channels, returning a channel that will receive a value after the event has been processed by the server.

This can be used to ensure a well-defined ordering of operations. Since each Server method is handled asynchronously via a separate channel, if you call server.Publish and then immediately call server.Close, there is no guarantee that the server execute the Close operation only after the event has been published. If you instead call PublishWithAcknowledgement, and then read from the returned channel before calling Close, you can be sure that the event was published before the server was closed.

func (*Server) Register

func (srv *Server) Register(channel string, repo Repository)

Register registers a Repository to be used for the specified channel. The Repository will be used to determine whether new subscribers should receive data that was generated before they subscribed.

Channels do not have to be registered unless you want to specify a Repository. An unregistered channel can still be subscribed to with Handler, and published to with Publish.

func (*Server) Unregister added in v1.6.0

func (srv *Server) Unregister(channel string, forceDisconnect bool)

Unregister removes a channel registration that was created by Register. If forceDisconnect is true, it also causes all currently active handlers for that channel to close their connections. If forceDisconnect is false, those connections will remain open until closed by their clients but will not receive any more events.

This will not prevent creating new channel subscriptions for the same channel with Handler, or publishing events to that channel with Publish. It is the caller's responsibility to avoid using channels that are no longer supposed to be used.

type SliceRepository

type SliceRepository struct {
	// contains filtered or unexported fields
}

SliceRepository is an example repository that uses a slice as storage for past events.

func NewSliceRepository

func NewSliceRepository() *SliceRepository

NewSliceRepository creates a SliceRepository.

func (*SliceRepository) Add

func (repo *SliceRepository) Add(channel string, event Event)

Add adds an event to the repository history.

func (SliceRepository) Replay

func (repo SliceRepository) Replay(channel, id string) (out chan Event)

Replay implements the event replay logic for the Repository interface.

type Stream

type Stream struct {

	// Events emits the events received by the stream
	Events chan Event
	// Errors emits any errors encountered while reading events from the stream.
	//
	// Errors during initialization of the stream are not pushed to this channel, since until the
	// Subscribe method has returned the caller would not be able to consume the channel. If you have
	// configured the Stream to be able to retry on initialization errors, but you still want to know
	// about those errors or control how they are handled, use StreamOptionErrorHandler.
	//
	// If an error handler has been specified with StreamOptionErrorHandler, the Errors channel is
	// not used and will be nil.
	Errors chan error

	// Logger is a logger that, when set, will be used for logging informational messages.
	//
	// This field is exported for backward compatibility, but should not be set directly because
	// it may be used by multiple goroutines. Use SetLogger instead.
	Logger Logger
	// contains filtered or unexported fields
}

Stream handles a connection for receiving Server Sent Events. It will try and reconnect if the connection is lost, respecting both received retry delays and event id's.

func Subscribe

func Subscribe(url, lastEventID string) (*Stream, error)

Subscribe to the Events emitted from the specified url. If lastEventId is non-empty it will be sent to the server in case it can replay missed events. Deprecated: use SubscribeWithURL instead.

func SubscribeWith

func SubscribeWith(lastEventID string, client *http.Client, request *http.Request) (*Stream, error)

SubscribeWith takes a HTTP client and request providing customization over both headers and control over the HTTP client settings (timeouts, tls, etc) If request.Body is set, then request.GetBody should also be set so that we can reissue the request Deprecated: use SubscribeWithRequestAndOptions instead.

func SubscribeWithRequest

func SubscribeWithRequest(lastEventID string, request *http.Request) (*Stream, error)

SubscribeWithRequest will take an http.Request to set up the stream, allowing custom headers to be specified, authentication to be configured, etc. Deprecated: use SubscribeWithRequestAndOptions instead.

func SubscribeWithRequestAndOptions

func SubscribeWithRequestAndOptions(request *http.Request, options ...StreamOption) (*Stream, error)

SubscribeWithRequestAndOptions takes an initial http.Request to set up the stream - allowing custom headers, authentication, etc. to be configured - and also takes any number of StreamOption values to set other properties of the stream, such as timeouts or a specific HTTP client to use.

func SubscribeWithURL

func SubscribeWithURL(url string, options ...StreamOption) (*Stream, error)

SubscribeWithURL subscribes to the Events emitted from the specified URL. The stream can be configured by providing any number of StreamOption values.

func (*Stream) Close

func (stream *Stream) Close()

Close closes the stream permanently. It is safe for concurrent access and can be called multiple times.

func (*Stream) Restart

func (stream *Stream) Restart()

Restart forces the stream to drop the currently active connection and attempt to connect again, in the same way it would if the connection had failed. There will be a delay before reconnection, as defined by the Stream configuration (StreamOptionInitialRetry, StreamOptionUseBackoff, etc.).

This method is safe for concurrent access. Its behavior is asynchronous: Restart returns immediately and the connection is restarted as soon as possible from another goroutine after that. It is possible for additional events from the original connection to be delivered during that interval.ssible.

If the stream has already been closed with Close, Restart has no effect.

func (*Stream) SetLogger

func (stream *Stream) SetLogger(logger Logger)

SetLogger sets the Logger field in a thread-safe manner.

type StreamErrorHandler

type StreamErrorHandler func(error) StreamErrorHandlerResult

StreamErrorHandler is a function type used with StreamOptionErrorHandler.

This function will be called whenever Stream encounters either a network error or an HTTP error response status. The returned value determines whether Stream should retry as usual, or immediately stop.

The error may be any I/O error returned by Go's networking types, or it may be the eventsource type SubscriptionError representing an HTTP error response status.

For errors during initialization of the Stream, this function will be called on the same goroutine that called the Subscribe method; for errors on an existing connection, it will be called on a worker goroutine. It should return promptly and not block the goroutine.

In this example, the error handler always logs the error with log.Printf, and it forces the stream to close permanently if there was an HTTP 401 error:

func handleError(err error) eventsource.StreamErrorHandlerResult {
    log.Printf("stream error: %s", err)
    if se, ok := err.(eventsource.SubscriptionError); ok && se.Code == 401 {
        return eventsource.StreamErrorHandlerResult{CloseNow: true}
    }
    return eventsource.StreamErrorHandlerResult{}
}

type StreamErrorHandlerResult

type StreamErrorHandlerResult struct {
	// CloseNow can be set to true to tell the Stream to immediately stop and not retry, as if Close had
	// been called.
	//
	// If CloseNow is false, the Stream will proceed as usual after an error: if there is an existing
	// connection it will retry the connection, and if the Stream is still being initialized then the
	// retry behavior is configurable (see StreamOptionCanRetryFirstConnection).
	CloseNow bool
}

StreamErrorHandlerResult contains values returned by StreamErrorHandler.

type StreamOption

type StreamOption interface {
	// contains filtered or unexported methods
}

StreamOption is a common interface for optional configuration parameters that can be used in creating a stream.

func StreamOptionCanRetryFirstConnection

func StreamOptionCanRetryFirstConnection(initialRetryTimeout time.Duration) StreamOption

StreamOptionCanRetryFirstConnection returns an option that determines whether to apply retry behavior to the first connection attempt for the stream.

If the timeout is nonzero, an initial connection failure when subscribing will not cause an error result, but will trigger the same retry logic as if an existing connection had failed. The stream constructor will not return until a connection has been made, or until the specified timeout expires, if the timeout is positive; if the timeout is negative, it will continue retrying indefinitely.

The default value is zero: an initial connection failure will not be retried.

func StreamOptionErrorHandler

func StreamOptionErrorHandler(handler StreamErrorHandler) StreamOption

StreamOptionErrorHandler returns an option that causes a Stream to call the specified function for stream errors.

If non-nil, this function will be called whenever Stream encounters either a network error or an HTTP error response status. The returned value determines whether Stream should retry as usual, or immediately stop as if Close had been called.

When used, this mechanism replaces the Errors channel; that channel will be pre-closed and Stream will not push any errors to it, so the caller does not need to consume the channel.

Note that using a handler is the only way to have control over how Stream handles errors during the initial connection attempt, since there would be no way for the caller to consume the Errors channel before the Subscribe method has returned.

func StreamOptionHTTPClient

func StreamOptionHTTPClient(client *http.Client) StreamOption

StreamOptionHTTPClient returns an option that overrides the default HTTP client used by a stream when the stream is created.

func StreamOptionInitialRetry

func StreamOptionInitialRetry(retry time.Duration) StreamOption

StreamOptionInitialRetry returns an option that sets the initial retry delay for a stream when the stream is created.

This delay will be used the first time the stream has to be restarted; the interval will increase exponentially on subsequent reconnections. Each time, there will also be a pseudo-random jitter so that the actual value may be up to 50% less. So, for instance, if you set the initial delay to 1 second, the first reconnection will use a delay between 0.5s and 1s inclusive, and subsequent reconnections will be 1s-2s, 2s-4s, etc.

The default value is DefaultInitialRetry. In a future version, this value may change, so if you need a specific value it is best to set it explicitly.

func StreamOptionLastEventID

func StreamOptionLastEventID(lastEventID string) StreamOption

StreamOptionLastEventID returns an option that sets the initial last event ID for a stream when the stream is created. If specified, this value will be sent to the server in case it can replay missed events.

func StreamOptionLogger

func StreamOptionLogger(logger Logger) StreamOption

StreamOptionLogger returns an option that sets the logger for a stream when the stream is created (to change it later, you can use SetLogger). By default, there is no logger.

func StreamOptionReadTimeout

func StreamOptionReadTimeout(timeout time.Duration) StreamOption

StreamOptionReadTimeout returns an option that sets the read timeout interval for a stream when the stream is created. If the stream does not receive new data within this length of time, it will restart the connection.

By default, there is no read timeout.

func StreamOptionRetryResetInterval

func StreamOptionRetryResetInterval(retryResetInterval time.Duration) StreamOption

StreamOptionRetryResetInterval returns an option that sets the minimum amount of time that a connection must stay open before the Stream resets its backoff delay. This is only relevant if backoff is enabled (see StreamOptionUseBackoff).

If a connection fails before the threshold has elapsed, the delay before reconnecting will be greater than the last delay; if it fails after the threshold, the delay will start over at the the initial minimum value. This prevents long delays from occurring on connections that are only rarely restarted.

The default value is DefaultRetryResetInterval.

func StreamOptionUseBackoff

func StreamOptionUseBackoff(maxDelay time.Duration) StreamOption

StreamOptionUseBackoff returns an option that determines whether to use an exponential backoff for reconnection delays.

If the maxDelay parameter is greater than zero, backoff is enabled. The retry delay interval will be doubled (not counting jitter - see StreamOptionUseJitter) for consecutive stream reconnections, but will never be greater than maxDelay.

For consistency with earlier versions, this is currently zero (disabled) by default. In a future version this may change, so if you do not want backoff behavior you should explicitly set it to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd" behavior in the case of a server outage.

func StreamOptionUseJitter

func StreamOptionUseJitter(jitterRatio float64) StreamOption

StreamOptionUseJitter returns an option that determines whether to use a randomized jitter for reconnection delays.

If jitterRatio is greater than zero, it represents a proportion up to 1.0 (100%) that will be deducted from the retry delay interval would otherwise be used: for instance, 0.5 means that the delay will be randomly decreased by up to 50%. A value greater than 1.0 is treated as equal to 1.0.

For consistency with earlier versions, this is currently disabled (zero) by default. In a future version this may change, so if you do not want jitter you should explicitly set it to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd" behavior in the case of a server outage.

type SubscriptionError

type SubscriptionError struct {
	Code    int
	Message string
}

SubscriptionError is an error object returned from a stream when there is an HTTP error.

func (SubscriptionError) Error

func (e SubscriptionError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL