persistent

package
v0.0.0-...-0bb0fe6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2019 License: BSD-2-Clause Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewOffsetConsumer

func NewOffsetConsumer(
	messageHandler MessageHandler,
	client sarama.Client,
	topic string,
	db *bolt.DB,
	offsetBucketName []byte,
) consumer.Consumer

NewOffsetConsumer return an consumer with offset tracking.

func NewOffsetMessageHandler

func NewOffsetMessageHandler(
	db *bolt.DB,
	offsetBucketName []byte,
	messageHandler MessageHandler,
) consumer.MessageHandler

Types

type MessageHandler

type MessageHandler interface {
	// HandleMessage with a open Bolt transaction.
	ConsumeMessage(ctx context.Context, tx *bolt.Tx, msg *sarama.ConsumerMessage) error
}

MessageHandler handles Kafka messages with an open database connection.

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, tx *bolt.Tx, msg *sarama.ConsumerMessage) error

MessageHandlerFunc allow use a function as MessageHandler.

func (MessageHandlerFunc) ConsumeMessage

func (m MessageHandlerFunc) ConsumeMessage(ctx context.Context, tx *bolt.Tx, msg *sarama.ConsumerMessage) error

ConsumeMessage forward to the function.

type Offset

type Offset int64

Offset in the Kafka topic.

func OffsetFromBytes

func OffsetFromBytes(content []byte) Offset

OffsetFromBytes returns the offset for the given bytes.

func (Offset) Bytes

func (o Offset) Bytes() []byte

Bytes representation for the offset.

func (Offset) Int64

func (o Offset) Int64() int64

Int64 value for the offset.

type OffsetRegistry

type OffsetRegistry interface {
	Get(partition int32) (int64, error)
	Set(partition int32, offset int64) error
}

OffsetRegistry save and load the current offset from Bolt.

func NewOffsetRegistry

func NewOffsetRegistry(
	tx *bolt.Tx,
	bucketName []byte,
) OffsetRegistry

type Partition

type Partition int32

Partition in Kafka.

func PartitionFromBytes

func PartitionFromBytes(content []byte) Partition

PartitionFromBytes returns the partition for the given bytes.

func (Partition) Bytes

func (o Partition) Bytes() []byte

Bytes representation for the partion.

func (Partition) Int32

func (o Partition) Int32() int32

Int32 value of the partition.

type Storage

type Storage interface {
	// Get value for the given key.
	Get(ctx context.Context, key []byte) ([]byte, error)

	// Set a value for the given key.
	Set(ctx context.Context, key []byte, value []byte) error

	// Read fill Bolt db until context is canceled.
	Read(ctx context.Context) error
}

Storage saves all write to a Kafka topic and caches reads in a local Bolt database.

func NewStorage

func NewStorage(
	producer sarama.SyncProducer,
	client sarama.Client,
	db *bolt.DB,
	topic string,
) Storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL