commander

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2021 License: MIT Imports: 11 Imported by: 5

README

Commander 🚀

GoDoc

Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streaming and more! Dialects could be used to stream messages from one to another.

Getting started

  1. 🚀 Examples

Contributing

Thank you for your interest in contributing to Commander! ❤ Check out the open projects and/or issues and feel free to join any ongoing discussion.

Everyone is welcome to contribute, whether it's in the form of code, documentation, bug reports, feature requests, or anything else. We encourage you to experiment with the project and make contributions to help evolve it to meet your needs!

See the contributing guide for more details.

Documentation

Index

Constants

View Source
const (
	// BeforeEvent gets called before a action gets taken.
	BeforeEvent = "before"
	// AfterEvent gets called after a action has been taken.
	AfterEvent = "after"
)
View Source
const (
	EventMessage   = types.EventMessage
	CommandMessage = types.CommandMessage
)

Available message types

View Source
const (
	ConsumeMode = types.ConsumeMode
	ProduceMode = types.ProduceMode

	DefaultMode = types.DefaultMode
)

Available topic modes

View Source
const (
	// DebugEnv os debug env key
	DebugEnv = "DEBUG"
)

Variables

View Source
var (
	ErrNoTopic  = errors.New("no topic found")
	ErrNoAction = errors.New("no action defined")
)

Custom error types

View Source
var (
	// ErrTimeout is returned when a timeout is reached when awaiting a responding event
	ErrTimeout = errors.New("timeout reached")
)
View Source
var NewMessage = types.NewMessage

NewMessage types.NewMessage alias

View Source
var NewTopic = options.NewTopic

NewTopic constructs a new commander topic for the given name, type, mode and dialect. If no topic mode is defined is the default mode (consume|produce) assigned to the topic.

Functions

func WithAction added in v0.5.0

func WithAction(n string) options.HandlerOption

WithAction returns a HandleOptions that configures the action handle

func WithAwaitTimeout added in v0.5.0

func WithAwaitTimeout(d time.Duration) options.GroupOption

WithAwaitTimeout returns a GroupOption that configures the timeout period for the given group

func WithCallback added in v0.5.0

func WithCallback(h types.HandlerFunc) options.HandlerOption

WithCallback returns a HandleOptions that configures the callback method for a given handle

func WithMessageType added in v0.5.0

func WithMessageType(t types.MessageType) options.HandlerOption

WithMessageType returns a HandleOptions that configures the message type handle

Types

type Client

type Client struct {
	middleware.UseImpl
	Groups []*Group
}

Client manages the consumers, producers and groups.

func NewClient

func NewClient(groups ...*Group) (*Client, error)

NewClient constructs a new commander client. A client is needed to control a collection of groups.

func (*Client) Close

func (client *Client) Close() error

Close closes the consumer and producer

type Close

type Close = types.Close

Close represents a closing method

type Dialect added in v0.2.0

type Dialect = types.Dialect

Dialect extention of the Dialect type

type Group

type Group struct {
	Middleware middleware.UseImpl
	Timeout    time.Duration
	Topics     []types.Topic
	Retries    int8
	// contains filtered or unexported fields
}

Group contains information about a commander group. A commander group could contain a events and commands topic where commands and events could be consumed and produced to. The amount of retries attempted before a error is thrown could also be defined in a group.

func NewGroup

func NewGroup(definitions ...options.GroupOption) *Group

NewGroup initializes a new commander group.

func (*Group) AsyncCommand

func (group *Group) AsyncCommand(message *Message) error

AsyncCommand produces a message to the given group command topic and does not await for the responding event. If no command key is set will the command id be used.

func (*Group) AwaitEventWithAction

func (group *Group) AwaitEventWithAction(messages <-chan *types.Message, parent metadata.ParentID, action string) (message *Message, err error)

AwaitEventWithAction awaits till the first event for the given parent id and action is consumed. If no events are returned within the given timeout period a error will be returned.

func (*Group) AwaitMessage added in v0.3.0

func (group *Group) AwaitMessage(messages <-chan *types.Message, parent metadata.ParentID) (message *Message, err error)

AwaitMessage awaits till the first message is consumed for the given parent id. If no events are returned within the given timeout period a error will be returned.

func (*Group) FetchTopics

func (group *Group) FetchTopics(t types.MessageType, m types.TopicMode) []types.Topic

FetchTopics fetches the available topics for the given mode and the given type

func (*Group) Handle

func (group *Group) Handle(sort types.MessageType, action string, handler Handler) (Close, error)

Handle awaits messages from the given MessageType and action. Once a message is received is the callback method called with the received command. The handle is closed once the consumer receives a close signal.

func (*Group) HandleContext added in v0.5.0

func (group *Group) HandleContext(definitions ...options.HandlerOption) (Close, error)

HandleContext constructs a handle context based on the given definitions.

func (*Group) HandleFunc

func (group *Group) HandleFunc(sort types.MessageType, action string, callback HandlerFunc) (Close, error)

HandleFunc awaits messages from the given MessageType and action. Once a message is received is the callback method called with the received command. The handle is closed once the consumer receives a close signal.

func (*Group) NewConsumer

func (group *Group) NewConsumer(sort types.MessageType) (<-chan *types.Message, Close, error)

NewConsumer starts consuming events of topics from the same topic type. All received messages are published over the returned messages channel. All middleware subscriptions are called before consuming the message. Once a message is consumed should the next function be called to mark a message successfully consumed.

func (*Group) NewConsumerWithDeadline

func (group *Group) NewConsumerWithDeadline(timeout time.Duration, t types.MessageType) (<-chan *types.Message, Close, error)

NewConsumerWithDeadline consumes events of the given message type for the given duration. The message channel is closed once the deadline is reached. Once a message is consumed should the next function be called to mark a successfull consumption. The consumer could be closed premature by calling the close method.

func (*Group) ProduceCommand

func (group *Group) ProduceCommand(message *Message) error

ProduceCommand produce a message to the given group command topic. A error is returned if anything went wrong in the process. If no command key is set will the command id be used.

func (*Group) ProduceEvent

func (group *Group) ProduceEvent(message *Message) error

ProduceEvent produces a event kafka message to the set event topic. A error is returned if anything went wrong in the process.

func (*Group) Publish

func (group *Group) Publish(message *Message) error

Publish publishes the given message to the group producer. All middleware subscriptions are called before publishing the message.

func (*Group) SyncCommand

func (group *Group) SyncCommand(message *Message) (event *Message, err error)

SyncCommand produces a message to the given group command topic and awaits its responding event message. If no message is received within the set timeout period will a timeout be thrown.

type Handler

type Handler = types.Handler

Handler interface handle wrapper

type HandlerFunc added in v0.6.0

type HandlerFunc = types.HandlerFunc

HandlerFunc message handle message, writer implementation

type Message

type Message = types.Message

Message a message

type Next

type Next = types.Next

Next indicates that the next message could be called

type Topic

type Topic = types.Topic

Topic contains information of a kafka topic

type Writer added in v0.3.0

type Writer = types.Writer

Writer handle implementation for a given group and message

func NewWriter added in v0.3.0

func NewWriter(group *Group, parent *Message) Writer

NewWriter initializes a new response writer for the given value

Directories

Path Synopsis
dialects
examples
kafka Module
mock Module
streaming Module
zipkin Module
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL