sseserver

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2023 License: AGPL-3.0 Imports: 11 Imported by: 4

README

sseserver 🏄♂

PkgGoDev Build Status CodeFactor Go Report Card

A high performance and thread-safe Server-Sent Events server for Go with hierarchical namespacing support.

This library has powered the streaming API endpoint for 💫 Emojitracker in production since 2014, where it routinely handles dispatching hundreds of messages per second to thousands of simultaneous clients, on a single Heroku dyno.

Introduction

Hierarchical Namespaced Channels*

A client can subscribe to channels reflecting the content they are interested in. For example, say you are broadcasting events to the namespaces /pets/cats and /pets/dogs. A client could subscribe to the parent channel /pets in the hierarchy and receive all messages for either.

In sseserver, channels have infinite depth and are automatically created on the fly with zero setup -- just broadcast and it works.

(*There's probably a more accurate term for this. If you know it, let me know.)

Performance

Designed for high throughput as primary performance consideration. In my preliminary benchmarking (on v1.0, circa 2014) this can handle ~100K/sec messages broadcast across ~1000 open HTTP connections on a 3.4GHz Intel Core i7 (using a single core, e.g. with GOMAXPROCS=1). There still remains quite a bit of optimization to be done so it should be able to get faster if needed.

SSE vs Websockets

SSE is the oft-overlooked uni-directional cousin of websockets. Being "just HTTP" it has some benefits:

  • Trvially easier to understand, plaintext format. Can be debugged by a human with curl.
  • Supported in most major browsers for a long time now. Everything except IE/Edge, but an easy polyfill!
  • Built-in standard support for automatic reconnection, event binding, etc.
  • Works with HTTP/2.

See also Smashing Magazine: "Using SSE Instead Of WebSockets For Unidirectional Data Flow Over HTTP/2".

Documentation

GoDoc

Namespaces are URLs

For clients, no need to think about protocol. To subscribe to one of the above namespaces from the previous example, just connect to http://$SERVER/pets/dogs. Done.

Example Usage

A simple Go program utilizing this package:

package main

import (
    "time"
    "github.com/mroth/sseserver"
)

func main() {
    s := sseserver.NewServer() // create a new server instance

    // broadcast the time every second to the "/time" namespace
    go func() {
        ticker := time.Tick(time.Duration(1 * time.Second))
        for {
            // wait for the ticker to fire
            t := <-ticker
            // create the message payload, can be any []byte value
            data := []byte(t.Format("3:04:05 pm (MST)"))
            // send a message without an event on the "/time" namespace
            s.Broadcast <- sseserver.SSEMessage{"", data, "/time"}
        }
    }()

    // simulate sending some scoped events on the "/pets" namespace
    go func() {
        time.Sleep(5 * time.Second)
        s.Broadcast <- sseserver.SSEMessage{"new-dog", []byte("Corgi"), "/pets/dogs"}
        s.Broadcast <- sseserver.SSEMessage{"new-cat", []byte("Persian"), "/pets/cats"}
        time.Sleep(1 * time.Second)
        s.Broadcast <- sseserver.SSEMessage{"new-dog", []byte("Terrier"), "/pets/dogs"}
        s.Broadcast <- sseserver.SSEMessage{"new-dog", []byte("Dauchsand"), "/pets/cats"}
        time.Sleep(2 * time.Second)
        s.Broadcast <- sseserver.SSEMessage{"new-cat", []byte("LOLcat"), "/pets/cats"}
    }()

    s.Serve(":8001") // bind to port and begin serving connections
}

All these event namespaces are exposed via HTTP endpoint in the /subscribe/:namespace route.

On the client, we can easily connect to those endpoints using built-in functions in JS:

// connect to an event source endpoint and print results
var es1 = new EventSource("http://localhost:8001/subscribe/time");
es1.onmessage = function(event) {
    console.log("TICK! The time is currently: " + event.data);
};

// connect to a different event source endpoint and register event handlers
// note that by subscribing to the "parent" namespace, we get all events
// contained within the pets hierarchy.
var es2 = new EventSource("http://localhost:8001/subscribe/pets")
es2.addEventListener("new-dog", function(event) {
    console.log("WOOF! Hello " + event.data);
}, false);

es2.addEventListener("new-cat", function(event) {
    console.log("MEOW! Hello " + event.data);
}, false);

Which when connecting to the server would yield results:

TICK! The time is currently: 6:07:17 pm (EDT)
TICK! The time is currently: 6:07:18 pm (EDT)
TICK! The time is currently: 6:07:19 pm (EDT)
TICK! The time is currently: 6:07:20 pm (EDT)
WOOF! Hello Corgi
MEOW! Hello Persian
TICK! The time is currently: 6:07:21 pm (EDT)
WOOF! Hello Terrier
WOOF! Hello Dauchsand
TICK! The time is currently: 6:07:22 pm (EDT)
TICK! The time is currently: 6:07:23 pm (EDT)
MEOW! Hello LOLcat
TICK! The time is currently: 6:07:24 pm (EDT)

Of course you could easily send JSON objects in the data payload instead, and most likely will be doing this often.

Another advantage of the SSE protocol is that the wire-format is so simple. Unlike WebSockets, we can connect with curl to an endpoint directly and just read what's going on:

$ curl http://localhost:8001/subscribe/pets
event:new-dog
data:Corgi

event:new-cat
data:Persian

event:new-dog
data:Terrier

event:new-dog
data:Dauchsand

event:new-cat
data:LOLcat

Yep, it's that simple.

Keep-Alives

All connections will send periodic :keepalive messages as recommended in the WHATWG spec (by default, every 15 seconds). Any library adhering to the EventSource standard should already automatically ignore and filter out these messages for you.

Admin Page

By default, an admin status page is available for easy monitoring.

screenshot

It's powered by a simple JSON API endpoint, which you can also use to build your own reporting. These endpoints can be disabled in the settings (see Server.Options).

HTTP Middleware

sseserver.Server implements the standard Go http.Handler interface, so you can easily integrate it with most existing HTTP middleware, and easily plug it into whatever you are using for your current routing, etc.

License

AGPL-3.0. Dual commercial licensing available upon request.

Documentation

Overview

Package sseserver implements a reference Server-Sent Events server, suitable for streaming unidirectional messages over HTTP to web browsers.

This implementation also adds easy namespacing so that clients can subscribe to only a specific subset of messages.

Server-Sent Events

For more information on the SSE format itself, check out this fairly comprehensive article: http://www.html5rocks.com/en/tutorials/eventsource/basics/

Note that the implementation of SSE in this server intentionally does not implement message IDs.

Namespacing

The server opens a HTTP endpoint at /subscribe/:namespace that will accept connections on any suitable child path. The remainder of the path is a virtual endpoint represents the "namespace" for the client connection. For example:

HTTP GET /subscribe/pets        // client subscribed to "/pets" namespace
HTTP GET /subscribe/pets/cats   // client subscribed to "/pets/cats" namespace
HTTP GET /subscribe/pets/dogs   // client subscribed to "/pets/dogs" namespace

SSEMessages broadcast via the server are only delivered to clients subscribed to the endpoint matching their namespace. Namespaces are hierarchical, subscribing to the "parent" endpoint would receive all messages broadcast to all child namespaces as well. E.g. in the previous example, a subscription to "/pets" would receive all messages broadcast to both the dogs and cats namespaces as well.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ProxyRemoteAddrHandler

func ProxyRemoteAddrHandler(next http.Handler) http.Handler

ProxyRemoteAddrHandler is HTTP middleware to determine the actual RemoteAddr of a http.Request when your server sits behind a proxy or load balancer.

When utilized, the value of RemoteAddr will be overridden based on the X-Real-IP or X-Forwarded-For HTTP header, which can be a comma separated list of IPs.

See http://httpd.apache.org/docs/2.2/mod/mod_proxy.html#x-headers for details.

Based on http://git.io/xDD3Mw

Types

type ReportingStatus

type ReportingStatus struct {
	Node        string         `json:"node"`
	Status      string         `json:"status"`
	Reported    int64          `json:"reported_at"`
	StartupTime int64          `json:"startup_time"`
	SentMsgs    uint64         `json:"msgs_broadcast"`
	Connections connStatusList `json:"connections"`
}

ReportingStatus is snapshot of metadata about the status of a Server

It can be serialized to JSON and is what gets reported to admin API endpoint.

type SSEMessage

type SSEMessage struct {
	Event     string // event scope for the message [optional]
	Data      []byte // message payload
	Namespace string // namespace for msg, matches to client subscriptions
}

SSEMessage is a message suitable for sending over a Server-Sent Event stream.

Note: Namespace is not part of the SSE spec, it is merely used internally to map a message to the appropriate HTTP virtual endpoint.

type Server

type Server struct {
	Broadcast chan<- SSEMessage
	Options   ServerOptions
	// contains filtered or unexported fields
}

Server is the primary interface to a SSE server.

Exposes a receive-only chan `Broadcast`: any SSEMessage sent to this channel will be broadcast out to any connected clients subscribed to a namespace that matches the message.

Server implements the http.Handler interface, and can be chained into existing HTTP routing muxes if desired.

func NewServer

func NewServer() *Server

NewServer creates a new Server and returns a reference to it.

func (*Server) Serve

func (s *Server) Serve(addr string)

Serve is a convenience method to begin serving connections on specified address.

This method blocks forever, as it is basically a convenience wrapper around http.ListenAndServe(addr, self).

It also implements basic request logging to STDOUT.

If you want to do something more sophisticated, you should not use this method, but rather just build your own HTTP routing/middleware chain around Server which implements the standard http.Handler interface.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements the http.Handler interface

func (*Server) Status

func (s *Server) Status() ReportingStatus

Status returns the ReportingStatus for a given server.

Primarily intended for logging and reporting.

type ServerOptions

type ServerOptions struct {
	DisableAdminEndpoints bool // disables the "/admin" status endpoints

}

ServerOptions defines a set of high-level user options that can be customized for a Server.

Directories

Path Synopsis
examples
petstore
In this example we will still only send simple text messages, but one can just as easily serialize objects to JSON and send them through the data field, and deserialize back into Javascript objects on the client side.
In this example we will still only send simple text messages, but one can just as easily serialize objects to JSON and send them through the data field, and deserialize back into Javascript objects on the client side.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL