outboxer

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2022 License: MIT Imports: 8 Imported by: 0

README

Outboxer

Build Status Go Report Card GoDoc

This is fork of https://github.com/italolelis/outboxer

Outboxer is a go library that implements the outbox pattern.

Getting Started

Outboxer was designed to simplify the tough work of orchestrating message reliabilty. Essentially we are trying to solve this question:

How can producers reliably send messages when the broker/consumer is unavailable?

If you have a distributed system architecture and especially is dealing with Event Driven Architecture, you might want to use outboxer.

The first thing to do is include the package in your project

go get github.com/artsv79/outboxer
Initial Configuration

Let's setup a simple example where you are using RabbitMQ and Postgres as your outbox pattern components:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, err := sql.Open("postgres", os.Getenv("DS_DSN"))
if err != nil {
    fmt.Printf("could not connect to amqp: %s", err)
    return
}

conn, err := amqp.Dial(os.Getenv("ES_DSN"))
if err != nil {
    fmt.Printf("could not connect to amqp: %s", err)
    return
}

// we need to create a data store instance first
ds, err := postgres.WithInstance(ctx, db)
if err != nil {
    fmt.Printf("could not setup the data store: %s", err)
    return
}
defer ds.Close()

// we create an event stream passing the amqp connection
es := amqpOut.NewAMQP(conn)

// now we create an outboxer instance passing the data store and event stream
o, err := outboxer.New(
    outboxer.WithDataStore(ds),
    outboxer.WithEventStream(es),
    outboxer.WithCheckInterval(1*time.Second),
    outboxer.WithCleanupInterval(5*time.Second),
    outboxer.WithCleanUpBefore(time.Now().AddDate(0, 0, -5)),
)
if err != nil {
    fmt.Printf("could not create an outboxer instance: %s", err)
    return
}

// here we initialize the outboxer checks and cleanup go rotines
o.Start(ctx)
defer o.Stop()

// finally we are ready to send messages
if err = o.Send(ctx, &outboxer.OutboxMessage{
    Payload: []byte("test payload"),
    Options: map[string]interface{}{
        amqpOut.ExchangeNameOption: "test",
        amqpOut.ExchangeTypeOption: "topic",
        amqpOut.RoutingKeyOption:   "test.send",
    },
}); err != nil {
    fmt.Printf("could not send message: %s", err)
    return
}

// we can also listen for errors and ok messages that were send
for {
    select {
    case err := <-o.ErrChan():
        fmt.Printf("could not send message: %s", err)
    case <-o.OkChan():
        fmt.Printf("message received")
        return
    }
}

Features

Outboxer comes with a few implementations of Data Stores and Event Streams.

Data Stores
Event Streams

License

This project is licensed under the MIT License - see the LICENSE file for details

Documentation

Overview

Package outboxer is an implementation of the outbox pattern. The producer of messages can durably store those messages in a local outbox before sending to a Message Endpoint. The durable local storage may be implemented in the Message Channel directly, especially when combined with Idempotent Messages.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrMissingEventStream is used when no event stream is provided.
	ErrMissingEventStream = errors.New("an event stream is required for the outboxer to work")

	// ErrMissingDataStore is used when no data store is provided.
	ErrMissingDataStore = errors.New("a data store is required for the outboxer to work")
)
View Source
var ErrFailedToDecodeType = errors.New("could not decode type")

ErrFailedToDecodeType is returned when the type of the value is not supported.

Functions

This section is empty.

Types

type DataStore

type DataStore interface {
	// Tries to find the given message in the outbox.
	GetEvents(ctx context.Context, batchSize int32) ([]*OutboxMessage, error)
	Add(ctx context.Context, m *OutboxMessage) error
	AddWithinTx(ctx context.Context, m *OutboxMessage, fn func(ExecerContext) error) error
	SetAsDispatched(ctx context.Context, id int64) error
	Remove(ctx context.Context, since time.Time, batchSize int32) error
}

DataStore defines the data store methods.

type DynamicValues

type DynamicValues map[string]interface{}

DynamicValues is a map that can be serialized.

func (*DynamicValues) Scan

func (p *DynamicValues) Scan(src interface{}) error

Scan scans a database json representation into a []Item.

func (DynamicValues) Value

func (p DynamicValues) Value() (driver.Value, error)

Value return a driver.Value representation of the order items.

type EventStream

type EventStream interface {
	Send(context.Context, *OutboxMessage) error
}

EventStream defines the event stream methods.

type ExecerContext

type ExecerContext interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

ExecerContext defines the exec context method that is used within a transaction.

type Option

type Option func(*Outboxer)

Option represents the outboxer options.

func WithCheckInterval

func WithCheckInterval(t time.Duration) Option

WithCheckInterval sets the frequency that outboxer will check for new events.

func WithCleanUpBatchSize

func WithCleanUpBatchSize(s int32) Option

WithCleanUpBatchSize sets the clean up process batch size.

func WithCleanUpOlderThan

func WithCleanUpOlderThan(t time.Duration) Option

WithCleanUpOlderThan sets the date that the clean up process should start removing from.

func WithCleanupInterval

func WithCleanupInterval(t time.Duration) Option

WithCleanupInterval sets the frequency that outboxer will clean old events from the data store.

func WithDataStore

func WithDataStore(ds DataStore) Option

WithDataStore sets the data store where events will be stored before sending.

func WithEventStream

func WithEventStream(es EventStream) Option

WithEventStream sets the event stream to where events will be sent.

func WithMessageBatchSize

func WithMessageBatchSize(s int32) Option

WithMessageBatchSize sets how many messages will be sent at a time.

type OutboxMessage

type OutboxMessage struct {
	ID           int64
	Dispatched   bool
	DispatchedAt sql.NullTime
	Payload      []byte
	Options      DynamicValues
	Headers      DynamicValues
}

OutboxMessage represents a message that will be sent.

type Outboxer

type Outboxer struct {
	// contains filtered or unexported fields
}

Outboxer implements the outbox pattern.

func New

func New(opts ...Option) (*Outboxer, error)

New creates a new instance of Outboxer.

Example

nolint

package main

import (
	"context"
	"database/sql"
	"fmt"
	"os"
	"time"

	"github.com/artsv79/outboxer"
	amqpOut "github.com/artsv79/outboxer/es/amqp"
	"github.com/artsv79/outboxer/storage/postgres"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	db, err := sql.Open("postgres", os.Getenv("DS_DSN"))
	if err != nil {
		fmt.Printf("could not connect to amqp: %s", err)
		return
	}

	conn, err := amqp.Dial(os.Getenv("ES_DSN"))
	if err != nil {
		fmt.Printf("could not connect to amqp: %s", err)
		return
	}

	// we need to create a data store instance first
	ds, err := postgres.WithInstance(ctx, db)
	if err != nil {
		fmt.Printf("could not setup the data store: %s", err)
		return
	}
	defer ds.Close()

	// we create an event stream passing the amqp connection
	es := amqpOut.NewAMQP(conn)

	// now we create an outboxer instance passing the data store and event stream
	o, err := outboxer.New(
		outboxer.WithDataStore(ds),
		outboxer.WithEventStream(es),
		outboxer.WithCheckInterval(1*time.Second),
		outboxer.WithCleanupInterval(5*time.Second),
		outboxer.WithCleanUpOlderThan(5*24*time.Hour),
		outboxer.WithCleanUpBatchSize(10),
		outboxer.WithMessageBatchSize(10),
	)
	if err != nil {
		fmt.Printf("could not create an outboxer instance: %s", err)
		return
	}

	// here we initialize the outboxer checks and cleanup go rotines
	o.Start(ctx)
	defer o.Stop()

	// finally we are ready to send messages
	if err = o.Send(ctx, &outboxer.OutboxMessage{
		Payload: []byte("test payload"),
		Options: map[string]interface{}{
			amqpOut.ExchangeNameOption: "test",
			amqpOut.ExchangeTypeOption: "topic",
			amqpOut.RoutingKeyOption:   "test.send",
		},
	}); err != nil {
		fmt.Printf("could not send message: %s", err)
		return
	}

	// we can also listen for errors and ok messages that were send
	for {
		select {
		case err := <-o.ErrChan():
			fmt.Printf("could not send message: %s", err)
		case <-o.OkChan():
			fmt.Printf("message received")
			return
		}
	}
}
Output:

func (*Outboxer) ErrChan

func (o *Outboxer) ErrChan() <-chan error

ErrChan returns the error channel.

func (*Outboxer) OkChan

func (o *Outboxer) OkChan() <-chan struct{}

OkChan returns the ok channel that is used when each message is successfully delivered.

func (*Outboxer) Send

func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error

Send sends a message.

func (*Outboxer) SendWithinTx

func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error

SendWithinTx encapsulate any database call within a transaction.

func (*Outboxer) Start

func (o *Outboxer) Start(ctx context.Context)

Start encapsulates two go routines. Starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream. Starts the cleanup process, that makes sure old messages are removed from the data store.

func (*Outboxer) StartCleanup

func (o *Outboxer) StartCleanup(ctx context.Context)

StartCleanup starts the cleanup process, that makes sure old messages are removed from the data store.

func (*Outboxer) StartDispatcher

func (o *Outboxer) StartDispatcher(ctx context.Context)

StartDispatcher starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream.

func (*Outboxer) Stop

func (o *Outboxer) Stop()

Stop closes all channels.

Directories

Path Synopsis
es
amqp
Package amqp is the AMQP implementation of an event stream.
Package amqp is the AMQP implementation of an event stream.
kinesis
Package kinesis is the AWS Kinesis implementation of an event stream.
Package kinesis is the AWS Kinesis implementation of an event stream.
pubsub
Package pubsub is the GCP PubSub implementation of an event stream.
Package pubsub is the GCP PubSub implementation of an event stream.
sqs
Package SQS is the AWS SQS implementation of an event stream.
Package SQS is the AWS SQS implementation of an event stream.
storage
mysql
Package mysql is the implementation of the mysql data store.
Package mysql is the implementation of the mysql data store.
postgres
Package postgres is the implementation of the postgres data store.
Package postgres is the implementation of the postgres data store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL