Documentation ¶
Index ¶
- Constants
- func RegisterTypes()
- type Command
- type CommandDispatcher
- type CommandDispatcherOption
- type CommandHandlerFunc
- type CommandMessagePublisher
- type Consumer
- type DomainCommand
- type DomainEvent
- type EntityEvent
- type EntityEventDispatcher
- type EntityEventDispatcherOption
- type EntityEventHandlerFunc
- type EntityEventMessagePublisher
- type Event
- type EventDispatcher
- type EventDispatcherOption
- type EventHandlerFunc
- type EventMessagePublisher
- type Failure
- type Headers
- type Message
- type MessageOption
- type MessagePublisher
- type MessageReceiver
- type MessageSubscriber
- type Producer
- type Publisher
- func (p *Publisher) Publish(ctx context.Context, message Message) error
- func (p *Publisher) PublishCommand(ctx context.Context, replyChannel string, command core.Command, ...) error
- func (p *Publisher) PublishEntityEvents(ctx context.Context, entity core.Entity, options ...MessageOption) error
- func (p *Publisher) PublishEvent(ctx context.Context, event core.Event, options ...MessageOption) error
- func (p *Publisher) PublishReply(ctx context.Context, reply core.Reply, options ...MessageOption) error
- func (p *Publisher) Stop(ctx context.Context) (err error)
- type PublisherOption
- type ReceiveMessageFunc
- type Reply
- type ReplyBuilder
- type ReplyMessagePublisher
- type Subscriber
- type SubscriberOption
- type Success
Constants ¶
const ( MessageID = "ID" MessageDate = "DATE" MessageChannel = "CHANNEL" MessageCorrelationID = "CORRELATION_ID" MessageCausationID = "CAUSATION_ID" MessageEventPrefix = "EVENT_" MessageEventName = MessageEventPrefix + "NAME" MessageEventEntityName = MessageEventPrefix + "ENTITY_NAME" MessageEventEntityID = MessageEventPrefix + "ENTITY_ID" MessageCommandPrefix = "COMMAND_" MessageCommandName = MessageCommandPrefix + "NAME" MessageCommandChannel = MessageCommandPrefix + "CHANNEL" MessageCommandReplyChannel = MessageCommandPrefix + "REPLY_CHANNEL" MessageReplyPrefix = "REPLY_" MessageReplyName = MessageReplyPrefix + "NAME" MessageReplyOutcome = MessageReplyPrefix + "OUTCOME" )
Message header keys
const ( ReplyOutcomeSuccess = "SUCCESS" ReplyOutcomeFailure = "FAILURE" )
Reply outcomes
Variables ¶
This section is empty.
Functions ¶
func RegisterTypes ¶
func RegisterTypes()
RegisterTypes should be called after registering a new marshaller; especially after registering a new default
Types ¶
type CommandDispatcher ¶
type CommandDispatcher struct {
// contains filtered or unexported fields
}
CommandDispatcher is a MessageReceiver for Commands
func NewCommandDispatcher ¶
func NewCommandDispatcher(publisher ReplyMessagePublisher, options ...CommandDispatcherOption) *CommandDispatcher
NewCommandDispatcher constructs a new CommandDispatcher
func (*CommandDispatcher) Handle ¶
func (d *CommandDispatcher) Handle(cmd core.Command, handler CommandHandlerFunc) *CommandDispatcher
Handle adds a new Command that will be handled by handler
func (*CommandDispatcher) ReceiveMessage ¶
func (d *CommandDispatcher) ReceiveMessage(ctx context.Context, message Message) error
ReceiveMessage implements MessageReceiver.ReceiveMessage
type CommandDispatcherOption ¶
type CommandDispatcherOption func(consumer *CommandDispatcher)
CommandDispatcherOption options for CommandDispatcher
func WithCommandDispatcherLogger ¶
func WithCommandDispatcherLogger(logger log.Logger) CommandDispatcherOption
WithCommandDispatcherLogger is an option to set the log.Logger of the CommandDispatcher
type CommandHandlerFunc ¶
CommandHandlerFunc function handlers for msg.Command
type CommandMessagePublisher ¶
type CommandMessagePublisher interface {
PublishCommand(ctx context.Context, replyChannel string, command core.Command, options ...MessageOption) error
}
CommandMessagePublisher interface
type Consumer ¶
type Consumer interface { Listen(ctx context.Context, channel string, consumer ReceiveMessageFunc) error Close(ctx context.Context) error }
Consumer is the interface that infrastructures should implement to be used in MessageDispatchers
type DomainCommand ¶
DomainCommand interface for commands that are shared across the domain
type DomainEvent ¶
DomainEvent interface for events that are shared across the domain
type EntityEvent ¶
type EntityEvent interface { EntityID() string EntityName() string Event() core.Event Headers() Headers }
EntityEvent is an event with message header information
type EntityEventDispatcher ¶
type EntityEventDispatcher struct {
// contains filtered or unexported fields
}
EntityEventDispatcher is a MessageReceiver for DomainEvents
func NewEntityEventDispatcher ¶
func NewEntityEventDispatcher(options ...EntityEventDispatcherOption) *EntityEventDispatcher
NewEntityEventDispatcher constructs a new EntityEventDispatcher
func (*EntityEventDispatcher) Handle ¶
func (d *EntityEventDispatcher) Handle(evt core.Event, handler EntityEventHandlerFunc) *EntityEventDispatcher
Handle adds a new Event that will be handled by EventMessageFunc handler
func (*EntityEventDispatcher) ReceiveMessage ¶
func (d *EntityEventDispatcher) ReceiveMessage(ctx context.Context, message Message) error
ReceiveMessage implements MessageReceiver.ReceiveMessage
type EntityEventDispatcherOption ¶
type EntityEventDispatcherOption func(consumer *EntityEventDispatcher)
EntityEventDispatcherOption options for EntityEventDispatcher
func WithEntityEventDispatcherLogger ¶
func WithEntityEventDispatcherLogger(logger log.Logger) EntityEventDispatcherOption
WithEntityEventDispatcherLogger is an option to set the log.Logger of the EntityEventDispatcher
type EntityEventHandlerFunc ¶
type EntityEventHandlerFunc func(context.Context, EntityEvent) error
EntityEventHandlerFunc function handlers for msg.EntityEvent
type EntityEventMessagePublisher ¶
type EntityEventMessagePublisher interface {
PublishEntityEvents(ctx context.Context, entity core.Entity, options ...MessageOption) error
}
EntityEventMessagePublisher interface
type EventDispatcher ¶
type EventDispatcher struct {
// contains filtered or unexported fields
}
EventDispatcher is a MessageReceiver for Events
func NewEventDispatcher ¶
func NewEventDispatcher(options ...EventDispatcherOption) *EventDispatcher
NewEventDispatcher constructs a new EventDispatcher
func (*EventDispatcher) Handle ¶
func (d *EventDispatcher) Handle(evt core.Event, handler EventHandlerFunc) *EventDispatcher
Handle adds a new Event that will be handled by EventMessageFunc handler
func (*EventDispatcher) ReceiveMessage ¶
func (d *EventDispatcher) ReceiveMessage(ctx context.Context, message Message) error
ReceiveMessage implements MessageReceiver.ReceiveMessage
type EventDispatcherOption ¶
type EventDispatcherOption func(consumer *EventDispatcher)
EventDispatcherOption options for EventDispatcher
func WithEventDispatcherLogger ¶
func WithEventDispatcherLogger(logger log.Logger) EventDispatcherOption
WithEventDispatcherLogger is an option to set the log.Logger of the EventDispatcher
type EventHandlerFunc ¶
EventHandlerFunc function handlers for msg.Event
type EventMessagePublisher ¶
type EventMessagePublisher interface {
PublishEvent(ctx context.Context, event core.Event, options ...MessageOption) error
}
EventMessagePublisher interface
type Headers ¶
Headers a map of strings keyed by Message header keys
func (Headers) Get ¶
Get returns the value for the given key. Returns a blank string if it does not exist
func (Headers) GetRequired ¶
GetRequired returns the value for the given key. Returns an error if it does not exist
type Message ¶
Message interface for messages containing payloads and headers
func NewMessage ¶
func NewMessage(payload []byte, options ...MessageOption) Message
NewMessage message constructor
type MessageOption ¶
type MessageOption func(m *message)
MessageOption options for Message
func WithAggregateInfo ¶
func WithAggregateInfo(a *es.AggregateRoot) MessageOption
WithAggregateInfo is an option to set additional Aggregate specific headers
func WithDestinationChannel ¶
func WithDestinationChannel(destinationChannel string) MessageOption
WithDestinationChannel is and option to set the destination of the outgoing Message
This will override the previous value set by interface { DestinationChannel() string }
func WithHeaders ¶
func WithHeaders(headers Headers) MessageOption
WithHeaders is an option to set additional headers onto the Message
func WithMessageID ¶
func WithMessageID(id string) MessageOption
WithMessageID is an option to set the ID of the Message
type MessagePublisher ¶
MessagePublisher interface
type MessageReceiver ¶
MessageReceiver interface for channel subscription receivers
type MessageSubscriber ¶
type MessageSubscriber interface {
Subscribe(channel string, receiver MessageReceiver)
}
MessageSubscriber interface
type Producer ¶
type Producer interface { Send(ctx context.Context, channel string, message Message) error Close(ctx context.Context) error }
Producer is the interface that infrastructures should implement to be used in a Publisher
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher send domain events, commands, and replies to the publisher
func NewPublisher ¶
func NewPublisher(producer Producer, options ...PublisherOption) *Publisher
NewPublisher constructs a new Publisher
func (*Publisher) PublishCommand ¶
func (p *Publisher) PublishCommand(ctx context.Context, replyChannel string, command core.Command, options ...MessageOption) error
PublishCommand serializes a command into a message with command specific headers and publishes it to a producer
func (*Publisher) PublishEntityEvents ¶
func (p *Publisher) PublishEntityEvents(ctx context.Context, entity core.Entity, options ...MessageOption) error
PublishEntityEvents serializes entity events into messages with entity specific headers and publishes it to a producer
func (*Publisher) PublishEvent ¶
func (p *Publisher) PublishEvent(ctx context.Context, event core.Event, options ...MessageOption) error
PublishEvent serializes an event into a message with event specific headers and publishes it to a producer
func (*Publisher) PublishReply ¶
func (p *Publisher) PublishReply(ctx context.Context, reply core.Reply, options ...MessageOption) error
PublishReply serializes a reply into a message with reply specific headers and publishes it to a producer
type PublisherOption ¶
type PublisherOption func(*Publisher)
PublisherOption options for PublisherPublisher
func WithPublisherLogger ¶
func WithPublisherLogger(logger log.Logger) PublisherOption
WithPublisherLogger is an option to set the log.Logger of the Publisher
type ReceiveMessageFunc ¶
ReceiveMessageFunc makes it easy to drop in functions as receivers
func (ReceiveMessageFunc) ReceiveMessage ¶
func (f ReceiveMessageFunc) ReceiveMessage(ctx context.Context, message Message) error
ReceiveMessage implements MessageReceiver.ReceiveMessage
type Reply ¶
Reply interface
func FailureReply ¶
FailureReply wraps a reply and returns it as a Failure reply Deprecated: Use the WithReply() reply builder
func SuccessReply ¶
SuccessReply wraps a reply and returns it as a Success reply Deprecated: Use the WithReply() reply builder
type ReplyBuilder ¶
type ReplyBuilder struct {
// contains filtered or unexported fields
}
ReplyBuilder is used to build custom replies
func WithReply ¶
func WithReply(reply core.Reply) *ReplyBuilder
WithReply starts a reply builder allowing custom headers to be injected
func (*ReplyBuilder) Failure ¶
func (b *ReplyBuilder) Failure() Reply
Failure wraps the reply with the custom headers as a Failure reply
func (*ReplyBuilder) Headers ¶
func (b *ReplyBuilder) Headers(headers map[string]string) *ReplyBuilder
Headers adds headers to include with the reply
func (*ReplyBuilder) Reply ¶
func (b *ReplyBuilder) Reply(reply core.Reply) *ReplyBuilder
Reply replaces the reply to be wrapped
func (*ReplyBuilder) Success ¶
func (b *ReplyBuilder) Success() Reply
Success wraps the reply with the custom headers as a Success reply
type ReplyMessagePublisher ¶
type ReplyMessagePublisher interface {
PublishReply(ctx context.Context, reply core.Reply, options ...MessageOption) error
}
ReplyMessagePublisher interface
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber receives domain events, commands, and replies from the consumer
func NewSubscriber ¶
func NewSubscriber(consumer Consumer, options ...SubscriberOption) *Subscriber
NewSubscriber constructs a new Subscriber
func (*Subscriber) Start ¶
func (s *Subscriber) Start(ctx context.Context) error
Start begins listening to all of the channels sending received messages into them
func (*Subscriber) Stop ¶
func (s *Subscriber) Stop(ctx context.Context) (err error)
Stop stops the consumer and underlying consumer
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(channel string, receiver MessageReceiver)
Subscribe connects the receiver with messages from the channel on the consumer
func (*Subscriber) Use ¶
func (s *Subscriber) Use(mws ...func(MessageReceiver) MessageReceiver)
Use appends middleware receivers to the receiver stack
type SubscriberOption ¶
type SubscriberOption func(*Subscriber)
SubscriberOption options for MessageConsumers
func WithSubscriberLogger ¶
func WithSubscriberLogger(logger log.Logger) SubscriberOption
WithSubscriberLogger is an option to set the log.Logger of the Subscriber
Source Files ¶
- command.go
- command_dispatcher.go
- command_dispatcher_options.go
- constants.go
- consumer.go
- entity_event.go
- entity_event_dispatcher.go
- entity_event_dispatcher_options.go
- event.go
- event_dispatcher.go
- event_dispatcher_options.go
- headers.go
- message.go
- message_options.go
- message_receiver.go
- producer.go
- publisher.go
- publisher_options.go
- register_types.go
- reply.go
- reply_builder.go
- reply_types.go
- subscriber.go
- subscriber_options.go