kazoo-go: github.com/wvanbergen/kazoo-go Index | Files

package kazoo

import "github.com/wvanbergen/kazoo-go"

Index

Package Files

consumergroup.go kazoo.go topic_admin.go topic_metadata.go

Variables

var (
    ErrRunningInstances          = errors.New("Cannot deregister a consumergroup with running instances")
    ErrInstanceAlreadyRegistered = errors.New("Consumer instance already registered")
    ErrInstanceNotRegistered     = errors.New("Consumer instance not registered")
    ErrPartitionClaimedByOther   = errors.New("Cannot claim partition: it is already claimed by another instance")
    ErrPartitionNotClaimed       = errors.New("Cannot release partition: it is not claimed by this instance")
)
var (
    ErrTopicExists          = errors.New("Topic already exists")
    ErrTopicMarkedForDelete = errors.New("Topic is already marked for deletion")
    ErrDeletionTimedOut     = errors.New("Timed out while waiting for a topic to be deleted")
)
var (
    ErrInvalidPartitionCount    = errors.New("Number of partitions must be larger than 0")
    ErrInvalidReplicationFactor = errors.New("Replication factor must be between 1 and the number of brokers")
    ErrInvalidReplicaCount      = errors.New("All partitions must have the same number of replicas")
    ErrReplicaBrokerOverlap     = errors.New("All replicas for a partition must be on separate brokers")
    ErrInvalidBroker            = errors.New("Replica assigned to invalid broker")
    ErrMissingPartitionID       = errors.New("Partition ids must be sequential starting from 0")
    ErrDuplicatePartitionID     = errors.New("Each partition must have a unique ID")
)
var (
    FailedToClaimPartition = errors.New("Failed to claim partition for this consumer instance. Do you have a rogue consumer running?")
)

func BuildConnectionString Uses

func BuildConnectionString(nodes []string) string

BuildConnectionString builds a Zookeeper connection string for a list of nodes. Returns a string like "zk1:2181,zk2:2181,zk3:2181"

func BuildConnectionStringWithChroot Uses

func BuildConnectionStringWithChroot(nodes []string, chroot string) string

ConnectionStringWithChroot builds a Zookeeper connection string for a list of nodes and a chroot. The chroot should start with "/". Returns a string like "zk1:2181,zk2:2181,zk3:2181/chroot"

func ParseConnectionString Uses

func ParseConnectionString(zookeeper string) (nodes []string, chroot string)

ParseConnectionString parses a zookeeper connection string in the form of host1:2181,host2:2181/chroot and returns the list of servers, and the chroot.

type Config Uses

type Config struct {
    // The chroot the Kafka installation is registerde under. Defaults to "".
    Chroot string

    // The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster
    // before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second.
    Timeout time.Duration

    // Logger
    Logger zk.Logger
}

Config holds configuration values f.

func NewConfig Uses

func NewConfig() *Config

NewConfig instantiates a new Config struct with sane defaults.

type Consumergroup Uses

type Consumergroup struct {
    Name string
    // contains filtered or unexported fields
}

Consumergroup represents a high-level consumer that is registered in Zookeeper,

func (*Consumergroup) CommitOffset Uses

func (cg *Consumergroup) CommitOffset(topic string, partition int32, offset int64) error

CommitOffset commits an offset to a group/topic/partition

func (*Consumergroup) Create Uses

func (cg *Consumergroup) Create() error

Create registers the consumergroup in zookeeper

func (*Consumergroup) Delete Uses

func (cg *Consumergroup) Delete() error

Delete removes the consumergroup from zookeeper

func (*Consumergroup) Exists Uses

func (cg *Consumergroup) Exists() (bool, error)

Exists checks whether the consumergroup has been registered in Zookeeper

func (*Consumergroup) FetchAllOffsets Uses

func (cg *Consumergroup) FetchAllOffsets() (map[string]map[int32]int64, error)

FetchOffset retrieves all the commmitted offsets for a group

func (*Consumergroup) FetchOffset Uses

func (cg *Consumergroup) FetchOffset(topic string, partition int32) (int64, error)

FetchOffset retrieves an offset to a group/topic/partition

func (*Consumergroup) Instance Uses

func (cg *Consumergroup) Instance(id string) *ConsumergroupInstance

Instance instantiates a new ConsumergroupInstance inside this consumer group, using an existing ID.

func (*Consumergroup) Instances Uses

func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error)

Instances returns a map of all running instances inside this consumergroup.

func (*Consumergroup) NewInstance Uses

func (cg *Consumergroup) NewInstance() *ConsumergroupInstance

NewInstance instantiates a new ConsumergroupInstance inside this consumer group, using a newly generated ID.

func (*Consumergroup) PartitionOwner Uses

func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*ConsumergroupInstance, error)

PartitionOwner returns the ConsumergroupInstance that has claimed the given partition. This can be nil if nobody has claimed it yet.

func (*Consumergroup) ResetOffsets Uses

func (cg *Consumergroup) ResetOffsets() error

func (*Consumergroup) Topics Uses

func (cg *Consumergroup) Topics() (TopicList, error)

Topics retrieves the list of topics the consumergroup has claimed ownership of at some point.

func (*Consumergroup) WatchInstances Uses

func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan zk.Event, error)

WatchInstances returns a ConsumergroupInstanceList, and a channel that will be closed as soon the instance list changes.

func (*Consumergroup) WatchPartitionOwner Uses

func (cg *Consumergroup) WatchPartitionOwner(topic string, partition int32) (*ConsumergroupInstance, <-chan zk.Event, error)

WatchPartitionOwner retrieves what instance is currently owning the partition, and sets a Zookeeper watch to be notified of changes. If the partition currently does not have an owner, the function returns nil for every return value. In this case is should be safe to claim the partition for an instance.

type ConsumergroupInstance Uses

type ConsumergroupInstance struct {
    ID string
    // contains filtered or unexported fields
}

ConsumergroupInstance represents an instance of a Consumergroup.

func (*ConsumergroupInstance) ClaimPartition Uses

func (cgi *ConsumergroupInstance) ClaimPartition(topic string, partition int32) error

Claim claims a topic/partition ownership for a consumer ID within a group. If the partition is already claimed by another running instance, it will return ErrAlreadyClaimed.

func (*ConsumergroupInstance) Deregister Uses

func (cgi *ConsumergroupInstance) Deregister() error

Deregister removes the registration of the instance from zookeeper.

func (*ConsumergroupInstance) Register Uses

func (cgi *ConsumergroupInstance) Register(topics []string) error

Register registers the consumergroup instance in Zookeeper.

func (*ConsumergroupInstance) RegisterWithSubscription Uses

func (cgi *ConsumergroupInstance) RegisterWithSubscription(subscriptionJSON []byte) error

RegisterSubscription registers the consumer instance in Zookeeper, with its subscription.

func (*ConsumergroupInstance) Registered Uses

func (cgi *ConsumergroupInstance) Registered() (bool, error)

Registered checks whether the consumergroup instance is registered in Zookeeper.

func (*ConsumergroupInstance) Registration Uses

func (cgi *ConsumergroupInstance) Registration() (*Registration, error)

Registered returns current registration of the consumer group instance.

func (*ConsumergroupInstance) ReleasePartition Uses

func (cgi *ConsumergroupInstance) ReleasePartition(topic string, partition int32) error

ReleasePartition releases a claim to a partition.

func (*ConsumergroupInstance) UpdateRegistration Uses

func (cgi *ConsumergroupInstance) UpdateRegistration(topics []string) error

UpdateRegistration updates a consumer group member registration. If the consumer group member has not been registered yet, then an error is returned.

func (*ConsumergroupInstance) WatchRegistration Uses

func (cgi *ConsumergroupInstance) WatchRegistration() (*Registration, <-chan zk.Event, error)

WatchRegistered returns current registration of the consumer group instance, and a channel that will be closed as soon the registration changes.

type ConsumergroupInstanceList Uses

type ConsumergroupInstanceList []*ConsumergroupInstance

ConsumergroupInstanceList implements the sortable interface on top of a consumer instance list

func (ConsumergroupInstanceList) Find Uses

func (cgil ConsumergroupInstanceList) Find(id string) *ConsumergroupInstance

Find returns the consumergroup instance with the given ID if it exists in the list. Otherwise it will return `nil`.

func (ConsumergroupInstanceList) Len Uses

func (cgil ConsumergroupInstanceList) Len() int

func (ConsumergroupInstanceList) Less Uses

func (cgil ConsumergroupInstanceList) Less(i, j int) bool

func (ConsumergroupInstanceList) Swap Uses

func (cgil ConsumergroupInstanceList) Swap(i, j int)

type ConsumergroupList Uses

type ConsumergroupList []*Consumergroup

ConsumergroupList implements the sortable interface on top of a consumer group list

func (ConsumergroupList) Find Uses

func (cgl ConsumergroupList) Find(name string) *Consumergroup

Find returns the consumergroup with the given name if it exists in the list. Otherwise it will return `nil`.

func (ConsumergroupList) Len Uses

func (cgl ConsumergroupList) Len() int

func (ConsumergroupList) Less Uses

func (cgl ConsumergroupList) Less(i, j int) bool

func (ConsumergroupList) Swap Uses

func (cgl ConsumergroupList) Swap(i, j int)

type Kazoo Uses

type Kazoo struct {
    // contains filtered or unexported fields
}

Kazoo interacts with the Kafka metadata in Zookeeper

func NewKazoo Uses

func NewKazoo(servers []string, conf *Config) (*Kazoo, error)

NewKazoo creates a new connection instance

func NewKazooFromConnectionString Uses

func NewKazooFromConnectionString(connectionString string, conf *Config) (*Kazoo, error)

NewKazooFromConnectionString creates a new connection instance based on a zookeeer connection string that can include a chroot.

func (*Kazoo) BrokerList Uses

func (kz *Kazoo) BrokerList() ([]string, error)

BrokerList returns a slice of broker addresses that can be used to connect to the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`.

func (*Kazoo) Brokers Uses

func (kz *Kazoo) Brokers() (map[int32]string, error)

Brokers returns a map of all the brokers that make part of the Kafka cluster that is registered in Zookeeper.

func (*Kazoo) Close Uses

func (kz *Kazoo) Close() error

Close closes the connection with the Zookeeper cluster

func (*Kazoo) Consumergroup Uses

func (kz *Kazoo) Consumergroup(name string) *Consumergroup

Consumergroup instantiates a new consumergroup.

func (*Kazoo) Consumergroups Uses

func (kz *Kazoo) Consumergroups() (ConsumergroupList, error)

Consumergroups returns all the registered consumergroups

func (*Kazoo) Controller Uses

func (kz *Kazoo) Controller() (int32, error)

Controller returns what broker is currently acting as controller of the Kafka cluster

func (*Kazoo) CreateTopic Uses

func (kz *Kazoo) CreateTopic(name string, partitionCount int, replicationFactor int, topicConfig map[string]string) error

CreateTopic creates a new kafka topic with the specified parameters and properties

func (*Kazoo) DeleteTopic Uses

func (kz *Kazoo) DeleteTopic(name string) error

DeleteTopic marks a kafka topic for deletion. Deleting a topic is asynchronous and DeleteTopic will return before Kafka actually does the deletion.

func (*Kazoo) DeleteTopicSync Uses

func (kz *Kazoo) DeleteTopicSync(name string, timeout time.Duration) error

DeleteTopicSync marks a kafka topic for deletion and waits until it is deleted before returning.

func (*Kazoo) Topic Uses

func (kz *Kazoo) Topic(topic string) *Topic

Topic returns a Topic instance for a given topic name

func (*Kazoo) Topics Uses

func (kz *Kazoo) Topics() (TopicList, error)

Topics returns a list of all registered Kafka topics.

func (*Kazoo) WatchTopics Uses

func (kz *Kazoo) WatchTopics() (TopicList, <-chan zk.Event, error)

WatchTopics returns a list of all registered Kafka topics, and watches that list for changes.

type Partition Uses

type Partition struct {
    ID       int32
    Replicas []int32
    // contains filtered or unexported fields
}

Partition interacts with Kafka's partition metadata in Zookeeper.

func (*Partition) ISR Uses

func (p *Partition) ISR() ([]int32, error)

ISR returns the broker IDs of the current in-sync replica set for the partition

func (*Partition) Key Uses

func (p *Partition) Key() string

Key returns a unique identifier for the partition, using the form "topic/partition".

func (*Partition) Leader Uses

func (p *Partition) Leader() (int32, error)

Leader returns the broker ID of the broker that is currently the leader for the partition.

func (*Partition) PreferredReplica Uses

func (p *Partition) PreferredReplica() int32

PreferredReplica returns the preferred replica for this partition.

func (*Partition) Topic Uses

func (p *Partition) Topic() *Topic

Topic returns the Topic of this partition.

func (*Partition) UnderReplicated Uses

func (p *Partition) UnderReplicated() (bool, error)

func (*Partition) UsesPreferredReplica Uses

func (p *Partition) UsesPreferredReplica() (bool, error)

type PartitionList Uses

type PartitionList []*Partition

PartitionList is a type that implements the sortable interface for a list of Partition instances

func (PartitionList) Len Uses

func (pl PartitionList) Len() int

func (PartitionList) Less Uses

func (pl PartitionList) Less(i, j int) bool

func (PartitionList) Swap Uses

func (pl PartitionList) Swap(i, j int)

type RegPattern Uses

type RegPattern string
const (
    RegPatternStatic    RegPattern = "static"
    RegPatternWhiteList RegPattern = "white_list"
    RegPatternBlackList RegPattern = "black_list"
)

type RegVersion Uses

type RegVersion int
const (
    RegDefaultVersion RegVersion = 1
)

type Registration Uses

type Registration struct {
    Pattern      RegPattern     `json:"pattern"`
    Subscription map[string]int `json:"subscription"`
    Timestamp    int64          `json:"timestamp"`
    Version      RegVersion     `json:"version"`
}

type Topic Uses

type Topic struct {
    Name string
    // contains filtered or unexported fields
}

Topic interacts with Kafka's topic metadata in Zookeeper.

func (*Topic) Config Uses

func (t *Topic) Config() (map[string]string, error)

Config returns topic-level configuration settings as a map.

func (*Topic) Exists Uses

func (t *Topic) Exists() (bool, error)

Exists returns true if the topic exists on the Kafka cluster.

func (*Topic) Partition Uses

func (t *Topic) Partition(id int32, replicas []int32) *Partition

Partition returns a Partition instance for the topic.

func (*Topic) Partitions Uses

func (t *Topic) Partitions() (PartitionList, error)

Partitions returns a list of all partitions for the topic.

func (*Topic) Watch Uses

func (t *Topic) Watch() (<-chan zk.Event, error)

Watch watches the topic for changes.

func (*Topic) WatchPartitions Uses

func (t *Topic) WatchPartitions() (PartitionList, <-chan zk.Event, error)

WatchPartitions returns a list of all partitions for the topic, and watches the topic for changes.

type TopicList Uses

type TopicList []*Topic

TopicList is a type that implements the sortable interface for a list of Topic instances.

func (TopicList) Find Uses

func (tl TopicList) Find(name string) *Topic

Find returns the topic with the given name if it exists in the topic list, and will return `nil` otherwise.

func (TopicList) Len Uses

func (tl TopicList) Len() int

func (TopicList) Less Uses

func (tl TopicList) Less(i, j int) bool

func (TopicList) Swap Uses

func (tl TopicList) Swap(i, j int)

Package kazoo imports 13 packages (graph) and is imported by 54 packages. Updated 2018-02-02. Refresh now. Tools for package owners.