naffka

package module
v0.0.0-...-14ff508 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2021 License: Apache-2.0 Imports: 7 Imported by: 1

README

naffka

Single in-process implementation of the sarama golang kafka APIs.

It's like Kafka, but a bit naff.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MemoryDatabase

type MemoryDatabase struct {
	// contains filtered or unexported fields
}

A MemoryDatabase stores the message history as arrays in memory. It can be used to run unit tests. If the process is stopped then any messages that haven't been processed by a consumer are lost forever and all offsets become invalid.

func (*MemoryDatabase) FetchMessages

func (m *MemoryDatabase) FetchMessages(topic string, startOffset, endOffset int64) ([]types.Message, error)

FetchMessages implements Database

func (*MemoryDatabase) MaxOffsets

func (m *MemoryDatabase) MaxOffsets() (map[string]int64, error)

MaxOffsets implements Database

func (*MemoryDatabase) StoreMessages

func (m *MemoryDatabase) StoreMessages(topic string, messages []types.Message) error

StoreMessages implements Database

type Naffka

type Naffka struct {
	// contains filtered or unexported fields
}

Naffka is an implementation of the sarama kafka API designed to run within a single go process. It implements both the sarama.SyncProducer and the sarama.Consumer interfaces. This means it can act as a drop in replacement for kafka for testing or single instance deployment. Does not support multiple partitions.

func New

func New(db storage.Database) (*Naffka, error)

New creates a new Naffka instance.

func (*Naffka) Close

func (n *Naffka) Close() error

Close implements sarama.SyncProducer and sarama.Consumer

func (*Naffka) ConsumePartition

func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition implements sarama.Consumer Note: offset is *inclusive*, i.e. it will include the message with that offset.

func (*Naffka) HighWaterMarks

func (n *Naffka) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks implements sarama.Consumer

func (*Naffka) Partitions

func (n *Naffka) Partitions(topic string) ([]int32, error)

Partitions implements sarama.Consumer

func (*Naffka) SendMessage

func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

SendMessage implements sarama.SyncProducer

func (*Naffka) SendMessages

func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error

SendMessages implements sarama.SyncProducer

func (*Naffka) Topics

func (n *Naffka) Topics() ([]string, error)

Topics implements sarama.Consumer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL