kafka

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2021 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SASLMechanismPlain       = "PLAIN"
	SASLMechanismScramSHA256 = "SCRAM-SHA-256"
	SASLMechanismScramSHA512 = "SCRAM-SHA-512"
	SASLMechanismGSSAPI      = "GSSAPI"
	SASLMechanismOAuthBearer = "OAUTHBEARER"
)
View Source
const (
	TimestampLatest   = -1
	TimestampEarliest = -2
)

Variables

This section is empty.

Functions

func NewKgoConfig

func NewKgoConfig(cfg *Config, logger *zap.Logger) ([]kgo.Opt, error)

NewKgoConfig creates a new Config for the Kafka Client as exposed by the franz-go library. If TLS certificates can't be read an error will be returned.

func SerializeJson

func SerializeJson(data interface{}) ([]byte, error)

Types

type Config

type Config struct {
	// General
	Brokers  []string `yaml:"brokers"`
	ClientID string   `yaml:"clientId"`

	TLS                    TLSConfig  `yaml:"tls"`
	SASL                   SASLConfig `yaml:"sasl"`
	TopicReplicationFactor int16      `yaml:"topicReplicationFactor"`
}

func (*Config) RegisterFlags

func (c *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags for all sensitive Kafka SASL configs.

func (*Config) SetDefaults

func (c *Config) SetDefaults()

SetDefaults for Kafka config

func (*Config) Validate

func (c *Config) Validate() error

type KgoZapLogger

type KgoZapLogger struct {
	// contains filtered or unexported fields
}

func (KgoZapLogger) Level

func (k KgoZapLogger) Level() kgo.LogLevel

Level Implements kgo.Logger interface. It returns the log level to log at. We pin this to debug as the zap logger decides what to actually send to the output stream.

func (KgoZapLogger) Log

func (k KgoZapLogger) Log(level kgo.LogLevel, msg string, keyvals ...interface{})

Log implements kgo.Logger interface

type PartitionMarks

type PartitionMarks struct {
	PartitionID int32
	Low         int64
	High        int64
}

PartitionMarks is a partitionID along with it's highest and lowest message index

type SASLConfig

type SASLConfig struct {
	Enabled      bool             `yaml:"enabled"`
	Username     string           `yaml:"username"`
	Password     string           `yaml:"password"`
	Mechanism    string           `yaml:"mechanism"`
	GSSAPIConfig SASLGSSAPIConfig `yaml:"gssapi"`
}

SASLConfig for Kafka client

func (*SASLConfig) RegisterFlags

func (c *SASLConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags for all sensitive Kafka SASL configs.

func (*SASLConfig) SetDefaults

func (c *SASLConfig) SetDefaults()

SetDefaults for SASL Config

func (*SASLConfig) Validate

func (c *SASLConfig) Validate() error

Validate SASL config input

type SASLGSSAPIConfig

type SASLGSSAPIConfig struct {
	AuthType           string `yaml:"authType"`
	KeyTabPath         string `yaml:"keyTabPath"`
	KerberosConfigPath string `yaml:"kerberosConfigPath"`
	ServiceName        string `yaml:"serviceName"`
	Username           string `yaml:"username"`
	Password           string `yaml:"password"`
	Realm              string `yaml:"realm"`
}

SASLGSSAPIConfig represents the Kafka Kerberos config

func (*SASLGSSAPIConfig) RegisterFlags

func (c *SASLGSSAPIConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers all sensitive Kerberos settings as flag

type Service

type Service struct {
	Config      Config
	Logger      *zap.Logger
	KafkaClient *kgo.Client
}

Service acts as interface to interact with the Kafka Cluster

func NewService

func NewService(cfg Config, logger *zap.Logger) (*Service, error)

NewService creates a new Kafka service and immediately checks connectivity to all components. If any of these external dependencies fail an error wil be returned.

func (*Service) GetAverageMessageSize

func (s *Service) GetAverageMessageSize(ctx context.Context, topicName string) (int64, error)

GetAverageMessageSize returns the average message size in a given topic in bytes.

func (*Service) GetMetadata

func (s *Service) GetMetadata(ctx context.Context) (*kmsg.MetadataResponse, error)

func (*Service) GetPartitionMarks

func (s *Service) GetPartitionMarks(ctx context.Context, topic string, partitionIDs []int32) (map[int32]PartitionMarks, error)

GetPartitionMarks returns a map of: partitionID -> PartitionMarks

func (*Service) GetTopicMessageCount

func (s *Service) GetTopicMessageCount(ctx context.Context, topicName string) (int64, error)

TopicMessage count tries to return the number of kafka messages in that topic. Depending on the configuration this might be as simple as returning the delta between low and high watermark (delete cleanup policy), but is more complex for compacted topics. For compacted topics the number of messages will be estimated by dividing the log dir size with the average message size.

func (*Service) GetTopicMetadata

func (s *Service) GetTopicMetadata(ctx context.Context, topicName string) (*kmsg.MetadataResponseTopic, error)

func (*Service) GetTopicSize

func (s *Service) GetTopicSize(ctx context.Context, topicName string, partitionIDs []int32) (TopicLogDirSize, error)

GetTopicSize returns the topic's log dir size in bytes.

func (*Service) ListOffsets

func (s *Service) ListOffsets(ctx context.Context, topicPartitions map[string][]int32, timestamp int64) (TopicPartitionOffsets, error)

ListOffsets returns a nested map of: topic -> partitionID -> high water mark offset of all available partitions

func (*Service) NewKafkaClient

func (s *Service) NewKafkaClient() (*kgo.Client, error)

type TLSConfig

type TLSConfig struct {
	Enabled               bool   `yaml:"enabled"`
	CaFilepath            string `yaml:"caFilepath"`
	CertFilepath          string `yaml:"certFilepath"`
	KeyFilepath           string `yaml:"keyFilepath"`
	Passphrase            string `yaml:"passphrase"`
	InsecureSkipTLSVerify bool   `yaml:"insecureSkipTlsVerify"`
}

TLSConfig to connect to Kafka via TLS

func (*TLSConfig) RegisterFlags

func (c *TLSConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags for all sensitive Kafka TLS configs

type TopicLogDirSize

type TopicLogDirSize struct {
	// TotalSize is the sum of primary log dir size and replica log dir size.
	TotalSize int64

	// LeaderLogDirSize describes the total size of all leaders' replica log dirs.
	LeaderLogDirSize int64

	// ReplicaLogDirSize describes the total size of all replica log dirs.
	ReplicaLogDirSize int64
}

type TopicPartitionOffsets

type TopicPartitionOffsets = map[string]map[int32]int64

TopicPartitionOffset is a map of Topicnames -> PartitionIDs -> Offset

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL