kafka

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2020 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// OffsetOldest uses sequence number of oldest known message as the current offset
	OffsetOldest = sarama.OffsetOldest
	// OffsetNewest option uses sequence number of newest message as the current offset
	OffsetNewest = sarama.OffsetNewest
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	// Name returns the name of this consumer group.
	Name() string
	// Topics returns the names of the topics being consumed.
	Topics() ConsumerTopicList
	// Start starts the consumer
	Start() error
	// Stop stops the consumer
	Stop()
	// Closed returns a channel which will be closed after this consumer is completely shutdown
	Closed() <-chan struct{}
	// Messages return the message channel for this consumer
	Messages() <-chan Message
	// MergeDLQ consumes the offset ranges for the partitions from the DLQ topic for the specified ConsumerTopic
	// Topic should be the __dlq topic name.
	MergeDLQ(cluster, group, topic string, partition int32, offsetRange OffsetRange) error
}

Consumer is the interface for a kafka consumer

type ConsumerConfig

type ConsumerConfig struct {
	// GroupName identifies your consumer group. Unless your application creates
	// multiple consumer groups (in which case it's suggested to have application name as
	// prefix of the group name), this should match your application name.
	GroupName string

	// TopicList is a list of consumer topics
	TopicList ConsumerTopicList

	// OffsetConfig is the offset-handling policy for this consumer group.
	Offsets struct {
		// Initial specifies the fallback offset configuration on consumer start.
		// The consumer will use the offsets persisted from its last run unless \
		// the offsets are too old or too new.
		Initial struct {
			// Offset is the initial offset to use if there is no previous offset
			// committed. Use OffsetNewest for high watermark and OffsetOldest for
			// low watermark. Defaults to OffsetOldest.
			Offset int64
		}

		// Commits a policy for committing consumer offsets to Kafka.
		Commits struct {
			// Enabled if you want the library to commit offsets on your behalf.
			// Defaults to true.
			//
			// The retry and dlq topic commit will always be committed for you since those topics are abstracted away from you.
			Enabled bool
		}
	}

	// Concurrency determines the number of concurrent messages to process.
	// When using the handler based API, this corresponds to the number of concurrent go
	// routines handler functions the library will run. Default is 1.
	Concurrency int

	// TLSConfig is the configuration to use for secure connections if
	// enabled (not nil) (defaults to disabled, nil).
	TLSConfig *tls.Config

	// SASLConfig is the configuration to use for SASL based auth
	SASLConfig *SASLConfig
}

ConsumerConfig describes the config for a consumer group

func NewConsumerConfig

func NewConsumerConfig(groupName string, topicList ConsumerTopicList) *ConsumerConfig

NewConsumerConfig returns ConsumerConfig with sane defaults.

func (ConsumerConfig) MarshalLogObject

func (c ConsumerConfig) MarshalLogObject(e zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.

type ConsumerTopic

type ConsumerTopic struct {
	Topic
	RetryQ     Topic
	DLQ        Topic
	MaxRetries int64 // MaxRetries = -1 for infinite retries.
}

ConsumerTopic contains information for a consumer topic.

func (ConsumerTopic) DLQEnabled

func (c ConsumerTopic) DLQEnabled() bool

DLQEnabled returns true if DLQ.Name and DLQ.Cluster are not empty.

func (ConsumerTopic) MarshalLogObject

func (c ConsumerTopic) MarshalLogObject(e zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.

type ConsumerTopicList

type ConsumerTopicList []ConsumerTopic

ConsumerTopicList is a list of consumer topics

func (ConsumerTopicList) GetConsumerTopicByClusterTopic

func (c ConsumerTopicList) GetConsumerTopicByClusterTopic(clusterName, topicName string) (ConsumerTopic, error)

GetConsumerTopicByClusterTopic returns the ConsumerTopic for the cluster, topic pair.

func (ConsumerTopicList) MarshalLogArray

func (c ConsumerTopicList) MarshalLogArray(e zapcore.ArrayEncoder) error

MarshalLogArray implements zapcore.ArrayMarshaler for structured logging.

func (ConsumerTopicList) TopicNames

func (c ConsumerTopicList) TopicNames() []string

TopicNames returns the list of topics to consume as a string array.

type Message

type Message interface {
	zapcore.ObjectMarshaler

	// Key is a mutable reference to the message's key.
	Key() []byte
	// Value is a mutable reference to the message's value.
	Value() []byte
	// Topic is the topic from which the message was read.
	Topic() string
	// Partition is the ID of the partition from which the message was read.
	Partition() int32
	// Offset is the message's offset.
	Offset() int64
	// Timestamp returns the timestamp for this message
	Timestamp() time.Time
	// RetryCount is an incrementing integer denoting the number of times this message has been redelivered.
	// The first delivery of the message will be 0, incrementing on each subsequent redelivery.
	RetryCount() int64
	// Ack marks the message as successfully processed.
	Ack() error
	// Nack marks the message processing as failed and the message will be retried or sent to DLQ.
	Nack() error
	// NackToDLQ marks the message processing as failed and sends it immediately to DLQ.
	NackToDLQ() error
}

Message is the interface for a Kafka message

type NameResolver

type NameResolver interface {
	// ResolveCluster returns a list of IP addresses for the brokers
	ResolveIPForCluster(cluster string) ([]string, error)
	// ResolveClusterForTopic returns the logical cluster names corresponding to a topic name
	//
	// It is possible for a topic to exist on multiple clusters in order to
	// transparently handle topic migration between clusters.
	// TODO (gteo): Remove to simplify API because not needed anymore
	ResolveClusterForTopic(topic string) ([]string, error)
}

NameResolver is an interface that will be used by the consumer library to resolve (1) topic to cluster name and (2) cluster name to broker IP addresses. Implementations of KafkaNameResolver should be threadsafe.

func NewStaticNameResolver

func NewStaticNameResolver(
	topicsToCluster map[string][]string,
	clusterToBrokers map[string][]string,
) NameResolver

NewStaticNameResolver returns a instance of NameResolver that relies on a static map of topic to list of brokers and map of topics to cluster

type OffsetRange

type OffsetRange struct {
	// LowOffset is the low watermark for this offset range.
	// -1 indicates the value is not set.
	LowOffset int64
	// HighOffset is the high watermark for this offset range.
	// -1 indicates the value is not set.
	HighOffset int64
}

OffsetRange is a range of offsets

func NewOffsetRange

func NewOffsetRange(low int64, high ...int64) OffsetRange

NewOffsetRange returns a new OffsetRange with the LowOffset of the range as specified. First variadic argument is used to set the HighOffset and all other variadic arguments are ignored. If no variadic arguments are provided, HighOffset is set to -1 to indicate that it is not set.

func (OffsetRange) MarshalLogObject

func (o OffsetRange) MarshalLogObject(e zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.

type SASLConfig added in v1.0.0

type SASLConfig struct {
	Enable    bool   `yaml:"enable"`
	User      string `yaml:"username"`
	Password  string `yaml:"password"`
	Mechanism string `yaml:"mechanism"`
}

SASLConfig for the config consumer

type Topic

type Topic struct {
	// Name for the topic
	Name string
	// Cluster is the logical name of the cluster to find this topic on.
	Cluster string
	// Delay is msg consumption delay applied on the topic.
	Delay time.Duration
}

Topic contains information for a topic. Our topics are uniquely defined by a Topic Name and Cluster pair.

func (Topic) HashKey

func (t Topic) HashKey() string

HashKey converts topic to a string for use as a map key

func (Topic) MarshalLogObject

func (t Topic) MarshalLogObject(e zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL