message

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrOutputInNoPublisherHandler happens when a handler func returned some messages in a no-publisher handler.
	// todo: maybe change the handler func signature in no-publisher handler so that there's no possibility for this
	ErrOutputInNoPublisherHandler = errors.New("returned output messages in a handler without publisher")
)

Functions

func HandlerNameFromCtx

func HandlerNameFromCtx(ctx context.Context) string

HandlerNameFromCtx returns the name of the message handler in the router that consumed the message.

func PublishTopicFromCtx

func PublishTopicFromCtx(ctx context.Context) string

PublishTopicFromCtx returns the topic to which message will be published by the router.

func PublisherNameFromCtx

func PublisherNameFromCtx(ctx context.Context) string

PublisherNameFromCtx returns the name of the message publisher type that published the message in the router. For example, for Kafka it will be `kafka.Publisher`.

func SubscribeTopicFromCtx

func SubscribeTopicFromCtx(ctx context.Context) string

SubscribeTopicFromCtx returns the topic from which message was received in the router.

func SubscriberNameFromCtx

func SubscriberNameFromCtx(ctx context.Context) string

SubscriberNameFromCtx returns the name of the message subscriber type that subscribed to the message in the router. For example, for Kafka it will be `kafka.Subscriber`.

Types

type DuplicateHandlerNameError

type DuplicateHandlerNameError struct {
	HandlerName string
}

DuplicateHandlerNameError is sent in a panic when you try to add a second handler with the same name.

func (DuplicateHandlerNameError) Error

type DynRouter

type DynRouter struct {
	// contains filtered or unexported fields
}

DynRouter is responsible for handling messages from subscribers using provided handler functions.

If the handler function returns a message, the message is published with the publisher. You can use middlewares to wrap handlers with common logic like logging, instrumentation, etc.

func NewDynRouter

func NewDynRouter(config RouterConfig, routeMsgFunc RouteMessageFunc, logger watermill.LoggerAdapter) (*DynRouter, error)

NewDynRouter creates a new Router with given configuration.

func (*DynRouter) AddHandlerRM added in v1.4.0

func (r *DynRouter) AddHandlerRM(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	publishTopic string,
	publisher Publisher,
	handlerFunc HandlerFuncRM,
) *RouteMsgHandler

AddHandler adds a new handler.

handlerName must be unique. For now, it is used only for debugging.

subscribeTopic is a topic from which handler will receive messages.

publishTopic is a topic to which router will produce messages returned by handlerFunc. When handler needs to publish to multiple topics, it is recommended to just inject Publisher to Handler or implement middleware which will catch messages and publish to topic based on metadata for example.

If handler is added while router is already running, you need to explicitly call RunHandlers().

func (*DynRouter) AddMiddlewareRM added in v1.4.0

func (r *DynRouter) AddMiddlewareRM(m ...HandlerMiddlewareRM)

AddMiddleware adds a new middleware to the router.

The order of middleware matters. Middleware added at the beginning is executed first.

func (*DynRouter) AddNoPublisherHandlerRM added in v1.4.0

func (r *DynRouter) AddNoPublisherHandlerRM(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *RouteMsgHandler

AddNoPublisherHandler adds a new handler. This handler cannot return messages. When message is returned it will occur an error and Nack will be sent.

handlerName must be unique. For now, it is used only for debugging.

subscribeTopic is a topic from which handler will receive messages.

subscriber is Subscriber from which messages will be consumed.

If handler is added while router is already running, you need to explicitly call RunHandlers().

func (*DynRouter) AddPlugin

func (r *DynRouter) AddPlugin(p ...DynRouterPlugin)

AddPlugin adds a new plugin to the router. Plugins are executed during startup of the router.

A plugin can, for example, close the router after SIGINT or SIGTERM is sent to the process (SignalsHandler plugin).

func (*DynRouter) AddPublisherDecorators

func (r *DynRouter) AddPublisherDecorators(dec ...PublisherDecorator)

AddPublisherDecorators wraps the router's Publisher. The first decorator is the innermost, i.e. calls the original publisher.

func (*DynRouter) AddSubscriberDecorators

func (r *DynRouter) AddSubscriberDecorators(dec ...SubscriberDecorator)

AddSubscriberDecorators wraps the router's Subscriber. The first decorator is the innermost, i.e. calls the original subscriber.

func (*DynRouter) Close

func (r *DynRouter) Close() error

Close gracefully closes the router with a timeout provided in the configuration.

func (*DynRouter) HandlersRM added in v1.3.0

func (r *DynRouter) HandlersRM() map[string]HandlerFuncRM

Handlers returns all registered handlers.

func (*DynRouter) IsRunning

func (r *DynRouter) IsRunning() bool

IsRunning returns true when router is running.

func (*DynRouter) Logger

func (r *DynRouter) Logger() watermill.LoggerAdapter

Logger returns the Router's logger.

func (*DynRouter) Run

func (r *DynRouter) Run(ctx context.Context) (err error)

Run runs all plugins and handlers and starts subscribing to provided topics. This call is blocking while the router is running.

When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.

To stop Run() you should call Close() on the router.

ctx will be propagated to all subscribers.

When all handlers are stopped (for example: because of closed connection), Run() will be also stopped.

func (*DynRouter) RunHandlers

func (r *DynRouter) RunHandlers(ctx context.Context) error

RunHandlers runs all handlers that were added after Run(). RunHandlers is idempotent, so can be called multiple times safely.

func (*DynRouter) Running

func (r *DynRouter) Running() chan struct{}

Running is closed when router is running. In other words: you can wait till router is running using

fmt.Println("Starting router")
go r.Run(ctx)
<- r.Running()
fmt.Println("Router is running")

type DynRouterPlugin

type DynRouterPlugin func(*DynRouter) error

RouterPlugin is function which is executed on Router start.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler handles Messages.

func (*Handler) AddMiddleware

func (h *Handler) AddMiddleware(m ...HandlerMiddleware)

AddMiddleware adds new middleware to the specified handler in the router.

The order of middleware matters. Middleware added at the beginning is executed first.

func (*Handler) Started

func (h *Handler) Started() chan struct{}

Started returns channel which is stopped when handler is running.

func (*Handler) Stop

func (h *Handler) Stop()

Stop stops the handler. Stop is asynchronous. You can check if handler was stopped with Stopped() function.

func (*Handler) Stopped

func (h *Handler) Stopped() chan struct{}

Stopped returns channel which is stopped when handler did stop.

type HandlerFunc

type HandlerFunc func(msg *Message) ([]*Message, error)

HandlerFunc is function called when message is received.

msg.Ack() is called automatically when HandlerFunc doesn't return error. When HandlerFunc returns error, msg.Nack() is called. When msg.Ack() was called in handler and HandlerFunc returns error, msg.Nack() will be not sent because Ack was already sent.

HandlerFunc's are executed parallel when multiple messages was received (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).

var PassthroughHandler HandlerFunc = func(msg *Message) ([]*Message, error) {
	return []*Message{msg}, nil
}

PassthroughHandler is a handler that passes the message unchanged from the subscriber to the publisher.

type HandlerFuncRM added in v1.3.0

type HandlerFuncRM func(msg *Message) ([]*Message, error)

HandlerFunc's are executed parallel when multiple messages was received (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).

type HandlerMiddleware

type HandlerMiddleware func(h HandlerFunc) HandlerFunc

HandlerMiddleware allows us to write something like decorators to HandlerFunc. It can execute something before handler (for example: modify consumed message) or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).

It can be attached to the router by using `AddMiddleware` method.

Example:

func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		fmt.Println("executed before handler")
		producedMessages, err := h(message)
		fmt.Println("executed after handler")

		return producedMessages, err
	}
}

type HandlerMiddlewareRM added in v1.3.0

type HandlerMiddlewareRM func(h HandlerFuncRM) HandlerFuncRM

type Message

type Message struct {
	// UUID is a unique identifier of message.
	//
	// It is only used by Watermill for debugging.
	// UUID can be empty.
	UUID string

	// Metadata contains the message metadata.
	//
	// Can be used to store data which doesn't require unmarshalling the entire payload.
	// It is something similar to HTTP request's headers.
	//
	// Metadata is marshaled and will be saved to the PubSub.
	Metadata Metadata

	// Payload is the message's payload.
	Payload Payload
	// contains filtered or unexported fields
}

Message is the basic transfer unit. Messages are emitted by Publishers and received by Subscribers.

func NewMessage

func NewMessage(uuid string, payload Payload) *Message

NewMessage creates a new Message with given uuid and payload.

func (*Message) Ack

func (m *Message) Ack() bool

Ack sends message's acknowledgement.

Ack is not blocking. Ack is idempotent. False is returned, if Nack is already sent.

func (*Message) Acked

func (m *Message) Acked() <-chan struct{}

Acked returns channel which is closed when acknowledgement is sent.

Usage:

select {
case <-message.Acked():
	// ack received
case <-message.Nacked():
	// nack received
}

func (*Message) Context

func (m *Message) Context() context.Context

Context returns the message's context. To change the context, use SetContext.

The returned context is always non-nil; it defaults to the background context.

func (*Message) Copy

func (m *Message) Copy() *Message

Copy copies all message without Acks/Nacks. The context is not propagated to the copy.

func (*Message) Equals

func (m *Message) Equals(toCompare *Message) bool

Equals compare, that two messages are equal. Acks/Nacks are not compared.

func (*Message) Nack

func (m *Message) Nack() bool

Nack sends message's negative acknowledgement.

Nack is not blocking. Nack is idempotent. False is returned, if Ack is already sent.

func (*Message) Nacked

func (m *Message) Nacked() <-chan struct{}

Nacked returns channel which is closed when negative acknowledgement is sent.

Usage:

select {
case <-message.Acked():
	// ack received
case <-message.Nacked():
	// nack received
}

func (*Message) SetContext

func (m *Message) SetContext(ctx context.Context)

SetContext sets provided context to the message.

type Messages

type Messages []*Message

Messages is a slice of messages.

func (Messages) IDs

func (m Messages) IDs() []string

IDs returns a slice of Messages' IDs.

type Metadata

type Metadata map[string]string

Metadata is sent with every message to provide extra context without unmarshaling the message payload.

func (Metadata) Get

func (m Metadata) Get(key string) string

Get returns the metadata value for the given key. If the key is not found, an empty string is returned.

func (Metadata) Set

func (m Metadata) Set(key, value string)

Set sets the metadata key to value.

type NoPublishHandlerFunc

type NoPublishHandlerFunc func(msg *Message) error

NoPublishHandlerFunc is HandlerFunc alternative, which doesn't produce any messages.

type Payload

type Payload []byte

Payload is the Message's payload.

type Publisher

type Publisher interface {
	// Publish publishes provided messages to given topic.
	//
	// Publish can be synchronous or asynchronous - it depends on the implementation.
	//
	// Most publishers implementations don't support atomic publishing of messages.
	// This means that if publishing one of the messages fails, the next messages will not be published.
	//
	// Publish must be thread safe.
	Publish(topic string, messages ...*Message) error
	// Close should flush unsent messages, if publisher is async.
	Close() error
}

Publisher is the emitting part of a Pub/Sub.

type PublisherDecorator

type PublisherDecorator func(pub Publisher) (Publisher, error)

PublisherDecorator wraps the underlying Publisher, adding some functionality.

func MessageTransformPublisherDecorator

func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator

MessageTransformPublisherDecorator creates a publisher decorator that calls transform on each message that passes through the publisher.

type RouteMessageFunc

type RouteMessageFunc func(topic string, msg []*Message) (string, []*Message, error)

RouteMessageFunc is function called when message is received.

type RouteMsgHandler

type RouteMsgHandler struct {
	// contains filtered or unexported fields
}

RouteMsgHandler handles Messages.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router is responsible for handling messages from subscribers using provided handler functions.

If the handler function returns a message, the message is published with the publisher. You can use middlewares to wrap handlers with common logic like logging, instrumentation, etc.

func NewRouter

func NewRouter(config RouterConfig, logger watermill.LoggerAdapter) (*Router, error)

NewRouter creates a new Router with given configuration.

func (*Router) AddHandler

func (r *Router) AddHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	publishTopic string,
	publisher Publisher,
	handlerFunc HandlerFunc,
) *Handler

AddHandler adds a new handler.

handlerName must be unique. For now, it is used only for debugging.

subscribeTopic is a topic from which handler will receive messages.

publishTopic is a topic to which router will produce messages returned by handlerFunc. When handler needs to publish to multiple topics, it is recommended to just inject Publisher to Handler or implement middleware which will catch messages and publish to topic based on metadata for example.

If handler is added while router is already running, you need to explicitly call RunHandlers().

func (*Router) AddMiddleware

func (r *Router) AddMiddleware(m ...HandlerMiddleware)

AddMiddleware adds a new middleware to the router.

The order of middleware matters. Middleware added at the beginning is executed first.

func (*Router) AddNoPublisherHandler

func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler

AddNoPublisherHandler adds a new handler. This handler cannot return messages. When message is returned it will occur an error and Nack will be sent.

handlerName must be unique. For now, it is used only for debugging.

subscribeTopic is a topic from which handler will receive messages.

subscriber is Subscriber from which messages will be consumed.

If handler is added while router is already running, you need to explicitly call RunHandlers().

func (*Router) AddPlugin

func (r *Router) AddPlugin(p ...RouterPlugin)

AddPlugin adds a new plugin to the router. Plugins are executed during startup of the router.

A plugin can, for example, close the router after SIGINT or SIGTERM is sent to the process (SignalsHandler plugin).

func (*Router) AddPublisherDecorators

func (r *Router) AddPublisherDecorators(dec ...PublisherDecorator)

AddPublisherDecorators wraps the router's Publisher. The first decorator is the innermost, i.e. calls the original publisher.

func (*Router) AddSubscriberDecorators

func (r *Router) AddSubscriberDecorators(dec ...SubscriberDecorator)

AddSubscriberDecorators wraps the router's Subscriber. The first decorator is the innermost, i.e. calls the original subscriber.

func (*Router) Close

func (r *Router) Close() error

Close gracefully closes the router with a timeout provided in the configuration.

func (*Router) Handlers

func (r *Router) Handlers() map[string]HandlerFunc

Handlers returns all registered handlers.

func (*Router) IsClosed

func (r *Router) IsClosed() bool

func (*Router) IsRunning

func (r *Router) IsRunning() bool

IsRunning returns true when router is running.

Warning: for historical reasons, this method is not aware of router closing. If you want to know if the router was closed, use IsClosed.

func (*Router) Logger

func (r *Router) Logger() watermill.LoggerAdapter

Logger returns the Router's logger.

func (*Router) Run

func (r *Router) Run(ctx context.Context) (err error)

Run runs all plugins and handlers and starts subscribing to provided topics. This call is blocking while the router is running.

When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.

To stop Run() you should call Close() on the router.

ctx will be propagated to all subscribers.

When all handlers are stopped (for example: because of closed connection), Run() will be also stopped.

func (*Router) RunHandlers

func (r *Router) RunHandlers(ctx context.Context) error

RunHandlers runs all handlers that were added after Run(). RunHandlers is idempotent, so can be called multiple times safely.

func (*Router) Running

func (r *Router) Running() chan struct{}

Running is closed when router is running. In other words: you can wait till router is running using

fmt.Println("Starting router")
go r.Run(ctx)
<- r.Running()
fmt.Println("Router is running")

Warning: for historical reasons, this channel is not aware of router closing - the channel will be closed if the router has been running and closed.

type RouterConfig

type RouterConfig struct {
	// CloseTimeout determines how long router should work for handlers when closing.
	CloseTimeout time.Duration
}

RouterConfig holds the Router's configuration options.

func (RouterConfig) Validate

func (c RouterConfig) Validate() error

Validate returns Router configuration error, if any.

type RouterPlugin

type RouterPlugin func(*Router) error

RouterPlugin is function which is executed on Router start.

type SubscribeInitializer

type SubscribeInitializer interface {
	// SubscribeInitialize can be called to initialize subscribe before consume.
	// When calling Subscribe before Publish, SubscribeInitialize should be not required.
	//
	// Not every Pub/Sub requires this initialization, and it may be optional for performance improvements etc.
	// For detailed SubscribeInitialize functionality, please check Pub/Subs godoc.
	//
	// Implementing SubscribeInitialize is not obligatory.
	SubscribeInitialize(topic string) error
}

SubscribeInitializer is used to initialize subscribers.

type Subscriber

type Subscriber interface {
	// Subscribe returns output channel with messages from provided topic.
	// Channel is closed, when Close() was called on the subscriber.
	//
	// To receive the next message, `Ack()` must be called on the received message.
	// If message processing failed and message should be redelivered `Nack()` should be called.
	//
	// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
	// Provided ctx is set to all produced messages.
	// When Nack or Ack is called on the message, context of the message is canceled.
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
	// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
	Close() error
}

Subscriber is the consuming part of the Pub/Sub.

type SubscriberDecorator

type SubscriberDecorator func(sub Subscriber) (Subscriber, error)

SubscriberDecorator wraps the underlying Subscriber, adding some functionality.

func MessageTransformSubscriberDecorator

func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator

MessageTransformSubscriberDecorator creates a subscriber decorator that calls transform on each message that passes through the subscriber.

Directories

Path Synopsis
router

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL