consumer

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2018 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Client          sarama.Client
	Topic           string
	Partitions      map[int32]bool  // which partitions to consume
	TMax            *time.Time      // abort consumption if a message later than this is received
	BeyondHighWater bool            // if set, wait for new messages. otherwise, consumption stops when last message (acc to high wtr) is received.
	StartOffset     map[int32]int64 // start offset per partition
}

func New

func New(client sarama.Client, topic string, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Messages

func (c *Consumer) Messages(ctx context.Context) (chan *sarama.ConsumerMessage, chan error, error)

type ConsumerOption

type ConsumerOption func(*Consumer) error

func AllPartitions

func AllPartitions() ConsumerOption

func BeyondHighWaterMark

func BeyondHighWaterMark(b bool) ConsumerOption

func FromCurrentOffset

func FromCurrentOffset() ConsumerOption

func FromOldestOffset

func FromOldestOffset() ConsumerOption

func FromTime

func FromTime(t time.Time) ConsumerOption

func Nop

func Nop() ConsumerOption

func SpecificPartitions

func SpecificPartitions(ps map[int32]bool) ConsumerOption

func UntilTime

func UntilTime(t time.Time) ConsumerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL