brokers

package
v1.0.6-1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2020 License: Apache-2.0 Imports: 10 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrOffsetOff = errors.New("Offset is off")

Functions

This section is empty.

Types

type Broker

type Broker interface {
	InitConfig()
	Initialize(peers []string)
	CloseConnections()
	Publish(topic string, payload messages.Message) (string, string, int, int64, error)
	GetMinOffset(topic string) int64
	GetMaxOffset(topic string) int64
	Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error)
	DeleteTopic(topic string) error
	TimeToOffset(topic string, time time.Time) (int64, error)
}

Broker Encapsulates the generic broker interface

type KafkaBroker

type KafkaBroker struct {
	sync.Mutex

	Config   *sarama.Config
	Producer sarama.SyncProducer
	Client   sarama.Client
	Consumer sarama.Consumer
	Servers  []string
	// contains filtered or unexported fields
}

KafkaBroker struct

func NewKafkaBroker

func NewKafkaBroker(peers []string) *KafkaBroker

NewKafkaBroker creates a new kafka broker object

func (*KafkaBroker) CloseConnections

func (b *KafkaBroker) CloseConnections()

CloseConnections closes open producer, consumer and client

func (*KafkaBroker) Consume

func (b *KafkaBroker) Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error)

Consume function to consume a message from the broker

func (*KafkaBroker) DeleteTopic

func (b *KafkaBroker) DeleteTopic(topic string) error

DeleteTopic deletes the topic from the Kafka cluster

func (*KafkaBroker) GetMaxOffset

func (b *KafkaBroker) GetMaxOffset(topic string) int64

GetOffset returns a current topic's offset

func (*KafkaBroker) GetMinOffset

func (b *KafkaBroker) GetMinOffset(topic string) int64

GetOffset returns a current topic's offset

func (*KafkaBroker) InitConfig

func (b *KafkaBroker) InitConfig()

InitConfig creates a new configuration for kafka broker

func (*KafkaBroker) Initialize

func (b *KafkaBroker) Initialize(peers []string)

Initialize method is a retry wrapper for init (which attempts to connect to broker backend)

func (*KafkaBroker) Publish

func (b *KafkaBroker) Publish(topic string, msg messages.Message) (string, string, int, int64, error)

Publish function publish a message to the broker

func (*KafkaBroker) TimeToOffset

func (b *KafkaBroker) TimeToOffset(topic string, t time.Time) (int64, error)

TimeToOffset returns the offset of the first message with a timestamp equal or greater than the time given.

type MockBroker

type MockBroker struct {
	MsgList          []string
	Topics           map[string]string
	TopicTimeIndices map[string][]TimeToOffset
}

MockBroker struct

func (*MockBroker) CloseConnections

func (b *MockBroker) CloseConnections()

CloseConnections closes open producer, consumer and client

func (*MockBroker) Consume

func (b *MockBroker) Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error)

Consume function to consume a message from the broker

func (*MockBroker) DeleteTopic

func (b *MockBroker) DeleteTopic(topic string) error

Delete topic from the broker

func (*MockBroker) GetMaxOffset

func (b *MockBroker) GetMaxOffset(topic string) int64

GetOffset returns a current topic's offset

func (*MockBroker) GetMinOffset

func (b *MockBroker) GetMinOffset(topic string) int64

GetOffset returns a current topic's offset

func (*MockBroker) InitConfig

func (b *MockBroker) InitConfig()

InitConfig creates a new configuration for kafka broker

func (*MockBroker) Initialize

func (b *MockBroker) Initialize(peers []string)

Initialize the broker struct

func (*MockBroker) PopulateOne

func (b *MockBroker) PopulateOne()

PopulateOne Adds three messages to the mock broker

func (*MockBroker) PopulateThree

func (b *MockBroker) PopulateThree()

PopulateThree Adds three messages to the mock broker

func (*MockBroker) Publish

func (b *MockBroker) Publish(topic string, msg messages.Message) (string, string, int, int64, error)

Publish function publish a message to the broker

func (*MockBroker) TimeToOffset

func (b *MockBroker) TimeToOffset(topic string, time time.Time) (int64, error)

type TimeToOffset

type TimeToOffset struct {
	Timestamp time.Time
	Offset    int64
}

type TopicOffset

type TopicOffset struct {
	Offset int64 `json:"offset"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL