pubsub

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2021 License: MIT Imports: 9 Imported by: 3

README

Pub/Sub

godoc ci coverage goreport

Package for publishing and consuming messages from pub/sub systems.

Supported:

  • AWS (SNS+SQS)

TL;DR

Publish a message

publisher := pubsub.NewPublisher(
	awsSNSPublisher,
	&marshaller.ProtoTextMarshaller{},
)

publisher.Publish(ctx, "some-topic", &pubsub.Message{
    Data: &proto.SomeMessage {Foo: "bar"},
})

Consume messages

var unmarshaller marshaller.ProtoTextMarshaller
unmarshaller.Register("some-topic", &proto.SomeMessage{})

router := pubsub.Router{
	Unmarshaller: &unmarshaller,
}

router.Register(
    "some-topic",
    awsSQSSusbcriber,
    pubsub.HandlerFunc(func(ctx context.Context, message *pubsub.Message) error {
        msg := request.Data.(*proto.SomeMessage)
        fmt.Println(msg.Foo)
    })
)

router.Run(ctx)

Components

Message

The key component in the flow is the message, this is the struct that is going to be published, and the struct that your handlers will receive.

The message contains this information:

  • Data: it accepts any type (accepts interface{}), as long as the marshaller you've chosen supports serializing it to a []byte.
  • ID: (optional) a string with an ID for the message, if not provided, the publisher will add one UUIDv4 automatically.
  • Name: (optional) name for the message, it can be used to dispatch the messages using the name as discriminator while sharing the same topic.
  • Key: (optional) certain pub/sub system can leverage the key to provide FIFO semantics to messages with the same key within a single topic.
  • Attributes: (optional) a map[string]string with custom message metadata.
Publisher

The publisher sends the message to a given topic using the underlying pub/sub system implementation.

type Publisher interface {
	Publish(ctx context.Context, topic string, messages ...*Message) error
}

Steps:

  • Serializes the message data using the marshaller
  • Sends the data, and the rest of the message fields as an EnvelopeMessage to the pub/sub system.
Scheduler

The scheduler supersedes a normal Publisher as it can schedule a message to be published at any time in the future

NOTE You need to provide some infrastructure to store the messages while they are not due.

type Scheduler interface {
    Publisher
    Schedule(ctx context.Context, dueDate time.Time, topic string, messages ...*Message) error
    Delay(ctx context.Context, delay time.Duration, topic string, messages ...*Message) error
}

You can use the Schedule method when you want to publish a message at some concrete time, or Delay to indicate that the message should be sent after some duration.

Publishing pending messages

You'll need a background process to publish the due pending messages. Instantiate the scheduler and call PublishDue

import (
    "github.com/hmoragrega/pubsub"
    "github.com/hmoragrega/pubsub/schedule/storage/postgres"
)

// External storage to store the pending messages.
storage := postgres.NewPostgres("instance-123", "pending_messages", dbClient)

p := pubsub.NewSchedulerPublisher(marshaller, storage)

// Sends the due messages in a loop blocking until context
// is terminated or an error occurs.
if err := p.PublishDue(ctx); err != nil {
	log.Fatal("error happened while published pending messages: %w", err)
}
Pending storage

The scheduler requires some infrastructure to store the messages while they are not due, PostgresQL is the only implementation provided, but you can provide yours with this interface

type SchedulerStorage interface {
	// Schedule schedules a message to be published in the future.
	Schedule(ctx context.Context, dueDate time.Time, topic string, messages ...*Envelope) error

	// ConsumeDue returns a channel that should receive the next due messages or an error.
	// It should feed the next messages while the context is valid, once the context terminates
	// it should make any cleaning task and close the channel to signal a proper cleanup.
	ConsumeDue(ctx context.Context) (<-chan DueMessage, error)

	// Published indicates to the storage that the message has been published.
	Published(ctx context.Context, message DueMessage) error
}
Postgres storage

It stores the messages in a table, you need to create such table and pass the name while instantiating it.

This is the recommended table, messages are encoded as JSON by default.

CREATE TABLE IF NOT EXISTS pending_messages (
    id               BIGSERIAL PRIMARY KEY,
    message_id       TEXT NOT NULL,
    due_date         TIMESTAMP WITH TIME ZONE NOT NULL,
    topic            TEXT NOT NULL,
    message          BYTEA NOT NULL, -- JSONB works too
    instance_id      TEXT DEFAULT NULL, 
    instance_timeout TIMESTAMP WITH TIME ZONE DEFAULT NULL,

    CONSTRAINT message_id_idx UNIQUE (message_id)

If the number of pending messages can grow large, it is recommended to have some indexes to improve the performance

-- Used for selecting the next due messages to process 
CREATE INDEX pending_messages_next_batch_idx
    ON pending_messages (due_date, instance_id);

Concurrency and ordering:

It is fine to have multiple postgres storages retrieving messages to be sent, they lock their next batch, trying to respect the inserting order after they are due, but since they work in batches, they will start feeding the messages concurrently to the parent process, so they will be published unordered.

If your application needs them to be published in insertion order you must have only one process retrieving the due messages.

Fail over: When a batch of messages is locked for publishing, the instance sets a timeout, if the timeout is reached and the messages have not yet been deleted from the table, another instance will pick them and process them, so if an instance crashes or blocks, those messages can be sent.

It can happen that if the timeout is reached but the instance was working fine (just slower), another instance can pick the messages, and they would be duplicated twice. This is a compromise as this system as been design with "at least one" publishing.

If your system needs "exactly one" publishing you must have only one instance processing pending messages at the cost of lower throughput.

Clean Up: When the consuming context is terminated, the storage will block before stopping until all it's current batch pending messages are published, this helps to prevent those locked messages to have to wait until the instance fail over before another instance picks them up.

Keep this in mind before killing application forcibly, give the kill signal enough time to publish the batch according to it's size.

Options: Most apps can work with the default values, but you can tweak the behavior explained before with the options.

type Options struct {
	// MinWaitTime minimum wait time between that will wait before processing the next batch of due messages.
	// If the batch took more to be processed, the next one will be processed immediately.
	// On mostly empty tables a longer time will prevent hitting the DB constantly.
	// at the cost of losing some precision in the due date.
	// Default: 500ms.
	MinWaitTime time.Duration

	// BatchSize number of messages that will be processed at the same time.
	// Default: 100.
	BatchSize int

	// InstanceTimeout specifies the time that a message will be locked for publishing after 
	// has been selected for this instance. After this time, the message will be available again.
	// A reasonable value should allow you app to publish all the messages in the batch.
	// Default: 1 minute.
	InstanceTimeout time.Duration

	// Encoder can be used to customize how to encode the messages for storing.
	// Default: JSON encoder
	Encoder encoder
}
Envelope publisher

This is the specific sender for each pub/sub system, it receives an envelope, that holds the data for the message (this time as []byte), the envelope has also the version of the marshaller used to serialize the data.

You can check the implementation for Amazon's SNS service here as example.

Subscriber

The subscriber can subscribe to a single topic in a pub/sub system and consume the topic messages.

// Subscriber consumes messages from a topic.
type Subscriber interface {
	// Subscribe to the topic.
	Subscribe() (<-chan Next, error)

	// Stop stops consuming.
	Stop(ctx context.Context) error
}

When the subscription succeeds it returns a channel that is fed with each new consuming operation , which contains either a new received message or, an error.

type Next struct {
	Message ReceivedMessage
	Err     error
}

The received message provides an abstraction of the underlying message implementation for the pu/sub system.

It has the same data as the envelope plus the Ack method to acknowledge the message once it has been handled.

You may build any custom logic on top of a subscriber to handle the message live cycle in your own way, usually it's more useful to use the router to handle it.

Router

A router holds a group of subscribers and runs them all together, starting and stopping all of them at the same time.

To initialize the router pass the unmarshaller and register all the subscribers along with the message handler associated to them.

NOTE: If you are using an enveloper publisher and/or marshalling in your own terms, the router will use the NoOpUnmarshaller function which will use the raw data []byte slice for the message.

Call Run on the router; it starts all the subscribers, and if all succeed, it will consume the messages in an endless loop, feeding them to the handlers after unmarshalling the message body.

Message Acknowledgement

By default, the router will acknowledge the message if the message handler does not return any error.

If you want to control this behaviour you can override the AckDecider function, for example you can use the provided DisableAutoAck to prevent the router from acknowledging the message automatically

To manually acknowledge a message call

msg.Ack(ctx)

To manually not-acknowledge a message use

msg.NAck(ctx)

If the messaging system supports it, the message can be re-scheduled with a delay.

msg.ReSchedule(10 * time.Minute)

By design, all these operations can be called just once, and subsequent calls will return the result of the first call

Re-Scheduling backoff

If you decide to re-schedule a message in the AckDecider the router will use the default exponential backoff strategy.

You can change this in the router initialization

r := pubsub.Router {
	BackoffStrategy: pubsub.LinerBackoffStrategy(5*time.Minute)
}

You can also override the backoff strategy for a given consumer

var r pubsub.Router {
	// config
}
// override backoff for a heavy task consumer.
r.Register("heavy-task", subscriber, handler, pubsub.WithBackoff(pubsub.LinearBackoff(time.Hour)))
Message handler

Register a message handler in the router using the method Register.

You'll need to provide:

  • The topic name
  • The subscriber that consumes the topic messages
  • The handler for the topic message

The handler receives a message and reports the result of handling it

// Handler handles events.
type Handler interface {
	HandleMessage(ctx context.Context, message *Message) error
}
Publisher handler

Sometimes a handler will need to publish a new message after handling its own.

You could publish on the same handler, but if you are using the standard publisher provided you can use its method Handler that accepts handler that return a list of messages to be published.

// PublisherHandler handles events and generates new
// messages that will be published.
type PublisherHandler interface {
	HandleMessage(ctx context.Context, message *Message) ([]*Message, error)
}
router.Register(
    "incoming-topic",
    subscriber,
    publisher.Handler("outgoing-topic", publisherHandler)
)
Stopping the router

By default, the router will stop only once the given context terminates, this means that skips to the next message if any of the steps fail.

You can change the behaviour using the different router Checkpoints that are invoked on every step.

Each checkpoint will receive the topic, the received message, and the result (it will be nil on success).
You can use them to log and monitor the functioning of the subscriptions.

If the return value of a checkpoint is not nil, the router will trigger a shutdown stopping all the subscribers and returning the error.

type Checkpoint func(ctx context.Context, topic string, msg ReceivedMessage, err error) error

This is the list of checkpoints available

  • OnReceive: after the subscriber provides the next result.
  • OnUnmarshal: after the unmarshalling the received message body.
  • OnHandler: after the handler returns.
  • OnAck: after acknowledging the message.
Stop timeout

By default, the router will trigger the shutdown and wait indefinitely until all the subscribers stop, then return reporting any errors that may have happened.

You can use the field StopTimeout to set a maximum time to wait for the subscribers to stop.

Marshaller/Unmarshaller

The (un)marshaller sits between the pub/sub system encoding and decoding the message data.

// Marshals the contents of message data.
type Marshaller interface {
	Marshal(data interface{}) (payload []byte, version string, err error)
}

// Unmarshaller will decode the received message.
// It should be aware of the version.
type Unmarshaller interface {
	Unmarshal(topic string, message ReceivedMessage) (*Message, error)
}

The provided un/marshallers are:

  • JSON: encodes the message data as a JSON string, and decodes into any registered struct either by the event name, or the topic
  • ProtoText: uses go-proto-sdk v2 to encode data implementing the proto.Message interface.
  • NoOp: accepts either a string or a []byte as data payload.
Received Message Version

The version included in the Envelope/ReceivedMessage is not the version of data but the version of the marshaller used to encode this data.

This is important since having this info helps the unmarshaller to understand if it can decode the payload or even allows migrating the marshaller in long-lived topic supporting old messages.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumerAlreadyRegistered = errors.New("consumer already registered")
	ErrRouterAlreadyRunning      = errors.New("router already running")
	ErrRouterAlreadyStopped      = errors.New("router already stopped")
)
View Source
var ErrMissingHandler = errors.New("missing handler")

ErrMissingHandler is fired when the dispatcher has not handler registered for the message name.

View Source
var (
	ErrReceivedMessageNotAvailable = errors.New("received message not available")
)
View Source
var (
	// ErrResourceDoesNotExist indicates that the resource (topic, queue...) does not exist.
	ErrResourceDoesNotExist = errors.New("name does not exist")
)
View Source
var ErrUnsupportedVersion = errors.New("unsupported version")

Functions

func NewID

func NewID() string

NewID package level function to generate IDs using the default generator.

func WithAckDecider added in v0.6.0

func WithAckDecider(ackDecider AckDecider) func(*consumer)

func WithBackoff added in v0.6.0

func WithBackoff(strategy BackoffStrategy) func(*consumer)

Types

type AckDecider added in v0.6.0

type AckDecider func(ctx context.Context, topic string, msg ReceivedMessage, err error) Acknowledgement

type Acknowledgement added in v0.4.0

type Acknowledgement uint8
const (
	NoOp Acknowledgement = iota
	Ack
	NAck
	ReSchedule
)

func AutoAck added in v0.6.0

func DisableAutoAck added in v0.3.8

func DisableAutoAck(_ context.Context, _ string, _ ReceivedMessage, _ error) Acknowledgement

DisableAutoAck is acknowledgement decider function that disables the router from auto-acknowledging messages.

func ReScheduleOnError added in v0.6.0

func ReScheduleOnError(_ context.Context, _ string, _ ReceivedMessage, err error) Acknowledgement

ReScheduleOnError is acknowledgement decider function re-schedules on errors.

type Attributes

type Attributes = map[string]string

Attributes a list of message attributes.

type BackoffStrategy added in v0.6.0

type BackoffStrategy interface {
	Delay(msg *Message) time.Duration
}

type BackoffStrategyFunc added in v0.6.0

type BackoffStrategyFunc func(msg *Message) time.Duration

func LinearBackoff added in v0.6.0

func LinearBackoff(delay time.Duration) BackoffStrategyFunc

func (BackoffStrategyFunc) Delay added in v0.6.0

func (f BackoffStrategyFunc) Delay(msg *Message) time.Duration

type Checkpoint

type Checkpoint func(ctx context.Context, consumerName string, msg ReceivedMessage, err error) error

Checkpoint optional hooks executed during the message live cycle

Returning an error will stop the subscription, and trigger the shutdown of the router.

func WrapCheckpoint added in v0.7.0

func WrapCheckpoint(current Checkpoint, hooks ...Checkpoint) Checkpoint

WrapCheckpoint will call multiple on checkpoint hooks one after the other and return on the first error.

type ConsumerOption added in v0.6.0

type ConsumerOption func(*consumer)

type CtxExtractorFunc added in v0.3.5

type CtxExtractorFunc func(ctx context.Context) string

CtxExtractorFunc is a function that given a context returns a value. Return empty string if not present.

type DueMessage added in v0.5.0

type DueMessage struct {
	Topic    string
	Envelope *Envelope
	Err      error
}

type Envelope

type Envelope struct {
	ID         MessageID
	Name       string
	Key        string
	Body       []byte
	Version    string
	Attributes Attributes
}

Envelope holds the data that need to be transmitted.

type EnvelopePublisher

type EnvelopePublisher interface {
	Publish(ctx context.Context, topic string, envelopes ...*Envelope) error
}

EnvelopePublisher publish envelopes where the data has already been marshalled.

You can also use this interface directly is you want to handle the marshalling yourself, combined with the NoOpMarshaller for the router.

func NoOpEnvelopePublisher added in v0.5.0

func NoOpEnvelopePublisher() EnvelopePublisher

NoOpEnvelopePublisher skips publishing the messages without failures.

type EnvelopePublisherFunc added in v0.2.1

type EnvelopePublisherFunc func(ctx context.Context, topic string, envelopes ...*Envelope) error

EnvelopePublisherFunc is a function that can publish envelopes.

func (EnvelopePublisherFunc) Publish added in v0.2.1

func (f EnvelopePublisherFunc) Publish(ctx context.Context, topic string, envelopes ...*Envelope) error

Publish publishes envelopes invoking the function.

type ExponentialBackoff added in v0.6.0

type ExponentialBackoff struct {
	// Factor is the multiplying factor for each increment step, 3 by default.
	Factor float64
	// Max is the minimum delay, 1 minute by default.
	Min time.Duration
	// Max is the maximum delay, 24 hours by default.
	Max time.Duration
	// Jitter eases contention by randomizing backoff steps, 0 by default (disabled).
	// Must be a value [0.0, 1.0]
	Jitter float64
}

func (*ExponentialBackoff) Delay added in v0.6.0

func (b *ExponentialBackoff) Delay(msg *Message) time.Duration

type Handler

type Handler interface {
	HandleMessage(ctx context.Context, message *Message) error
}

Handler handles events.

func Acknowledge added in v0.4.2

func Acknowledge(next Handler) Handler

Acknowledge will acknowledge a message and pass it to the next handler.

func Recoverer added in v0.3.4

func Recoverer(next Handler) Handler

Recoverer will prevent panics in the handler

func WrapHandler added in v0.3.5

func WrapHandler(handler Handler, middlewares ...func(Handler) Handler) Handler

WrapHandler will wrap the handler in the given middlewares.

type HandlerFunc

type HandlerFunc func(ctx context.Context, message *Message) error

HandlerFunc that handles an event

func Dispatcher

func Dispatcher(handlers map[string]Handler) HandlerFunc

Dispatcher is a message handler middleware that can be used to register different handlers for the same consumer, based on the message name.

func (HandlerFunc) HandleMessage

func (f HandlerFunc) HandleMessage(ctx context.Context, message *Message) error

HandleMessage handles the message using the function.

type IDGenerator

type IDGenerator interface {
	New() string
}

IDGenerator is able to generate IDs.

type IDGeneratorFunc

type IDGeneratorFunc func() string

IDGeneratorFunc function helper to conform the IDGenerator interface.

var DefaultIDGenerator IDGeneratorFunc = uuid.NewString

DefaultIDGenerator generates a v4 UUID. It will panic if it cannot generate the ID.

func (IDGeneratorFunc) New

func (f IDGeneratorFunc) New() string

New generate a new ID.

type Marshaller

type Marshaller interface {
	Marshal(data interface{}) (payload []byte, version string, err error)
}

Marshaller marshalls the contents of message data.

type MarshallerPublisher added in v0.2.1

type MarshallerPublisher struct {
	// contains filtered or unexported fields
}

MarshallerPublisher will marshall a message and publish a message delegate the publishing of the envelope.

func NewPublisher added in v0.2.1

func NewPublisher(publisher EnvelopePublisher, marshaller Marshaller) *MarshallerPublisher

NewPublisher creates a new marshaller publisher.

func (*MarshallerPublisher) Handler added in v0.2.1

func (p *MarshallerPublisher) Handler(topic string, handler PublisherHandler) Handler

Handler is a helper that publishes messages generated by other handlers.

func (*MarshallerPublisher) Publish added in v0.2.1

func (p *MarshallerPublisher) Publish(ctx context.Context, topic string, messages ...*Message) error

Publish a message to the given topic.

type Message

type Message struct {
	// ID of the message, if empty a new one will
	// be generated automatically
	ID MessageID
	// Name of the message
	Name string
	// Key groups the message of the same type.
	// Different transports may try to guarantee
	// the order for messages with the same key.
	Key string
	// Data that we want to transmit.
	Data interface{}
	// ReceivedCount returns the number of times this message
	// has been delivered.
	ReceivedCount int
	// Message attributes
	Attributes Attributes
	// contains filtered or unexported fields
}

Message represent the information that we want to transmit.

func NewMessageFromReceived added in v0.3.3

func NewMessageFromReceived(msg ReceivedMessage, data interface{}) *Message

NewMessageFromReceived builds a new message from a received one and it's unmarshalled body.

func (*Message) Ack added in v0.2.1

func (m *Message) Ack(ctx context.Context) error

Ack acknowledges the message.

func (*Message) NAck added in v0.4.2

func (m *Message) NAck(ctx context.Context) error

func (*Message) ReSchedule added in v0.6.0

func (m *Message) ReSchedule(ctx context.Context, delay time.Duration) error

ReSchedule puts the message back again in the topic/queue to be available after a certain delay.

Different implementations may need to act accordingly and ack or n/ack the message.

The common scenario is that a message has been failed to be processed and, we don't want to receive it immediately, probably applying a backoff strategy with increased delay times.

func (*Message) SetAttribute added in v0.2.1

func (m *Message) SetAttribute(key, value string)

SetAttribute sets an attribute.

type MessageContext added in v0.7.0

type MessageContext func(ctx context.Context, consumerName string, message ReceivedMessage) context.Context

MessageContext optional hook that can be used to modify the context used while processing a message.

func WrapMessageContext added in v0.7.0

func WrapMessageContext(current MessageContext, hooks ...MessageContext) MessageContext

WrapMessageContext will call multiple message context hooks one after the other.

type MessageID

type MessageID = string

MessageID a slice of bytes representing a unique message ID

type MessageRecord added in v0.3.7

type MessageRecord struct {
	Topic   string
	Message *Message
}

MessageRecord record the message and the topic.

type Next

type Next struct {
	Message ReceivedMessage
	Err     error
}

Next holds the next message in the subscription

type OnNext added in v0.8.0

type OnNext func(ctx context.Context, consumerName string, next Next) Next

OnNext optional hook called after a next message operation is received. It can be used to modify the next message.

func WrapNext added in v0.8.0

func WrapNext(current OnNext, hooks ...OnNext) OnNext

WrapNext will call multiple on next hooks one after the other.

type OnProcess added in v0.7.0

type OnProcess func(ctx context.Context, consumerName string, elapsed time.Duration, msg ReceivedMessage, err error)

OnProcess optional hook called after processing a received message in a consumer.

func WrapOnProcess added in v0.7.0

func WrapOnProcess(current OnProcess, hooks ...OnProcess) OnProcess

WrapOnProcess will call multiple on process hooks one after the other.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, topic string, messages ...*Message) error
}

Publisher describes the top level method to publish messages.

func NoOpPublisher added in v0.3.7

func NoOpPublisher() Publisher

NoOpPublisher skips publishing the messages without failures.

func WrapPublisher added in v0.3.5

func WrapPublisher(publisher Publisher, middlewares ...PublisherMiddleware) Publisher

WrapPublisher will wrap the publisher in the given middlewares.

type PublisherFunc added in v0.2.1

type PublisherFunc func(ctx context.Context, topic string, envelopes ...*Message) error

PublisherFunc is a function that can publish messages.

func (PublisherFunc) Publish added in v0.2.1

func (f PublisherFunc) Publish(ctx context.Context, topic string, envelopes ...*Message) error

Publish publishes messages invoking the function.

type PublisherHandler

type PublisherHandler interface {
	HandleMessage(ctx context.Context, message *Message) ([]*Message, error)
}

PublisherHandler handles events and generates new messages that should be published.

type PublisherHandlerFunc

type PublisherHandlerFunc func(ctx context.Context, message *Message) ([]*Message, error)

PublisherHandlerFunc function that can handle a message.

func (PublisherHandlerFunc) HandleMessage

func (f PublisherHandlerFunc) HandleMessage(ctx context.Context, message *Message) ([]*Message, error)

HandleMessage handles the message with the function.

type PublisherMiddleware added in v0.3.5

type PublisherMiddleware = func(Publisher) Publisher

func CtxAttributeInjector added in v0.3.5

func CtxAttributeInjector(attributeName string, extract CtxExtractorFunc) PublisherMiddleware

CtxAttributeInjector is a publisher middleware that can set message attributes based on value present in the context.

func TopicAsEventName added in v0.4.1

func TopicAsEventName() PublisherMiddleware

TopicAsEventName is a publisher middleware that will set the event name as the topic if the event name is empty.

type PublisherRecorder added in v0.3.7

type PublisherRecorder struct {
	// contains filtered or unexported fields
}

PublisherRecorder is a publisher middleware that records all the events that are published.

func NewPublisherRecorder added in v0.3.7

func NewPublisherRecorder(publisher Publisher) *PublisherRecorder

NewPublisherRecorder returns a new publisher recorder.

func (*PublisherRecorder) Messages added in v0.3.7

func (p *PublisherRecorder) Messages() []*MessageRecord

Messages returns a list of records in the same order that have been published.

func (*PublisherRecorder) MessagesMap added in v0.3.7

func (p *PublisherRecorder) MessagesMap() map[string][]*Message

MessagesMap returns a map of all the messages published by topic.

func (*PublisherRecorder) Publish added in v0.3.7

func (p *PublisherRecorder) Publish(ctx context.Context, topic string, messages ...*Message) error

func (*PublisherRecorder) Reset added in v0.3.7

func (p *PublisherRecorder) Reset()

Reset empties the lists of published messages.

func (*PublisherRecorder) TopicMessages added in v0.3.7

func (p *PublisherRecorder) TopicMessages(topic string) []*Message

TopicMessages returns list of message sent in a single topic.

type ReceivedMessage

type ReceivedMessage interface {
	// ID of the message, it should be unique.
	ID() string

	// Name of the message.
	Name() string

	// Key grouping key of the message.
	Key() string

	// Body of the message.
	Body() []byte

	// Version of the envelope.
	Version() string

	// Attributes message attributes.
	Attributes() Attributes

	// Ack acknowledges the message.
	Ack(ctx context.Context) error

	// NAck negative acknowledges the message.
	NAck(ctx context.Context) error

	// ReSchedule puts the message back in the same topic
	// to be available again after a certain delay
	// And Ack or Nack is expected to happen at this point,
	// although not required.
	ReSchedule(ctx context.Context, delay time.Duration) error

	// ReceivedCount returns the number of times this message
	// has been delivered.
	// If the messaging system does not support this information
	// it should be 0.
	ReceivedCount() int

	// String prints the message.
	String() string
}

ReceivedMessage incoming message consumed by the subscriber.

type Router

type Router struct {
	// Message unmarshaller. If none provided
	// NoOpUnmarshaller will be used.
	Unmarshaller Unmarshaller

	// AckDecider is an optional method that will decide if the message should
	// be acknowledged, negative acknowledged or do nothing; it receives the topic,
	// By default, the message will be acknowledged if there was no error handling it
	// and negatively acknowledged if there was.
	// To disable the automatic acknowledgements pass the DisableAutoAck function
	AckDecider AckDecider

	// Backoff calculates the time to wait when re-scheduling a message.
	// The default exponential back off will be used if not provided.
	// Note that consumers can override the back off strategy.
	Backoff BackoffStrategy

	// StopTimeout time to wait for all the consumer to stop in a
	// clean way. No timeout by default.
	StopTimeout time.Duration

	// MessageContext is an optional function that modify the context
	// that will be passed along during the message live-cycle:
	//  * in the message handler
	//  * in all the checkpoints.
	MessageContext MessageContext

	// Optional callback invoked when the consumer
	// reports an error.
	OnReceive Checkpoint

	// Optional callback invoked when the received message
	// cannot be unmarshaled into a message.
	OnUnmarshal Checkpoint

	// OnHandlerError callback invoked when the handler
	// returns an error.
	OnHandler Checkpoint

	// Optional callback invoked when the handled
	// message cannot be acknowledged
	OnAck Checkpoint

	// Optional callback invoked after fully processing a message
	// passing the elapsed time and the error, if any.
	OnProcess OnProcess

	// Optional callback invoked after a consumer return from a
	// consume operation.
	OnNext OnNext
	// contains filtered or unexported fields
}

Router groups consumers and runs them together.

func (*Router) Register

func (r *Router) Register(name string, subscriber Subscriber, handler Handler, opts ...ConsumerOption) error

func (*Router) Run

func (r *Router) Run(ctx context.Context) (err error)

Run starts all the consumer and keeps them running.

Run is a blocking call, to stop it cancel the given context. If a consumer returns and error and "ContinueOnErrors" is "false" (default value), the router will stop all consumers and return the first error that triggered the shutdown.

Calling run more than once will return an error. Registering new handlers after call running won't have any effect. It needs to be stopped and started again.

type Scheduler added in v0.5.0

type Scheduler interface {
	Publisher
	Schedule(ctx context.Context, dueDate time.Time, topic string, messages ...*Message) error
	Delay(ctx context.Context, delay time.Duration, topic string, messages ...*Message) error
}

type SchedulerPublisher added in v0.5.0

type SchedulerPublisher struct {
	*MarshallerPublisher
	// contains filtered or unexported fields
}

func NewSchedulerPublisher added in v0.5.0

func NewSchedulerPublisher(pub *MarshallerPublisher, storage SchedulerStorage) *SchedulerPublisher

func (*SchedulerPublisher) Delay added in v0.5.0

func (p *SchedulerPublisher) Delay(ctx context.Context, duration time.Duration, topic string, messages ...*Message) error

func (*SchedulerPublisher) PublishDue added in v0.5.0

func (p *SchedulerPublisher) PublishDue(ctx context.Context) error

func (*SchedulerPublisher) Schedule added in v0.5.0

func (p *SchedulerPublisher) Schedule(ctx context.Context, dueDate time.Time, topic string, messages ...*Message) error

type SchedulerStorage added in v0.5.0

type SchedulerStorage interface {
	// Schedule schedules a message to be published in the future.
	Schedule(ctx context.Context, dueDate time.Time, topic string, messages ...*Envelope) error

	// ConsumeDue returns a channel that should receive the next due messages or an error.
	// It should feed the next messages while the context is valid, once the context terminates
	// it should make any cleaning task and close the channel to signal a proper cleanup.
	ConsumeDue(ctx context.Context) (<-chan DueMessage, error)

	// Published indicates to the storage that the message has been published.
	Published(ctx context.Context, message DueMessage) error
}

type Subscriber

type Subscriber interface {
	// Subscribe to the topic.
	Subscribe() (<-chan Next, error)

	// Stop stops consuming.
	Stop(ctx context.Context) error
}

Subscriber consumes messages from a topic.

type Unmarshaller

type Unmarshaller interface {
	// Unmarshal unmarshalls the received message.
	Unmarshal(topic string, message ReceivedMessage) (data interface{}, err error)
}

Unmarshaller will decode the received message. It should be aware of the version.

func NoOpUnmarshaller

func NoOpUnmarshaller() Unmarshaller

NoOpUnmarshaller will pass the message data raw byte slice without taking the version into account.

type UnmarshallerFunc

type UnmarshallerFunc func(topic string, message ReceivedMessage) (data interface{}, err error)

UnmarshallerFunc will decode the received message. It should be aware of the version.

func (UnmarshallerFunc) Unmarshal

func (f UnmarshallerFunc) Unmarshal(topic string, message ReceivedMessage) (data interface{}, err error)

Directories

Path Synopsis
aws module
pubsubtest
schedule

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL