Documentation ¶
Index ¶
- Constants
- Variables
- func WithAction(n string) options.HandlerOption
- func WithAwaitTimeout(d time.Duration) options.GroupOption
- func WithCallback(h types.HandlerFunc) options.HandlerOption
- func WithMessageType(t types.MessageType) options.HandlerOption
- type Client
- type Close
- type Dialect
- type Group
- func (group *Group) AsyncCommand(message *Message) error
- func (group *Group) AwaitEventWithAction(messages <-chan *types.Message, parent metadata.ParentID, action string) (message *Message, err error)
- func (group *Group) AwaitMessage(messages <-chan *types.Message, parent metadata.ParentID) (message *Message, err error)
- func (group *Group) FetchTopics(t types.MessageType, m types.TopicMode) []types.Topic
- func (group *Group) Handle(sort types.MessageType, action string, handler Handler) (Close, error)
- func (group *Group) HandleContext(definitions ...options.HandlerOption) (Close, error)
- func (group *Group) HandleFunc(sort types.MessageType, action string, callback HandlerFunc) (Close, error)
- func (group *Group) NewConsumer(sort types.MessageType) (<-chan *types.Message, Close, error)
- func (group *Group) NewConsumerWithDeadline(timeout time.Duration, t types.MessageType) (<-chan *types.Message, Close, error)
- func (group *Group) ProduceCommand(message *Message) error
- func (group *Group) ProduceEvent(message *Message) error
- func (group *Group) Publish(message *Message) error
- func (group *Group) SyncCommand(message *Message) (event *Message, err error)
- type Handler
- type HandlerFunc
- type Message
- type Next
- type Topic
- type Writer
Constants ¶
const ( // BeforeEvent gets called before a action gets taken. BeforeEvent = "before" // AfterEvent gets called after a action has been taken. AfterEvent = "after" )
const ( EventMessage = types.EventMessage CommandMessage = types.CommandMessage )
Available message types
const ( ConsumeMode = types.ConsumeMode ProduceMode = types.ProduceMode DefaultMode = types.DefaultMode )
Available topic modes
const (
// DebugEnv os debug env key
DebugEnv = "DEBUG"
)
Variables ¶
var ( ErrNoTopic = errors.New("no topic found") ErrNoAction = errors.New("no action defined") )
Custom error types
var ( // ErrTimeout is returned when a timeout is reached when awaiting a responding event ErrTimeout = errors.New("timeout reached") )
var NewMessage = types.NewMessage
NewMessage types.NewMessage alias
var NewTopic = options.NewTopic
NewTopic constructs a new commander topic for the given name, type, mode and dialect. If no topic mode is defined is the default mode (consume|produce) assigned to the topic.
Functions ¶
func WithAction ¶ added in v0.5.0
func WithAction(n string) options.HandlerOption
WithAction returns a HandleOptions that configures the action handle
func WithAwaitTimeout ¶ added in v0.5.0
func WithAwaitTimeout(d time.Duration) options.GroupOption
WithAwaitTimeout returns a GroupOption that configures the timeout period for the given group
func WithCallback ¶ added in v0.5.0
func WithCallback(h types.HandlerFunc) options.HandlerOption
WithCallback returns a HandleOptions that configures the callback method for a given handle
func WithMessageType ¶ added in v0.5.0
func WithMessageType(t types.MessageType) options.HandlerOption
WithMessageType returns a HandleOptions that configures the message type handle
Types ¶
type Client ¶
type Client struct { middleware.UseImpl Groups []*Group }
Client manages the consumers, producers and groups.
type Group ¶
type Group struct { Middleware middleware.UseImpl Timeout time.Duration Topics []types.Topic Retries int8 // contains filtered or unexported fields }
Group contains information about a commander group. A commander group could contain a events and commands topic where commands and events could be consumed and produced to. The amount of retries attempted before a error is thrown could also be defined in a group.
func NewGroup ¶
func NewGroup(definitions ...options.GroupOption) *Group
NewGroup initializes a new commander group.
func (*Group) AsyncCommand ¶
AsyncCommand produces a message to the given group command topic and does not await for the responding event. If no command key is set will the command id be used.
func (*Group) AwaitEventWithAction ¶
func (group *Group) AwaitEventWithAction(messages <-chan *types.Message, parent metadata.ParentID, action string) (message *Message, err error)
AwaitEventWithAction awaits till the first event for the given parent id and action is consumed. If no events are returned within the given timeout period a error will be returned.
func (*Group) AwaitMessage ¶ added in v0.3.0
func (group *Group) AwaitMessage(messages <-chan *types.Message, parent metadata.ParentID) (message *Message, err error)
AwaitMessage awaits till the first message is consumed for the given parent id. If no events are returned within the given timeout period a error will be returned.
func (*Group) FetchTopics ¶
FetchTopics fetches the available topics for the given mode and the given type
func (*Group) Handle ¶
Handle awaits messages from the given MessageType and action. Once a message is received is the callback method called with the received command. The handle is closed once the consumer receives a close signal.
func (*Group) HandleContext ¶ added in v0.5.0
func (group *Group) HandleContext(definitions ...options.HandlerOption) (Close, error)
HandleContext constructs a handle context based on the given definitions.
func (*Group) HandleFunc ¶
func (group *Group) HandleFunc(sort types.MessageType, action string, callback HandlerFunc) (Close, error)
HandleFunc awaits messages from the given MessageType and action. Once a message is received is the callback method called with the received command. The handle is closed once the consumer receives a close signal.
func (*Group) NewConsumer ¶
NewConsumer starts consuming events of topics from the same topic type. All received messages are published over the returned messages channel. All middleware subscriptions are called before consuming the message. Once a message is consumed should the next function be called to mark a message successfully consumed.
func (*Group) NewConsumerWithDeadline ¶
func (group *Group) NewConsumerWithDeadline(timeout time.Duration, t types.MessageType) (<-chan *types.Message, Close, error)
NewConsumerWithDeadline consumes events of the given message type for the given duration. The message channel is closed once the deadline is reached. Once a message is consumed should the next function be called to mark a successfull consumption. The consumer could be closed premature by calling the close method.
func (*Group) ProduceCommand ¶
ProduceCommand produce a message to the given group command topic. A error is returned if anything went wrong in the process. If no command key is set will the command id be used.
func (*Group) ProduceEvent ¶
ProduceEvent produces a event kafka message to the set event topic. A error is returned if anything went wrong in the process.
type HandlerFunc ¶ added in v0.6.0
type HandlerFunc = types.HandlerFunc
HandlerFunc message handle message, writer implementation