spub

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2017 License: Apache-2.0 Imports: 4 Imported by: 1

README

Spub

GoDoc

What is Spub?

Spub is a simple, channel-based asynchronous broadcast publishing wrapper.

With Spub, you can send once and have any number of receivers get a copy of the data.

How do I use it?

First, create a spub.Mux by passing it a time.Duration to act as the default timeout to use to determine when a send call should be considered invalid:

p := spub.New(time.Milisecond * 1)

You will want to begin receiving on the errors channel in a go-routine so you can handle any timeouts or other delivery failures on your own. Spub will return several typed errors that contain the data that wasn't able to be sent, and if applicable, data identifying the Listener that was unable to be sent to, so you can manage retries on your own.

Note that this errors channel will never be closed, because it is used to report issues during shutdown as well as normal operations.

Take care not to assume this channel will ever close

go func() {
    for err := <-p.Err() {
        switch err.(type) {
            case ErrPublishDeadline:
            case ErrShuttingDown:
            case ErrUnknownListener:
        }
    }
}

You can then begin registering Listeners. Listeners must have an ID, and that ID must be unique amongst all other Listeners. This is to support the case where 1 out o N Listeners fails to receive a message in time, or that the Mux is shutting down partway through a send, and you need to know which Listeners did not get the messages.

l := spub.Listener{
    ID: "unique-id", // not optional
    C: make(chan []byte), // optional, will default to this value
    Timeout: time.Milisecond * 1 // optional, will default to the Mux timeout you specified
}

if err := p.Register(l); err != nil {
    // You probably didn't set the ID or you set a duplicate ID
}

Now, you can begin to Send to the Listeners. Note that any erros from Send or SendTo will always be reported via the Err() channel, never synchronously. Both Send and SendTo are meant to behave in an asynchronous, non-blocking manner.

p.Send([]byte("some really important data"))

If you receive any errors in your Error handler, you can use this method to attempt to retry:

// err is either ErrPublishDeadline, ErrShuttingDown, ErrUnknownListener
p.SendTo(err.Data, err.ListenerID)

Documentation

Overview

Package spub has some pub sub stuff

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrDuplicateListenerID

type ErrDuplicateListenerID struct {
	ListenerID string
}

ErrDuplicateListenerID is returned when you register conflicting listener ids

func (ErrDuplicateListenerID) Error

func (e ErrDuplicateListenerID) Error() string

type ErrListenerWithoutID

type ErrListenerWithoutID struct {
	ListenerID string
}

ErrListenerWithoutID is returned when you register a listener without an ID

func (ErrListenerWithoutID) Error

func (e ErrListenerWithoutID) Error() string

type ErrPublishDeadline

type ErrPublishDeadline struct {
	Err        error
	Data       []byte
	ListenerID string
}

ErrPublishDeadline is returned when the message cannot be published due to timeout

func (ErrPublishDeadline) Error

func (e ErrPublishDeadline) Error() string

type ErrShuttingDown

type ErrShuttingDown struct {
	Data       []byte
	ListenerID string
}

ErrShuttingDown is returned when the message cannot be published due to shutdown

func (ErrShuttingDown) Error

func (e ErrShuttingDown) Error() string

type ErrUnknownListener

type ErrUnknownListener struct {
	Data       []byte
	ListenerID string
}

ErrUnknownListener is returned when the message cannot be published due to a listener being unknown by ID

func (ErrUnknownListener) Error

func (e ErrUnknownListener) Error() string

type Listener

type Listener struct {
	ID      string
	C       chan []byte
	Timeout time.Duration
	// contains filtered or unexported fields
}

Listener is a channel to receive bytes from and a context for cancellation

func (*Listener) Close

func (l *Listener) Close()

Close closes the listener

func (*Listener) Closed

func (l *Listener) Closed() bool

Closed returns whether or not the Listener is closed

type Pub

type Pub struct {
	// contains filtered or unexported fields
}

Pub registers listeners and produces errors

func New

func New(d time.Duration) *Pub

New returns a new Pub

func (*Pub) Close

func (p *Pub) Close()

Close closes the mux after draining the listeners?

func (*Pub) Err

func (p *Pub) Err() chan error

Err returns a channel that emits errors

func (*Pub) Register

func (p *Pub) Register(l Listener) error

Register will register a listener

func (*Pub) RemoveListener

func (p *Pub) RemoveListener(id string) error

RemoveListener is to turn a listener off and close it's channel

func (*Pub) Send

func (p *Pub) Send(b []byte)

Send will produce a message to the listeners. Check Err() for errors

func (*Pub) SendTo

func (p *Pub) SendTo(b []byte, id string)

SendTo allows you to send to just 1 specific listener, good for retrying failures

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL