kafkazk

package
v4.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

README

GoDoc

THIS PACKAGE IS DEPRECATED

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidKafkaConfigType error.
	ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type")
)

Functions

func LoadMetrics added in v4.2.0

func LoadMetrics(zk Handler, bm mapper.BrokerMetaMap) []error

LoadMetrics takes a Handler and fetches stored broker metrics, populating the BrokerMetaMap.

func PartitionMapFromZK

func PartitionMapFromZK(t []*regexp.Regexp, zk Handler) (*mapper.PartitionMap, error)

PartitionMapFromZK takes a slice of regexp and finds all matching topics for each. A merged *PartitionMap of all matching topic maps is returned.

Types

type Config

type Config struct {
	Connect       string
	Prefix        string
	MetricsPrefix string
}

Config holds initialization paramaters for a Handler. Connect is a ZooKeeper connect string. Prefix should reflect any prefix used for Kafka on the reference ZooKeeper cluster (excluding slashes). MetricsPrefix is the prefix used for broker metrics metadata persisted in ZooKeeper.

type ErrNoNode

type ErrNoNode struct {
	// contains filtered or unexported fields
}

ErrNoNode error type is specifically for Get method calls where the underlying error type is a zkclient.ErrNoNode.

func (ErrNoNode) Error

func (e ErrNoNode) Error() string

type Handler

type Handler interface {
	SimpleZooKeeperClient
	GetBrokerMetrics() (mapper.BrokerMetricsMap, error)
	GetTopicState(string) (*mapper.TopicState, error)
	GetTopicStateISR(string) (TopicStateISR, error)
	UpdateKafkaConfig(KafkaConfig) ([]bool, error)
	GetReassignments() Reassignments
	ListReassignments() (Reassignments, error)
	GetUnderReplicated() ([]string, error)
	GetPendingDeletion() ([]string, error)
	GetTopics([]*regexp.Regexp) ([]string, error)
	GetTopicConfig(string) (*TopicConfig, error)
	GetTopicMetadata(string) (TopicMetadata, error)
	GetAllBrokerMeta(bool) (mapper.BrokerMetaMap, []error)
	GetAllPartitionMeta() (mapper.PartitionMetaMap, error)
	MaxMetaAge() (time.Duration, error)
	GetPartitionMap(string) (*mapper.PartitionMap, error)
}

Handler specifies an interface for common Kafka metadata retrieval and configuration methods.

func NewHandler

func NewHandler(c *Config) (Handler, error)

NewHandler takes a *Config, performs any initialization and returns a Handler.

type KafkaConfig

type KafkaConfig struct {
	Type    string          // Topic or broker.
	Name    string          // Entity name.
	Configs []KafkaConfigKV // Config KVs.
}

KafkaConfig is used to issue configuration updates to either topics or brokers in ZooKeeper.

type KafkaConfigData

type KafkaConfigData struct {
	Version int               `json:"version"`
	Config  map[string]string `json:"config"`
}

KafkaConfigData is used for unmarshalling /config/<type>/<name> data from ZooKeeper.

func NewKafkaConfigData

func NewKafkaConfigData() KafkaConfigData

NewKafkaConfigData creates a KafkaConfigData.

type KafkaConfigKV

type KafkaConfigKV [2]string

KafkaConfigKV is a [2]string{key, value} representing a Kafka configuration.

type PartitionState

type PartitionState struct {
	Version         int   `json:"version"`
	ControllerEpoch int   `json:"controller_epoch"`
	Leader          int   `json:"leader"`
	LeaderEpoch     int   `json:"leader_epoch"`
	ISR             []int `json:"isr"`
}

PartitionState is used for unmarshalling json data from a partition state: e.g. /brokers/topics/some-topic/partitions/0/state

type Reassignments

type Reassignments map[string]map[int][]int

Reassignments is a map of topic:partition:brokers.

func (Reassignments) List added in v4.1.0

func (r Reassignments) List() []string

List returns a []string of topic names held in the Reassignments.

type SimpleZooKeeperClient

type SimpleZooKeeperClient interface {
	Exists(string) (bool, error)
	Create(string, string) error
	CreateSequential(string, string) error
	Set(string, string) error
	Get(string) ([]byte, error)
	Delete(string) error
	Children(string) ([]string, error)
	NextInt(string) (int32, error)
	Close()
	Ready() bool
}

SimpleZooKeeperClient is an interface that wraps a real ZooKeeper client, obscuring much of the API semantics that are unneeded for a ZooKeeper based Handler implementation.

type Stub

type Stub struct {
	// contains filtered or unexported fields
}

Stub stubs the Handler interface.

func NewZooKeeperStub

func NewZooKeeperStub() *Stub

NewZooKeeperStub returns a stub ZooKeeper.

func (*Stub) AddBrokers

func (zk *Stub) AddBrokers(b map[int]mapper.BrokerMeta)

AddBrokers takes a map of broker ID to BrokerMeta and adds it to the Stub mapper.BrokerMetaMap.

func (*Stub) Children

func (zk *Stub) Children(p string) ([]string, error)

Children stubs children.

func (*Stub) Close

func (zk *Stub) Close()

Close stubs Close.

func (*Stub) Create

func (zk *Stub) Create(p, d string) error

Create stubs Create.

func (*Stub) CreateSequential

func (zk *Stub) CreateSequential(a, b string) error

CreateSequential stubs CreateSequential.

func (*Stub) Delete

func (zk *Stub) Delete(p string) error

Delete stubs Delete.

func (*Stub) Exists

func (zk *Stub) Exists(p string) (bool, error)

Exists stubs Exists.

func (*Stub) Get

func (zk *Stub) Get(p string) ([]byte, error)

Get stubs Get.

func (*Stub) GetAllBrokerMeta

func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, []error)

GetAllBrokerMeta stubs GetAllBrokerMeta.

func (*Stub) GetAllPartitionMeta

func (zk *Stub) GetAllPartitionMeta() (mapper.PartitionMetaMap, error)

GetAllPartitionMeta stubs GetAllPartitionMeta.

func (*Stub) GetBrokerMetrics

func (zk *Stub) GetBrokerMetrics() (mapper.BrokerMetricsMap, error)

GetBrokerMetrics stubs GetBrokerMetrics.

func (*Stub) GetPartitionMap

func (zk *Stub) GetPartitionMap(t string) (*mapper.PartitionMap, error)

GetPartitionMap stubs Getmapper.PartitionMap.

func (*Stub) GetPendingDeletion

func (zk *Stub) GetPendingDeletion() ([]string, error)

func (*Stub) GetReassignments

func (zk *Stub) GetReassignments() Reassignments

GetReassignments stubs GetReassignments.

func (*Stub) GetTopicConfig

func (zk *Stub) GetTopicConfig(t string) (*TopicConfig, error)

GetTopicConfig stubs GetTopicConfig.

func (*Stub) GetTopicMetadata

func (zk *Stub) GetTopicMetadata(t string) (TopicMetadata, error)

GetTopicMetadata stubs GetTopicMetadata.

func (*Stub) GetTopicState

func (zk *Stub) GetTopicState(t string) (*mapper.TopicState, error)

GetTopicState stubs GetTopicState.

func (*Stub) GetTopicStateISR

func (zk *Stub) GetTopicStateISR(t string) (TopicStateISR, error)

GetTopicStateISR stubs GetTopicStateISR.

func (*Stub) GetTopics

func (zk *Stub) GetTopics(ts []*regexp.Regexp) ([]string, error)

GetTopics stubs GetTopics.

func (*Stub) GetUnderReplicated

func (zk *Stub) GetUnderReplicated() ([]string, error)

func (*Stub) InitRawClient

func (zk *Stub) InitRawClient() error

InitRawClient stubs InitRawClient.

func (*Stub) ListReassignments

func (zk *Stub) ListReassignments() (Reassignments, error)

ListReassignments stubs ListReassignments.

func (*Stub) MaxMetaAge

func (zk *Stub) MaxMetaAge() (time.Duration, error)

MaxMetaAge stubs MaxMetaAge.

func (*Stub) NextInt

func (zk *Stub) NextInt(p string) (int32, error)

func (*Stub) Ready

func (zk *Stub) Ready() bool

Ready stubs Ready.

func (*Stub) RemoveBrokers

func (zk *Stub) RemoveBrokers(ids []int)

RemoveBrokers removes the specified IDs from the mapper.BrokerMetaMap. This can be used in testing to simulate brokers leaving the cluster.

func (*Stub) Set

func (zk *Stub) Set(p, d string) error

Set stubs Set.

func (*Stub) UpdateKafkaConfig

func (zk *Stub) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)

UpdateKafkaConfig stubs UpdateKafkaConfig.

type StubZnode

type StubZnode struct {
	// contains filtered or unexported fields
}

StubZnode stubs a ZooKeeper znode.

type TopicConfig

type TopicConfig struct {
	Version int               `json:"version"`
	Config  map[string]string `json:"config"`
}

TopicConfig is used for unmarshalling /config/topics/<topic> from ZooKeeper.

type TopicMetadata

type TopicMetadata struct {
	Version          int
	Name             string
	TopicID          string `json:"topic_id"`
	Partitions       map[int][]int
	AddingReplicas   map[int][]int `json:"adding_replicas"`
	RemovingReplicas map[int][]int `json:"removing_replicas"`
}

TopicMetadata holds the topic data found in the /brokers/topics/<topic> znode. This is designed for the version 3 fields present in Kafka version ~2.4+.

func (TopicMetadata) Reassignments

func (tm TopicMetadata) Reassignments() Reassignments

Reassignments returns a Reassignments from a given topics TopicMetadata.

type TopicStateISR

type TopicStateISR map[string]PartitionState

TopicStateISR is a map of partition numbers to PartitionState.

type ZKHandler

type ZKHandler struct {
	Connect       string
	Prefix        string
	MetricsPrefix string
	// contains filtered or unexported fields
}

ZKHandler implements the Handler interface for real ZooKeeper clusters.

func (*ZKHandler) Children

func (z *ZKHandler) Children(p string) ([]string, error)

Children takes a path p and returns a list of child znodes and an error if encountered.

func (*ZKHandler) Close

func (z *ZKHandler) Close()

Close calls close on the *ZKHandler. Any additional shutdown cleanup or other tasks should be performed here.

func (*ZKHandler) Create

func (z *ZKHandler) Create(p string, d string) error

Create creates the provided path p with the data from the provided string d and returns an error if encountered.

func (*ZKHandler) CreateSequential

func (z *ZKHandler) CreateSequential(p string, d string) error

CreateSequential takes a path p and data d and creates a sequential znode at p with data d. An error is returned if encountered.

func (*ZKHandler) Delete

func (z *ZKHandler) Delete(p string) error

Delete deletes the znode at path p.

func (*ZKHandler) Exists

func (z *ZKHandler) Exists(p string) (bool, error)

Exists takes a path p and returns a bool as to whether the path exists and an error if encountered.

func (*ZKHandler) Get

func (z *ZKHandler) Get(p string) ([]byte, error)

Get returns the data from path p.

func (*ZKHandler) GetAllBrokerMeta

func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, []error)

GetAllBrokerMeta looks up all registered Kafka brokers and returns their metadata as a mapper.BrokerMetaMap. A withMetrics bool param determines whether we additionally want to fetch stored broker metrics.

func (*ZKHandler) GetAllPartitionMeta

func (z *ZKHandler) GetAllPartitionMeta() (mapper.PartitionMetaMap, error)

GetAllPartitionMeta fetches partition metadata stored in Zookeeper.

func (*ZKHandler) GetBrokerMetrics added in v4.2.0

func (z *ZKHandler) GetBrokerMetrics() (mapper.BrokerMetricsMap, error)

GetBrokerMetrics fetches broker metrics stored in ZooKeeper and returns a BrokerMetricsMap and an error if encountered.

func (*ZKHandler) GetPartitionMap

func (z *ZKHandler) GetPartitionMap(t string) (*mapper.PartitionMap, error)

GetPartitionMap takes a topic name. If the topic exists, the state of the topic is fetched and returned as a *PartitionMap.

func (*ZKHandler) GetPendingDeletion

func (z *ZKHandler) GetPendingDeletion() ([]string, error)

GetPendingDeletion returns any topics pending deletion.

func (*ZKHandler) GetReassignments

func (z *ZKHandler) GetReassignments() Reassignments

GetReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments.

func (*ZKHandler) GetTopicConfig

func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)

GetTopicConfig takes a topic name. If the topic exists, the topic config is returned as a *TopicConfig.

func (*ZKHandler) GetTopicMetadata

func (z *ZKHandler) GetTopicMetadata(t string) (TopicMetadata, error)

GetTopicMetadata takes a topic name. If the topic exists, the topic metadata is returned as a TopicMetadata.

func (*ZKHandler) GetTopicState

func (z *ZKHandler) GetTopicState(t string) (*mapper.TopicState, error)

GetTopicState takes a topic name. If the topic exists, the topic state is returned as a *mapper.TopicState.

func (*ZKHandler) GetTopicStateISR

func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)

GetTopicStateISR takes a topic name. If the topic exists, the topic state is returned as a TopicStateISR. GetTopicStateCurrentISR differs from GetTopicState in that the actual, current broker IDs in the ISR are returned for each partition. This method is more expensive due to the need for a call per partition to ZK.

func (*ZKHandler) GetTopics

func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error)

GetTopics takes a []*regexp.Regexp and returns a []string of all topic names that match any of the provided regex.

func (*ZKHandler) GetUnderReplicated

func (z *ZKHandler) GetUnderReplicated() ([]string, error)

GetUnderReplicated returns a []string of all under-replicated topics.

func (*ZKHandler) ListReassignments

func (z *ZKHandler) ListReassignments() (Reassignments, error)

ListReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments. ListReassignments is a KIP-455 compatible call for Kafka 2.4 and Kafka cli tools 2.6.

func (*ZKHandler) MaxMetaAge

func (z *ZKHandler) MaxMetaAge() (time.Duration, error)

MaxMetaAge returns the greatest age between the partitionmeta and brokermetrics stuctures.

func (*ZKHandler) NextInt

func (z *ZKHandler) NextInt(p string) (int32, error)

NextInt works as an atomic int generator. It does this by setting nil value to path p and returns the znode version.

func (*ZKHandler) Ready

func (z *ZKHandler) Ready() bool

Ready returns true if the client is in either state StateConnected or StateHasSession. See https://godoc.org/github.com/go-zookeeper/zk#State.

func (*ZKHandler) Set

func (z *ZKHandler) Set(p string, d string) error

Set sets the data at path p.

func (*ZKHandler) UpdateKafkaConfig

func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)

UpdateKafkaConfig takes a KafkaConfig with key value pairs of entity config. If the config is changed, a persistent sequential znode is also written to propagate changes (via watches) to all Kafka brokers. This is a Kafka specific behavior; further references are available from the Kafka codebase. A []bool is returned indicating whether the config of the respective index was changed (if a config is updated to the existing value, 'false' is returned) along with any errors encountered. If a config value is set to an empty string (""), the entire config key itself is deleted. This was a convenient method to combine update/delete into a single func.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL