kafka

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2019 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SASLMechanismNone     = "none"
	SASLMechanismPlain    = "plain"
	SASLMechanismSCRAM256 = "scram-sha-256"
	SASLMechanismSCRAM512 = "scram-sha-512"
)

Variables

View Source
var (
	// ErrEmptyEnvironment occurs when the provided environment is empty.
	ErrEmptyEnvironment = errors.New("The environment cannot be empty")

	// ErrEmptyTopic occurs when the provided topic is empty.
	ErrEmptyTopic = errors.New("The topic cannot be empty")
)
View Source
var (
	DefaultClusterVersion = sarama.MaxVersion.String()
)

Functions

This section is empty.

Types

type Broker added in v1.0.0

type Broker struct {
	Address string
	ID      int
	Meta    *BrokerMetadata
}

func (Broker) String added in v1.0.0

func (b Broker) String() string

type BrokerMetadata added in v1.0.0

type BrokerMetadata struct {
	Version int
	Topics  []Topic
}

type Callback

type Callback func(topic string, partition int32, offset int64, time time.Time, key, value []byte) error

Callback the function which will get called upon receiving a message from Kafka.

type Checkpoint added in v0.0.7

type Checkpoint struct {
	// contains filtered or unexported fields
}

Checkpoint represents a point in time or offset, from which the consumer has to start consuming from the specified topic.

func NewCheckpoint added in v0.0.7

func NewCheckpoint(rewind bool) *Checkpoint

NewCheckpoint creates a new checkpoint instance.

In rewind mode, the consumer will start consuming from the oldest available offset which means to consume all the old messages from the beginning of the stream.

func (*Checkpoint) Mode added in v0.0.7

func (c *Checkpoint) Mode() OffsetMode

Mode returns the current mode of the checkpoint.

func (*Checkpoint) Offset added in v0.0.7

func (c *Checkpoint) Offset() int64

Offset returns the final offset value from which consuming will be started.

In MillisecondsOffsetMode, the offset will be the milliseconds of the specified time. This is what Kafka needs to figure out the closest available offset at the given time.

func (*Checkpoint) OffsetString added in v0.0.7

func (c *Checkpoint) OffsetString() string

OffsetString returns the string representation of the time offset in `02-01-2006T15:04:05.999999999` format if in MillisecondsOffsetMode mode, otherwise returns the string representation of the offset value.

func (*Checkpoint) SetOffset added in v0.0.7

func (c *Checkpoint) SetOffset(offset int64)

SetOffset sets the offset of the checkpoint and switches the mode to ExplicitOffsetMode.

func (*Checkpoint) SetTimeOffset added in v0.0.7

func (c *Checkpoint) SetTimeOffset(at time.Time)

SetTimeOffset sets the offset to the milliseconds of the given time and sets the mode to MillisecondsOffsetMode.

func (*Checkpoint) TimeOffset added in v0.0.7

func (c *Checkpoint) TimeOffset() time.Time

TimeOffset returns the originally provided time value of the time-based offset in MillisecondsOffsetMode mode.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a new Kafka cluster consumer.

func NewConsumer

func NewConsumer(brokers []string, printer internal.Printer, environment string, enableAutoTopicCreation bool, options ...Option) (*Consumer, error)

NewConsumer creates a new instance of Kafka cluster consumer.

func (*Consumer) Close

func (c *Consumer) Close()

Close closes the Kafka consumer.

func (*Consumer) Events added in v0.0.11

func (c *Consumer) Events() <-chan *Event

Events the channel to which the Kafka events will be published.

You MUST listen to this channel before you start the consumer to avoid deadlock.

func (*Consumer) GetTopics

func (c *Consumer) GetTopics(search *regexp.Regexp) ([]string, error)

GetTopics fetches the topics from the server.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context, topics map[string]*Checkpoint) error

Start starts consuming from the specified topics and executes the callback function on each message.

This is a blocking call which will be terminated on cancellation of the context parameter. The method returns error if the topic list is empty or the callback function is nil.

func (*Consumer) StoreOffset added in v0.0.11

func (c *Consumer) StoreOffset(event *Event)

StoreOffset stores the offset of the successfully processed message into the offset store.

type ConsumerGroup added in v1.0.0

type ConsumerGroup struct {
	// Members the clients attached to the consumer groups.
	Members []GroupMember
	// TopicOffsets the offsets of each topic belong to the group.
	TopicOffsets TopicPartitionOffset
}

ConsumerGroup represents a consumer group.

type ConsumerGroups added in v1.0.0

type ConsumerGroups map[string]*ConsumerGroup

ConsumerGroups the map of consumer groups keyed by consumer group ID.

func (ConsumerGroups) Names added in v1.0.0

func (c ConsumerGroups) Names() []string

Names returns the names of the consumer groups

type Event added in v0.0.11

type Event struct {
	// Topic the topic from which the message was consumed.
	Topic string
	// Key partition key.
	Key []byte
	// Value message content.
	Value []byte
	// Timestamp message timestamp.
	Timestamp time.Time
	// Partition the Kafka partition to which the message belong.
	Partition int32
	// Offset the message offset.
	Offset int64
}

Event Kafka event.

type GroupMember added in v1.0.0

type GroupMember struct {
	// ID the member identifier.
	ID string
	// ClientID client ID.
	ClientID string
	// Host the host name/IP of the client machine.
	Host string
}

GroupMember represents a consumer group member.

func (GroupMember) String added in v1.0.0

func (g GroupMember) String() string

type LocalOffsetManager added in v1.0.0

type LocalOffsetManager struct {
	*internal.Logger
	// contains filtered or unexported fields
}

func NewLocalOffsetManager added in v1.0.0

func NewLocalOffsetManager(level internal.VerbosityLevel) *LocalOffsetManager

func (*LocalOffsetManager) GetOffsetFiles added in v1.0.0

func (l *LocalOffsetManager) GetOffsetFiles(environment string, topicFilter *regexp.Regexp) ([]string, error)

GetOffsetFiles returns a list of all the offset files for the given environment.

func (*LocalOffsetManager) ListLocalOffsets added in v1.0.0

func (l *LocalOffsetManager) ListLocalOffsets(topicFilter *regexp.Regexp, envFilter *regexp.Regexp) (map[string]TopicPartitionOffset, error)

ListLocalOffsets lists the locally stored offsets for the the topics of all the available environments.

The returned map is keyed by the environment name.

func (*LocalOffsetManager) ReadLocalTopicOffsets added in v1.0.0

func (l *LocalOffsetManager) ReadLocalTopicOffsets(topic string, environment string) (PartitionOffset, error)

ReadLocalTopicOffsets returns the locally stored offsets of the given topic for the specified environment if exists.

If there is no local offsets, the method will return an empty partition-offset map.

type Manager added in v1.0.0

type Manager struct {
	*internal.Logger
	// contains filtered or unexported fields
}

Manager a type to query Kafka metadata.

func NewManager added in v1.0.0

func NewManager(brokers []string, verbosity internal.VerbosityLevel, options ...Option) (*Manager, error)

NewManager creates a new instance of Kafka manager

func (*Manager) Close added in v1.0.0

func (m *Manager) Close()

Close closes the underlying Kafka connection.

func (*Manager) DeleteConsumerGroup added in v1.0.0

func (m *Manager) DeleteConsumerGroup(group string) error

func (*Manager) DeleteTopic added in v1.0.0

func (m *Manager) DeleteTopic(topic string) error

func (*Manager) GetBrokers added in v1.0.0

func (m *Manager) GetBrokers(ctx context.Context, includeMetadata bool) ([]Broker, error)

GetBrokers returns the current set of active brokers as retrieved from cluster metadata.

func (*Manager) GetConsumerGroups added in v1.0.0

func (m *Manager) GetConsumerGroups(ctx context.Context, includeMembers bool, memberFilter, groupFilter *regexp.Regexp, topics []string) (ConsumerGroups, error)

func (*Manager) GetGroupTopics added in v1.0.1

func (m *Manager) GetGroupTopics(ctx context.Context, group string, includeOffsets bool, topicFilter *regexp.Regexp) (TopicPartitionOffset, error)

func (*Manager) GetTopics added in v1.0.0

func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp, includeOffsets bool, environment string) (TopicPartitionOffset, error)

GetTopics loads a list of the available topics from the server.

type Offset added in v1.0.0

type Offset struct {
	// Latest the latest available offset of the partition reported by the server.
	Latest int64
	// Current the current value of the local or consumer group offset. This is where the consumer up to.
	Current int64
}

Offset represents an offset pair for a given partition.

A pair contains the latest offset of the partition reported by the server and the local or consumer group offset.

func (Offset) Lag added in v1.0.0

func (o Offset) Lag() int64

Lag calculates the lag between the latest and the current offset values.

func (Offset) String added in v1.0.0

func (o Offset) String(latest bool) string

String returns the string representation of the given offset.

type OffsetMode added in v0.0.7

type OffsetMode int8

OffsetMode represents the offset mode for a checkpoint.

const (
	// UndefinedOffsetMode the user has not requested for any specific offset.
	UndefinedOffsetMode OffsetMode = iota
	// MillisecondsOffsetMode the closet available offset at a given time will be fetched from the server
	// before the consumer starts pulling messages from Kafka.
	MillisecondsOffsetMode
	// ExplicitOffsetMode the user has explicitly asked for a specific offset.
	ExplicitOffsetMode
)

type Option

type Option func(options *Options)

Option represents a configuration function.

func WithClusterVersion

func WithClusterVersion(version string) Option

WithClusterVersion kafka cluster version.

func WithLogWriter added in v1.0.0

func WithLogWriter(writer io.Writer) Option

WithLogWriter sets the writer to write the internal Sarama logs to.

func WithSASL added in v0.0.8

func WithSASL(mechanism, username, password string) Option

WithSASL enables SASL authentication.

func WithTLS added in v0.0.8

func WithTLS(tls *tls.Config) Option

WithTLS enables TLS.

type Options

type Options struct {
	// DisableErrorReporting disables sending consumer errors to the Errors() channel.
	DisableErrorReporting bool
	// ClusterVersion kafka cluster version.
	ClusterVersion string
	// TLS configuration to connect to Kafka cluster.
	TLS *tls.Config
	// contains filtered or unexported fields
}

Options holds the configuration settings for kafka consumer.

func NewOptions

func NewOptions() *Options

NewOptions creates a new Options object with default values.

type PartitionOffset added in v1.0.0

type PartitionOffset map[int32]Offset

PartitionOffset represents a map of partition offset pairs.

func ToPartitionOffset added in v1.0.0

func ToPartitionOffset(po map[int32]int64, latest bool) PartitionOffset

ToPartitionOffset creates a new PartitionOffset map from a raw map.

Set latest parameter to true, if you would like to set the Latest offset value instead of the Current value.

func (PartitionOffset) SortPartitions added in v1.0.0

func (p PartitionOffset) SortPartitions() []int

SortPartitions returns a list of sorted partitions.

type Topic added in v1.0.0

type Topic struct {
	Name               string
	NumberOdPartitions int
}

type TopicPartitionOffset added in v1.0.0

type TopicPartitionOffset map[string]PartitionOffset

TopicPartitionOffset represents a map of topic offset pairs for all the partitions.

func ToTopicPartitionOffset added in v1.0.0

func ToTopicPartitionOffset(tpo map[string]map[int32]int64, latest bool) TopicPartitionOffset

ToTopicPartitionOffset creates a new TopicPartitionOffset from a raw map.

Set latest parameter to true, if you would like to set the Latest offset value instead of the Current value.

func (TopicPartitionOffset) SortedTopics added in v1.0.0

func (t TopicPartitionOffset) SortedTopics() []string

SortedTopics returns a list of sorted topics.

type TopicsByName added in v1.0.0

type TopicsByName []Topic

func (TopicsByName) Len added in v1.0.0

func (t TopicsByName) Len() int

func (TopicsByName) Less added in v1.0.0

func (t TopicsByName) Less(i, j int) bool

func (TopicsByName) Swap added in v1.0.0

func (t TopicsByName) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL