events

package
v2.3.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateOutbox added in v2.2.58

func CreateOutbox(ctx context.Context, db *sql.DB) error

func SendAsync added in v2.2.58

func SendAsync(ctx context.Context, event Event)

SendAsync sends the event without blocking and with no error reporting. If an error happens or the event queue is full, the event might be dropped.

func SendTx added in v2.2.58

func SendTx(ctx context.Context, tx sqlx.ExecerContext, event Event) error

SendTx ensures that the event is sent if the transaction is committed successfully. If the transaction rollbacks, the event will be discarded.

func TimeToEventTimestamp added in v2.2.18

func TimeToEventTimestamp(ts time.Time) int64

func WriteToOutbox added in v2.2.58

func WriteToOutbox(ctx context.Context, tx sqlx.ExecerContext, metadata EventMetadata, payload []byte) error

Types

type Event

type Event interface {
	// Schema returns the avro schema of this event
	Schema() string

	// Serialize writes the event (in avro format) to the given writer.
	Serialize(io.Writer) error
}

type EventHeader added in v2.2.58

type EventHeader struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

type EventHeaders added in v2.2.58

type EventHeaders []EventHeader

func (EventHeaders) ToKafka added in v2.2.58

func (headers EventHeaders) ToKafka() []kafka.Header

type EventMetadata added in v2.2.58

type EventMetadata struct {
	Type    reflect.Type
	Topic   string
	Key     *string
	Headers EventHeaders
}

type EventSender

type EventSender interface {
	// SendAsync sends the given event. This method should be non blocking and
	// must never fail. If sending would block, you are allowed to discard the event.
	// Errors will be logged to the terminal but otherwise ignored.
	SendAsync(ctx context.Context, event Event)

	// SendAsyncCh returns a channel you can use to enqueue events to the sender. Sending events on this
	// channel will give you no delivery guarantees for those events.
	SendAsyncCh() chan<- Event

	// SendInTx sends the message in the transaction.
	// Returns an error, if sending fails.
	SendInTx(ctx context.Context, tx sqlx.ExecerContext, event Event) error

	// Close the event sender and flush all pending events.
	// Waits for all events to be send out.
	Close() error
}
var Sender EventSender = &NoopEventSender{}

type EventSenderInitializer added in v2.2.58

type EventSenderInitializer interface {
	Close() error
	Initialize() (EventSender, error)
}

func NewInitializer added in v2.2.58

func NewInitializer(
	confluentClient *confluent.Client,
	kafkaSender *kafka.Producer,
	fileSender io.WriteCloser,
	eventTopics EventTopics,
	bufferSize uint,
) (EventSenderInitializer, error)

type EventTopics

type EventTopics struct {
	EventTypes map[reflect.Type]kafka.Topic
}

EventTopics contains a mapping from event struct type to a kafka topic. This map must contain all event types that are going to be send. If this misses an event, sending that event will fail.

func (*EventTopics) Normalized added in v2.2.58

func (topics *EventTopics) Normalized() (*NormalizedEventTypes, error)

Normalized returns a Normalized EventTopics instance where the event types point directly to the events struct type and not to any kind of pointer. This ways we prevent issues with pointers to event types not getting matched to a topic.

func (*EventTopics) Topics

func (topics *EventTopics) Topics() kafka.Topics

type KafkaEvent added in v2.2.58

type KafkaEvent struct {
	Event
	Key     string
	Headers []EventHeader
}

func ToKafkaEvent added in v2.1.14

func ToKafkaEvent(key string, ev Event) *KafkaEvent

func WithKey added in v2.2.58

func WithKey(ev Event, key string, headers ...EventHeader) *KafkaEvent

type NoopEventSender

type NoopEventSender struct {
	// contains filtered or unexported fields
}

func (*NoopEventSender) Close

func (n *NoopEventSender) Close() error

func (*NoopEventSender) SendAsync added in v2.2.58

func (n *NoopEventSender) SendAsync(ctx context.Context, event Event)

func (*NoopEventSender) SendAsyncCh added in v2.2.88

func (n *NoopEventSender) SendAsyncCh() chan<- Event

func (*NoopEventSender) SendInTx added in v2.2.58

func (n *NoopEventSender) SendInTx(ctx context.Context, tx sqlx.ExecerContext, event Event) error

type NormalizedEventTypes added in v2.2.58

type NormalizedEventTypes struct {
	EventTopics
}

func (*NormalizedEventTypes) MetadataOf added in v2.2.58

func (topics *NormalizedEventTypes) MetadataOf(event Event) (*EventMetadata, error)

func (*NormalizedEventTypes) TopicForType added in v2.2.58

func (topics *NormalizedEventTypes) TopicForType(eventType reflect.Type) (string, error)

type TopicsFunc

type TopicsFunc func(replicationFactor int16) EventTopics

TopicsFunc builds an EventTopics instance for the given kafka replication factor.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL