kafka

package
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncProducer added in v0.29.0

type AsyncProducer struct {
	sarama.AsyncProducer
	// contains filtered or unexported fields
}

AsyncProducer is a drop-in replacement for the sarama AsyncProducer that adds Prometheus metrics on its performance.

func NewAsyncProducerFromClient added in v0.29.0

func NewAsyncProducerFromClient(client sarama.Client, metrics ProducerMetrics) (AsyncProducer, error)

NewAsyncProducerFromClient creates a new AsyncProducer from a sarama Client that pushes metrics to the provided ProducerMetrics. Note that metrics are only collected if the producer is configured to return successes and errors.

func (AsyncProducer) AsyncClose added in v0.29.0

func (ap AsyncProducer) AsyncClose()

AsyncClose triggers a shutdown of the producer. The producer will be shutdown when the input, errors, and successes channels are closed.

func (AsyncProducer) Close added in v0.29.0

func (ap AsyncProducer) Close() error

Close synchronously shuts down the producer and waits for any buffered messages to be flushed before returning.

func (AsyncProducer) Errors added in v0.29.0

func (ap AsyncProducer) Errors() <-chan *sarama.ProducerError

Errors returns the output channel where errored messages will be returned if ProducerReturnErrors was true when configuring the client.

func (AsyncProducer) Successes added in v0.29.0

func (ap AsyncProducer) Successes() <-chan *sarama.ProducerMessage

Successes returns the output channel where successfully written messages will be returned if ProducerReturnSuccesses was true when configuring the client.

type Config

type Config struct {
	sarama.Config
	// Prometheus registerer for metrics
	Registerer   prometheus.Registerer
	TLSCaCrtPath string
	TLSCrtPath   string
	TLSKeyPath   string
	// version to be parsed and loaded into sarama.Config.KafkaVersion
	KafkaVersion string
	// value to be parsed to sarama.CompressionCodec and loaded into sarama.Config.Producer.CompressionCoded
	ProducerCompressionCodec string
	BrokerAddrs              []string
	// Frequency with which to collect metrics
	MetricsFrequency time.Duration
	// value to be cast to sarama.RequiredAcks and loaded into sarama.Config.Producer.RequiredAcks
	ProducerRequiredAcks int16
	Verbose              bool
}

Config contains all configuration for Kafka message consumption and production

func NewDefaultConfig added in v0.29.0

func NewDefaultConfig() Config

NewDefaultConfig creates a new default Kafka configuration

func (*Config) NewClient

func (c *Config) NewClient(ctx context.Context) (sarama.Client, error)

NewClient creates a new Sarama Client from the tools configuration. Using this version of NewClient enables setting of Sarama configuration from the CLI and environment variables. In addition, this method has the side effect of running a periodic task to collect prometheus from the Sarama internal metrics registry. Calling the cancel function associated with the provided context stops the periodic metrics collection. Note that this method overrides the Producer.Return.Successes, Producer.Return.Errors, and Consumer.Return.Errors and sets them all to true since those options are required for metrics collection and tracing. Code that uses the client generated by this method must handle those cases appropriately.

func (*Config) RegisterAdminFlags added in v0.29.0

func (c *Config) RegisterAdminFlags(flags *pflag.FlagSet)

RegisterAdminFlags registers options for the Kafka admin API.

func (*Config) RegisterBaseFlags added in v0.29.0

func (c *Config) RegisterBaseFlags(flags *pflag.FlagSet)

RegisterBaseFlags registers basic Kafka configuration. If using Kafka, these flags should always be registered.

func (*Config) RegisterConsumerFlags added in v0.29.0

func (c *Config) RegisterConsumerFlags(flags *pflag.FlagSet)

RegisterConsumerFlags registers configuration options for Kafka consumers.

func (*Config) RegisterConsumerGroupFlags added in v0.29.0

func (c *Config) RegisterConsumerGroupFlags(flags *pflag.FlagSet)

RegisterConsumerGroupFlags registers options for Kafka consumer group configuration.

func (*Config) RegisterMetadataFlags added in v0.29.0

func (c *Config) RegisterMetadataFlags(flags *pflag.FlagSet)

RegisterMetadataFlags registers configuration for fetching metadata from the Kafka broker.

func (*Config) RegisterNetFlags added in v0.29.0

func (c *Config) RegisterNetFlags(flags *pflag.FlagSet)

RegisterNetFlags registers configuration for connection to the Kafka broker including TLS configuration.

func (*Config) RegisterProducerFlags added in v0.29.0

func (c *Config) RegisterProducerFlags(flags *pflag.FlagSet)

RegisterProducerFlags registers configuration options for Kafka producers.

type ConnectAvroUnmarshaller added in v0.29.0

type ConnectAvroUnmarshaller struct {
	SchemaRegistryClient
}

ConnectAvroUnmarshaller is a helper for unmarshalling Kafka Avro messages, taking into account the quirks of the Kafka Connect producer's format

func (ConnectAvroUnmarshaller) Unmarshal added in v0.29.0

func (u ConnectAvroUnmarshaller) Unmarshal(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) []error

Unmarshal takes the contents of the ConsumerMessage and unmarshals it into the target using avro decoding, returning any and all errors that occur during unmarshalling

type ConnectJSONUnmarshaller added in v0.29.0

type ConnectJSONUnmarshaller struct{}

ConnectJSONUnmarshaller is a helper for unmarshalling Kafka JSON messages, taking into account the quirks of the Kafka Connect producer's format

func (ConnectJSONUnmarshaller) Unmarshal added in v0.29.0

func (u ConnectJSONUnmarshaller) Unmarshal(_ context.Context, msg *sarama.ConsumerMessage, target interface{}) []error

Unmarshal takes the contents of the ConsumerMessage and unmarshals it into the target using JSON decoding, returning any and all errors that occur during unmarshalling

type Consumer

type Consumer struct {
	sarama.Consumer
	// contains filtered or unexported fields
}

Consumer is a drop-in replacement for the sarama consumer that adds Prometheus metrics on the number of messages read and errors received. This consumer implementation creates the drop-in PartitionConsumer from this package.

func NewConsumerFromClient added in v0.29.0

func NewConsumerFromClient(client sarama.Client, metrics ConsumerMetrics) (Consumer, error)

NewConsumerFromClient creates a new Consumer from a sarama Client with the given set of metrics. Note that error metrics can only be collected if the consumer is configured to return errors.

func (Consumer) ConsumePartition added in v0.29.0

func (c Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition creates a wrapped PartitionConsumer on the given topic and partition starting at the given offset.

type ConsumerMetrics added in v0.10.0

type ConsumerMetrics struct {
	// contains filtered or unexported fields
}

ConsumerMetrics is a collection of Prometheus metrics for tracking a Kafka consumer's performance

func NewConsumerMetrics added in v0.29.0

func NewConsumerMetrics(registerer prometheus.Registerer) (ConsumerMetrics, error)

NewConsumerMetrics creates and registers metrics for the Kafka Consumer with the provided prometheus registerer

type MessageUnmarshaller added in v0.29.0

type MessageUnmarshaller interface {
	// Unmarshal takes the contents of the ConsumerMessage and unmarshals it into the target, returning
	// any and all errors that occur during unmarshalling
	Unmarshal(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) []error
}

MessageUnmarshaller are helpers for unmarshalling Kafka messages into Go types

type MockSchemaRegistryClient added in v0.55.1

type MockSchemaRegistryClient struct {
	mock.Mock
}

MockSchemaRegistryClient defines an interface for mocking all interfaces that are satisfied by schema registry client.

func (*MockSchemaRegistryClient) CheckSchema added in v0.55.1

func (mc *MockSchemaRegistryClient) CheckSchema(ctx context.Context, subject string, schema string, isKey bool) (*SchemaResponse, error)

CheckSchema implements the SchemaRegistryConsumer interface.

func (*MockSchemaRegistryClient) CreateSchema added in v0.55.1

func (mc *MockSchemaRegistryClient) CreateSchema(ctx context.Context, subject string, schema string, isKey bool) (*SchemaResponse, error)

CreateSchema implements the SchemaRegistryProducer interface.

func (*MockSchemaRegistryClient) DecodeKafkaAvroMessage added in v0.55.1

func (mc *MockSchemaRegistryClient) DecodeKafkaAvroMessage(ctx context.Context, message *sarama.ConsumerMessage) (interface{}, error)

DecodeKafkaAvroMessage implements the SchemaRegistryConsumer interface.

func (*MockSchemaRegistryClient) EncodeKafkaAvroMessage added in v0.55.1

func (mc *MockSchemaRegistryClient) EncodeKafkaAvroMessage(ctx context.Context, schemaID uint, message interface{}) ([]byte, error)

EncodeKafkaAvroMessage implements the SchemaRegistryProducer interface.

func (*MockSchemaRegistryClient) GetSchema added in v0.55.1

func (mc *MockSchemaRegistryClient) GetSchema(ctx context.Context, id uint) (string, error)

GetSchema implements the SchemaRegistryConsumer interface.

type PartitionConsumer added in v0.29.0

type PartitionConsumer struct {
	sarama.PartitionConsumer
	// contains filtered or unexported fields
}

PartitionConsumer is a drop-in replacement for the sarama partition consumer.

func (PartitionConsumer) AsyncClose added in v0.29.0

func (pc PartitionConsumer) AsyncClose()

AsyncClose initiates a shutdown of the PartitionConsumer. This method returns immediately. Once the consumer is shutdown, the message and error channels are closed.

func (PartitionConsumer) Close added in v0.29.0

func (pc PartitionConsumer) Close() error

Close synchronously shuts down the partition consumer and returns any outstanding errors.

func (PartitionConsumer) Errors added in v0.29.0

func (pc PartitionConsumer) Errors() <-chan *sarama.ConsumerError

Errors returns the read channel of errors that occurred during consumption if ConsumerReturnErrors was true when configuring the client.

func (PartitionConsumer) Messages added in v0.29.0

func (pc PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages returns the read channel for the messages returned by the broker

type ProducerMetrics added in v0.10.0

type ProducerMetrics struct {
	// contains filtered or unexported fields
}

ProducerMetrics is a collection of Prometheus metrics for tracking a Kafka producer's performance

func NewProducerMetrics added in v0.29.0

func NewProducerMetrics(registerer prometheus.Registerer) (ProducerMetrics, error)

NewProducerMetrics creates and registers metrics for the Kafka Producer with the provided prometheus registerer.

type SchemaRegistryClient added in v0.29.0

type SchemaRegistryClient struct {
	SchemaRegistryConfig
	// contains filtered or unexported fields
}

SchemaRegistryClient provides functionality for interacting with Kafka schema registry. This type has methods for getting schemas from the registry and decoding sarama ConsumerMessages from Avro into Go types. In addition, since the schema registry is immutable, the client contains a cache of schemas so that a network request to the registry does not have to be made for every Kafka message that needs to be decoded.

func (*SchemaRegistryClient) CheckSchema added in v0.46.0

func (c *SchemaRegistryClient) CheckSchema(ctx context.Context, subject string, schema string, isKey bool) (*SchemaResponse, error)

CheckSchema will check if the schema exists for the given subject

func (*SchemaRegistryClient) CreateSchema added in v0.46.0

func (c *SchemaRegistryClient) CreateSchema(ctx context.Context, subject string, schema string, isKey bool) (*SchemaResponse, error)

CreateSchema creates a new schema in Schema Registry. The Schema Registry compares this against existing known schemas. If this schema matches an existing schema, a new schema will not be created and instead the existing ID will be returned. This applies even if the schema is assgined only to another subject.

func (*SchemaRegistryClient) DecodeKafkaAvroMessage added in v0.29.0

func (c *SchemaRegistryClient) DecodeKafkaAvroMessage(ctx context.Context, message *sarama.ConsumerMessage) (interface{}, error)

DecodeKafkaAvroMessage decodes the given Kafka message encoded with Avro into a Go type.

func (*SchemaRegistryClient) EncodeKafkaAvroMessage added in v0.48.0

func (c *SchemaRegistryClient) EncodeKafkaAvroMessage(ctx context.Context, schemaID uint, message interface{}) ([]byte, error)

EncodeKafkaAvroMessage encode the given Kafka message encoded with Avro into a Go type.

func (*SchemaRegistryClient) GetCodec added in v0.46.0

func (c *SchemaRegistryClient) GetCodec(ctx context.Context, id uint) (*goavro.Codec, error)

GetCodec returns an avro codec based on the provided schema id

func (*SchemaRegistryClient) GetSchema added in v0.29.0

func (c *SchemaRegistryClient) GetSchema(ctx context.Context, id uint) (string, error)

GetSchema retrieves a textual JSON Avro schema from the Kafka schema registry

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	URL string
}

SchemaRegistryConfig defines the necessary configuration for interacting with Kafka Schema Registry

func (SchemaRegistryConfig) NewSchemaRegistryClient added in v0.29.0

func (c SchemaRegistryConfig) NewSchemaRegistryClient(httpMetrics shHTTP.Metrics) *SchemaRegistryClient

NewSchemaRegistryClient creates a schema registry client with the given HTTP metrics bundle.

func (*SchemaRegistryConfig) RegisterFlags

func (c *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers schema registry flags with pflags

type SchemaRegistryConsumer added in v0.55.1

type SchemaRegistryConsumer interface {
	GetSchema(ctx context.Context, id uint) (string, error)
	CheckSchema(ctx context.Context, subject string, schema string, isKey bool) (*SchemaResponse, error)
	DecodeKafkaAvroMessage(ctx context.Context, message *sarama.ConsumerMessage) (interface{}, error)
}

SchemaRegistryConsumer defines an interface that contains methods to retrieve schemas and decode kafka messages using the retrieved schemas.

type SchemaRegistryProducer added in v0.55.1

type SchemaRegistryProducer interface {
	CreateSchema(ctx context.Context, subject string, schema string, isKey bool) (*SchemaResponse, error)
	EncodeKafkaAvroMessage(ctx context.Context, schemaID uint, message interface{}) ([]byte, error)
}

SchemaRegistryProducer defines an interface that contains methods to create schemas and encode kafka messages using the uploaded schemas.

type SchemaResponse added in v1.3.4

type SchemaResponse struct {
	Subject string `json:"subject"`
	Schema  string `json:"schema"`
	Version int    `json:"version"`
	ID      int    `json:"id"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL