msg

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2021 License: MIT Imports: 10 Imported by: 10

Documentation

Index

Constants

View Source
const (
	MessageID            = "ID"
	MessageDate          = "DATE"
	MessageChannel       = "CHANNEL"
	MessageCorrelationID = "CORRELATION_ID"
	MessageCausationID   = "CAUSATION_ID"

	MessageEventPrefix     = "EVENT_"
	MessageEventName       = MessageEventPrefix + "NAME"
	MessageEventEntityName = MessageEventPrefix + "ENTITY_NAME"
	MessageEventEntityID   = MessageEventPrefix + "ENTITY_ID"

	MessageCommandPrefix       = "COMMAND_"
	MessageCommandName         = MessageCommandPrefix + "NAME"
	MessageCommandChannel      = MessageCommandPrefix + "CHANNEL"
	MessageCommandReplyChannel = MessageCommandPrefix + "REPLY_CHANNEL"

	MessageReplyPrefix  = "REPLY_"
	MessageReplyName    = MessageReplyPrefix + "NAME"
	MessageReplyOutcome = MessageReplyPrefix + "OUTCOME"
)

Message header keys

View Source
const (
	ReplyOutcomeSuccess = "SUCCESS"
	ReplyOutcomeFailure = "FAILURE"
)

Reply outcomes

Variables

This section is empty.

Functions

func RegisterTypes

func RegisterTypes()

RegisterTypes should be called after registering a new marshaller; especially after registering a new default

Types

type Command

type Command interface {
	Command() core.Command
	Headers() Headers
}

Command is a core.Command with message header information

type CommandDispatcher

type CommandDispatcher struct {
	// contains filtered or unexported fields
}

CommandDispatcher is a MessageReceiver for Commands

func NewCommandDispatcher

func NewCommandDispatcher(publisher ReplyMessagePublisher, options ...CommandDispatcherOption) *CommandDispatcher

NewCommandDispatcher constructs a new CommandDispatcher

func (*CommandDispatcher) Handle

Handle adds a new Command that will be handled by handler

func (*CommandDispatcher) ReceiveMessage

func (d *CommandDispatcher) ReceiveMessage(ctx context.Context, message Message) error

ReceiveMessage implements MessageReceiver.ReceiveMessage

type CommandDispatcherOption

type CommandDispatcherOption func(consumer *CommandDispatcher)

CommandDispatcherOption options for CommandDispatcher

func WithCommandDispatcherLogger

func WithCommandDispatcherLogger(logger log.Logger) CommandDispatcherOption

WithCommandDispatcherLogger is an option to set the log.Logger of the CommandDispatcher

type CommandHandlerFunc

type CommandHandlerFunc func(context.Context, Command) ([]Reply, error)

CommandHandlerFunc function handlers for msg.Command

type CommandMessagePublisher

type CommandMessagePublisher interface {
	PublishCommand(ctx context.Context, replyChannel string, command core.Command, options ...MessageOption) error
}

CommandMessagePublisher interface

type Consumer

type Consumer interface {
	Listen(ctx context.Context, channel string, consumer ReceiveMessageFunc) error
	Close(ctx context.Context) error
}

Consumer is the interface that infrastructures should implement to be used in MessageDispatchers

type DomainCommand

type DomainCommand interface {
	core.Command
	DestinationChannel() string
}

DomainCommand interface for commands that are shared across the domain

type DomainEvent

type DomainEvent interface {
	core.Event
	DestinationChannel() string
}

DomainEvent interface for events that are shared across the domain

type EntityEvent

type EntityEvent interface {
	EntityID() string
	EntityName() string
	Event() core.Event
	Headers() Headers
}

EntityEvent is an event with message header information

type EntityEventDispatcher

type EntityEventDispatcher struct {
	// contains filtered or unexported fields
}

EntityEventDispatcher is a MessageReceiver for DomainEvents

func NewEntityEventDispatcher

func NewEntityEventDispatcher(options ...EntityEventDispatcherOption) *EntityEventDispatcher

NewEntityEventDispatcher constructs a new EntityEventDispatcher

func (*EntityEventDispatcher) Handle

Handle adds a new Event that will be handled by EventMessageFunc handler

func (*EntityEventDispatcher) ReceiveMessage

func (d *EntityEventDispatcher) ReceiveMessage(ctx context.Context, message Message) error

ReceiveMessage implements MessageReceiver.ReceiveMessage

type EntityEventDispatcherOption

type EntityEventDispatcherOption func(consumer *EntityEventDispatcher)

EntityEventDispatcherOption options for EntityEventDispatcher

func WithEntityEventDispatcherLogger

func WithEntityEventDispatcherLogger(logger log.Logger) EntityEventDispatcherOption

WithEntityEventDispatcherLogger is an option to set the log.Logger of the EntityEventDispatcher

type EntityEventHandlerFunc

type EntityEventHandlerFunc func(context.Context, EntityEvent) error

EntityEventHandlerFunc function handlers for msg.EntityEvent

type EntityEventMessagePublisher

type EntityEventMessagePublisher interface {
	PublishEntityEvents(ctx context.Context, entity core.Entity, options ...MessageOption) error
}

EntityEventMessagePublisher interface

type Event

type Event interface {
	Event() core.Event
	Headers() Headers
}

Event is an event with message header information

type EventDispatcher

type EventDispatcher struct {
	// contains filtered or unexported fields
}

EventDispatcher is a MessageReceiver for Events

func NewEventDispatcher

func NewEventDispatcher(options ...EventDispatcherOption) *EventDispatcher

NewEventDispatcher constructs a new EventDispatcher

func (*EventDispatcher) Handle

func (d *EventDispatcher) Handle(evt core.Event, handler EventHandlerFunc) *EventDispatcher

Handle adds a new Event that will be handled by EventMessageFunc handler

func (*EventDispatcher) ReceiveMessage

func (d *EventDispatcher) ReceiveMessage(ctx context.Context, message Message) error

ReceiveMessage implements MessageReceiver.ReceiveMessage

type EventDispatcherOption

type EventDispatcherOption func(consumer *EventDispatcher)

EventDispatcherOption options for EventDispatcher

func WithEventDispatcherLogger

func WithEventDispatcherLogger(logger log.Logger) EventDispatcherOption

WithEventDispatcherLogger is an option to set the log.Logger of the EventDispatcher

type EventHandlerFunc

type EventHandlerFunc func(context.Context, Event) error

EventHandlerFunc function handlers for msg.Event

type EventMessagePublisher

type EventMessagePublisher interface {
	PublishEvent(ctx context.Context, event core.Event, options ...MessageOption) error
}

EventMessagePublisher interface

type Failure

type Failure struct{}

Failure reply type for generic failure replies to commands

func (Failure) ReplyName

func (Failure) ReplyName() string

ReplyName implements core.Reply.ReplyName

type Headers

type Headers map[string]string

Headers a map of strings keyed by Message header keys

func (Headers) Get

func (h Headers) Get(key string) string

Get returns the value for the given key. Returns a blank string if it does not exist

func (Headers) GetRequired

func (h Headers) GetRequired(key string) (string, error)

GetRequired returns the value for the given key. Returns an error if it does not exist

func (Headers) Has

func (h Headers) Has(key string) bool

Has returned whether or not the given key exists in the headers

func (Headers) Set

func (h Headers) Set(key, value string)

Set sets or overwrites the key with the value

type Message

type Message interface {
	ID() string
	Headers() Headers
	Payload() []byte
}

Message interface for messages containing payloads and headers

func NewMessage

func NewMessage(payload []byte, options ...MessageOption) Message

NewMessage message constructor

type MessageOption

type MessageOption func(m *message)

MessageOption options for Message

func WithAggregateInfo

func WithAggregateInfo(a *es.AggregateRoot) MessageOption

WithAggregateInfo is an option to set additional Aggregate specific headers

func WithDestinationChannel

func WithDestinationChannel(destinationChannel string) MessageOption

WithDestinationChannel is and option to set the destination of the outgoing Message

This will override the previous value set by interface { DestinationChannel() string }

func WithHeaders

func WithHeaders(headers Headers) MessageOption

WithHeaders is an option to set additional headers onto the Message

func WithMessageID

func WithMessageID(id string) MessageOption

WithMessageID is an option to set the ID of the Message

type MessagePublisher

type MessagePublisher interface {
	Publish(ctx context.Context, message Message) error
}

MessagePublisher interface

type MessageReceiver

type MessageReceiver interface {
	ReceiveMessage(context.Context, Message) error
}

MessageReceiver interface for channel subscription receivers

type MessageSubscriber

type MessageSubscriber interface {
	Subscribe(channel string, receiver MessageReceiver)
}

MessageSubscriber interface

type Producer

type Producer interface {
	Send(ctx context.Context, channel string, message Message) error
	Close(ctx context.Context) error
}

Producer is the interface that infrastructures should implement to be used in a Publisher

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher send domain events, commands, and replies to the publisher

func NewPublisher

func NewPublisher(producer Producer, options ...PublisherOption) *Publisher

NewPublisher constructs a new Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, message Message) error

Publish sends a message off to a producer

func (*Publisher) PublishCommand

func (p *Publisher) PublishCommand(ctx context.Context, replyChannel string, command core.Command, options ...MessageOption) error

PublishCommand serializes a command into a message with command specific headers and publishes it to a producer

func (*Publisher) PublishEntityEvents

func (p *Publisher) PublishEntityEvents(ctx context.Context, entity core.Entity, options ...MessageOption) error

PublishEntityEvents serializes entity events into messages with entity specific headers and publishes it to a producer

func (*Publisher) PublishEvent

func (p *Publisher) PublishEvent(ctx context.Context, event core.Event, options ...MessageOption) error

PublishEvent serializes an event into a message with event specific headers and publishes it to a producer

func (*Publisher) PublishReply

func (p *Publisher) PublishReply(ctx context.Context, reply core.Reply, options ...MessageOption) error

PublishReply serializes a reply into a message with reply specific headers and publishes it to a producer

func (*Publisher) Stop

func (p *Publisher) Stop(ctx context.Context) (err error)

Stop stops the publisher and underlying producer

type PublisherOption

type PublisherOption func(*Publisher)

PublisherOption options for PublisherPublisher

func WithPublisherLogger

func WithPublisherLogger(logger log.Logger) PublisherOption

WithPublisherLogger is an option to set the log.Logger of the Publisher

type ReceiveMessageFunc

type ReceiveMessageFunc func(context.Context, Message) error

ReceiveMessageFunc makes it easy to drop in functions as receivers

func (ReceiveMessageFunc) ReceiveMessage

func (f ReceiveMessageFunc) ReceiveMessage(ctx context.Context, message Message) error

ReceiveMessage implements MessageReceiver.ReceiveMessage

type Reply

type Reply interface {
	Reply() core.Reply
	Headers() Headers
}

Reply interface

func FailureReply

func FailureReply(reply core.Reply) Reply

FailureReply wraps a reply and returns it as a Failure reply Deprecated: Use the WithReply() reply builder

func NewReply

func NewReply(reply core.Reply, headers Headers) Reply

NewReply constructs a new reply with headers

func SuccessReply

func SuccessReply(reply core.Reply) Reply

SuccessReply wraps a reply and returns it as a Success reply Deprecated: Use the WithReply() reply builder

func WithFailure

func WithFailure() Reply

WithFailure returns a generic Failure reply

func WithSuccess

func WithSuccess() Reply

WithSuccess returns a generic Success reply

type ReplyBuilder

type ReplyBuilder struct {
	// contains filtered or unexported fields
}

ReplyBuilder is used to build custom replies

func WithReply

func WithReply(reply core.Reply) *ReplyBuilder

WithReply starts a reply builder allowing custom headers to be injected

func (*ReplyBuilder) Failure

func (b *ReplyBuilder) Failure() Reply

Failure wraps the reply with the custom headers as a Failure reply

func (*ReplyBuilder) Headers

func (b *ReplyBuilder) Headers(headers map[string]string) *ReplyBuilder

Headers adds headers to include with the reply

func (*ReplyBuilder) Reply

func (b *ReplyBuilder) Reply(reply core.Reply) *ReplyBuilder

Reply replaces the reply to be wrapped

func (*ReplyBuilder) Success

func (b *ReplyBuilder) Success() Reply

Success wraps the reply with the custom headers as a Success reply

type ReplyMessagePublisher

type ReplyMessagePublisher interface {
	PublishReply(ctx context.Context, reply core.Reply, options ...MessageOption) error
}

ReplyMessagePublisher interface

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber receives domain events, commands, and replies from the consumer

func NewSubscriber

func NewSubscriber(consumer Consumer, options ...SubscriberOption) *Subscriber

NewSubscriber constructs a new Subscriber

func (*Subscriber) Start

func (s *Subscriber) Start(ctx context.Context) error

Start begins listening to all of the channels sending received messages into them

func (*Subscriber) Stop

func (s *Subscriber) Stop(ctx context.Context) (err error)

Stop stops the consumer and underlying consumer

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(channel string, receiver MessageReceiver)

Subscribe connects the receiver with messages from the channel on the consumer

func (*Subscriber) Use

func (s *Subscriber) Use(mws ...func(MessageReceiver) MessageReceiver)

Use appends middleware receivers to the receiver stack

type SubscriberOption

type SubscriberOption func(*Subscriber)

SubscriberOption options for MessageConsumers

func WithSubscriberLogger

func WithSubscriberLogger(logger log.Logger) SubscriberOption

WithSubscriberLogger is an option to set the log.Logger of the Subscriber

type Success

type Success struct{}

Success reply type for generic successful replies to commands

func (Success) ReplyName

func (Success) ReplyName() string

ReplyName implements core.Reply.ReplyName

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL