brokerutil

package module
v0.0.0-...-ba379e9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2018 License: MIT Imports: 4 Imported by: 2

README

brokerutil

brokerutil provides a common interface to message-brokers for pub-sub applications.

GoDoc Build Status codecov

Use brokerutil to be able to build pub-sub applications which feature interopability with message-brokers drivers. brokerutil provides a abstract and extendible interface which enables developers to switch or mix message brokers without having to rewrite major parts of the applications pub-sub logic.

Features

  • sync / async subscription
  • multi-driver support - subscribe to messages from multiple brokers
  • simple yet extendible pub sub interface
  • simple driver interface to implement own drivers
Preinstalled Drivers

Installation

go get github.com/jakoblorz/brokerutil

Use the package drivers or implement your own.

Example

This example will use redis as message-broker. It uses the package redis driver which extends on the github.com/go-redis/redis driver.

package main

import (
    "flag"

    "github.com/jakoblorz/brokerutil"
    "github.com/jakoblorz/brokerutil/redis"
    r "github.com/go-redis/redis"
)

var (
    raddr   = flag.String("raddr", ":6379", "redis address to connect to")
    channel = flag.String("channel", "brokerutil", "redis message channel")
)

func main() {
    flag.Parse()

    // create redis driver to support pub-sub
    driver, err := redis.NewRedisPubSubDriver([]string{*channel}, &r.Options{
        Addr: *raddr,
    })

    if err != nil {
        log.Fatalf("could not create redis driver: %v", err)
    }

    // create new pub sub using the initialized redis driver
    ps, err := brokerutil.NewPubSubFromDriver(driver)
    if err != nil {
        log.Fatalf("could not create pub sub: %v", err)
    }

    // run blocking subscribe as go routine
    go ps.SubscribeSync(func(msg interface{}) error {
        fmt.Printf("%v", msg)

        return nil
    })

    // start controller routine which blocks execution
    if err := ps.ListenSync(); err != nil {
        log.Fatalf("could not listen: %v", err)
    }
}

Documentation

Overview

Package brokerutil provides a common interface to message-brokers for pub-sub applications.

Use brokerutil to be able to build pub-sub applications which are not highly dependent on the message-brokers drivers implementation. brokerutil provides a common interface which enables developers to switch the message broker without having to rewrite major parts of the applications pub-sub logic.

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrConcurrentDriverCast is the error thrown when a cast to a concurrent driver
	// failed
	ErrConcurrentDriverCast = errors.New("could not cast driver to concurrency driver")

	// ErrBlockingDriverCast is the error thrown when a cast to a blocking driver
	// failed
	ErrBlockingDriverCast = errors.New("could not cast driver to blocking driver")

	// ErrMissingExecutionFlag is the error thrown when the GetDriverFlags() function
	// returned an array / slice missing a execution flag
	ErrMissingExecutionFlag = errors.New("could not find execution flag")
)

Functions

This section is empty.

Types

type BlockingPubSubDriver

type BlockingPubSubDriver interface {
	PubSubDriver

	// ReceiveMessage is called by the driver consumer to
	// recieve a message
	//
	// ReceiveMessage can be blocking
	ReceiveMessage() (interface{}, error)

	// PublishMessage is called by the driver consumer to
	// publish a message.
	PublishMessage(interface{}) error
}

BlockingPubSubDriver is the implementation contract for a pub sub driver which does not support concurrent use

NotifyMessageRecieve() and NotifyMessageTest() can both be blocking, but no message will be sent / published during that block to follow the unsupported concurrent use restriction.

type ConcurrentPubSubDriver

type ConcurrentPubSubDriver interface {
	PubSubDriver

	// GetMessageWriterChannel is called by the driver consumer
	// to get the writer channel of the driver.
	//
	// Messages written to the channel are to be sent to the
	// message broker by the driver.
	GetMessageWriterChannel() (chan<- interface{}, error)

	// GetMessageReaderChannel is called by the driver consumer
	// to get the reader channel of the driver.
	//
	// Recieved messages from the message broker are to be written
	// to this channel by the driver.
	GetMessageReaderChannel() (<-chan interface{}, error)
}

