kafkazk

package
v2.1.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2018 License: Apache-2.0 Imports: 12 Imported by: 0

README

GoDoc

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoBrokers              = errors.New("No additional brokers that meet Constraints")
	ErrInvalidSelectionMethod = errors.New("Invalid selection method")
)
View Source
var (
	// ErrInvalidKafkaConfigType indicates invalid Kafka config types.
	ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type")
)

Functions

func WhatChanged

func WhatChanged(s1 []int, s2 []int) string

WhatChanged takes a before and after broker replica set and returns a string describing what changed.

func WriteMap

func WriteMap(pm *PartitionMap, path string) error

WriteMap takes a *PartitionMap and writes a JSON text file to the provided path.

Types

type Broker

type Broker struct {
	ID          int
	Locality    string
	Used        int
	StorageFree float64
	Replace     bool
	Missing     bool
	New         bool
}

Broker associates metadata with a real broker by ID.

type BrokerList

type BrokerList []*Broker

BrokerList is a slice of brokers for sorting by used count.

func (BrokerList) BestCandidate

func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)

BestCandidate takes a *Constraints, selection method and pass / iteration number (for use as a seed value for pseudo-random number generation) and returns the most suitable broker.

func (BrokerList) SortByCount

func (b BrokerList) SortByCount()

SortByCount sorts the BrokerList by Used values.

func (BrokerList) SortByID

func (b BrokerList) SortByID()

SortByID sorts the BrokerList by ID values.

func (BrokerList) SortByStorage

func (b BrokerList) SortByStorage()

SortByStorage sorts the BrokerList by StorageFree values.

func (BrokerList) SortPseudoShuffle

func (b BrokerList) SortPseudoShuffle(seed int64)

SortPseudoShuffle takes a BrokerList and performs a sort by count. For each sequence of brokers with equal counts, the sub-slice is pseudo random shuffled using the provided seed value s.

type BrokerMap

type BrokerMap map[int]*Broker

BrokerMap holds a mapping of broker IDs to *Broker.

func BrokerMapFromPartitionMap

func BrokerMapFromPartitionMap(pm *PartitionMap, bm BrokerMetaMap, force bool) BrokerMap

BrokerMapFromPartitionMap creates a BrokerMap from a partitionMap. TODO can we remove marked for replacement here too?

func (BrokerMap) AboveMean

func (b BrokerMap) AboveMean(d float64, f func() float64) []int

AboveMean returns a sorted []int of broker IDs that are above the mean by d percent (0.00 < d). The mean type is provided as a function parameter f.

func (BrokerMap) BelowMean

func (b BrokerMap) BelowMean(d float64, f func() float64) []int

BelowMean returns a sorted []int of broker IDs that are below the mean by d percent (0.00 < d). The mean type is provided as a function parameter f.

func (BrokerMap) Copy

func (b BrokerMap) Copy() BrokerMap

Copy returns a copy of a BrokerMap.

func (BrokerMap) HMean

func (b BrokerMap) HMean() float64

HMean returns the harmonic mean of broker storage free.

func (BrokerMap) List

func (b BrokerMap) List() BrokerList

List take a BrokerMap and returns a BrokerList.

func (BrokerMap) MappedBrokers

func (b BrokerMap) MappedBrokers(pm *PartitionMap) BrokerMap

MappedBrokers takes a PartitionMap and returns a new BrokerMap that only includes brokers found in the partition map holding a partition.

func (BrokerMap) Mean

func (b BrokerMap) Mean() float64

Mean returns the arithmetic mean of broker storage free.

func (BrokerMap) NonReplacedBrokers

func (b BrokerMap) NonReplacedBrokers() BrokerMap

NonReplacedBrokers returns a copy of a BrokerMap that excludes all brokers marked for replacement.

func (BrokerMap) StorageDiff

func (b BrokerMap) StorageDiff(b2 BrokerMap) map[int][2]float64

StorageDiff takes two BrokerMaps and returns a per broker ID diff in storage as a [2]float64: [absolute, percentage] diff.

func (BrokerMap) StorageRange

func (b BrokerMap) StorageRange() float64

StorageRange returns the range of free storage for all brokers in the BrokerMap.

func (BrokerMap) StorageRangeSpread

func (b BrokerMap) StorageRangeSpread() float64

StorageRangeSpread returns the range spread of free storage for all brokers in the BrokerMap.

func (BrokerMap) StorageStdDev

func (b BrokerMap) StorageStdDev() float64

StorageStdDev returns the standard deviation of free storage for all brokers in the BrokerMap.

func (BrokerMap) SubStorageAll

func (b BrokerMap) SubStorageAll(pm *PartitionMap, pmm PartitionMetaMap) error

SubStorageAll takes a PartitionMap + PartitionMetaMap and adds the size of each partition back to the StorageFree value of any broker it was originally mapped to. This is used in a force rebuild where the assumption is that partitions will be lifted and repositioned.

func (BrokerMap) SubStorageReplacements

func (b BrokerMap) SubStorageReplacements(pm *PartitionMap, pmm PartitionMetaMap) error

SubStorageReplacements works similarly to SubStorageAll except that storage usage is only subtraced from brokers marked for replacement.

func (BrokerMap) SubstitutionAffinities

func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)

SubstitutionAffinities finds all brokers marked for replacement and for each broker, it creates an exclusive association with a newly provided broker. In the rebuild stage, each to-be-replaced broker will be only replaced with the affinity it's associated with. A given new broker can only be an affinity for a single outgoing broker. An error is returned if a complete mapping of affinities cannot be constructed (e.g. two brokers are marked for replacement but only one new replacement was provided and substitution affinities is enabled).

func (BrokerMap) Update

func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) *BrokerStatus

Update takes a []int of broker IDs and BrokerMap then adds them to the BrokerMap, returning the count of marked for replacement, newly included, and brokers that weren't found in ZooKeeper.

type BrokerMeta

type BrokerMeta struct {
	Rack              string  `json:"rack"`
	StorageFree       float64 // In bytes.
	MetricsIncomplete bool
}

BrokerMeta holds metadata that describes a broker, used in satisfying constraints.

type BrokerMetaMap

type BrokerMetaMap map[int]*BrokerMeta

BrokerMetaMap is a map of broker IDs to BrokerMeta metadata fetched from ZooKeeper. Currently, just the rack field is retrieved.

type BrokerMetrics

type BrokerMetrics struct {
	StorageFree float64
}

BrokerMetrics holds broker metric data fetched from ZK.

type BrokerMetricsMap

type BrokerMetricsMap map[int]*BrokerMetrics

BrokerMetricsMap holds a mapping of broker ID to BrokerMetrics.

type BrokerStatus

type BrokerStatus struct {
	New        int
	Missing    int
	OldMissing int
	Replace    int
}

BrokerStatus summarizes change counts from an input and output broker list.

func (BrokerStatus) Changes

func (bs BrokerStatus) Changes() bool

Changes returns a bool that indicates whether a BrokerStatus values represent a change in brokers.

type BrokerUseStats

type BrokerUseStats struct {
	ID       int
	Leader   int
	Follower int
}

BrokerUseStats holds counts of partition ownership.

type BrokerUseStatsList

type BrokerUseStatsList []*BrokerUseStats

BrokerUseStatsList is a slice of *BrokerUseStats.

func (BrokerUseStatsList) Len

func (b BrokerUseStatsList) Len() int

func (BrokerUseStatsList) Less

func (b BrokerUseStatsList) Less(i, j int) bool

func (BrokerUseStatsList) Swap

func (b BrokerUseStatsList) Swap(i, j int)

type Config

type Config struct {
	Connect       string
	Prefix        string
	MetricsPrefix string
}

Config holds initialization paramaters for a Handler. Connect is a ZooKeeper connect string. Prefix should reflect any prefix used for Kafka on the reference ZooKeeper cluster (excluding slashes). MetricsPrefix is the prefix used for broker metrics metadata persisted in ZooKeeper.

type Constraints

type Constraints struct {
	// contains filtered or unexported fields
}

Constraints holds a map of IDs and locality key-values.

func MergeConstraints

func MergeConstraints(bl BrokerList) *Constraints

MergeConstraints takes a brokerlist and builds a *Constraints by merging the attributes of all brokers from the supplied list.

func NewConstraints

func NewConstraints() *Constraints

NewConstraints returns an empty *Constraints.

func (*Constraints) Add

func (c *Constraints) Add(b *Broker)

Add takes a *Broker and adds its attributes to the *Constraints. The requestSize is also subtracted from the *Broker.StorageFree.

type DegreeDistribution

type DegreeDistribution struct {
	// Relationships is a an adjacency list
	// where an edge between brokers is defined as
	// a common occupancy in at least one replica set.
	// For instance, given the replica set [1001,1002,1003],
	// ID 1002 has a relationship with 1001 and 1003.
	Relationships map[int]map[int]struct{}
}

DegreeDistribution holds broker to broker relationships.

func NewDegreeDistribution

func NewDegreeDistribution() DegreeDistribution

NewDegreeDistribution returns a new DegreeDistribution.

func (DegreeDistribution) Add

func (dd DegreeDistribution) Add(nodes []int)

Add takes a []int of broker IDs representing a replica set and updates the adjacency lists for each broker in the set.

func (DegreeDistribution) Count

func (dd DegreeDistribution) Count(n int) int

Count takes a node ID and returns the degree distribution.

func (DegreeDistribution) Stats

Stats returns a DegreeDistributionStats.

type DegreeDistributionStats

type DegreeDistributionStats struct {
	Min float64
	Max float64
	Avg float64
}

DegreeDistributionStats holds general statistical information describing the DegreeDistribution counts.

type ErrNoNode

type ErrNoNode struct {
	// contains filtered or unexported fields
}

ErrNoNode error type is specifically for Get method calls where the underlying error type is a zkclient.ErrNoNode.

func (ErrNoNode) Error

func (e ErrNoNode) Error() string

Error returns an errror.

type Handler

type Handler interface {
	Exists(string) (bool, error)
	Create(string, string) error
	CreateSequential(string, string) error
	Set(string, string) error
	Get(string) ([]byte, error)
	Children(string) ([]string, error)
	Close()
	GetTopicState(string) (*TopicState, error)
	GetTopicStateISR(string) (TopicStateISR, error)
	UpdateKafkaConfig(KafkaConfig) (bool, error)
	GetReassignments() Reassignments
	GetTopics([]*regexp.Regexp) ([]string, error)
	GetTopicConfig(string) (*TopicConfig, error)
	GetAllBrokerMeta(bool) (BrokerMetaMap, []error)
	GetAllPartitionMeta() (PartitionMetaMap, error)
	GetPartitionMap(string) (*PartitionMap, error)
	Ready() bool
}

Handler exposes basic ZooKeeper operations along with additional methods that return kafkazk package specific types, populated with data fetched from ZooKeeper.

func NewHandler

func NewHandler(c *Config) (Handler, error)

NewHandler takes a *Config, performs any initialization and returns a Handler.

type KafkaConfig

type KafkaConfig struct {
	Type    string      // Topic or broker.
	Name    string      // Entity name.
	Configs [][2]string // Slice of [2]string{key,value} configs.

}

KafkaConfig is used to issue configuration updates to either topics or brokers in ZooKeeper.

type KafkaConfigData

type KafkaConfigData struct {
	Version int               `json:"version"`
	Config  map[string]string `json:"config"`
}

KafkaConfigData is used for unmarshalling /config/<type>/<name> data from ZooKeeper.

func NewKafkaConfigData

func NewKafkaConfigData() KafkaConfigData

NewKafkaConfigData creates a KafkaConfigData.

type Mappings

type Mappings map[int]map[string]partitionList

Mappings is a mapping of broker IDs to currently held partition as a partitionList.

func NewMappings

func NewMappings() Mappings

NewMappings returns a new Mappings.

func (Mappings) LargestPartitions

func (m Mappings) LargestPartitions(id int, k int, pm PartitionMetaMap) (partitionList, error)

LargestPartitions takes a broker ID and PartitionMetaMap and returns a partitionList with the top k partitions by size for the provided broker ID.

func (Mappings) Remove

func (m Mappings) Remove(id int, p Partition) error

Remove takes a broker ID and partition and removes the mapping association.

type Mock

type Mock struct{}

Mock mocks the Handler interface.

func (*Mock) Children

func (zk *Mock) Children(a string) ([]string, error)

Children mocks children.

func (*Mock) Close

func (zk *Mock) Close()

Close mocks Close.

func (*Mock) Create

func (zk *Mock) Create(a, b string) error

Create mocks Create.

func (*Mock) CreateSequential

func (zk *Mock) CreateSequential(a, b string) error

CreateSequential mocks CreateSequential.

func (*Mock) Exists

func (zk *Mock) Exists(a string) (bool, error)

Exists mocks Exists.

func (*Mock) Get

func (zk *Mock) Get(a string) ([]byte, error)

Get mocks Get.

func (*Mock) GetAllBrokerMeta

func (zk *Mock) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)

GetAllBrokerMeta mocks GetAllBrokerMeta.

func (*Mock) GetAllPartitionMeta

func (zk *Mock) GetAllPartitionMeta() (PartitionMetaMap, error)

GetAllPartitionMeta mocks GetAllPartitionMeta.

func (*Mock) GetBrokerMetrics

func (zk *Mock) GetBrokerMetrics() (BrokerMetricsMap, error)

GetBrokerMetrics mocks GetBrokerMetrics.

func (*Mock) GetPartitionMap

func (zk *Mock) GetPartitionMap(t string) (*PartitionMap, error)

GetPartitionMap mocks GetPartitionMap.

func (*Mock) GetReassignments

func (zk *Mock) GetReassignments() Reassignments

GetReassignments mocks GetReassignments.

func (*Mock) GetTopicConfig

func (zk *Mock) GetTopicConfig(t string) (*TopicConfig, error)

GetTopicConfig mocks GetTopicConfig.

func (*Mock) GetTopicState

func (zk *Mock) GetTopicState(t string) (*TopicState, error)

GetTopicState mocks GetTopicState.

func (*Mock) GetTopicStateISR

func (zk *Mock) GetTopicStateISR(t string) (TopicStateISR, error)

GetTopicStateISR mocks GetTopicStateISR.

func (*Mock) GetTopics

func (zk *Mock) GetTopics(ts []*regexp.Regexp) ([]string, error)

GetTopics mocks GetTopics.

func (*Mock) InitRawClient

func (zk *Mock) InitRawClient() error

InitRawClient mocks InitRawClient.

func (*Mock) Ready

func (zk *Mock) Ready() bool

Ready mocks Ready.

func (*Mock) Set

func (zk *Mock) Set(a, b string) error

Set mocks Set.

func (*Mock) UpdateKafkaConfig

func (zk *Mock) UpdateKafkaConfig(c KafkaConfig) (bool, error)

UpdateKafkaConfig mocks UpdateKafkaConfig.

type NoMappingForBroker

type NoMappingForBroker struct {
	// contains filtered or unexported fields
}

func (NoMappingForBroker) Error

func (e NoMappingForBroker) Error() string

type NoMappingForTopic

type NoMappingForTopic struct {
	// contains filtered or unexported fields
}

func (NoMappingForTopic) Error

func (e NoMappingForTopic) Error() string

type Partition

type Partition struct {
	Topic     string `json:"topic"`
	Partition int    `json:"partition"`
	Replicas  []int  `json:"replicas"`
}

Partition represents the Kafka partition structure.

func (Partition) Equal

func (p Partition) Equal(p2 Partition) bool

Equal defines equalty between two Partition objects as an equality of topic, partition and replicas.

type PartitionMap

type PartitionMap struct {
	Version    int           `json:"version"`
	Partitions partitionList `json:"partitions"`
}

PartitionMap represents the Kafka partition map structure.

func NewPartitionMap

func NewPartitionMap() *PartitionMap

NewPartitionMap returns an empty *PartitionMap.

func PartitionMapFromString

func PartitionMapFromString(s string) (*PartitionMap, error)

PartitionMapFromString takes a json encoded string and returns a *PartitionMap.

func PartitionMapFromZK

func PartitionMapFromZK(t []*regexp.Regexp, zk Handler) (*PartitionMap, error)

PartitionMapFromZK takes a slice of regexp and finds all matching topics for each. A merged *PartitionMap of all matching topic maps is returned.

func (*PartitionMap) Copy

func (pm *PartitionMap) Copy() *PartitionMap

Copy returns a copy of a *PartitionMap.

func (*PartitionMap) DegreeDistribution

func (pm *PartitionMap) DegreeDistribution() DegreeDistribution

DegreeDistribution returns the DegreeDistribution for the PartitionMap.

func (*PartitionMap) LocalitiesAvailable

func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string

LocalitiesAvailable takes a broker map and broker and returns a []string of localities that are unused by any of the brokers in any replica sets that the reference broker was found in. This is done by building a set of all localities observed across all replica sets and a set of all localities observed in replica sets containing the reference broker, then returning the diff.

func (*PartitionMap) Mappings

func (pm *PartitionMap) Mappings() Mappings

Mappings returns a Mappings from a *PartitionMap.

func (*PartitionMap) Rebuild

func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []string)

Rebuild takes a BrokerMap and rebuild strategy. It then traverses the partition map, replacing brokers marked removal with the best available candidate based on the selected rebuild strategy. A rebuilt *PartitionMap and []string of errors is returned.

func (*PartitionMap) SetReplication

func (pm *PartitionMap) SetReplication(r int)

SetReplication ensures that replica sets is reset to the replication factor r. Sets exceeding r are truncated, sets below r are extended with stub brokers.

func (*PartitionMap) SimpleLeaderOptimization

func (pm *PartitionMap) SimpleLeaderOptimization()

SimpleLeaderOptimization is a naive leadership optimization algorithm. It gets leadership counts for all brokers in the partition map and shuffles partition replica sets for those holding brokers with below average leadership.

func (*PartitionMap) Strip

func (pm *PartitionMap) Strip() *PartitionMap

Strip takes a PartitionMap and returns a copy where all broker ID references are replaced with the stub broker with ID 0 where the replace field is set to true. This ensures that the entire map is rebuilt, even if the provided broker list matches what's already in the map.

func (*PartitionMap) UseStats

func (pm *PartitionMap) UseStats() []*BrokerUseStats

UseStats returns a map of broker IDs to BrokerUseStats; each contains a count of leader and follower partition assignments.

type PartitionMeta

type PartitionMeta struct {
	Size float64 // In bytes.
}

PartitionMeta holds partition metadata.

type PartitionMetaMap

type PartitionMetaMap map[string]map[int]*PartitionMeta

PartitionMetaMap is a mapping of topic, partition number to PartitionMeta.

func NewPartitionMetaMap

func NewPartitionMetaMap() PartitionMetaMap

NewPartitionMetaMap returns an empty PartitionMetaMap.

func (PartitionMetaMap) Size

func (pmm PartitionMetaMap) Size(p Partition) (float64, error)

Size takes a Partition and returns the size. An error is returned if the partition isn't in the PartitionMetaMap.

type PartitionState

type PartitionState struct {
	Version         int   `json:"version"`
	ControllerEpoch int   `json:"controller_epoch"`
	Leader          int   `json:"leader"`
	LeaderEpoch     int   `json:"leader_epoch"`
	ISR             []int `json:"isr"`
}

PartitionState is used for unmarshalling json data from a partition state: e.g. /brokers/topics/some-topic/partitions/0/state

type Reassignments

type Reassignments map[string]map[int][]int

Reassignments is a map of topic:partition:brokers.

type RebuildParams

type RebuildParams struct {
	PMM           PartitionMetaMap
	BM            BrokerMap
	Strategy      string
	Optimization  string
	Affinities    SubstitutionAffinities
	PartnSzFactor float64
	// contains filtered or unexported fields
}

RebuildParams holds required parameters to call the Rebuild method on a *PartitionMap.

func NewRebuildParams

func NewRebuildParams() RebuildParams

NewRebuildParams initializes a RebuildParams.

type SubstitutionAffinities

type SubstitutionAffinities map[int]*Broker

SubstitutionAffinities is a mapping of an ID belonging to a *Broker marked for replacement and a replacement *Broker that will fill all previously filled replica slots held by the *Broker being replaced.

func (SubstitutionAffinities) Get

func (sa SubstitutionAffinities) Get(id int) *Broker

Get takes a broker ID and returns a *Broker if one was set as a substitution affinity.

type TopicConfig

type TopicConfig struct {
	Version int               `json:"version"`
	Config  map[string]string `json:"config"`
}

TopicConfig is used for unmarshalling /config/topics/<topic> from ZooKeeper.

type TopicState

type TopicState struct {
	Partitions map[string][]int `json:"partitions"`
}

TopicState is used for unmarshing ZooKeeper json data from a topic: e.g. /brokers/topics/some-topic

type TopicStateISR

type TopicStateISR map[string]PartitionState

TopicStateISR is a map of partition numbers to PartitionState.

type ZKHandler

type ZKHandler struct {
	Connect       string
	Prefix        string
	MetricsPrefix string
	// contains filtered or unexported fields
}

ZKHandler implements the Handler interface for real ZooKeeper clusters.

func (*ZKHandler) Children

func (z *ZKHandler) Children(p string) ([]string, error)

Children takes a path p and returns a list of child znodes and an error if encountered.

func (*ZKHandler) Close

func (z *ZKHandler) Close()

Close calls close on the *ZKHandler. Any additional shutdown cleanup or other tasks should be performed here.

func (*ZKHandler) Create

func (z *ZKHandler) Create(p string, d string) error

Create creates the provided path p with the data from the provided string d and returns an error if encountered.

func (*ZKHandler) CreateSequential

func (z *ZKHandler) CreateSequential(p string, d string) error

CreateSequential takes a path p and data d and creates a sequential znode at p with data d. An error is returned if encountered.

func (*ZKHandler) Exists

func (z *ZKHandler) Exists(p string) (bool, error)

Exists takes a path p and returns a bool as to whether the path exists and an error if encountered.

func (*ZKHandler) Get

func (z *ZKHandler) Get(p string) ([]byte, error)

Get gets the provided path p and returns the data from the path and an error if encountered.

func (*ZKHandler) GetAllBrokerMeta

func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)

GetAllBrokerMeta looks up all registered Kafka brokers and returns their metadata as a BrokerMetaMap. A withMetrics bool param determines whether we additionally want to fetch stored broker metrics.

func (*ZKHandler) GetAllPartitionMeta

func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error)

GetAllPartitionMeta fetches partition metadata stored in Zookeeper.

func (*ZKHandler) GetPartitionMap

func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error)

GetPartitionMap takes a topic name. If the topic exists, the state of the topic is fetched and translated into a *PartitionMap.

func (*ZKHandler) GetReassignments

func (z *ZKHandler) GetReassignments() Reassignments

GetReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments type.

func (*ZKHandler) GetTopicConfig

func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)

GetTopicConfig takes a topic name. If the topic exists, the topic config is returned as a *TopicConfig.

func (*ZKHandler) GetTopicState

func (z *ZKHandler) GetTopicState(t string) (*TopicState, error)

GetTopicState takes a topic name. If the topic exists, the topic state is returned as a *TopicState.

func (*ZKHandler) GetTopicStateISR

func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)

GetTopicStateCurrentISR takes a topic name. If the topic exists, the topic state is returned as a TopicStateISR. GetTopicStateCurrentISR differs from GetTopicState in that the actual, current broker IDs in the ISR are returned for each partition. This method is notably more expensive due to the need for a call per partition to ZK.

func (*ZKHandler) GetTopics

func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error)

GetTopics takes a []*regexp.Regexp and returns a []string of all topic names that match any of the provided regex.

func (*ZKHandler) Ready

func (z *ZKHandler) Ready() bool

Ready returns true if the client is in either state StateConnected or StateHasSession. See https://godoc.org/github.com/samuel/go-zookeeper/zk#State.

func (*ZKHandler) Set

func (z *ZKHandler) Set(p string, d string) error

Set sets the provided path p data to the provided string d and returns an error if encountered.

func (*ZKHandler) UpdateKafkaConfig

func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) (bool, error)

UpdateKafkaConfig takes a KafkaConfig with key value pairs of entity config. If the config is changed, a persistent sequential znode is also written to propagate changes (via watches) to all Kafka brokers. This is a Kafka specific behavior; further references are available from the Kafka codebase. A bool is returned indicating whether the config was changed (if a config is updated to the existing value, 'false' is returned) along with any errors encountered. If a config value is set to an empty string (""), the entire config key itself is deleted. This was a convenient way to combine update/delete into a single func.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL