sseclient

package module
v0.0.0-...-0e8c434 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2022 License: Unlicense Imports: 10 Imported by: 0

README

sseclient

GoDoc

This is a go library for consuming streams Server-Sent Events. It handles automatic reconnect and last seen event ID tracking.

Key differences:

  • Synchronous execution. Reconnecting, event parsing and processing is executed in single go-routine that started the stream. This provides freedom to choose any concurrency and synchronization model.
  • Go context aware. SSE streams can be optionally given a context on start. This gives flexibility to support different stream stopping mechanisms.

Usage example:

package main

import (
        "context"
        "log"
        "time"

        "github.com/fln/sseclient"
)

func errorHandler(err error) bool {
        log.Printf("error : %s", err)
        return false
}

func eventHandler(event *sseclient.Event) {
        log.Printf("event : %s : %s : %d bytes", event.ID, event.Event, len(event.Data))
}

func main() {
        c := sseclient.New("https://example.net/stream", "")
        ctx, _ := context.WithTimeout(context.Background(), time.Minute)
        c.Start(ctx, eventHandler, errorHandler)
}

Documentation

Overview

Package sseclient is library for consuming SSE streams.

Key features:

Synchronous execution. Reconnecting, event parsing and processing is executed in single go-routine that started the stream. This gives freedom to use any concurrency and synchronization model.

Go context aware. SSE streams can be optionally given a context on start. This gives flexibility to support different stream stopping mechanisms.

Example
package main

import (
	"context"
	"log"
	"time"
)

func errorHandler(err error) error {
	log.Printf("error : %s", err)
	return nil
}

func eventHandler(event *Event) error {
	log.Printf("event : %s : %s : %d bytes of data", event.ID, event.Event, len(event.Data))
	return nil
}

func main() {
	c := New("https://example.net/stream", "")
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()
	c.Start(ctx, eventHandler, errorHandler)
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// MalformedEvent error is returned if stream ended with incomplete event.
	MalformedEvent = errors.New("incomplete event at the end of the stream")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	URL         string
	LastEventID string
	Retry       time.Duration
	HTTPClient  *http.Client
	Headers     http.Header

	// VerboseStatusCodes specifies whether connect should return all
	// status codes as errors if they're not StatusOK (200).
	VerboseStatusCodes bool
}

Client is used to connect to SSE stream and receive events. It handles HTTP request creation and reconnects automatically.

Client struct should be created with New method or manually.

func New

func New(url, lastEventID string) *Client

New creates SSE stream client object. It will use given url and last event ID values and a 2 second retry timeout. It will use custom http client that skips verification for tls process. This method only creates Client struct and does not start connecting to the SSE endpoint.

func (*Client) Start

func (c *Client) Start(ctx context.Context, eventFn EventHandler, errorFn ErrorHandler) error

Start connects to the SSE stream. This function will block until SSE stream is stopped. Stopping SSE stream is possible by cancelling given stream context or by returning some error from the error handler callback. Error returned by the error handler is passed back to the caller of this function.

func (*Client) Stream

func (c *Client) Stream(ctx context.Context, buf int) <-chan StreamMessage

Stream is non-blocking SSE stream consumption mode where events are passed through a channel. Stream can be stopped by cancelling context.

Parameter buf controls returned stream channel buffer size. Buffer size of 0 is a good default.

type ErrorHandler

type ErrorHandler func(error) error

ErrorHandler is a callback that gets called every time SSE stream encounters an error including errors returned by EventHandler function. Network connection errors and response codes 500, 502, 503, 504 are not treated as errors.

If error handler returns nil, error will be treated as handled and stream will continue to be processed (with automatic reconnect).

If error handler returns error it is treated as fatal and stream processing loop exits returning received error up the stack.

This handler can be used to implement complex error handling scenarios. For simple cases ReconnectOnError or StopOnError are provided by this library.

Users of this package have to provide this function implementation.

var (
	ReconnectOnError ErrorHandler = func(error) error { return nil }
	StopOnError      ErrorHandler = func(err error) error { return err }
)

List of commonly used error handler function implementations.

type Event

type Event struct {
	ID    string
	Event string
	Data  []byte
}

Event object is a representation of single chunk of data in event stream.

type EventHandler

type EventHandler func(e *Event) error

EventHandler is a callback that gets called every time event on the SSE stream is received. Error returned from handler function will be passed to the error handler.

Users of this package have to provide this function implementation.

type StreamMessage

type StreamMessage struct {
	Event *Event
	Err   error
}

StreamMessage stores single SSE event or error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL