zoo

package
v0.0.0-...-0e65dd1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2015 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func TestConnect

func TestConnect(hosts []string) error

Types

type ByPartition

type ByPartition struct{ Offsets }

func (ByPartition) Less

func (p ByPartition) Less(i, j int) bool

type KafkaKeeper

type KafkaKeeper struct {
	Consumer *TopicConsumer
	State    *KafkaState
	// contains filtered or unexported fields
}

KafkaKeeper - Used to manage consumer per topic, per application consuming > /{TopicConsumer.Root}/{topic}/consumers/{consumer-app}/partitions

/{partition}/consumed/{offset}

func NewKafkaKeeper

func NewKafkaKeeper(hosts []string, c *TopicConsumer, s *KafkaState) *KafkaKeeper

func (*KafkaKeeper) GetOffsets

func (z *KafkaKeeper) GetOffsets() ([]*PartitionOffset, error)

GetOffsets - returns an array sorted by Partition ascending

func (*KafkaKeeper) SetOffset

func (z *KafkaKeeper) SetOffset(partition int32, offset int64) error

type KafkaOffsets

type KafkaOffsets struct {
	ZookeeperHosts []string
	// ConsumerApp - consumer application get's it's own distinct offsets on the topic
	ConsumerApp string
}

func (*KafkaOffsets) Offsets

func (ko *KafkaOffsets) Offsets(topic string) ([]*PartitionOffset, error)

func (*KafkaOffsets) SetOffset

func (ko *KafkaOffsets) SetOffset(topic string, partition int32, offset int64) error

type KafkaState

type KafkaState struct {
	Topic string
	// Root - typically blank or "/kafka"
	Root string
}

func (*KafkaState) PartitionsPath

func (ks *KafkaState) PartitionsPath() string

type Offsets

type Offsets []*PartitionOffset

func (Offsets) Len

func (o Offsets) Len() int

func (Offsets) Swap

func (o Offsets) Swap(i, j int)

type PartitionOffset

type PartitionOffset struct {
	Partition int32
	Offset    int64
}

type TopicConsumer

type TopicConsumer struct {
	Topic string
	// ConsumerApp - typically the name of the app or function for it's own dedicated set of offsets
	ConsumerApp string
	// Root - suggested name /kafka-topics
	Root string
}

func (*TopicConsumer) PartitionsPath

func (tc *TopicConsumer) PartitionsPath() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL