input

package
v0.0.0-...-d8c2abc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2018 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfigKafka

type ConfigKafka struct {
	Zookeepers        string        `required:"true" desc:"Zookeeper nodes for offset storage"`                                                              // CFMR_KAFKA_ZOOKEEPERS
	Topics            []string      `required:"true" desc:"Topics to read events from"`                                                                      // CFMR_KAFKA_TOPICS
	ConsumerGroup     string        `required:"true" desc:"Name of the Kafka consumer group"`                                                                // CFMR_KAFKA_CONSUMERGROUP
	ProcessingTimeout time.Duration `default:"1m" desc:"Time to wait for all the offsets for a partition to be processed after stopping to consume from it"` // CFMR_KAFKA_PROCESSINGTIMEOUT
	OffsetNewest      bool          `default:"false" desc:"If true start from the newest message in Kafka in case the offset in zookeeper does not exist"`   // CFMR_KAFKA_OFFSETNEWEST
}

type KafkaConsumer

type KafkaConsumer struct {
	// FIXME: these should be private
	CG      *consumergroup.ConsumerGroup
	Offsets map[string]map[int32]int64
}

func NewKafkaConsumer

func NewKafkaConsumer(i *ConfigKafka) (*KafkaConsumer, error)

Create Kafka consumer group

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close() error

Close Kafka consumer group

func (*KafkaConsumer) Process

func (c *KafkaConsumer) Process(message *sarama.ConsumerMessage) (*transformer.Envelope, error)

func (*KafkaConsumer) Read

func (c *KafkaConsumer) Read() (*transformer.Envelope, error)

Read message from Kafka

type Reader

type Reader interface {
	Read() (*transformer.Envelope, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL