consumer

package
v0.0.0-...-158209a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2016 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Offset

type Offset struct {
	// contains filtered or unexported fields
}

Offset consumes given topic-partition starting at an offset provided.

func NewOffset

func NewOffset(cfg *OffsetConfig) (*Offset, error)

NewOffset creates a new Offset that immediately starts consuming and whose messages are available on the Messages() channel.

func (*Offset) CommitOffset

func (oc *Offset) CommitOffset(offset int64) error

CommitOffset writes the provided offset to kafka.

func (*Offset) Consume

func (oc *Offset) Consume() <-chan *sarama.ConsumerMessage

Consume returns a channel of Kafka messages on this topic-partition starting at the provided offset. This channel will close when there is a non-recoverable error, or the context provided at creation time closes.

func (*Offset) Err

func (oc *Offset) Err() error

Err should be called after the Messages() channel closes to determine if there was an error during processing.

func (*Offset) HighWaterMarkOffset

func (oc *Offset) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the last reported highwatermark offset for the partition this consumer is reading.

type OffsetConfig

type OffsetConfig struct {
	CacheDuration time.Duration
	Client        sarama.Client
	Context       context.Context
	Coordinator   *cg.Coordinator
	Offset        int64
	Partition     int32
	Topic         string
}

OffsetConfig is all the instantiated dependencies needed to run an Offset.

type Seek

type Seek struct {
	// contains filtered or unexported fields
}

Seek consumes given topic-partition starting at an offset determined by the provided Seek function.

func NewSeek

func NewSeek(cfg *SeekConfig) (*Seek, error)

NewSeek creates a new Seek that immediately starts consuming and whose messages are available on the Messages() channel.

func (*Seek) CommitOffset

func (sk *Seek) CommitOffset(offset int64) error

CommitOffset writes the provided offset to kafka.

func (*Seek) Consume

func (sk *Seek) Consume() <-chan *sarama.ConsumerMessage

Consume returns a channel of Kafka messages on this topic-partition starting at the provided offset. This channel will close when there is a non-recoverable error, or the context provided at creation time closes.

func (*Seek) Err

func (sk *Seek) Err() error

Err should be called after the Messages() channel closes to determine if there was an error during processing.

func (*Seek) HighWaterMarkOffset

func (sk *Seek) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the last reported highwatermark offset for the partition this consumer is reading.

type SeekConfig

type SeekConfig struct {
	CacheDuration time.Duration
	Client        sarama.Client
	Context       context.Context
	Coordinator   *cg.Coordinator
	Partition     int32
	SeekFn        SeekFn
	Topic         string
}

SeekConfig is needed to create a new Seek.

type SeekFn

type SeekFn func(topic string, partition int32) (int64, error)

SeekFn returns an offset for the Seek to begin reading from. Seek is provided a topic, partition. This can be used for an application that needs a history of messages for context before the application can begin reading at the last committed offset.

type StartPosition

type StartPosition int

StartPosition is where the TimeWindow should start at to seek back the Window duration.

const (
	// OffsetGroup starts at the committed offset, if no offset is commited or the offset is out of range
	// of what is readable, the consumer will error upon reading.
	OffsetGroup StartPosition = iota
	// OffsetNewest starts at the newest message in Kafka and then seeks back Window amount.
	OffsetNewest
)

type TimeWindow

type TimeWindow struct {
	// contains filtered or unexported fields
}

TimeWindow is a consumer that finds the current offset in the group for the given partition-topic, discovers what time that message happened, and then rewinds to past offsets until the provided Window of time is acheived.

func NewTimeWindow

func NewTimeWindow(cfg *TimeWindowConfig) (*TimeWindow, error)

NewTimeWindow creates a new consumer that is ready to begin reading.

func (*TimeWindow) CommitOffset

func (twc *TimeWindow) CommitOffset(offset int64) error

CommitOffset writes the provided offset to kafka.

func (*TimeWindow) Consume

func (twc *TimeWindow) Consume() <-chan *sarama.ConsumerMessage

Consume returns a channel of Kafka messages on this topic-partition starting at the provided offset. This channel will close when there is a non-recoverable error, or the context provided at creation time closes.

func (*TimeWindow) Err

func (twc *TimeWindow) Err() error

Err should be called after the Messages() channel closes to determine if there was an error during processing.

func (*TimeWindow) HighWaterMarkOffset

func (twc *TimeWindow) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the last reported highwatermark offset for the partition this consumer is reading.

type TimeWindowConfig

type TimeWindowConfig struct {
	CacheDuration time.Duration
	Client        sarama.Client
	Context       context.Context
	Coordinator   *cg.Coordinator
	Start         StartPosition
	Partition     int32
	Topic         string
	Window        time.Duration
}

TimeWindowConfig is used to create a new TimeWindow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL