streamer

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2023 License: BSD-2-Clause Imports: 10 Imported by: 0

README

Streamer

Build Go Report Card codecov

  • stream aims at providing a no hassle, quick, and easy integration of websockets for your Go server.

  • stream is not a complete package and will be extended in the future.

  • stream is built on top of https://github.com/gorilla/websocket.

  • To start a new stream just do the following:

package main

import "github.com/andrewwormald/streamer"

func main() {
    s := streamer.New() // Returns a new stream
}

API

package streamer

// Accept takes ownership of upgrading the HTTP server connection to the WebSocket protocol and adding the new connection
// to the Stream's channel pool.
func (s *Stream) Accept(w http.ResponseWriter, r *http.Request, channelKey string) error {}

// Publish sends a message to all of the open channels in the Stream's channel pool and takes channelTimeout which it uses to
// set a deadline per channel.
func (s *Stream) Publish(m SendMessage) {}

// Responder is blocking method that should be run in a goroutine for responding and handling received messages
func (s *Stream) Responder(handler func(m ReceiveMessage)) {}

// Read returns the streams read buffer that it consumes from for handling messages from the stream's channels
func (s *Stream) Read() chan ReceiveMessage {}

// Collect uses the provided channelID to fetch the channel
func (s *Stream) Collect(channelID string) (*Channel, error) {}

// Connections returns the amount of valid channels that are in the Stream.
func (s *Stream) Connections() int {}

Options
package streamer

func WithReadBufferSize(size int) StreamOption {
	return func(s *Stream) {
		s.readBuff = make(chan ReceiveMessage, size)
	}
}

func WithUpgrader(u websocket.Upgrader) StreamOption {
	return func(s *Stream) {
		s.u = u
	}
}
Errors
package streamer

var ErrChannelDoesNotExist = errors.New("channel does not exist", j.C("ERR_bcd404068d4f7f1b"))

Upcoming

Add go doc

refactor to not use gorilla and increase memory efficiency

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrChannelDoesNotExist = errors.New("channel does not exist", j.C("ERR_bcd404068d4f7f1b"))

ErrChannelDoesNotExist is returned when the channel is not found in the stream

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is the abstraction of the websocket connection. You interact with the websocket connection via the Channel struct and all clients are represented as a Channel as soon as they are accepted into the stream.

func NewChannel

func NewChannel(ctx context.Context, wc *websocket.Conn, id string, opts ...ChannelOption) *Channel

NewChannel provides a new Channel that the wraps the websocket connection and allows Stream to easily interface with it. NewChannel by default runs a an additional go routine to manage and flush all the writes to the connection.

func (*Channel) Close

func (c *Channel) Close(closeCode int)

Close sends a close connection message to the channel's underlying websocket connection. Regardless if this is successful the channel's context will be cancelled which results in this channel being marked as closed. Close takes a Close code defined in RFC 6455, section 11.7.

func (*Channel) Closed

func (c *Channel) Closed() bool

Closed returns true if the Channel can no longer be interacted with.

func (*Channel) ID

func (c *Channel) ID() string

ID returns the id of the channel that the stream uses as its key in the channel pool.

func (*Channel) Recv

func (c *Channel) Recv(ch chan ReceiveMessage)

Recv is a blocking method that waits for a websocket Text message from the channel and progresses to write to the parameter Go channel. If the provided Go channel is full for more that 200 milliseconds an error will be logged and will retry when the next message is read.

func (*Channel) Send

func (c *Channel) Send(ctx context.Context, msg string) error

Send pushes a string to the channel's write buffer. If the buffer is full this method will hang until the context is cancelled or until items are removed from the writeBuf.

Send must be provided with a context.WithTimeout() or context.WithDeadline() as to prevent Send from becoming a blocking method that is dependant on it's consumption of the write buffer.

Send returns a context error if the context's Done is closed.

type ChannelOption

type ChannelOption func(c *Channel)

ChannelOption provides the ability to configure the Channel to your own specification

func WithWriteBufferSize

func WithWriteBufferSize(size int) ChannelOption

WithWriteBufferSize changes the size of the underlying write buffer. The write buffer size correlates to the number of messages and not the actual size of the messages. Therefore a buffer size of 1 would allow 1 message to be queued at a time.

func WithoutAsyncFlush

func WithoutAsyncFlush() ChannelOption

WithoutAsyncFlush means that the Channel will not send any messages it is send and depending on the WriteBufferSize will wait for the first message to be consumed. This is largely used for tests and not intended to be used in production.

type ReceiveMessage

type ReceiveMessage struct {
	ChannelID string `json:"id"`
	Message   string `json:"message"`
}

ReceiveMessage is the data structure expected from the client

type SendMessage

type SendMessage struct {
	Topic   string `json:"topic"`
	Message string `json:"message"`
}

SendMessage is the outgoing data structure being sent from the stream to the client

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a abstracted ws client connection pool that has an API to interact with the entire pool of client connections. Each pool requires that the connection is accepted into it before it can include it or listen to it.

func New

func New(ctx context.Context, opts ...StreamOption) *Stream

New returns a new implementation of the Stream struct and kicks off the housekeeping loop to ensure all closed connections are removed from the Stream

func (*Stream) Accept

func (s *Stream) Accept(w http.ResponseWriter, r *http.Request, channelKey string) error

Accept takes ownership of upgrading the HTTP server connection to the WebSocket protocol and adding the new connection to the Stream's channel pool.

func (*Stream) Connections

func (s *Stream) Connections() int

Connections returns the amount of valid channels that are in the Stream.

func (*Stream) Publish

func (s *Stream) Publish(payload string)

Publish sends a string payload to all of the open channels in the Stream's channel pool and takes channelTimeout which it uses to set a deadline per channel.

func (*Stream) Read

func (s *Stream) Read() chan ReceiveMessage

Read returns the streams read buffer that it consumes from for handling messages from the stream's channels

func (*Stream) Responder

func (s *Stream) Responder(handler func(m ReceiveMessage))

Responder is blocking method that should be run in a goroutine for responding and handling received messages

type StreamOption

type StreamOption func(*Stream)

StreamOption is a type that allows configuration of the Stream type.

func WithReadBufferSize

func WithReadBufferSize(size int) StreamOption

WithReadBufferSize takes an int which is used to set the go channel size and therefore passing 1 would entail a limit of 1 message to be queued at a time.

func WithUpgrader

func WithUpgrader(u websocket.Upgrader) StreamOption

WithUpgrader allows the stream to be configured with a custom gorilla websocket upgrader.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL