bus: github.com/mustafaturan/bus Index | Files

package bus

import "github.com/mustafaturan/bus"

Package bus is a minimalist event/message bus implementation for internal communication

The package requires a unique id generator to assign ids to events. You can write your own function to generate unique ids or use a package that provides unique id generation functionality.

The `bus` package respect to software design choice of the packages/projects. It supports both singleton and dependency injection to init a `bus` instance.

Here is a sample initilization using `monoton` id generator:

Example code for configuration:

import (
	"github.com/mustafaturan/bus"
	"github.com/mustafaturan/monoton"
	"github.com/mustafaturan/monoton/sequencer"
)

func NewBus() *bus.Bus {
	// configure id generator (it doesn't have to be monoton)
	node        := uint64(1)
	initialTime := uint64(1577865600000) // set 2020-01-01 PST as start
	m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
	if err != nil {
		panic(err)
	}

	// init an id generator
	var idGenerator bus.Next = (*m).Next

	// create a new bus instance
	b, err := bus.NewBus(idGenerator)
	if err != nil {
		panic(err)
	}

	// maybe register topics in here
	b.RegisterTopics("order.received", "order.fulfilled")

	return b
}

Register Topics

To emit events to the topics, topic names should be registered first:

Example code:

// register topics
b.RegisterTopics("order.received", "order.fulfilled")
// ...

Register Handlers

To receive topic events you need to register handlers; A handler basically requires two vals which are a `Handle` function and topic `Matcher` regex pattern.

Example code:

handler := bus.Handler{
	Handle: func(e *Event) {
		// do something
		// NOTE: Highly recommended to process the event in an async way
	},
	Matcher: ".*", // matches all topics
}
b.RegisterHandler("a unique key for the handler", &handler)

Emit Event

Example code:

// if txID val is blank, bus package generates one using the id generator
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "a-transaction-id")

// event topic name (must be registered before)
topic := "order.received"

// interface{} data for event
order := make(map[string]string)
order["orderID"]     = "123456"
order["orderAmount"] = "112.20"
order["currency"]    = "USD"

// emit the event
event, err := b.Emit(ctx, topic, order)

if err != nil {
	// report the err
	fmt.Println(err)
}

// in case of need to do anything with event on caller, a ref is also
// returning on `Emit` call.
fmt.Println(event)

Processing Events

When an event is emitted, the topic handlers receive the event synchronously. It is highly recommended to process events asynchronous. Package leave the decision to the packages/projects to use concurrency abstractions depending on use-cases. Each handlers receive the same event as ref of `bus.Event` struct.

Index

Package Files

bus.go doc.go

Constants

const (
    // CtxKeyTxID tx id context key
    CtxKeyTxID = ctxKey("bus.txID")

    // Version syncs with package version
    Version = "1.0.2"
)

type Bus Uses

type Bus struct {
    sync.Mutex
    // contains filtered or unexported fields
}

Bus is a message bus

func NewBus Uses

func NewBus(g IDGenerator) (*Bus, error)

NewBus inits a new bus

func (*Bus) DeregisterHandler Uses

func (b *Bus) DeregisterHandler(key string)

DeregisterHandler deletes handler from the registry

func (*Bus) DeregisterTopics Uses

func (b *Bus) DeregisterTopics(topicNames ...string)

DeregisterTopics deletes topic

func (*Bus) Emit Uses

func (b *Bus) Emit(ctx context.Context, topicName string, data interface{}) (*Event, error)

Emit inits a new event and delivers to the interested in handlers

func (*Bus) HandlerKeys Uses

func (b *Bus) HandlerKeys() []string

HandlerKeys returns list of registered handler keys

func (*Bus) HandlerTopicSubscriptions Uses

func (b *Bus) HandlerTopicSubscriptions(handlerKey string) []string

HandlerTopicSubscriptions returns all topic subscriptions of the handler

func (*Bus) RegisterHandler Uses

func (b *Bus) RegisterHandler(key string, h *Handler)

RegisterHandler re/register the handler to the registry

func (*Bus) RegisterTopics Uses

func (b *Bus) RegisterTopics(topicNames ...string)

RegisterTopics registers topics and fullfills handlers

func (*Bus) TopicHandlers Uses

func (b *Bus) TopicHandlers(topicName string) []*Handler

TopicHandlers returns all handlers for the topic

func (*Bus) Topics Uses

func (b *Bus) Topics() []string

Topics lists the all registered topics

type Event Uses

type Event struct {
    ID         string      // identifier
    TxID       string      // transaction identifier
    Topic      string      // topic name
    Data       interface{} // actual event data
    OccurredAt int64       // creation time in nanoseconds
    // contains filtered or unexported fields
}

Event is data structure for any logs

func (*Event) Context Uses

func (e *Event) Context() context.Context

Context returns event's context

type Handler Uses

type Handler struct {
    Handle  func(e *Event) // handler func to process events
    Matcher string         // topic matcher as regex pattern
}

Handler is a receiver for event reference with the given regex pattern

type IDGenerator Uses

type IDGenerator interface {
    Generate() string
}

IDGenerator is a sequential unique id generator interface

type Next Uses

type Next func() string

Next is a sequential unique id generator func type

func (Next) Generate Uses

func (n Next) Generate() string

Generate is an implementation of IDGenerator for bus.Next fn type

Package bus imports 5 packages (graph). Updated 2020-02-10. Refresh now. Tools for package owners.