Documentation ¶
Index ¶
- Variables
- func NewID() string
- func WithAckDecider(ackDecider AckDecider) func(*consumer)
- func WithBackoff(strategy BackoffStrategy) func(*consumer)
- type AckDecider
- type Acknowledgement
- type Attributes
- type BackoffStrategy
- type BackoffStrategyFunc
- type Checkpoint
- type ConsumerOption
- type CtxExtractorFunc
- type DueMessage
- type Envelope
- type EnvelopePublisher
- type EnvelopePublisherFunc
- type ExponentialBackoff
- type Handler
- type HandlerFunc
- type IDGenerator
- type IDGeneratorFunc
- type Marshaller
- type MarshallerPublisher
- type Message
- type MessageContext
- type MessageID
- type MessageRecord
- type Next
- type OnNext
- type OnProcess
- type Publisher
- type PublisherFunc
- type PublisherHandler
- type PublisherHandlerFunc
- type PublisherMiddleware
- type PublisherRecorder
- func (p *PublisherRecorder) Messages() []*MessageRecord
- func (p *PublisherRecorder) MessagesMap() map[string][]*Message
- func (p *PublisherRecorder) Publish(ctx context.Context, topic string, messages ...*Message) error
- func (p *PublisherRecorder) Reset()
- func (p *PublisherRecorder) TopicMessages(topic string) []*Message
- type ReceivedMessage
- type Router
- type Scheduler
- type SchedulerPublisher
- type SchedulerStorage
- type Subscriber
- type Unmarshaller
- type UnmarshallerFunc
Constants ¶
This section is empty.
Variables ¶
var ( ErrConsumerAlreadyRegistered = errors.New("consumer already registered") ErrRouterAlreadyRunning = errors.New("router already running") ErrRouterAlreadyStopped = errors.New("router already stopped") )
var ErrMissingHandler = errors.New("missing handler")
ErrMissingHandler is fired when the dispatcher has not handler registered for the message name.
var (
ErrReceivedMessageNotAvailable = errors.New("received message not available")
)
var ( // ErrResourceDoesNotExist indicates that the resource (topic, queue...) does not exist. ErrResourceDoesNotExist = errors.New("name does not exist") )
var ErrUnsupportedVersion = errors.New("unsupported version")
Functions ¶
func NewID ¶
func NewID() string
NewID package level function to generate IDs using the default generator.
func WithAckDecider ¶ added in v0.6.0
func WithAckDecider(ackDecider AckDecider) func(*consumer)
func WithBackoff ¶ added in v0.6.0
func WithBackoff(strategy BackoffStrategy) func(*consumer)
Types ¶
type AckDecider ¶ added in v0.6.0
type AckDecider func(ctx context.Context, topic string, msg ReceivedMessage, err error) Acknowledgement
type Acknowledgement ¶ added in v0.4.0
type Acknowledgement uint8
const ( NoOp Acknowledgement = iota Ack NAck ReSchedule )
func AutoAck ¶ added in v0.6.0
func AutoAck(_ context.Context, _ string, _ ReceivedMessage, err error) Acknowledgement
func DisableAutoAck ¶ added in v0.3.8
func DisableAutoAck(_ context.Context, _ string, _ ReceivedMessage, _ error) Acknowledgement
DisableAutoAck is acknowledgement decider function that disables the router from auto-acknowledging messages.
func ReScheduleOnError ¶ added in v0.6.0
func ReScheduleOnError(_ context.Context, _ string, _ ReceivedMessage, err error) Acknowledgement
ReScheduleOnError is acknowledgement decider function re-schedules on errors.
type BackoffStrategy ¶ added in v0.6.0
type BackoffStrategyFunc ¶ added in v0.6.0
func LinearBackoff ¶ added in v0.6.0
func LinearBackoff(delay time.Duration) BackoffStrategyFunc
type Checkpoint ¶
type Checkpoint func(ctx context.Context, consumerName string, msg ReceivedMessage, err error) error
Checkpoint optional hooks executed during the message live cycle
Returning an error will stop the subscription, and trigger the shutdown of the router.
func WrapCheckpoint ¶ added in v0.7.0
func WrapCheckpoint(current Checkpoint, hooks ...Checkpoint) Checkpoint
WrapCheckpoint will call multiple on checkpoint hooks one after the other and return on the first error.
type ConsumerOption ¶ added in v0.6.0
type ConsumerOption func(*consumer)
type CtxExtractorFunc ¶ added in v0.3.5
CtxExtractorFunc is a function that given a context returns a value. Return empty string if not present.
type DueMessage ¶ added in v0.5.0
type Envelope ¶
type Envelope struct { ID MessageID Name string Key string Body []byte Version string Attributes Attributes }
Envelope holds the data that need to be transmitted.
type EnvelopePublisher ¶
type EnvelopePublisher interface {
Publish(ctx context.Context, topic string, envelopes ...*Envelope) error
}
EnvelopePublisher publish envelopes where the data has already been marshalled.
You can also use this interface directly is you want to handle the marshalling yourself, combined with the NoOpMarshaller for the router.
func NoOpEnvelopePublisher ¶ added in v0.5.0
func NoOpEnvelopePublisher() EnvelopePublisher
NoOpEnvelopePublisher skips publishing the messages without failures.
type EnvelopePublisherFunc ¶ added in v0.2.1
EnvelopePublisherFunc is a function that can publish envelopes.
type ExponentialBackoff ¶ added in v0.6.0
type ExponentialBackoff struct { // Factor is the multiplying factor for each increment step, 3 by default. Factor float64 // Max is the minimum delay, 1 minute by default. Min time.Duration // Max is the maximum delay, 24 hours by default. Max time.Duration // Jitter eases contention by randomizing backoff steps, 0 by default (disabled). // Must be a value [0.0, 1.0] Jitter float64 }
type Handler ¶
Handler handles events.
func Acknowledge ¶ added in v0.4.2
Acknowledge will acknowledge a message and pass it to the next handler.
type HandlerFunc ¶
HandlerFunc that handles an event
func Dispatcher ¶
func Dispatcher(handlers map[string]Handler) HandlerFunc
Dispatcher is a message handler middleware that can be used to register different handlers for the same consumer, based on the message name.
func (HandlerFunc) HandleMessage ¶
func (f HandlerFunc) HandleMessage(ctx context.Context, message *Message) error
HandleMessage handles the message using the function.
type IDGeneratorFunc ¶
type IDGeneratorFunc func() string
IDGeneratorFunc function helper to conform the IDGenerator interface.
var DefaultIDGenerator IDGeneratorFunc = uuid.NewString
DefaultIDGenerator generates a v4 UUID. It will panic if it cannot generate the ID.
type Marshaller ¶
Marshaller marshalls the contents of message data.
type MarshallerPublisher ¶ added in v0.2.1
type MarshallerPublisher struct {
// contains filtered or unexported fields
}
MarshallerPublisher will marshall a message and publish a message delegate the publishing of the envelope.
func NewPublisher ¶ added in v0.2.1
func NewPublisher(publisher EnvelopePublisher, marshaller Marshaller) *MarshallerPublisher
NewPublisher creates a new marshaller publisher.
func (*MarshallerPublisher) Handler ¶ added in v0.2.1
func (p *MarshallerPublisher) Handler(topic string, handler PublisherHandler) Handler
Handler is a helper that publishes messages generated by other handlers.
type Message ¶
type Message struct { // ID of the message, if empty a new one will // be generated automatically ID MessageID // Name of the message Name string // Key groups the message of the same type. // Different transports may try to guarantee // the order for messages with the same key. Key string // Data that we want to transmit. Data interface{} // ReceivedCount returns the number of times this message // has been delivered. ReceivedCount int // Message attributes Attributes Attributes // contains filtered or unexported fields }
Message represent the information that we want to transmit.
func NewMessageFromReceived ¶ added in v0.3.3
func NewMessageFromReceived(msg ReceivedMessage, data interface{}) *Message
NewMessageFromReceived builds a new message from a received one and it's unmarshalled body.
func (*Message) ReSchedule ¶ added in v0.6.0
ReSchedule puts the message back again in the topic/queue to be available after a certain delay.
Different implementations may need to act accordingly and ack or n/ack the message.
The common scenario is that a message has been failed to be processed and, we don't want to receive it immediately, probably applying a backoff strategy with increased delay times.
func (*Message) SetAttribute ¶ added in v0.2.1
SetAttribute sets an attribute.
type MessageContext ¶ added in v0.7.0
type MessageContext func(ctx context.Context, consumerName string, message ReceivedMessage) context.Context
MessageContext optional hook that can be used to modify the context used while processing a message.
func WrapMessageContext ¶ added in v0.7.0
func WrapMessageContext(current MessageContext, hooks ...MessageContext) MessageContext
WrapMessageContext will call multiple message context hooks one after the other.
type MessageID ¶
type MessageID = string
MessageID a slice of bytes representing a unique message ID
type MessageRecord ¶ added in v0.3.7
MessageRecord record the message and the topic.
type Next ¶
type Next struct { Message ReceivedMessage Err error }
Next holds the next message in the subscription
type OnNext ¶ added in v0.8.0
OnNext optional hook called after a next message operation is received. It can be used to modify the next message.
type OnProcess ¶ added in v0.7.0
type OnProcess func(ctx context.Context, consumerName string, elapsed time.Duration, msg ReceivedMessage, err error)
OnProcess optional hook called after processing a received message in a consumer.
func WrapOnProcess ¶ added in v0.7.0
WrapOnProcess will call multiple on process hooks one after the other.
type Publisher ¶
Publisher describes the top level method to publish messages.
func NoOpPublisher ¶ added in v0.3.7
func NoOpPublisher() Publisher
NoOpPublisher skips publishing the messages without failures.
func WrapPublisher ¶ added in v0.3.5
func WrapPublisher(publisher Publisher, middlewares ...PublisherMiddleware) Publisher
WrapPublisher will wrap the publisher in the given middlewares.
type PublisherFunc ¶ added in v0.2.1
PublisherFunc is a function that can publish messages.
type PublisherHandler ¶
type PublisherHandler interface {
HandleMessage(ctx context.Context, message *Message) ([]*Message, error)
}
PublisherHandler handles events and generates new messages that should be published.
type PublisherHandlerFunc ¶
PublisherHandlerFunc function that can handle a message.
func (PublisherHandlerFunc) HandleMessage ¶
func (f PublisherHandlerFunc) HandleMessage(ctx context.Context, message *Message) ([]*Message, error)
HandleMessage handles the message with the function.
type PublisherMiddleware ¶ added in v0.3.5
func CtxAttributeInjector ¶ added in v0.3.5
func CtxAttributeInjector(attributeName string, extract CtxExtractorFunc) PublisherMiddleware
CtxAttributeInjector is a publisher middleware that can set message attributes based on value present in the context.
func TopicAsEventName ¶ added in v0.4.1
func TopicAsEventName() PublisherMiddleware
TopicAsEventName is a publisher middleware that will set the event name as the topic if the event name is empty.
type PublisherRecorder ¶ added in v0.3.7
type PublisherRecorder struct {
// contains filtered or unexported fields
}
PublisherRecorder is a publisher middleware that records all the events that are published.
func NewPublisherRecorder ¶ added in v0.3.7
func NewPublisherRecorder(publisher Publisher) *PublisherRecorder
NewPublisherRecorder returns a new publisher recorder.
func (*PublisherRecorder) Messages ¶ added in v0.3.7
func (p *PublisherRecorder) Messages() []*MessageRecord
Messages returns a list of records in the same order that have been published.
func (*PublisherRecorder) MessagesMap ¶ added in v0.3.7
func (p *PublisherRecorder) MessagesMap() map[string][]*Message
MessagesMap returns a map of all the messages published by topic.
func (*PublisherRecorder) Reset ¶ added in v0.3.7
func (p *PublisherRecorder) Reset()
Reset empties the lists of published messages.
func (*PublisherRecorder) TopicMessages ¶ added in v0.3.7
func (p *PublisherRecorder) TopicMessages(topic string) []*Message
TopicMessages returns list of message sent in a single topic.
type ReceivedMessage ¶
type ReceivedMessage interface { // ID of the message, it should be unique. ID() string // Name of the message. Name() string // Key grouping key of the message. Key() string // Body of the message. Body() []byte // Version of the envelope. Version() string // Attributes message attributes. Attributes() Attributes // Ack acknowledges the message. Ack(ctx context.Context) error // NAck negative acknowledges the message. NAck(ctx context.Context) error // ReSchedule puts the message back in the same topic // to be available again after a certain delay // And Ack or Nack is expected to happen at this point, // although not required. ReSchedule(ctx context.Context, delay time.Duration) error // ReceivedCount returns the number of times this message // has been delivered. // If the messaging system does not support this information // it should be 0. ReceivedCount() int // String prints the message. String() string }
ReceivedMessage incoming message consumed by the subscriber.
type Router ¶
type Router struct { // Message unmarshaller. If none provided // NoOpUnmarshaller will be used. Unmarshaller Unmarshaller // AckDecider is an optional method that will decide if the message should // be acknowledged, negative acknowledged or do nothing; it receives the topic, // By default, the message will be acknowledged if there was no error handling it // and negatively acknowledged if there was. // To disable the automatic acknowledgements pass the DisableAutoAck function AckDecider AckDecider // Backoff calculates the time to wait when re-scheduling a message. // The default exponential back off will be used if not provided. // Note that consumers can override the back off strategy. Backoff BackoffStrategy // StopTimeout time to wait for all the consumer to stop in a // clean way. No timeout by default. StopTimeout time.Duration // MessageContext is an optional function that modify the context // that will be passed along during the message live-cycle: // * in the message handler // * in all the checkpoints. MessageContext MessageContext // Optional callback invoked when the consumer // reports an error. OnReceive Checkpoint // Optional callback invoked when the received message // cannot be unmarshaled into a message. OnUnmarshal Checkpoint // OnHandlerError callback invoked when the handler // returns an error. OnHandler Checkpoint // Optional callback invoked when the handled // message cannot be acknowledged OnAck Checkpoint // Optional callback invoked after fully processing a message // passing the elapsed time and the error, if any. OnProcess OnProcess // Optional callback invoked after a consumer return from a // consume operation. OnNext OnNext // contains filtered or unexported fields }
Router groups consumers and runs them together.
func (*Router) Register ¶
func (r *Router) Register(name string, subscriber Subscriber, handler Handler, opts ...ConsumerOption) error
func (*Router) Run ¶
Run starts all the consumer and keeps them running.
Run is a blocking call, to stop it cancel the given context. If a consumer returns and error and "ContinueOnErrors" is "false" (default value), the router will stop all consumers and return the first error that triggered the shutdown.
Calling run more than once will return an error. Registering new handlers after call running won't have any effect. It needs to be stopped and started again.
type SchedulerPublisher ¶ added in v0.5.0
type SchedulerPublisher struct { *MarshallerPublisher // contains filtered or unexported fields }
func NewSchedulerPublisher ¶ added in v0.5.0
func NewSchedulerPublisher(pub *MarshallerPublisher, storage SchedulerStorage) *SchedulerPublisher
func (*SchedulerPublisher) PublishDue ¶ added in v0.5.0
func (p *SchedulerPublisher) PublishDue(ctx context.Context) error
type SchedulerStorage ¶ added in v0.5.0
type SchedulerStorage interface { // Schedule schedules a message to be published in the future. Schedule(ctx context.Context, dueDate time.Time, topic string, messages ...*Envelope) error // ConsumeDue returns a channel that should receive the next due messages or an error. // It should feed the next messages while the context is valid, once the context terminates // it should make any cleaning task and close the channel to signal a proper cleanup. ConsumeDue(ctx context.Context) (<-chan DueMessage, error) // Published indicates to the storage that the message has been published. Published(ctx context.Context, message DueMessage) error }
type Subscriber ¶
type Subscriber interface { // Subscribe to the topic. Subscribe() (<-chan Next, error) // Stop stops consuming. Stop(ctx context.Context) error }
Subscriber consumes messages from a topic.
type Unmarshaller ¶
type Unmarshaller interface { // Unmarshal unmarshalls the received message. Unmarshal(topic string, message ReceivedMessage) (data interface{}, err error) }
Unmarshaller will decode the received message. It should be aware of the version.
func NoOpUnmarshaller ¶
func NoOpUnmarshaller() Unmarshaller
NoOpUnmarshaller will pass the message data raw byte slice without taking the version into account.
type UnmarshallerFunc ¶
type UnmarshallerFunc func(topic string, message ReceivedMessage) (data interface{}, err error)
UnmarshallerFunc will decode the received message. It should be aware of the version.
func (UnmarshallerFunc) Unmarshal ¶
func (f UnmarshallerFunc) Unmarshal(topic string, message ReceivedMessage) (data interface{}, err error)