kafkaadmin

package
v4.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

GoDoc

Documentation

Overview

Package kafkaadmin provides Kafka administrative functionality.

Index

Constants

This section is empty.

Variables

View Source
var (

	// SecurityProtocolSet is the set of protocols supported to communicate with brokers
	SecurityProtocolSet = map[string]struct{}{"PLAINTEXT": empty, "SSL": empty, "SASL_PLAINTEXT": empty, "SASL_SSL": empty}
	// SASLMechanismSet is the set of mechanisms supported for client to broker authentication
	SASLMechanismSet = map[string]struct{}{"PLAIN": empty, "SCRAM-SHA-256": empty, "SCRAM-SHA-512": empty}
)
View Source
var (
	// ErrNoData is a generic error for no data available to be returned.
	ErrNoData = fmt.Errorf("no data returned")
)

Functions

func NewConsumer

func NewConsumer(cfg Config) (*kafka.Consumer, error)

Types

type BrokerState added in v4.1.0

type BrokerState struct {
	// Key metadata from the Kafka cluster state.
	Host                       string
	Port                       int
	Rack                       string // broker.rack
	LogMessageFormat           string // log.message.format.version
	InterBrokerProtocolVersion string // inter.broker.protocol.version
	// All metadata.
	FullData map[string]string
}

BrokerMeta holds metadata that describes a broker.

type BrokerStates added in v4.1.0

type BrokerStates map[int]BrokerState

BrokerStates is a map of broker IDs to BrokerState.

func NewBrokerStates added in v4.1.0

func NewBrokerStates() BrokerStates

NewBrokerStates returns a BrokerStates.

type BrokerThrottleConfig

type BrokerThrottleConfig struct {
	InboundLimitBytes  int
	OutboundLimitBytes int
}

BrokerThrottleConfig defines an inbound and outbound throttle rate in bytes to be applied to a broker.

type Client

type Client struct {
	DefaultTimeoutMs int
	// contains filtered or unexported fields
}

Client implements a KafkaAdmin.

func NewClientWithFactory

func NewClientWithFactory(cfg Config, factory FactoryFunc) (*Client, error)

NewClientWithFactory returns a new admin Client using a factory func for the kafkaAdminClient

func (Client) Close

func (c Client) Close()

Close closes the Client.

func (Client) CreateTopic

func (c Client) CreateTopic(ctx context.Context, cfg CreateTopicConfig) error

CreateTopic creates a topic.

func (Client) DeleteTopic

func (c Client) DeleteTopic(ctx context.Context, name string) error

DeleteTopic deletes a topic.

func (Client) DescribeBrokers added in v4.1.0

func (c Client) DescribeBrokers(ctx context.Context, fullData bool) (BrokerStates, error)

DescribeBrokers returns a BrokerStates for all live brokers. By default, key metadata is populated for each broker's BrokerState entry. If the fullData bool is set to True, complete metadata will be included in the BrokerState.FullData field. This includes all broker configs found in the cluster state including dynamic configs.

func (Client) DescribeTopics

func (c Client) DescribeTopics(ctx context.Context, topics []string) (TopicStates, error)

DescribeTopics takes a []string of topic names. Topic names can be name literals or optional regex. A TopicStates is returned for all matching topics.

func (Client) GetConfigs added in v4.1.0

func (c Client) GetConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)

GetConfigs takes a kafka resource type (ie topic, broker) and list of names and returns a ResourceConfigs for all configurations discovered for each resource by name. Nil configs are excluded.

func (Client) GetDynamicConfigs

func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)

GetDynamicConfigs takes a kafka resource type (ie topic, broker) and list of names and returns a ResourceConfigs for all dynamic configurations discovered for each resource by name.

func (Client) ListBrokers added in v4.1.0

func (c Client) ListBrokers(ctx context.Context) ([]int, error)

ListBrokers returns a []int of all live broker IDs.

func (Client) RemoveThrottle

func (c Client) RemoveThrottle(ctx context.Context, cfg RemoveThrottleConfig) error

RemoveThrottle takes a RemoveThrottleConfig that includes an optionally specified list of brokers and topics to remove all throttle configurations from.

func (Client) SetThrottle

func (c Client) SetThrottle(ctx context.Context, cfg SetThrottleConfig) error

SetThrottle takes a SetThrottleConfig and sets the underlying throttle configs accordingly. A throttle is a combination of topic throttled replicas configs and broker inbound/outbound throttle configs.

func (Client) UnderReplicatedTopics added in v4.1.0

func (c Client) UnderReplicatedTopics(ctx context.Context) (TopicStates, error)

UnderReplicatedTopics returns a TopicStates that only includes under-replicated topics.

type Config

type Config struct {
	// Required.
	BootstrapServers string
	// Misc.
	DefaultTimeoutMs int
	GroupId          string
	SSLCALocation    string
	SecurityProtocol string
	SASLMechanism    string
	SASLUsername     string
	SASLPassword     string
}

Config holds Client configuration parameters.

type CreateTopicConfig

type CreateTopicConfig struct {
	Name              string
	Partitions        int
	ReplicationFactor int
	Config            map[string]string
	ReplicaAssignment ReplicaAssignment
}

CreateTopicConfig holds CreateTopic parameters.

type ErrRemoveThrottle

type ErrRemoveThrottle struct{ Message string }

ErrRemoveThrottle is a generic error for RemoveThrottle.

func (ErrRemoveThrottle) Error

func (e ErrRemoveThrottle) Error() string

type ErrSetThrottle

type ErrSetThrottle struct{ Message string }

ErrSetThrottle is a generic error for SetThrottle.

func (ErrSetThrottle) Error

func (e ErrSetThrottle) Error() string

type ErrorFetchingMetadata

type ErrorFetchingMetadata struct{ Message string }

ErrorFetchingMetadata is an error encountered fetching Kafka cluster metadata.

func (ErrorFetchingMetadata) Error

func (e ErrorFetchingMetadata) Error() string

type FactoryFunc

type FactoryFunc func(conf *kafka.ConfigMap) (*kafka.AdminClient, error)

type KafkaAdmin

type KafkaAdmin interface {
	Close()
	// Topics.
	CreateTopic(context.Context, CreateTopicConfig) error
	DeleteTopic(context.Context, string) error
	DescribeTopics(context.Context, []string) (TopicStates, error)
	UnderReplicatedTopics(context.Context) (TopicStates, error)
	// Brokers.
	ListBrokers(context.Context) ([]int, error)
	DescribeBrokers(context.Context, bool) (BrokerStates, error)
	// Cluster.
	SetThrottle(context.Context, SetThrottleConfig) error
	RemoveThrottle(context.Context, RemoveThrottleConfig) error
	GetConfigs(context.Context, string, []string) (ResourceConfigs, error)
	GetDynamicConfigs(context.Context, string, []string) (ResourceConfigs, error)
}

KafkaAdmin interface.

func NewClient

func NewClient(cfg Config) (KafkaAdmin, error)

NewClient returns a KafkaAdmin.

type PartitionState

type PartitionState struct {
	ID       int32
	Leader   int32
	Replicas []int32
	ISR      []int32
}

PartitionState describes the state of a partition.

type RemoveThrottleConfig

type RemoveThrottleConfig struct {
	Topics  []string
	Brokers []int
}

RemoveThrottleConfig holds lists of all topics and brokers to remove throttles from.

type ReplicaAssignment

type ReplicaAssignment [][]int32

ReplicaAssignment is a [][]int32 of partition assignments. The outer slice index maps to the partition ID (ie index position 3 describes partition 3 for the reference topic), the inner slice is an []int32 of broker assignments.

type ResourceConfigs

type ResourceConfigs map[string]map[string]string

ResourceConfigs is a map of resource name to a map of configuration name and configuration value Example: map["my_topic"]map["retention.ms"] = "4000000"

func (ResourceConfigs) AddConfig

func (rc ResourceConfigs) AddConfig(name, key, value string) error

AddConfig takes a resource name and populates the config key to the specified value.

func (ResourceConfigs) AddConfigEntry

func (rc ResourceConfigs) AddConfigEntry(name string, config kafka.ConfigEntryResult) error

AddConfigEntry takes a resource name (ie a broker ID or topic name) and a kafka.ConfigEntryResult. It populates ResourceConfigs with the provided ConfigEntryResult for the respective resource by name.

type SetThrottleConfig

type SetThrottleConfig struct {
	// Topics is a list of all topics that require throttled replica configs.
	Topics []string
	// Brokers is a mapping of broker ID to BrokerThrottleConfig.
	Brokers map[int]BrokerThrottleConfig
}

SetThrottleConfig holds SetThrottle configs.

type TopicState

type TopicState struct {
	Name              string
	Partitions        int32
	ReplicationFactor int32
	PartitionStates   map[int]PartitionState
}

TopicState describes the current state of a topic.

func NewTopicState

func NewTopicState(name string) TopicState

NewTopicState initializes a TopicState.

func (TopicState) Brokers added in v4.1.0

func (t TopicState) Brokers() []int

Brokers returns a list of all brokers assigned to any partition in the TopicState.

type TopicStates

type TopicStates map[string]TopicState

TopicStates is a map of topic names to TopicState.

func NewTopicStates

func NewTopicStates() TopicStates

NewTopicStates initializes a TopicStates.

func TopicStatesFromMetadata

func TopicStatesFromMetadata(md *kafka.Metadata) (TopicStates, error)

func (TopicStates) List added in v4.1.0

func (t TopicStates) List() []string

List returns a []string of all topic names in the TopicStates.

func (TopicStates) UnderReplicated added in v4.1.0

func (ts TopicStates) UnderReplicated() TopicStates

UnderReplicated returns a TopicStates and only includes under-replicated topics.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL