commander: github.com/CloudProud/commander Index | Files | Directories

package commander

import "github.com/CloudProud/commander"

Package commander is a toolkit for writing event driven applications, aims to be developer friendly. Commander supports event driven patterns such as CQRS and has support for different "dialects". Dialects allow Commander to communicate with different protocols.

import (
	"github.com/jeroenrinzema/commander"
	"github.com/jeroenrinzema/commander/dialects/mock"
)

func main() {
	dialect := mock.NewDialect()
	group := commander.NewGroup(
		NewTopic("commands", dialect, commander.CommandMessage, commander.ConsumeMode),
		NewTopic("event", dialect, commander.EventMessage, commander.ConsumeMode|commander.ProduceMode),
	)

	client := commander.NewClient(group)
	defer client.Close()

	group.HandleFunc("example", commander.CommandTopic, func(writer commander.ResponseWriter, message interface{}) {
		writer.ProduceEvent("created", 1, uuid.Nil, nil)
	})

	command := commander.NewCommand("example", 1, uuid.Nil, nil)
	group.ProduceCommand(command)

	// ...
}

Index

Package Files

doc.go group.go main.go options.go retry.go types.go writer.go

Constants

const (
    // BeforeEvent gets called before a action gets taken.
    BeforeEvent = "before"
    // AfterEvent gets called after a action has been taken.
    AfterEvent = "after"
)
const (
    StatusOK                  = types.StatusOK
    StatusBadRequest          = types.StatusBadRequest
    StatusUnauthorized        = types.StatusUnauthorized
    StatusForbidden           = types.StatusForbidden
    StatusNotFound            = types.StatusNotFound
    StatusConflict            = types.StatusConflict
    StatusImATeapot           = types.StatusImATeapot
    StatusInternalServerError = types.StatusInternalServerError
)

Status codes that represents the status of a event

const (
    EventMessage   = types.EventMessage
    CommandMessage = types.CommandMessage
)

Available message types

const (
    ConsumeMode = types.ConsumeMode
    ProduceMode = types.ProduceMode

    DefaultMode = types.DefaultMode
)

Available topic modes

const (
    // DebugEnv os debug env key
    DebugEnv = "DEBUG"
)

Variables

var (
    ErrNoTopic  = errors.New("no topic found")
    ErrNoAction = errors.New("no action defined")
)

Custom error types

var (
    // ErrTimeout is returned when a timeout is reached when awaiting a responding event
    ErrTimeout = errors.New("timeout reached")
)
var NewMessage = types.NewMessage

NewMessage types.NewMessage alias

var NewTopic = options.NewTopic

NewTopic constructs a new commander topic for the given name, type, mode and dialect. If no topic mode is defined is the default mode (consume|produce) assigned to the topic.

var WithJSONCodec = options.WithJSONCodec

WithJSONCodec options.WithJSONCodec alias

func WithAction Uses

func WithAction(n string) options.HandlerOption

WithAction returns a HandleOptions that configures the action handle

func WithAwaitTimeout Uses

func WithAwaitTimeout(d time.Duration) options.GroupOption

WithAwaitTimeout returns a GroupOption that configures the timeout period for the given group

func WithCallback Uses

func WithCallback(h types.HandlerFunc) options.HandlerOption

WithCallback returns a HandleOptions that configures the callback method for a given handle

func WithMessageSchema Uses

func WithMessageSchema(f func() interface{}) options.HandlerOption

WithMessageSchema returns a HandleOptions that configures the message schema for a handle

func WithMessageType Uses

func WithMessageType(t types.MessageType) options.HandlerOption

WithMessageType returns a HandleOptions that configures the message type handle

type Client Uses

type Client struct {
    middleware.UseImpl
    Groups []*Group
}

Client manages the consumers, producers and groups.

func NewClient Uses

func NewClient(groups ...*Group) (*Client, error)

NewClient constructs a new commander client. A client is needed to control a collection of groups.

func (*Client) Close Uses

func (client *Client) Close() error

Close closes the consumer and producer

type Close Uses

type Close = types.Close

Close represents a closing method

type Dialect Uses

type Dialect = types.Dialect

Dialect extention of the Dialect type

type Group Uses

type Group struct {
    Middleware middleware.UseImpl
    Timeout    time.Duration
    Topics     []types.Topic
    Codec      options.Codec
    Retries    int8
    // contains filtered or unexported fields
}

Group contains information about a commander group. A commander group could contain a events and commands topic where commands and events could be consumed and produced to. The amount of retries attempted before a error is thrown could also be defined in a group.

func NewGroup Uses

func NewGroup(definitions ...options.GroupOption) *Group

NewGroup initializes a new commander group.

func (*Group) AsyncCommand Uses

func (group *Group) AsyncCommand(message *Message) error

AsyncCommand produces a message to the given group command topic and does not await for the responding event. If no command key is set will the command id be used.

func (*Group) AwaitEOS Uses

func (group *Group) AwaitEOS(messages <-chan *types.Message, parent metadata.ParentID) (message *Message, err error)

AwaitEOS awaits till the final event stream message is emitted. If no events are returned within the given timeout period a error will be returned.

func (*Group) AwaitEventWithAction Uses

func (group *Group) AwaitEventWithAction(messages <-chan *types.Message, parent metadata.ParentID, action string) (message *Message, err error)

AwaitEventWithAction awaits till the first event for the given parent id and action is consumed. If no events are returned within the given timeout period a error will be returned.

func (*Group) AwaitMessage Uses

func (group *Group) AwaitMessage(messages <-chan *types.Message, parent metadata.ParentID) (message *Message, err error)

AwaitMessage awaits till the first message is consumed for the given parent id. If no events are returned within the given timeout period a error will be returned.

func (*Group) FetchTopics Uses

func (group *Group) FetchTopics(t types.MessageType, m types.TopicMode) []types.Topic

FetchTopics fetches the available topics for the given mode and the given type

func (*Group) Handle Uses

func (group *Group) Handle(sort types.MessageType, action string, handler Handler) (Close, error)

Handle awaits messages from the given MessageType and action. Once a message is received is the callback method called with the received command. The handle is closed once the consumer receives a close signal.

func (*Group) HandleContext Uses

func (group *Group) HandleContext(definitions ...options.HandlerOption) (Close, error)

HandleContext constructs a handle context based on the given definitions.

func (*Group) HandleFunc Uses

func (group *Group) HandleFunc(sort types.MessageType, action string, callback HandlerFunc) (Close, error)

HandleFunc awaits messages from the given MessageType and action. Once a message is received is the callback method called with the received command. The handle is closed once the consumer receives a close signal.

func (*Group) NewConsumer Uses

func (group *Group) NewConsumer(sort types.MessageType) (<-chan *types.Message, Close, error)

NewConsumer starts consuming events of topics from the same topic type. All received messages are published over the returned messages channel. All middleware subscriptions are called before consuming the message. Once a message is consumed should the next function be called to mark a message successfully consumed.

func (*Group) NewConsumerWithDeadline Uses

func (group *Group) NewConsumerWithDeadline(timeout time.Duration, t types.MessageType) (<-chan *types.Message, Close, error)

NewConsumerWithDeadline consumes events of the given message type for the given duration. The message channel is closed once the deadline is reached. Once a message is consumed should the next function be called to mark a successfull consumption. The consumer could be closed premature by calling the close method.

func (*Group) ProduceCommand Uses

func (group *Group) ProduceCommand(message *Message) error

ProduceCommand produce a message to the given group command topic. A error is returned if anything went wrong in the process. If no command key is set will the command id be used.

func (*Group) ProduceEvent Uses

func (group *Group) ProduceEvent(message *Message) error

ProduceEvent produces a event kafka message to the set event topic. A error is returned if anything went wrong in the process.

func (*Group) Publish Uses

func (group *Group) Publish(message *Message) error

Publish publishes the given message to the group producer. All middleware subscriptions are called before publishing the message.

func (*Group) SyncCommand Uses

func (group *Group) SyncCommand(message *Message) (event *Message, err error)

SyncCommand produces a message to the given group command topic and awaits its responding event message. If no message is received within the set timeout period will a timeout be thrown.

type Handler Uses

type Handler = types.Handler

Handler interface handle wrapper

type HandlerFunc Uses

type HandlerFunc = types.HandlerFunc

HandlerFunc message handle message, writer implementation

type Message Uses

type Message = types.Message

Message a message

type Next Uses

type Next = types.Next

Next indicates that the next message could be called

type Retry Uses

type Retry struct {
    Amount  int8 `json:"amount"`
    Retries int8
}

Retry allowes a given method to be retried x amount of times.

func (*Retry) Attempt Uses

func (retry *Retry) Attempt(method func() error) error

Attempt tries to attempt the given method for the given amount of retries. If the method still fails after the set limit is a error returned.

type Topic Uses

type Topic = types.Topic

Topic contains information of a kafka topic

type Writer Uses

type Writer = types.Writer

Writer handle implementation for a given group and message

func NewWriter Uses

func NewWriter(group *Group, parent *Message) Writer

NewWriter initializes a new response writer for the given value

Directories

PathSynopsis
middleware
middleware/recover
middleware/throttle
middleware/timeout
middleware/zipkin
middleware/zipkin/metadata

Package commander imports 11 packages (graph). Updated 2020-01-25. Refresh now. Tools for package owners.