admin

package
v0.0.0-...-e78f20e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2023 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RetentionKey is the config key used for topic time retention.
	RetentionKey = "retention.ms"

	// LeaderThrottledKey is the config key for the leader throttle rate.
	LeaderThrottledKey = "leader.replication.throttled.rate"

	// FollowerThrottledKey is the config key for the follower throttle rate.
	FollowerThrottledKey = "follower.replication.throttled.rate"

	// LeaderReplicasThrottledKey is the config key for the list of leader replicas
	// that should be throttled.
	LeaderReplicasThrottledKey = "leader.replication.throttled.replicas"

	// FollowerReplicasThrottledKey is the config key for the list of follower replicas
	// that should be throttled.
	FollowerReplicasThrottledKey = "follower.replication.throttled.replicas"
)

Variables

View Source
var (
	// ErrTopicDoesNotExist is returned by admin functions when a topic that should exist
	// does not.
	ErrTopicDoesNotExist = errors.New("Topic does not exist")
)

Functions

func AssignmentsToReplicas

func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)

AssignmentsToReplicas is the inverse of ReplicasToAssignments. Used for unit tests.

func BrokerCountsPerRack

func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int

BrokerCountsPerRack returns a mapping of rack -> number of brokers.

func BrokerIDs

func BrokerIDs(brokers []BrokerInfo) []int

BrokerIDs returns a slice of the IDs of the argument brokers.

func BrokerRacks

func BrokerRacks(brokers []BrokerInfo) map[int]string

BrokerRacks returns a mapping of broker ID -> rack.

func BrokersPerRack

func BrokersPerRack(brokers []BrokerInfo) map[string][]int

BrokersPerRack returns a mapping of rack -> broker IDs.

func CheckAssignments

func CheckAssignments(assignments []PartitionAssignment) error

CheckAssignments does some basic sanity checks on the assignments that are passed into an Assigner or extender so that we can fail early if something is obviously wrong.

func DistinctRacks

func DistinctRacks(brokers []BrokerInfo) []string

DistinctRacks returns a sorted slice of all the distinct racks in the cluster.

func FormatAssignentDiffs

func FormatAssignentDiffs(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
	brokers []BrokerInfo,
) string

FormatAssignentDiffs generates a pretty table that shows the before and after states of a partition replica and/or leader update.

func FormatBrokerMaxPartitions

func FormatBrokerMaxPartitions(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
	brokers []BrokerInfo,
) string

FormatBrokerMaxPartitions generates a pretty table that shows the total number of partitions that each broker is involved in for a diff. It's used to evaluate the potential extra load that could occur on brokers during a migration.

func FormatBrokerRackReplicas

func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string

FormatBrokerRackReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by rack across all topics. Useful for showing total-topic balance.

func FormatBrokerReplicas

func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string

FormatBrokerReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by broker across all topics. Useful for showing total-topic balance.

func FormatBrokers

func FormatBrokers(brokers []BrokerInfo, full bool) string

FormatBrokers creates a pretty table from a list of brokers.

func FormatBrokersPerRack

func FormatBrokersPerRack(brokers []BrokerInfo) string

FormatBrokersPerRack creates a pretty table that shows the number of brokers per rack.

func FormatConfig

func FormatConfig(configMap map[string]string) string

FormatConfig creates a pretty table with all of the keys and values in a topic or broker config.

func FormatTopicLeadersPerRack

func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string

FormatTopicLeadersPerRack creates a pretty table that shows the number of partitions with a leader in each rack.

func FormatTopicPartitions

func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string

FormatTopicPartitions creates a pretty table with information on all of the partitions for a topic.

func FormatTopics

func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string

FormatTopics creates a pretty table that lists the details of the argument topics.

func HasLeaders

func HasLeaders(topics []TopicInfo) bool

HasLeaders returns whether at least one partition in the argument topics has a non-zero leader set. Used for formatting purposes.

func LeadersPerRack

func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int

LeadersPerRack returns a mapping of rack -> number of partitions with a leader in that rack.

func MaxPartitionsPerBroker

func MaxPartitionsPerBroker(
	allAssignments ...[]PartitionAssignment,
) map[int]int

MaxPartitionsPerBroker calculates the number of partitions that each broker may need to handle during a migration.

func MaxReplication

func MaxReplication(topics []TopicInfo) int

MaxReplication returns the maximum amount of replication across all partitions in the argument topics.

func NewLeaderPartitions

func NewLeaderPartitions(
	current []PartitionAssignment,
	desired []PartitionAssignment) []int

NewLeaderPartitions returns the partition IDs which will have new leaders given the current and desired assignments.

func ParseBrokerThrottles

func ParseBrokerThrottles(brokers []BrokerInfo) (
	[]BrokerThrottle,
	[]BrokerThrottle,
	error,
)

ParseBrokerThrottles returns slices of the leader and follower throttles for the argument brokers.

func ParsePartitionThrottles

func ParsePartitionThrottles(topic TopicInfo) (
	[]PartitionThrottle,
	[]PartitionThrottle,
	error,
)

ParsePartitionThrottles returns slices of the leader and follower partition throttles for the argument topic.

func PartitionIDs

func PartitionIDs(partitions []PartitionInfo) []int

PartitionIDs returns the IDs from the argument partitions.

func PartitionThrottleConfigEntries

func PartitionThrottleConfigEntries(
	leaderThrottles []PartitionThrottle,
	followerThrottles []PartitionThrottle,
) []kafka.ConfigEntry

PartitionThrottleConfigEntries generates the topic config entries for the provided leader and follower throttles.

func SameBrokers

func SameBrokers(
	a PartitionAssignment,
	b PartitionAssignment,
) bool

SameBrokers returns whether two PartitionAssignments have the same brokers.

func ThrottledBrokerIDs

func ThrottledBrokerIDs(brokers []BrokerInfo) []int

ThrottledBrokerIDs returns a slice of the IDs of the subset of argument brokers that have throttles on them.

func ThrottledTopicNames

func ThrottledTopicNames(topics []TopicInfo) []string

ThrottledTopicNames returns the names of topics in the argument slice that have throttles on them.

Types

type AssignmentDiff

type AssignmentDiff struct {
	PartitionID int
	Old         PartitionAssignment
	New         PartitionAssignment
}

AssignmentDiff represents the diff in a single partition reassignment.

func AssignmentDiffs

func AssignmentDiffs(
	current []PartitionAssignment,
	desired []PartitionAssignment,
) []AssignmentDiff

AssignmentDiffs returns the diffs implied by the argument current and desired PartitionAssignments. Used for displaying diffs to user.

type BrokerAdminClient

type BrokerAdminClient struct {
	// contains filtered or unexported fields
}

BrokerAdminClient is a Client implementation that only uses broker APIs, without any zookeeper access.

func NewBrokerAdminClient

func NewBrokerAdminClient(
	ctx context.Context,
	config BrokerAdminClientConfig,
) (*BrokerAdminClient, error)

NewBrokerAdminClient constructs a new BrokerAdminClient instance.

func (*BrokerAdminClient) AcquireLock

func (c *BrokerAdminClient) AcquireLock(ctx context.Context, path string) (
	zk.Lock,
	error,
)

AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic. NOTE: Not implemented for broker-based clients.

func (*BrokerAdminClient) AddPartitions

func (c *BrokerAdminClient) AddPartitions(
	ctx context.Context,
	topic string,
	newAssignments []PartitionAssignment,
) error

AddPartitions extends a topic by adding one or more new partitions to it.

func (*BrokerAdminClient) AssignPartitions

func (c *BrokerAdminClient) AssignPartitions(
	ctx context.Context,
	topic string,
	assignments []PartitionAssignment,
) error

AssignPartitions sets the replica broker IDs for one or more partitions in a topic.

func (*BrokerAdminClient) Close

func (c *BrokerAdminClient) Close() error

Close closes the client.

func (*BrokerAdminClient) CreateTopic

func (c *BrokerAdminClient) CreateTopic(
	ctx context.Context,
	config kafka.TopicConfig,
) error

CreateTopic creates a topic in the cluster.

func (*BrokerAdminClient) GetBrokerIDs

func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)

GetBrokerIDs get the IDs of all brokers in the cluster.

func (*BrokerAdminClient) GetBrokers

func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) (
	[]BrokerInfo,
	error,
)

GetBrokers gets information about all brokers in the cluster.

func (*BrokerAdminClient) GetClusterID

func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error)

GetClusterID gets the ID of the cluster.

func (*BrokerAdminClient) GetConnector

func (c *BrokerAdminClient) GetConnector() *Connector

GetConnector gets the Connector instance for this cluster.

func (*BrokerAdminClient) GetSupportedFeatures

func (c *BrokerAdminClient) GetSupportedFeatures() SupportedFeatures

GetSupportedFeatures gets the features supported by the cluster for this client.

func (*BrokerAdminClient) GetTopic

func (c *BrokerAdminClient) GetTopic(
	ctx context.Context,
	name string,
	detailed bool,
) (TopicInfo, error)

GetTopic gets the details of a single topic in the cluster.

func (*BrokerAdminClient) GetTopicNames

func (c *BrokerAdminClient) GetTopicNames(ctx context.Context) ([]string, error)

GetTopicNames gets just the names of each topic in the cluster.

func (*BrokerAdminClient) GetTopics

func (c *BrokerAdminClient) GetTopics(
	ctx context.Context,
	names []string,
	detailed bool,
) ([]TopicInfo, error)

GetTopics gets full information about each topic in the cluster.

func (*BrokerAdminClient) LockHeld

func (c *BrokerAdminClient) LockHeld(ctx context.Context, path string) (bool, error)

LockHeld returns whether a lock is currently held for the given path. NOTE: Not implemented for broker-based clients.

func (*BrokerAdminClient) RunLeaderElection

func (c *BrokerAdminClient) RunLeaderElection(
	ctx context.Context,
	topic string,
	partitions []int,
) error

RunLeaderElection triggers a leader election for one or more partitions in a topic.

func (*BrokerAdminClient) UpdateBrokerConfig

func (c *BrokerAdminClient) UpdateBrokerConfig(
	ctx context.Context,
	id int,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateBrokerConfig updates the configuration for the argument broker. It returns the config keys that were updated.

func (*BrokerAdminClient) UpdateTopicConfig

func (c *BrokerAdminClient) UpdateTopicConfig(
	ctx context.Context,
	name string,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateTopicConfig updates the configuration for the argument topic. It returns the config keys that were updated.

type BrokerAdminClientConfig

type BrokerAdminClientConfig struct {
	ConnectorConfig
	ReadOnly          bool
	ExpectedClusterID string
}

BrokerAdminClientConfig contains the configuration settings to construct a BrokerAdminClient instance.

type BrokerInfo

type BrokerInfo struct {
	ID               int               `json:"id"`
	Endpoints        []string          `json:"endpoints"`
	Host             string            `json:"host"`
	Port             int32             `json:"port"`
	InstanceID       string            `json:"instanceID"`
	AvailabilityZone string            `json:"availabilityZone"`
	Rack             string            `json:"rack"`
	InstanceType     string            `json:"instanceType"`
	Version          int               `json:"version"`
	Timestamp        time.Time         `json:"timestamp"`
	Config           map[string]string `json:"config"`
}

BrokerInfo represents the information stored about a broker in zookeeper.

func (BrokerInfo) Addr

func (b BrokerInfo) Addr() string

Addr returns the address of the current BrokerInfo.

func (BrokerInfo) IsThrottled

func (b BrokerInfo) IsThrottled() bool

IsThrottled determines whether the broker has any throttles in its config.

type BrokerThrottle

type BrokerThrottle struct {
	Broker        int
	ThrottleBytes int64
}

BrokerThrottle represents a throttle being applied to a single broker.

func BrokerThrottles

func BrokerThrottles(
	leaderThrottles []PartitionThrottle,
	followerThrottles []PartitionThrottle,
	throttleBytes int64,
) []BrokerThrottle

BrokerThrottles returns a slice of BrokerThrottles that we should apply. It's currently just set from the union of the leader and follower brokers (matching the behavior of bin/kafka-reassign-partitions.sh).

func (BrokerThrottle) ConfigEntries

func (b BrokerThrottle) ConfigEntries() []kafka.ConfigEntry

ConfigEntries returns the kafka config entries associated with this broker throttle.

type Client

type Client interface {
	// GetClusterID gets the ID of the cluster.
	GetClusterID(ctx context.Context) (string, error)

	// GetBrokers gets information about all brokers in the cluster.
	GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)

	// GetBrokerIDs get the IDs of all brokers in the cluster.
	GetBrokerIDs(ctx context.Context) ([]int, error)

	// GetConnector gets the Connector instance for this cluster.
	GetConnector() *Connector

	// GetTopics gets full information about each topic in the cluster.
	GetTopics(
		ctx context.Context,
		names []string,
		detailed bool,
	) ([]TopicInfo, error)

	// GetTopicNames gets just the names of each topic in the cluster.
	GetTopicNames(ctx context.Context) ([]string, error)

	// GetTopic gets the details of a single topic in the cluster.
	GetTopic(
		ctx context.Context,
		name string,
		detailed bool,
	) (TopicInfo, error)

	// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
	// keys that were updated.
	UpdateTopicConfig(
		ctx context.Context,
		name string,
		configEntries []kafka.ConfigEntry,
		overwrite bool,
	) ([]string, error)

	// UpdateBrokerConfig updates the configuration for the argument broker. It returns the config
	// keys that were updated.
	UpdateBrokerConfig(
		ctx context.Context,
		id int,
		configEntries []kafka.ConfigEntry,
		overwrite bool,
	) ([]string, error)

	// CreateTopic creates a topic in the cluster.
	CreateTopic(
		ctx context.Context,
		config kafka.TopicConfig,
	) error

	// AssignPartitions sets the replica broker IDs for one or more partitions in a topic.
	AssignPartitions(
		ctx context.Context,
		topic string,
		assignments []PartitionAssignment,
	) error

	// AddPartitions extends a topic by adding one or more new partitions to it.
	AddPartitions(
		ctx context.Context,
		topic string,
		newAssignments []PartitionAssignment,
	) error

	// RunLeaderElection triggers a leader election for one or more partitions in a topic.
	RunLeaderElection(
		ctx context.Context,
		topic string,
		partitions []int,
	) error

	// AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic.
	AcquireLock(ctx context.Context, path string) (zk.Lock, error)

	// LockHeld returns whether a lock is currently held for the given path.
	LockHeld(ctx context.Context, path string) (bool, error)

	// GetSupportedFeatures gets the features supported by the cluster for this client.
	GetSupportedFeatures() SupportedFeatures

	// Close closes the client.
	Close() error
}

Client is an interface for interacting with a cluster for administrative tasks.

type Connector

type Connector struct {
	Config      ConnectorConfig
	Dialer      *kafka.Dialer
	KafkaClient *kafka.Client
}

Connector is a wrapper around the low-level, kafka-go dialer and client.

func NewConnector

func NewConnector(config ConnectorConfig) (*Connector, error)

NewConnector contructs a new Connector instance given the argument config.

type ConnectorConfig

type ConnectorConfig struct {
	BrokerAddr string
	TLS        TLSConfig
	SASL       SASLConfig
}

ConnectorConfig contains the configuration used to contruct a connector.

type PartitionAssignment

type PartitionAssignment struct {
	ID       int   `json:"id"`
	Replicas []int `json:"replicas"`
}

PartitionAssignment contains the actual or desired assignment of replicas in a topic partition.

func AssignmentsToUpdate

func AssignmentsToUpdate(
	current []PartitionAssignment,
	desired []PartitionAssignment,
) []PartitionAssignment

AssignmentsToUpdate returns the subset of assignments that need to be updated given the current and desired states.

func CopyAssignments

func CopyAssignments(
	curr []PartitionAssignment,
) []PartitionAssignment

CopyAssignments returns a deep copy of the argument PartitionAssignment slice.

func ReplicasToAssignments

func ReplicasToAssignments(
	replicaSlices [][]int,
) []PartitionAssignment

ReplicasToAssignments converts a slice of slices to a slice of PartitionAssignments, assuming that the argument slices are in partition order. Used for unit tests.

func (PartitionAssignment) Copy

Copy returns a deep copy of this PartitionAssignment.

func (PartitionAssignment) DistinctRacks

func (a PartitionAssignment) DistinctRacks(
	brokerRacks map[int]string,
) map[string]struct{}

DistinctRacks returns a map of the distinct racks in this PartitionAssignment.

func (PartitionAssignment) Index

func (a PartitionAssignment) Index(replica int) int

Index returns the index of the argument replica, or -1 if it can't be found.

type PartitionInfo

type PartitionInfo struct {
	Topic           string `json:"topic"`
	ID              int    `json:"ID"`
	Leader          int    `json:"leader"`
	Version         int    `json:"version"`
	Replicas        []int  `json:"replicas"`
	ISR             []int  `json:"isr"`
	ControllerEpoch int    `json:"controllerEpoch"`
	LeaderEpoch     int    `json:"leaderEpoch"`
}

PartitionInfo represents the information stored about a topic partition in zookeeper.

func (PartitionInfo) NumRacks

func (p PartitionInfo) NumRacks(brokerRacks map[int]string) (int, error)

NumRacks returns the number of distinct racks in the partition.

func (PartitionInfo) Racks

func (p PartitionInfo) Racks(brokerRacks map[int]string) ([]string, error)

Racks returns a slice of all racks for the partition replicas.

type PartitionThrottle

type PartitionThrottle struct {
	Partition int
	Broker    int
}

PartitionThrottle represents a throttle being applied to a single partition, broker combination.

func FollowerPartitionThrottles

func FollowerPartitionThrottles(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
) []PartitionThrottle

FollowerPartitionThrottles returns a slice of PartitionThrottles that we should apply on the follower side.

See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.

func LeaderPartitionThrottles

func LeaderPartitionThrottles(
	curr []PartitionAssignment,
	desired []PartitionAssignment,
) []PartitionThrottle

LeaderPartitionThrottles returns a slice of PartitionThrottles that we should apply on the leader side.

See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.

func ParsePartitionThrottleStr

func ParsePartitionThrottleStr(valuesStr string) ([]PartitionThrottle, error)

ParsePartitionThrottleStr converts a throttle config string from zk into a slice of PartitionThrottle structs.

func (PartitionThrottle) String

func (p PartitionThrottle) String() string

type SASLConfig

type SASLConfig struct {
	Enabled   bool
	Mechanism SASLMechanism
	Username  string
	Password  string
}

SASLConfig stores the SASL-related configuration for a connection.

type SASLMechanism

type SASLMechanism string

SASLMechanism is the name of a SASL mechanism that will be used for client authentication.

const (
	SASLMechanismAWSMSKIAM   SASLMechanism = "aws-msk-iam"
	SASLMechanismPlain       SASLMechanism = "plain"
	SASLMechanismScramSHA256 SASLMechanism = "scram-sha-256"
	SASLMechanismScramSHA512 SASLMechanism = "scram-sha-512"
)

func SASLNameToMechanism

func SASLNameToMechanism(name string) (SASLMechanism, error)

SASLNameToMechanism converts the argument SASL mechanism name string to a valid instance of the SASLMechanism enum.

type SupportedFeatures

type SupportedFeatures struct {
	// Reads indicates whether the client supports reading basic cluster information
	// (metadata, configs, etc.).
	Reads bool

	// Applies indicates whether the client supports the functionality required for applying
	// (e.g., changing configs, electing leaders, etc.).
	Applies bool

	// Locks indicates whether the client supports locking.
	Locks bool

	// DynamicBrokerConfigs indicates whether the client can return dynamic broker configs
	// like leader.replication.throttled.rate.
	DynamicBrokerConfigs bool
}

SupportedFeatures provides a summary of what an admin client supports.

type TLSConfig

type TLSConfig struct {
	Enabled    bool
	CertPath   string
	KeyPath    string
	CACertPath string
	ServerName string
	SkipVerify bool
}

TLSConfig stores the TLS-related configuration for a connection.

type TopicInfo

type TopicInfo struct {
	Name       string            `json:"name"`
	Config     map[string]string `json:"config"`
	Partitions []PartitionInfo   `json:"partitions"`
	Version    int               `json:"version"`
}

TopicInfo represents the information stored about a topic in zookeeper.

func (TopicInfo) AllLeadersCorrect

func (t TopicInfo) AllLeadersCorrect() bool

AllLeadersCorrect returns whether leader == replicas[0] for all partitions.

func (TopicInfo) AllReplicasInSync

func (t TopicInfo) AllReplicasInSync() bool

AllReplicasInSync returns whether all partitions have ISR == replicas (ignoring order).

func (TopicInfo) IsThrottled

func (t TopicInfo) IsThrottled() bool

IsThrottled determines whether the topic has any throttles in its config.

func (TopicInfo) MaxISR

func (t TopicInfo) MaxISR() int

MaxISR returns the maximum number of in-sync replicas across all partitions in a topic.

func (TopicInfo) MaxReplication

func (t TopicInfo) MaxReplication() int

MaxReplication returns the maximum number of replicas across all partitions in a topic.

func (TopicInfo) OutOfSyncPartitions

func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo

OutOfSyncPartitions returns the partitions for which ISR != replicas (ignoring order).

func (TopicInfo) PartitionIDs

func (t TopicInfo) PartitionIDs() []int

PartitionIDs returns an ordered slice of partition IDs for a topic.

func (TopicInfo) RackCounts

func (t TopicInfo) RackCounts(brokerRacks map[int]string) (int, int, error)

RackCounts returns the minimum and maximum distinct rack counts across all partitions in a topic.

func (TopicInfo) Retention

func (t TopicInfo) Retention() time.Duration

Retention returns the retention duration implied by a topic config. If unset, it returns 0.

func (TopicInfo) ToAssignments

func (t TopicInfo) ToAssignments() []PartitionAssignment

ToAssignments converts a topic to a slice of partition assignments.

func (TopicInfo) WrongLeaderPartitions

func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo

WrongLeaderPartitions returns the partitions where leader != replicas[0].

type ZKAdminClient

type ZKAdminClient struct {
	Connector *Connector
	// contains filtered or unexported fields
}

ZKAdminClient is a general client for interacting with a kafka cluster that assumes zookeeper access. Most interactions are done via the latter, but a few (e.g., creating topics or getting the controller address) are done via the broker API instead.

func NewZKAdminClient

func NewZKAdminClient(
	ctx context.Context,
	config ZKAdminClientConfig,
) (*ZKAdminClient, error)

NewZKAdminClient creates and returns a new Client instance.

func (*ZKAdminClient) AcquireLock

func (c *ZKAdminClient) AcquireLock(
	ctx context.Context,
	path string,
) (zk.Lock, error)

AcquireLock acquires and returns a lock from the underlying zookeeper client. The Unlock method should be called on the lock when it's safe to release.

func (*ZKAdminClient) AddPartitions

func (c *ZKAdminClient) AddPartitions(
	ctx context.Context,
	topic string,
	newAssignments []PartitionAssignment,
) error

AddPartitions adds one or more partitions to an existing topic. Unlike AssignPartitions, this directly updates the topic's partition config in zookeeper.

func (*ZKAdminClient) AssignPartitions

func (c *ZKAdminClient) AssignPartitions(
	ctx context.Context,
	topic string,
	assignments []PartitionAssignment,
) error

AssignPartitions notifies the cluster to begin a partition reassignment. This should only be used for existing partitions; to create new partitions, use the AddPartitions method.

func (*ZKAdminClient) Close

func (c *ZKAdminClient) Close() error

Close closes the connections in the underlying zookeeper client.

func (*ZKAdminClient) CreateTopic

func (c *ZKAdminClient) CreateTopic(
	ctx context.Context,
	config kafka.TopicConfig,
) error

CreateTopic creates a new topic with the argument config. It uses the topic creation API exposed on the controller broker.

func (*ZKAdminClient) GetBrokerIDs

func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)

GetBrokerIDs returns a slice of all broker IDs.

func (*ZKAdminClient) GetBrokers

func (c *ZKAdminClient) GetBrokers(
	ctx context.Context,
	ids []int,
) ([]BrokerInfo, error)

GetBrokers gets information on one or more cluster brokers from zookeeper. If the argument ids is unset, then it fetches all brokers.

func (*ZKAdminClient) GetClusterID

func (c *ZKAdminClient) GetClusterID(
	ctx context.Context,
) (string, error)

GetClusterID gets the cluster ID from zookeeper. This ID is generated when the cluster is created and should be stable over the life of the cluster.

func (*ZKAdminClient) GetConnector

func (c *ZKAdminClient) GetConnector() *Connector

GetConnector returns the Connector instance associated with this client.

func (*ZKAdminClient) GetSupportedFeatures

func (c *ZKAdminClient) GetSupportedFeatures() SupportedFeatures

GetSupportedFeatures returns the features that are supported by this client.

func (*ZKAdminClient) GetTopic

func (c *ZKAdminClient) GetTopic(
	ctx context.Context,
	name string,
	detailed bool,
) (TopicInfo, error)

GetTopic is a wrapper around GetTopics(...) for getting information about a single topic.

func (*ZKAdminClient) GetTopicNames

func (c *ZKAdminClient) GetTopicNames(ctx context.Context) ([]string, error)

GetTopicNames gets all topic names from zookeeper.

func (*ZKAdminClient) GetTopics

func (c *ZKAdminClient) GetTopics(
	ctx context.Context,
	names []string,
	detailed bool,
) ([]TopicInfo, error)

GetTopics gets information about one or more cluster topics from zookeeper. If the argument names is unset, then it fetches all topics. The detailed parameter determines whether the ISRs and leaders are fetched for each partition.

func (*ZKAdminClient) LockHeld

func (c *ZKAdminClient) LockHeld(
	ctx context.Context,
	path string,
) (bool, error)

LockHeld determines whether the lock with the provided path is held (i.e., has children).

func (*ZKAdminClient) RunLeaderElection

func (c *ZKAdminClient) RunLeaderElection(
	ctx context.Context,
	topic string,
	partitions []int,
) error

RunLeaderElection triggers a leader election for the argument topic and partitions.

func (*ZKAdminClient) UpdateBrokerConfig

func (c *ZKAdminClient) UpdateBrokerConfig(
	ctx context.Context,
	id int,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateBrokerConfig updates the config JSON for a cluster broker and sets a change notification so the cluster brokers are notified. If overwrite is true, then it will overwrite existing config entries.

The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries

func (*ZKAdminClient) UpdateTopicConfig

func (c *ZKAdminClient) UpdateTopicConfig(
	ctx context.Context,
	name string,
	configEntries []kafka.ConfigEntry,
	overwrite bool,
) ([]string, error)

UpdateTopicConfig updates the config JSON for a topic and sets a change notification so that the brokers are notified. If overwrite is true, then it will overwrite existing config entries.

The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries that were already set.

type ZKAdminClientConfig

type ZKAdminClientConfig struct {
	ZKAddrs           []string
	ZKPrefix          string
	BootstrapAddrs    []string
	ExpectedClusterID string
	Sess              *session.Session
	ReadOnly          bool
}

ZKAdminClientConfig contains all of the parameters necessary to create a kafka admin client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL