emitter

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2023 License: MIT Imports: 5 Imported by: 0

README

Emitter

Emitter is the implementation of the Outbox Pattern for handling outbox message from database.

Installation

go get -u github.com/bluexlab/emitter

Default Usage

// Kafka Writer
kafkaWriter := emitter.NewKafkaWriter([]string{"broker:9092"}, "topic", 10)
defer func() { _ = kafkaWriter.Close() }()

ctx, cancel := context.WithCancel(context.Background())
notificationEmitter := emitter.NewEmitter(ctx, outboxSourceImpl, emitter.KafkaHandler(kafkaWriter))
defer notificationEmitter.Stop()

notificationEmitter.Run()

Message Handler

Emitter leverage Handler to process the messages. User can provide custom handler as long as it follows HandlerFunc function signature:

// Handler process the OutboxMsg retrieve from OutboxSource.
type Handler interface {
Process(ctx context.Context, msgs ...OutboxMsg) ([]int64, error)
}

// HandlerFunc is an adapter to allow the use of
// ordinary functions as a OutboxMsg handler. If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f.
type HandlerFunc func(ctx context.Context, msgs ...OutboxMsg) ([]int64, error)

// Process calls f(ctx, msg).
func (f HandlerFunc) Process(ctx context.Context, msgs ...OutboxMsg) ([]int64, error) {
return f(ctx, msgs...)
}

Options

By default, Emitter polls OutboxMsg every 30 second. User can provide Options to change the default behavior:

notificationEmitter := emitter.NewOutboxEmitter(ctx, source, handler,
    emitter.WithInterval(10 * time.Second),
    emitter.WithBatchSize(5),
)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewKafkaWriter

func NewKafkaWriter(brokers []string, topic string, batchSize int) *kafka.Writer

NewKafkaWriter creates a new kafka writer.

Types

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

func NewEmitter

func NewEmitter(ctx context.Context, outbox OutboxSource, handler HandlerFunc, opts ...Option) *Emitter

func (*Emitter) Run

func (e *Emitter) Run()

Run starts the emitter.

func (*Emitter) Stop

func (e *Emitter) Stop()

Stop stops the emitter.

type Handler

type Handler interface {
	Process(ctx context.Context, msgs ...OutboxMsg) ([]int64, error)
}

Handler process the OutboxMsg retrieve from OutboxSource.

type HandlerFunc

type HandlerFunc func(ctx context.Context, msgs ...OutboxMsg) ([]int64, error)

HandlerFunc is an adapter to allow the use of ordinary functions as a OutboxMsg handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func KafkaHandler

func KafkaHandler(writer *kafka.Writer) HandlerFunc

KafkaHandler writes messages to kafka with a kafka writer.

func (HandlerFunc) Process

func (f HandlerFunc) Process(ctx context.Context, msgs ...OutboxMsg) ([]int64, error)

Process calls f(ctx, msg).

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

Logger interface API for log.Logger.

type LoggerFunc

type LoggerFunc func(msg string, args ...any)

LoggerFunc is a bridge between Logger and any third party logger Usage:

l := NewLogger() // some logger
a := antenna.NewAntenna(...)
a.SetLogger(LoggerFunc(l.Infof))
a.SetErrorLogger(LoggerFunc(l.Errorf))

func (LoggerFunc) Printf

func (f LoggerFunc) Printf(msg string, args ...any)

Printf implements Logger interface.

type Option

type Option func(emitter *Emitter)

Option is a function that configures an Emitter.

func WithBatchSize

func WithBatchSize(size int) Option

WithBatchSize sets the batch size for the emitter.

func WithErrorLogger

func WithErrorLogger(logger Logger) Option

WithErrorLogger sets the error logger for the antenna.

func WithInterval

func WithInterval(interval time.Duration) Option

WithInterval sets the interval for the emitter.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger for the antenna.

type OutboxMsg

type OutboxMsg struct {
	RecID int64
	Key   string
	Msg   []byte
}

type OutboxSource

type OutboxSource interface {
	GetOutboxMsg(ctx context.Context, batchSize int) ([]OutboxMsg, error)
	DeleteOutboxMsg(ctx context.Context, recIDs ...int64) error
}

OutboxSource provides the functionality to retrieve and deleted OutboxMsg.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL