kafkabox

package
v0.0.0-...-81c1d68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2022 License: MIT Imports: 10 Imported by: 0

README

Outbox pattern for kafka.

The outbox document example:

var doc={
topic:"order.created",
key:"my_key",
value:"my_value",
headers:[{key:"header_key",value:"header_value"}]
}

Documentation

Index

Constants

View Source
const CollectionName = "outbox"

Variables

This section is empty.

Functions

func NewEmitter

func NewEmitter(o EmitterOptions) (hevent.Emitter, error)

Types

type EmitterOptions

type EmitterOptions struct {
	Outbox            OutboxStore
	ContextPropagator hexa.ContextPropagator
	Encoder           hevent.Encoder
}

func (EmitterOptions) Validate

func (o EmitterOptions) Validate() error
type Header struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

Header is the Kafka event's header

type MessageConverter

type MessageConverter interface {
	EventToOutboxMessage(context.Context, *hevent.Event) (*OutboxMessage, error)
}

type OutboxMessage

type OutboxMessage struct {
	ID        string    `bson:"id" json:"id"`
	Topic     string    `bson:"topic" json:"topic"`
	Key       string    `bson:"key" json:"key"`
	Value     string    `bson:"value" json:"value"`
	Headers   []Header  `bson:"headers" json:"headers"`
	EmittedAt time.Time `bson:"emitted_at" json:"emitted_at"`
}

OutboxMessage is the outbox collection's model

type OutboxStore

type OutboxStore interface {
	hexa.Shutdownable

	Migrate() error // do migration if needed.
	Create(c context.Context, msg *OutboxMessage) error
	Ping(c context.Context) error
}

func NewOutboxStore

func NewOutboxStore(coll *mongo.Collection) OutboxStore

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL