bus

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidQueryResult indicates a programming error where the query result
	// wasn't passed as a pointer to be filled
	ErrInvalidQueryResult = errors.New("Query result must be a pointer")
)

Functions

func CommandHandlerName

func CommandHandlerName(h CommandHandler) string

commandhandlerName returns the name of the command handler, used for routing and DI

func Get

func Get(ctx context.Context, key string) interface{}

Get retrieves a service from the bus's DI container

func QueryHandlerName

func QueryHandlerName(h QueryHandler) string

QueryHandlerName returns the name of the handler, used for DI and routing

Types

type Bus

type Bus struct {
	Container di.Container
	// contains filtered or unexported fields
}

Bus is the main dependency. It is the entry point for all messages and routes them to the correct place either synchronously or asynchronously

var Instance *Bus

func NewBus

func NewBus(ctx context.Context, bcs []Module, configs ...Config) *Bus

NewBus returns a new configured bus. TODO: Create own DI container, maybe with code gen, that allows request scoping and control of dependence

func (*Bus) Close

func (b *Bus) Close()

Close deletes all the container resources. TODO: Should be private, and cleanup handled by ctx cancellation. But, what about in publish mode?

func (*Bus) Dispatch

func (b *Bus) Dispatch(ctx context.Context, cmd Command, sync bool) (*CommandResponse, error)

Dispatch runs a command, either synchronously or asynchronously

func (*Bus) ExtendCommands

func (b *Bus) ExtendCommands(fn func(CmdBuilder))

func (*Bus) ExtendEvents

func (b *Bus) ExtendEvents(rules ...EventRules) *Bus

ExtendEvents extends the Bus EventRules

func (*Bus) ExtendQueries

func (b *Bus) ExtendQueries(fn func(QueryBuilder))

func (*Bus) Get

func (b *Bus) Get(key string) interface{}

func (*Bus) Publish

func (b *Bus) Publish(ctx context.Context, events ...Event) error

Publish distributes one or more events to the system

func (*Bus) Query

func (b *Bus) Query(ctx context.Context, query Query, result interface{}) error

Query routes and handles a query

func (*Bus) RegisterContextKey

func (b *Bus) RegisterContextKey(key interface{ String() string }, fn func(j []byte) interface{})

RegisterContextKey registers a context key interpretation value for serialization

func (*Bus) RegisterDeletion

func (b *Bus) RegisterDeletion(fn func())

RegisterDeletion allows a plugin to register a function to clean itself up. TODO: Replace cleanup with more idiomatic context cleanup.

func (*Bus) RegisterWork

func (b *Bus) RegisterWork(fn func())

RegisterWork allows plugins to register a function for themselves that the bus should call when in worker mode

func (*Bus) Use

func (b *Bus) Use(ms ...interface{})

Use registers middleware and guards. Accepts a union of command/query guards and middleware.

func (*Bus) Work

func (b *Bus) Work()

Work runs the bus in subscribe mode, to be ran as on a worker node, or in the background on an API server TODO: Handle clean up from here, and don't block. Use ctx for cancellation

type CmdBuilder

type CmdBuilder interface {
	CmdReceiver

	Use(middlewares ...CommandMiddleware)

	With(middlewares ...CommandMiddleware) CmdReceiver

	Group(func(CmdBuilder))
}

CmdBuilder allows building of command routing patterns

type CmdReceiver

type CmdReceiver interface {
	Command(Command) commandRecord
}

CmdReceiver allows registration of a command and handler

type Command

type Command interface {
	message.Message

	// Command returns the commands (unique) name, and must be implemented by every command
	Command() string

	// Valid returns an error if the command is not valid. Must be implemented by every command
	Valid() error

	// Auth returns the list of scopes required for the command to execute.
	// The list of scopes may be dynamic by using data contained within the context,
	// such as user IDs, for protecting user data
	Auth(context.Context) [][]string
}

Command is a value object instructing the system to change state.

func CommandValidationGuard

func CommandValidationGuard(ctx context.Context, c Command) (context.Context, Command, error)

CommandValidationGuard checks a command is valid before being executed, and returns an error if not

type CommandContext

type CommandContext struct {
	// contains filtered or unexported fields
}

CommandContext is a context that command routes are built in

func NewCommandContext

func NewCommandContext() *CommandContext

NewCommandContext creates an initialized CommandContext

func (*CommandContext) Command

func (c *CommandContext) Command(cmd Command) commandRecord

func (*CommandContext) Group

func (c *CommandContext) Group(fn func(CmdBuilder))

func (CommandContext) Route

func (c CommandContext) Route(cmd Command) (CommandRoute, bool)

Route takes a command and returns it's execution route

func (CommandContext) Routes

func (c CommandContext) Routes() commandRouting

Routes buildings a routing table, a mapping between routable message and route entry

func (CommandContext) SelfTest

func (c CommandContext) SelfTest() error

func (*CommandContext) Use

func (c *CommandContext) Use(middlewares ...CommandMiddleware)

func (*CommandContext) With

func (c *CommandContext) With(middlewares ...CommandMiddleware) CmdReceiver

type CommandGuard

type CommandGuard = func(context.Context, Command) (context.Context, Command, error)

CommandGuard allows runtime composition of code to test commands before being routed to a handler. Intended for use with validation and access control. Will always run before a command is queued

type CommandHandler

type CommandHandler interface {
	// Execute takes a command and executes the stateful logic it requests.
	Execute(context.Context, Command) (CommandResponse, []message.Message)
}

CommandHandler is a handler for a specific command. Command <-> CommandHandler has a 1:1 relationship

func CmdMiddlewareFunc

func CmdMiddlewareFunc(fn func(context.Context, Command) (CommandResponse, []message.Message)) CommandHandler

CmdMiddlewareFunc is used for creating middleware with a function

func CommandLoggingMiddleware

func CommandLoggingMiddleware(next CommandHandler) CommandHandler

CommandLoggingMiddleware logs before and after a command is executed by it's handler

type CommandMiddleware

type CommandMiddleware = func(CommandHandler) CommandHandler

CommandMiddleware allows access to a command before and after it's executed by a handler

type CommandResponse

type CommandResponse struct {
	Error error
	ID    string
}

CommandResponse originates from a command when it is executed synchronously. If async, then the response cannot be provided.

type CommandRoute

type CommandRoute struct {
	Command    Command
	Middleware []CommandMiddleware
	Handler    CommandHandler
}

type CommandType

type CommandType struct {
}

CommandType can be embedded in commands to provide sane defaults for the Command interface

func (CommandType) Auth

func (c CommandType) Auth(ctx context.Context) [][]string

Auth implements the Command interface, with public access control

func (CommandType) MessageType

func (c CommandType) MessageType() message.Type

MessageType implement the message.Message interface

type Config

type Config = func(*Bus) error

func UseQueue

func UseQueue(q Queue) Config

UseQueue provides an instantiated queue for the bus to use

type Def

type Def struct {
	Build    func(ctn di.Container) (interface{}, error)
	Close    func(obj interface{}) error
	Name     interface{}
	Scope    string
	Tags     []di.Tag
	Unshared bool
}

type Event

type Event interface {
	message.Message

	// OwnedBy tells the event which entity the event originated from
	OwnedBy(interface{ String() string })

	// Event returns the events name. Must be implemented by all events
	Event() string
}

Event is a routable event indicating something has happened. Events are fanned out to both sync and async handlers

type EventHandler

type EventHandler interface {
	Handle(context.Context, Event) ([]message.Message, error)
	Async() bool
}

EventHandler is a handler for one specific event. Each event may have multiple, or 0, EventHandlers.

type EventQueue

type EventQueue struct {
	GobEncode bool // Unused, purely to make Gob encode the eventqueue and not fail.
	// contains filtered or unexported fields
}

EventQueue is embedded in entities to buffer events before being released to infrastructure TODO: Add entity versioning

func NewEventQueue

func NewEventQueue(owner interface{ String() string }) EventQueue

NewEventQueue returns an owned event queue

func (*EventQueue) Publish

func (e *EventQueue) Publish(events ...Event)

Publish adds events to the buffer queue, and sets their owner simutaneously

func (*EventQueue) Release

func (e *EventQueue) Release() []Event

Release empties the event queue, returning

type EventRules

type EventRules map[Event][]string

EventRules is a map that determines routing for an event. The key is the event name, and the value is a list of DI handler names. TODO: Change into routing composition

type EventType

type EventType struct {
	Owner string
}

EventType is a struct designed to be embedded within an event, providing some basic behaviours

func (EventType) MessageType

func (e EventType) MessageType() message.Type

MessageType satisfies the message.Message interface, used for routing

func (*EventType) OwnedBy

func (e *EventType) OwnedBy(id interface{ String() string })

OwnedBy is the owning entity of the event

type FuncModule

type FuncModule struct {
	EventsFunc   func() EventRules
	CommandsFunc func(CmdBuilder)
	QueriesFunc  func(QueryBuilder)
	ServicesFunc func() []Def

	Defs []Def
}

func (FuncModule) Commands

func (m FuncModule) Commands(b CmdBuilder)

func (FuncModule) EventRules

func (m FuncModule) EventRules() EventRules

func (FuncModule) Queries

func (m FuncModule) Queries(b QueryBuilder)

func (FuncModule) Services

func (m FuncModule) Services() []Def

type MessageRouter

type MessageRouter struct {
	Events eventRules
	// contains filtered or unexported fields
}

MessageRouter routes a message to its correct destination. TODO: Auto generate documentation of bus

func NewMessageRouter

func NewMessageRouter() MessageRouter

NewMessageRouter returns a new, empty, message router

func (*MessageRouter) Extend

func (r *MessageRouter) Extend(rules EventRules)

Extend takes EventRules|CommandRules|QueryRules and extends the routers internal routing rules with it

func (*MessageRouter) ExtendCommands

func (r *MessageRouter) ExtendCommands(fn func(b CmdBuilder))

func (*MessageRouter) ExtendQueries

func (r *MessageRouter) ExtendQueries(fn func(b QueryBuilder))

func (MessageRouter) RouteCommand

func (r MessageRouter) RouteCommand(cmd Command) (CommandRoute, bool)

RouteCommand returns the routing record for a command, if it exists

func (MessageRouter) RouteEvent

func (r MessageRouter) RouteEvent(e Event) []string

RouteEvent returns all the handlers for an event

func (MessageRouter) RouteQuery

func (r MessageRouter) RouteQuery(q Query) (QueryRoute, bool)

RouteQuery returns the routing record for a query, if it exists

type Module

type Module interface {
	EventRules() EventRules
	Commands(CmdBuilder)
	Queries(QueryBuilder)

	// TODO: Make own internal DI system
	Services() []Def
}

BoundedContext represents the integration between the main app and a BC.

type NoCommandHandler

type NoCommandHandler struct {
	Cmd Command
}

NoCommandHandler is an error returned when a command's handler cannot be found

func (NoCommandHandler) Error

func (e NoCommandHandler) Error() string

type NoQueryHandler

type NoQueryHandler struct {
	Query Query
}

NoQueryHandler is an error returned when a query's handler cannot be found

func (NoQueryHandler) Error

func (e NoQueryHandler) Error() string

type Query

type Query interface {
	message.Message

	// Query returns the name of the query, and must be implemented
	// by every query
	Query() string

	// Valid returns an error if the query is invalid
	Valid() error

	// Auth returns the scopes required to execute the query.
	// May return dynamic scopes, based on values in the context
	Auth(context.Context) [][]string
}

Query is a question that is asked of the application. Execution of the query cannot change application state, although may still change infrastructure state (such as monitoring)

func QueryValidationGuard

func QueryValidationGuard(ctx context.Context, q Query) (context.Context, Query, error)

QueryValidationGuard ensures a query is valid before being routed to a handler

type QueryBuilder

type QueryBuilder interface {
	QueryReceiver

	Use(middlewares ...QueryMiddleware)

	With(middlewares ...QueryMiddleware) QueryReceiver

	Group(func(QueryBuilder))
}

type QueryContext

type QueryContext struct {
	// contains filtered or unexported fields
}

QueryContext is a context that command routes are built in

func NewQueryContext

func NewQueryContext() *QueryContext

NewQueryContext creates an initialized QueryContext

func (*QueryContext) Group

func (c *QueryContext) Group(fn func(QueryBuilder))

func (*QueryContext) Query

func (c *QueryContext) Query(q Query) queryRecord

func (QueryContext) Route

func (c QueryContext) Route(q Query) (QueryRoute, bool)

Route takes a command and returns it's execution route

func (QueryContext) Routes

func (c QueryContext) Routes() queryRouting

Routes buildings a routing table, a mapping between routable message and route entry

func (QueryContext) SelfTest

func (c QueryContext) SelfTest() error

func (*QueryContext) Use

func (c *QueryContext) Use(middlewares ...QueryMiddleware)

func (*QueryContext) With

func (c *QueryContext) With(middlewares ...QueryMiddleware) QueryReceiver

type QueryGuard

type QueryGuard = func(context.Context, Query) (context.Context, Query, error)

QueryGuard allows runtime composition of code to test queries before being routed to a handler. Intended for use with validation and access control.

type QueryHandler

type QueryHandler interface {
	// Execute runs the query, and fills a result provided
	// in the third argument, which must be a pointer.
	Execute(context.Context, Query, interface{}) error
}

QueryHandler is an interface for a handler that executes a query. Queries have a 1:1 relationship with handlers.

func QueryLoggingMiddleware

func QueryLoggingMiddleware(next QueryHandler) QueryHandler

QueryLoggingMiddleware logs the query before and after it's executed

func QueryMiddlewareFunc

func QueryMiddlewareFunc(fn func(context.Context, Query, interface{}) error) QueryHandler

QueryMiddlewareFunc allows construction of a QueryMiddleware

type QueryMiddleware

type QueryMiddleware = func(QueryHandler) QueryHandler

QueryMiddleware allows access to a query before and after it's executed

type QueryReceiver

type QueryReceiver interface {
	Query(Query) queryRecord
}

type QueryRoute

type QueryRoute struct {
	Query      Query
	Middleware []QueryMiddleware
	Handler    QueryHandler
}

type QueryRoutes

type QueryRoutes map[string]*queryRoutingRecord

type QueryType

type QueryType struct {
}

QueryType is a utility type that can be embedded within a new Query

func (QueryType) Auth

func (QueryType) Auth(context.Context) [][]string

Auth provides a public default to satisfy the query interface

func (QueryType) MessageType

func (QueryType) MessageType() message.Type

MessageType returns the type for use with routing within the bus

type Queue

type Queue interface {
	// RegisterCtxKey allows serialization of contexts by registering a key and context
	RegisterCtxKey(key interface{ String() string }, fn func([]byte) interface{})

	// Publish publishes a message to the queue
	// blocking until the message has been published
	Publish(context.Context, ...message.Message) error

	// Subscribe registers a callback for inbound messages
	// and runs the queue, blocking
	// TODO: Assess whether this should block
	Subscribe(context.Context, func(context.Context, message.Message) error)

	// Close closes the queue down
	// TODO: Assess necessity and find better way of managing queue lifecycle
	Close()
}

Queue allows the bus to queue messages for asynchronous execution

Directories

Path Synopsis
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL