broker

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2022 License: Apache-2.0 Imports: 5 Imported by: 0

README

Broker - a minimal pubsub

GoDoc Reference

pubsub is a very small library for implementing the simplest possible publish-subscribe mechanism for Go using channels.

Usage

import "github.com/borud/broker"
Creating a new broker
broker := New(Config{
    DownStreamChanLen:  100,
    PublishChanLen:     100,
    SubscribeChanLen:   10,
    UnsubscribeChanLen: 10,
    DeliveryTimeout:    10*time.Millisecond,
})

The configuration options are

  • DownStreamChanLen is the length of the channel used to send messages to the subscriber.

  • PublishChanLen is the length of the incoming channel used by Publish()

  • SubscribeChanLen is the length of the channel that accepts Subscribe() requests

  • UnsubscribeChanLen is the length of the channel that accepts unsubscribe requests.

  • DeliveryTimeout is the timeout before giving up delivering a message to a subscriber

Subscribe to topic or topic prefix
sub, err := broker.Subscribe("/foo/bar")
Fetch messages from subscription
for msg := range sub.Messages() {
    log.Printf("topic = '%s', message = '%+v'", msg.Topic, msg.Payload)
}
Publish message to broker with timeout
err := broker.Publish("/foo", "some payload", 300 * time.Millisecond)
Cancel a subscription
err := sub.Cancel()
Shut down broker
broker.Shutdown()
Topics

Topics are entirely dynamic, meaning that a topic exists if there are subscribers listening to it. If a message is published to a topic that has no subscribers, nothing will happen and the message is silently discarded.

Topics are hierarchical and look like filesyste paths and matching is by path prefix.

/house/bedroom/light
/house/bedroom/temp
/house/kitchen/light
/house/kitchen/temp
/house/kitchen/humidity

Your subscription can be for any prefix of the path, including the full path. You will receive all messages that match your prefix. So for instance if you subscribe to /house/kitchen you will get all messages sent to

/house/kitchen
/house/kitchen/light
/house/kitchen/temp
/house/kitchen/humidity

If you subscribe to /house/kitchen/temp you will only get messages sent to this single topic since it has no children.

At this time no wildcard matching is supported.

Logging

Libraries shouldn't emit log messages, but sometimes you might want to output log messages if you suspect something funny is going on. You can register your own logger via the Config type, like this:

b := New(Config{Logger: log.Printf})

The Logger is of type Printfer (a Printf'er to use the naming conventions of go), which looks like this:

func(string, ...interface{})

...which happens to be the signature of log.Printf (but not fmt.Printf).

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBrokerClosed = errors.New("Broker has been closed")

ErrBrokerClosed the broker has been closed

View Source
var ErrTimedTimedOut = errors.New("operation timed out")

ErrTimedTimedOut operation timed out

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents the message broker.

func New

func New(config Config) *Broker

New returns a new broker.

func (*Broker) Counts added in v0.1.4

func (b *Broker) Counts() (deliveryCount uint64, droppedCount uint64)

Counts returns the delivery count and dropped count respectively

func (*Broker) Publish

func (b *Broker) Publish(topic string, payload interface{}, timeout time.Duration) error

Publish a payload to topic.

func (*Broker) Shutdown

func (b *Broker) Shutdown()

Shutdown broker

func (*Broker) Subscribe

func (b *Broker) Subscribe(topicName string) (*Subscriber, error)

Subscribe creates a subscription and asks the broker to add it to its subscribers.

type Config added in v1.0.0

type Config struct {
	// DownStreamChanLen is the length of the channel used to send messages to the subscriber
	DownStreamChanLen int
	// PublishChanLen is the length of the incoming channel used by Publish()
	PublishChanLen int
	// SubscribeChanLen is the length of the channel that accepts Subscribe() requests
	SubscribeChanLen int
	// UnsubscribeChanLen is the length of the channel that accepts unsubscribe requests.
	// This is used by the Cancel() call on Subscriber
	UnsubscribeChanLen int
	// DeliveryTimeout is the timeout before giving up delivering a message to a subscriber
	DeliveryTimeout time.Duration
	// Logger provides a logger for logging errors.  Libraries shouldn't log so
	// this is a compromise.
	Logger Printfer
}

Config contains the broker configuration.

type Message

type Message struct {
	Topic   string
	Payload interface{}
}

Message contains the topic name the message was sent to and the payload.

type Printfer added in v1.0.2

type Printfer func(string, ...interface{})

Printfer is a Printf'er - that is, you can give it a function that looks like log.Printf.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber represents a subscription to a topic or topic prefix.

func (*Subscriber) Cancel

func (s *Subscriber) Cancel() error

Cancel cancels the subscription

func (*Subscriber) Messages

func (s *Subscriber) Messages() <-chan Message

Messages returns the message channel.

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL