kafka

package
v0.0.0-...-8c342a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2017 License: BSD-3-Clause Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// OffsetNewest defines the newest offset to read from using the consumer
	OffsetNewest = -1
	// OffsetOldest defines the oldest offset to read from using the consumer
	OffsetOldest = -2
)

Variables

This section is empty.

Functions

func CreateDefaultKafkaConfig

func CreateDefaultKafkaConfig(clientID string, initialOffset int64, registry metrics.Registry) *cluster.Config

CreateDefaultKafkaConfig creates a (bsm) sarama configuration with default values.

Types

type Assignment

type Assignment map[int32]int64

Assignment represents a partition:offset assignment for the current connection

type BOF

type BOF struct {
	Topic     string
	Partition int32
	Offset    int64
	Hwm       int64
}

BOF marks the beginning of a topic/partition.

type Consumer

type Consumer interface {
	Events() <-chan Event

	// group consume assumes co-partioned topics
	Subscribe(topics map[string]int64) error
	AddGroupPartition(partition int32)
	Commit(topic string, partition int32, offset int64) error

	// consume individual topic/partitions
	AddPartition(topic string, partition int32, initialOffset int64)
	RemovePartition(topic string, partition int32)

	// Close stops closes the events channel
	Close() error
}

Consumer abstracts a kafka consumer

func NewSaramaConsumer

func NewSaramaConsumer(brokers []string, group string, registry metrics.Registry) (Consumer, error)

NewSaramaConsumer creates a new Consumer using sarama

type EOF

type EOF struct {
	Topic     string
	Partition int32
	Hwm       int64
}

EOF marks the end of the log of a topic/partition.

type Error

type Error struct {
	Err error
}

Error from kafka wrapped to be conform with the Event-Interface

type Event

type Event interface {
	// contains filtered or unexported methods
}

Event abstracts different types of events from the kafka consumer like BOF/EOF/Error or an actual message

type Message

type Message struct {
	Topic     string
	Partition int32
	Offset    int64

	Key   string
	Value []byte
}

Message represents a message from kafka containing extra information like topic, partition and offset for convenience

type NOP

type NOP struct {
	Topic     string
	Partition int32
}

NOP does not carry any information. Useful for debugging.

type Producer

type Producer interface {
	// Emit sends a message to topic.
	// TODO (franz): this method should return a promise, instead of getting one.
	// Otherwise a callback is sufficient
	Emit(topic string, key string, value []byte) *Promise
	Close() error
}

Producer abstracts the kafka producer

func NewProducer

func NewProducer(brokers []string, registry metrics.Registry) (Producer, error)

NewProducer creates new kafka producer for passed brokers.

type Promise

type Promise struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Promise as in https://en.wikipedia.org/wiki/Futures_and_promises

func NewPromise

func NewPromise() *Promise

NewPromise creates a new Promise

func (*Promise) Finish

func (p *Promise) Finish(err error) *Promise

Finish finishes the promise by executing all callbacks and saving the message/error for late subscribers

func (*Promise) Then

func (p *Promise) Then(s func(err error)) *Promise

Then chains a callback to the Promise

type TopicManager

type TopicManager interface {
	// EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible
	EnsureTableExists(topic string, npar int) error
	// EnsureStreamExists checks that a stream topic exists, or create one if possible
	EnsureStreamExists(topic string, npar int) error

	// Partitions returns the number of partitions of a topic, that are assigned to the running
	// instance, i.e. it doesn't represent all partitions of a topic.
	Partitions(topic string) ([]int32, error)

	// Close closes the topic manager
	Close() error
}

TopicManager provides an interface to create/check topics and their partitions

func NewSaramaTopicManager

func NewSaramaTopicManager(brokers []string) (TopicManager, error)

NewSaramaTopicManager creates a new topic manager using the sarama library

func NewTopicManager

func NewTopicManager(servers []string, config *TopicManagerConfig) (TopicManager, error)

NewTopicManager creates a new topic manager for managing topics with zookeeper

type TopicManagerConfig

type TopicManagerConfig struct {
	Table struct {
		Replication int
	}
	Stream struct {
		Replication int
		Retention   time.Duration
	}
}

TopicManagerConfig contains the configuration to access the Zookeeper servers as well as the desired options of to create tables and stream topics.

func NewTopicManagerConfig

func NewTopicManagerConfig() *TopicManagerConfig

NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 1 and rentention time of 1 hour.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL