consumer

package
v0.0.0-...-a4f9db0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2016 License: GPL-3.0 Imports: 6 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CloseKafkaConsumer

func CloseKafkaConsumer(consumer sarama.Consumer) error

Closes the Kafka Consumer.

func ConsumeFromPartition

func ConsumeFromPartition(consumer sarama.Consumer, topic string, partition int32, initialOffset int64) (sarama.PartitionConsumer, error)

Attaches an initialized Kafka Consumer with a Kafka topic, in conjunction with a given topic partition. The topic will be read from the point specified by the initial offset.

func CreateNewConsumer

func CreateNewConsumer(brokerlist []string) (sarama.Consumer, error)

Creates new Kafka generic consumer attached to a set of brokers.

func GetPartitions

func GetPartitions(c sarama.Consumer, topic string, partitions string) ([]int32, error)

Retrieves the list of partitions available for a given Kafka topic. - c sarama.Consumer: the initialized Kafka consumer - topic string: the kafka topic to use - paritions string : the comma separated list of partitions ids to use. 'all' to use all the available.

func ReceiveMessages

func ReceiveMessages(consumerInstance sarama.Consumer,
	topic string, initialOffset int64,
	partitionList []int32,
	messages chan *sarama.ConsumerMessage,
	wg sync.WaitGroup,
	closing chan struct{})

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL