Documentation ¶
Index ¶
- Variables
- type Broker
- type KafkaBroker
- func (b *KafkaBroker) CloseConnections()
- func (b *KafkaBroker) Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error)
- func (b *KafkaBroker) DeleteTopic(topic string) error
- func (b *KafkaBroker) GetMaxOffset(topic string) int64
- func (b *KafkaBroker) GetMinOffset(topic string) int64
- func (b *KafkaBroker) InitConfig()
- func (b *KafkaBroker) Initialize(peers []string)
- func (b *KafkaBroker) Publish(topic string, msg messages.Message) (string, string, int, int64, error)
- func (b *KafkaBroker) TimeToOffset(topic string, t time.Time) (int64, error)
- type MockBroker
- func (b *MockBroker) CloseConnections()
- func (b *MockBroker) Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error)
- func (b *MockBroker) DeleteTopic(topic string) error
- func (b *MockBroker) GetMaxOffset(topic string) int64
- func (b *MockBroker) GetMinOffset(topic string) int64
- func (b *MockBroker) InitConfig()
- func (b *MockBroker) Initialize(peers []string)
- func (b *MockBroker) PopulateOne()
- func (b *MockBroker) PopulateThree()
- func (b *MockBroker) Publish(topic string, msg messages.Message) (string, string, int, int64, error)
- func (b *MockBroker) TimeToOffset(topic string, time time.Time) (int64, error)
- type TimeToOffset
- type TopicOffset
Constants ¶
This section is empty.
Variables ¶
var ErrOffsetOff = errors.New("Offset is off")
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface { InitConfig() Initialize(peers []string) CloseConnections() Publish(topic string, payload messages.Message) (string, string, int, int64, error) GetMinOffset(topic string) int64 GetMaxOffset(topic string) int64 Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error) DeleteTopic(topic string) error TimeToOffset(topic string, time time.Time) (int64, error) }
Broker Encapsulates the generic broker interface
type KafkaBroker ¶
type KafkaBroker struct { sync.Mutex Config *sarama.Config Producer sarama.SyncProducer Client sarama.Client Consumer sarama.Consumer Servers []string // contains filtered or unexported fields }
KafkaBroker struct
func NewKafkaBroker ¶
func NewKafkaBroker(peers []string) *KafkaBroker
NewKafkaBroker creates a new kafka broker object
func (*KafkaBroker) CloseConnections ¶
func (b *KafkaBroker) CloseConnections()
CloseConnections closes open producer, consumer and client
func (*KafkaBroker) Consume ¶
func (b *KafkaBroker) Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error)
Consume function to consume a message from the broker
func (*KafkaBroker) DeleteTopic ¶
func (b *KafkaBroker) DeleteTopic(topic string) error
DeleteTopic deletes the topic from the Kafka cluster
func (*KafkaBroker) GetMaxOffset ¶
func (b *KafkaBroker) GetMaxOffset(topic string) int64
GetOffset returns a current topic's offset
func (*KafkaBroker) GetMinOffset ¶
func (b *KafkaBroker) GetMinOffset(topic string) int64
GetOffset returns a current topic's offset
func (*KafkaBroker) InitConfig ¶
func (b *KafkaBroker) InitConfig()
InitConfig creates a new configuration for kafka broker
func (*KafkaBroker) Initialize ¶
func (b *KafkaBroker) Initialize(peers []string)
Initialize method is a retry wrapper for init (which attempts to connect to broker backend)
func (*KafkaBroker) Publish ¶
func (b *KafkaBroker) Publish(topic string, msg messages.Message) (string, string, int, int64, error)
Publish function publish a message to the broker
func (*KafkaBroker) TimeToOffset ¶
TimeToOffset returns the offset of the first message with a timestamp equal or greater than the time given.
type MockBroker ¶
type MockBroker struct { MsgList []string Topics map[string]string TopicTimeIndices map[string][]TimeToOffset }
MockBroker struct
func (*MockBroker) CloseConnections ¶
func (b *MockBroker) CloseConnections()
CloseConnections closes open producer, consumer and client
func (*MockBroker) Consume ¶
func (b *MockBroker) Consume(ctx context.Context, topic string, offset int64, imm bool, max int64) ([]string, error)
Consume function to consume a message from the broker
func (*MockBroker) DeleteTopic ¶
func (b *MockBroker) DeleteTopic(topic string) error
Delete topic from the broker
func (*MockBroker) GetMaxOffset ¶
func (b *MockBroker) GetMaxOffset(topic string) int64
GetOffset returns a current topic's offset
func (*MockBroker) GetMinOffset ¶
func (b *MockBroker) GetMinOffset(topic string) int64
GetOffset returns a current topic's offset
func (*MockBroker) InitConfig ¶
func (b *MockBroker) InitConfig()
InitConfig creates a new configuration for kafka broker
func (*MockBroker) Initialize ¶
func (b *MockBroker) Initialize(peers []string)
Initialize the broker struct
func (*MockBroker) PopulateOne ¶
func (b *MockBroker) PopulateOne()
PopulateOne Adds three messages to the mock broker
func (*MockBroker) PopulateThree ¶
func (b *MockBroker) PopulateThree()
PopulateThree Adds three messages to the mock broker
func (*MockBroker) Publish ¶
func (b *MockBroker) Publish(topic string, msg messages.Message) (string, string, int, int64, error)
Publish function publish a message to the broker
func (*MockBroker) TimeToOffset ¶
type TimeToOffset ¶
type TopicOffset ¶
type TopicOffset struct {
Offset int64 `json:"offset"`
}