Documentation ¶
Overview ¶
Package brokerutil provides a common interface to message-brokers for pub-sub applications.
Use brokerutil to be able to build pub-sub applications which are not highly dependent on the message-brokers drivers implementation. brokerutil provides a common interface which enables developers to switch the message broker without having to rewrite major parts of the applications pub-sub logic.
Index ¶
- Variables
- type BlockingPubSubDriver
- type ConcurrentPubSubDriver
- type DriverAwarePubSub
- func (a *DriverAwarePubSub) ListenAsync() chan error
- func (a *DriverAwarePubSub) ListenSync() error
- func (a *DriverAwarePubSub) Publish(msg interface{}) error
- func (a *DriverAwarePubSub) PublishWithTarget(msg interface{}, target PubSubDriver) error
- func (a *DriverAwarePubSub) SubscribeAsync(fn SubscriberFunc) (chan error, SubscriberIdentifier)
- func (a *DriverAwarePubSub) SubscribeAsyncWithSource(fn SubscriberFuncWithSource) (chan error, SubscriberIdentifier)
- func (a *DriverAwarePubSub) SubscribeSync(fn SubscriberFunc) error
- func (a *DriverAwarePubSub) SubscribeSyncWithSource(fn SubscriberFuncWithSource) error
- func (a *DriverAwarePubSub) Terminate() error
- func (a *DriverAwarePubSub) Unsubscribe(identifier SubscriberIdentifier)
- func (a *DriverAwarePubSub) UnsubscribeAll()
- type Flag
- type PubSub
- func (a *PubSub) ListenAsync() chan error
- func (a *PubSub) ListenSync() error
- func (a *PubSub) Publish(msg interface{}) error
- func (a *PubSub) SubscribeAsync(fn SubscriberFunc) (chan error, SubscriberIdentifier)
- func (a *PubSub) SubscribeSync(fn SubscriberFunc) error
- func (a *PubSub) Terminate() error
- func (a *PubSub) Unsubscribe(identifier SubscriberIdentifier)
- func (a *PubSub) UnsubscribeAll()
- type PubSubDriver
- type SubscriberFunc
- type SubscriberFuncWithSource
- type SubscriberIdentifier
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConcurrentDriverCast is the error thrown when a cast to a concurrent driver // failed ErrConcurrentDriverCast = errors.New("could not cast driver to concurrency driver") // ErrBlockingDriverCast is the error thrown when a cast to a blocking driver // failed ErrBlockingDriverCast = errors.New("could not cast driver to blocking driver") // ErrMissingExecutionFlag is the error thrown when the GetDriverFlags() function // returned an array / slice missing a execution flag ErrMissingExecutionFlag = errors.New("could not find execution flag") )
Functions ¶
This section is empty.
Types ¶
type BlockingPubSubDriver ¶
type BlockingPubSubDriver interface { PubSubDriver // ReceiveMessage is called by the driver consumer to // recieve a message // // ReceiveMessage can be blocking ReceiveMessage() (interface{}, error) // PublishMessage is called by the driver consumer to // publish a message. PublishMessage(interface{}) error }
BlockingPubSubDriver is the implementation contract for a pub sub driver which does not support concurrent use
NotifyMessageRecieve() and NotifyMessageTest() can both be blocking, but no message will be sent / published during that block to follow the unsupported concurrent use restriction.
type ConcurrentPubSubDriver ¶
type ConcurrentPubSubDriver interface { PubSubDriver // GetMessageWriterChannel is called by the driver consumer // to get the writer channel of the driver. // // Messages written to the channel are to be sent to the // message broker by the driver. GetMessageWriterChannel() (chan<- interface{}, error) // GetMessageReaderChannel is called by the driver consumer // to get the reader channel of the driver. // // Recieved messages from the message broker are to be written // to this channel by the driver. GetMessageReaderChannel() (<-chan interface{}, error) }
ConcurrentPubSubDriver is the implementation contract for a pub sub driver which does support concurrent use.
type DriverAwarePubSub ¶
type DriverAwarePubSub struct {
// contains filtered or unexported fields
}
DriverAwarePubSub is an extension of PubSub with multiple drivers which enables its consumers to control / be informed from which / to which broker a message is sent / received from
func NewDriverAwarePubSub ¶
func NewDriverAwarePubSub(drivers ...PubSubDriver) (*DriverAwarePubSub, error)
NewDriverAwarePubSub creates a new DriverAwarePubSub from the provided drivers
func (*DriverAwarePubSub) ListenAsync ¶
func (a *DriverAwarePubSub) ListenAsync() chan error
ListenAsync starts the relay goroutine which uses the provided drivers to communicate with the message broker.
func (*DriverAwarePubSub) ListenSync ¶
func (a *DriverAwarePubSub) ListenSync() error
ListenSync starts relay loops which use the provided drivers to communicate with the message broker.
func (*DriverAwarePubSub) Publish ¶
func (a *DriverAwarePubSub) Publish(msg interface{}) error
Publish sends a message to the message broker.
func (*DriverAwarePubSub) PublishWithTarget ¶
func (a *DriverAwarePubSub) PublishWithTarget(msg interface{}, target PubSubDriver) error
PublishWithTarget sends a message to the message broker. Specify the driver ptr to send the message to.
func (*DriverAwarePubSub) SubscribeAsync ¶
func (a *DriverAwarePubSub) SubscribeAsync(fn SubscriberFunc) (chan error, SubscriberIdentifier)
SubscribeAsync creates a new callback function which is invoked on any incomming messages.
It returns a error chan which will contain all occuring / returned errors of the SubscriberFunc. A nil error indicates unsubscription. Use the SubscriberIdentifier to unsubscribe later.
func (*DriverAwarePubSub) SubscribeAsyncWithSource ¶
func (a *DriverAwarePubSub) SubscribeAsyncWithSource(fn SubscriberFuncWithSource) (chan error, SubscriberIdentifier)
SubscribeAsyncWithSource creates a new callback function which is invoked on any incomming message with the driver ptr it came from (aka source).
It returns a error chan which will contain all occuring / returned errors of the SubscriberFunc. A nil error indicates unsubscription. Use the SubscriberIdentifier to unsubscribe later.
func (*DriverAwarePubSub) SubscribeSync ¶
func (a *DriverAwarePubSub) SubscribeSync(fn SubscriberFunc) error
SubscribeSync creates a new callback function like SubscriberAsync().
It will block until receiving error or nil in error chan, then returns it.
func (*DriverAwarePubSub) SubscribeSyncWithSource ¶
func (a *DriverAwarePubSub) SubscribeSyncWithSource(fn SubscriberFuncWithSource) error
SubscribeSyncWithSource creates a new callback function like SubscribeAsyncWithSource(): it will be invoked also with the driver ptr the message was received from
It will block until receiving error or nil in error chan, then returns it.
func (*DriverAwarePubSub) Terminate ¶
func (a *DriverAwarePubSub) Terminate() error
Terminate send a termination signal so that the blocking Listen will be released.
Subscribers will be unsubscribed was well.
func (*DriverAwarePubSub) Unsubscribe ¶
func (a *DriverAwarePubSub) Unsubscribe(identifier SubscriberIdentifier)
Unsubscribe removes a previously added callback function from the invokation loop.
Use the SubscriberIdentifier created when calling SubscribeAsync(). It will send a nil error in the callback function's error chan.
func (*DriverAwarePubSub) UnsubscribeAll ¶
func (a *DriverAwarePubSub) UnsubscribeAll()
UnsubscribeAll removes all added callback functions from the invokation loop.
It will send a nil error in the callback's function's error chans.
type Flag ¶
type Flag int
Flag should reflect the ability of a driver to be used in concurrent environments
const ( // ConcurrentExecution is the Type value used to // indicate that the pub sub driver supports concurrent // use ConcurrentExecution Flag = iota // BlockingExecution is the Type value used to // indicate that the pub sub driver does not support // concurrent use BlockingExecution Flag = iota )
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is the common "gateway" to reach to interact with the message broker such as Publish / Subscribe. Independently from the implementation of the driver, it guarantees that the exposed functions will work as expected.
func NewPubSubFromDriver ¶
func NewPubSubFromDriver(d PubSubDriver) (*PubSub, error)
NewPubSubFromDriver creates a new PubSub from the provided driver
Depending on the implementation of the driver (single- or multithreaded) a different PubSub implementation will be chosen.
func NewPubSubFromDrivers ¶
func NewPubSubFromDrivers(drivers ...PubSubDriver) (*PubSub, error)
NewPubSubFromDrivers creates a new PubSub from the provided drivers
Only the first driver is used to publish messages, for further functionality use DriverAwarePubSub
func (*PubSub) ListenAsync ¶
ListenAsync starts the relay goroutine which uses the provided driver to communicate with the message broker.
func (*PubSub) ListenSync ¶
ListenSync starts relay loops which use the provided driver to communicate with the message broker.
func (*PubSub) SubscribeAsync ¶
func (a *PubSub) SubscribeAsync(fn SubscriberFunc) (chan error, SubscriberIdentifier)
SubscribeAsync creates a new callback function which is invoked on any incomming messages.
It returns a error chan which will contain all occuring / returned errors of the SubscriberFunc. A nil error indicates the auto-unsubscribe after the call of UnsubscribeAll(). Use the SubscriberIdentifier to Unsubscribe later.
func (*PubSub) SubscribeSync ¶
func (a *PubSub) SubscribeSync(fn SubscriberFunc) error
SubscribeSync creates a new callback function like SubscribeAsync().
It will block until recieving error or nil in the error chan, then returns it.
func (*PubSub) Terminate ¶
Terminate send a termination signal so that the blocking Listen will be released.
Subscribers will be unsubscribed was well.
func (*PubSub) Unsubscribe ¶
func (a *PubSub) Unsubscribe(identifier SubscriberIdentifier)
Unsubscribe removes a previously added callback function from the invokation loop.
Use the SubscriberIdentifier created when calling SubscribeAsync(). It will send a nil error in the callback function's error chan.
func (*PubSub) UnsubscribeAll ¶
func (a *PubSub) UnsubscribeAll()
UnsubscribeAll removes all added callback functions from the invokation loop.
It will send a nil error in the callback's function's error chans.
type PubSubDriver ¶
type PubSubDriver interface { // GetDriverFlags should reflect the ability of the driver to // be used in concurrent environments such as multiple // goroutines pub'n'subbing concurrently GetDriverFlags() []Flag // CloseStream is called by the driver consumer when // the pub-sub stream is to be closed CloseStream() error // OpenStream is called by the driver consumer when // the pub-sub stream is to be opened OpenStream() error }
PubSubDriver is the simplest pub sub driver requirement to be used initially
type SubscriberFunc ¶
type SubscriberFunc func(interface{}) error
SubscriberFunc is the type of a callback function
type SubscriberFuncWithSource ¶
type SubscriberFuncWithSource func(interface{}, PubSubDriver) error
SubscriberFuncWithSource is the type of a subscriber func with provided driver information
type SubscriberIdentifier ¶
type SubscriberIdentifier string
SubscriberIdentifier uniquely identifies a callback function
These identifiers are generated once a callback function is created in the internal subscriber map. You can use it to remove a subscriber from scheduling rotation which is commonly called unsubscribing.