ConcurrentPubSubDriver is the implementation contract for a pub sub driver which does support concurrent use.

type DriverAwarePubSub

type DriverAwarePubSub struct {
	// contains filtered or unexported fields
}

DriverAwarePubSub is an extension of PubSub with multiple drivers which enables its consumers to control / be informed from which / to which broker a message is sent / received from

func NewDriverAwarePubSub

func NewDriverAwarePubSub(drivers ...PubSubDriver) (*DriverAwarePubSub, error)

NewDriverAwarePubSub creates a new DriverAwarePubSub from the provided drivers

func (*DriverAwarePubSub) ListenAsync

func (a *DriverAwarePubSub) ListenAsync() chan error

ListenAsync starts the relay goroutine which uses the provided drivers to communicate with the message broker.

func (*DriverAwarePubSub) ListenSync

func (a *DriverAwarePubSub) ListenSync() error

ListenSync starts relay loops which use the provided drivers to communicate with the message broker.

func (*DriverAwarePubSub) Publish

func (a *DriverAwarePubSub) Publish(msg interface{}) error

Publish sends a message to the message broker.

func (*DriverAwarePubSub) PublishWithTarget

func (a *DriverAwarePubSub) PublishWithTarget(msg interface{}, target PubSubDriver) error

PublishWithTarget sends a message to the message broker. Specify the driver ptr to send the message to.

func (*DriverAwarePubSub) SubscribeAsync

func (a *DriverAwarePubSub) SubscribeAsync(fn SubscriberFunc) (chan error, SubscriberIdentifier)

SubscribeAsync creates a new callback function which is invoked on any incomming messages.

It returns a error chan which will contain all occuring / returned errors of the SubscriberFunc. A nil error indicates unsubscription. Use the SubscriberIdentifier to unsubscribe later.

func (*DriverAwarePubSub) SubscribeAsyncWithSource

func (a *DriverAwarePubSub) SubscribeAsyncWithSource(fn SubscriberFuncWithSource) (chan error, SubscriberIdentifier)

SubscribeAsyncWithSource creates a new callback function which is invoked on any incomming message with the driver ptr it came from (aka source).

It returns a error chan which will contain all occuring / returned errors of the SubscriberFunc. A nil error indicates unsubscription. Use the SubscriberIdentifier to unsubscribe later.

func (*DriverAwarePubSub) SubscribeSync

func (a *DriverAwarePubSub) SubscribeSync(fn SubscriberFunc) error

SubscribeSync creates a new callback function like SubscriberAsync().

It will block until receiving error or nil in error chan, then returns it.

func (*DriverAwarePubSub) SubscribeSyncWithSource

func (a *DriverAwarePubSub) SubscribeSyncWithSource(fn SubscriberFuncWithSource) error

SubscribeSyncWithSource creates a new callback function like SubscribeAsyncWithSource(): it will be invoked also with the driver ptr the message was received from

It will block until receiving error or nil in error chan, then returns it.

func (*DriverAwarePubSub) Terminate

func (a *DriverAwarePubSub) Terminate() error

Terminate send a termination signal so that the blocking Listen will be released.

Subscribers will be unsubscribed was well.

func (*DriverAwarePubSub) Unsubscribe

func (a *DriverAwarePubSub) Unsubscribe(identifier SubscriberIdentifier)

Unsubscribe removes a previously added callback function from the invokation loop.

Use the SubscriberIdentifier created when calling SubscribeAsync(). It will send a nil error in the callback function's error chan.

func (*DriverAwarePubSub) UnsubscribeAll

func (a *DriverAwarePubSub) UnsubscribeAll()

UnsubscribeAll removes all added callback functions from the invokation loop.

It will send a nil error in the callback's function's error chans.

type Flag

type Flag int

Flag should reflect the ability of a driver to be used in concurrent environments

const (

	// ConcurrentExecution is the Type value used to
	// indicate that the pub sub driver supports concurrent
	// use
	ConcurrentExecution Flag = iota

	// BlockingExecution is the Type value used to
	// indicate that the pub sub driver does not support
	// concurrent use
	BlockingExecution Flag = iota
)

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub is the common "gateway" to reach to interact with the message broker such as Publish / Subscribe. Independently from the implementation of the driver, it guarantees that the exposed functions will work as expected.

func NewPubSubFromDriver

func NewPubSubFromDriver(d PubSubDriver) (*PubSub, error)

NewPubSubFromDriver creates a new PubSub from the provided driver

Depending on the implementation of the driver (single- or multithreaded) a different PubSub implementation will be chosen.

func NewPubSubFromDrivers

func NewPubSubFromDrivers(drivers ...PubSubDriver) (*PubSub, error)

NewPubSubFromDrivers creates a new PubSub from the provided drivers

Only the first driver is used to publish messages, for further functionality use DriverAwarePubSub

func (*PubSub) ListenAsync

func (a *PubSub) ListenAsync() chan error

ListenAsync starts the relay goroutine which uses the provided driver to communicate with the message broker.

func (*PubSub) ListenSync

func (a *PubSub) ListenSync() error

ListenSync starts relay loops which use the provided driver to communicate with the message broker.

func (*PubSub) Publish

func (a *PubSub) Publish(msg interface{}) error

Publish sends a message to the message broker.

func (*PubSub) SubscribeAsync

func (a *PubSub) SubscribeAsync(fn SubscriberFunc) (chan error, SubscriberIdentifier)

SubscribeAsync creates a new callback function which is invoked on any incomming messages.

It returns a error chan which will contain all occuring / returned errors of the SubscriberFunc. A nil error indicates the auto-unsubscribe after the call of UnsubscribeAll(). Use the SubscriberIdentifier to Unsubscribe later.

func (*PubSub) SubscribeSync

func (a *PubSub) SubscribeSync(fn SubscriberFunc) error

SubscribeSync creates a new callback function like SubscribeAsync().

It will block until recieving error or nil in the error chan, then returns it.

func (*PubSub) Terminate

func (a *PubSub) Terminate() error

Terminate send a termination signal so that the blocking Listen will be released.

Subscribers will be unsubscribed was well.

func (*PubSub) Unsubscribe

func (a *PubSub) Unsubscribe(identifier SubscriberIdentifier)

Unsubscribe removes a previously added callback function from the invokation loop.

Use the SubscriberIdentifier created when calling SubscribeAsync(). It will send a nil error in the callback function's error chan.

func (*PubSub) UnsubscribeAll

func (a *PubSub) UnsubscribeAll()

UnsubscribeAll removes all added callback functions from the invokation loop.

It will send a nil error in the callback's function's error chans.

type PubSubDriver

type PubSubDriver interface {

	// GetDriverFlags should reflect the ability of the driver to
	// be used in concurrent environments such as multiple
	// goroutines pub'n'subbing concurrently
	GetDriverFlags() []Flag

	// CloseStream is called by the driver consumer when
	// the pub-sub stream is to be closed
	CloseStream() error

	// OpenStream is called by the driver consumer when
	// the pub-sub stream is to be opened
	OpenStream() error
}

PubSubDriver is the simplest pub sub driver requirement to be used initially

type SubscriberFunc

type SubscriberFunc func(interface{}) error

SubscriberFunc is the type of a callback function

type SubscriberFuncWithSource

type SubscriberFuncWithSource func(interface{}, PubSubDriver) error

SubscriberFuncWithSource is the type of a subscriber func with provided driver information

type SubscriberIdentifier

type SubscriberIdentifier string

SubscriberIdentifier uniquely identifies a callback function

These identifiers are generated once a callback function is created in the internal subscriber map. You can use it to remove a subscriber from scheduling rotation which is commonly called unsubscribing.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL