kafkabp

package
v0.9.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: BSD-3-Clause Imports: 17 Imported by: 0

Documentation

Overview

Package kafkabp provides Apache Kafka library implementations.

It wraps the Shopify/sarama Go client library with Baseplate.go integrations.

Index

Constants

View Source
const (
	OffsetOldest = "oldest"
	OffsetNewest = "newest"
)

Allowed Offset values

View Source
const (
	SimpleHTTPRackIDDefaultLimit   = 1024
	SimpleHTTPRackIDDefaultTimeout = time.Second
)

Default values for SimpleHTTPRackIDConfig.

Variables

View Source
var (
	// ErrBrokersEmpty is thrown when the slice of brokers is empty.
	ErrBrokersEmpty = errors.New("kafkabp: Brokers are empty")

	// ErrTopicEmpty is thrown when the topic is empty.
	ErrTopicEmpty = errors.New("kafkabp: Topic is empty")

	// ErrClientIDEmpty is thrown when the client ID is empty.
	ErrClientIDEmpty = errors.New("kafkabp: ClientID is empty")

	// ErrOffsetInvalid is thrown when an invalid offset is specified.
	ErrOffsetInvalid = errors.New("kafkabp: Offset is invalid")

	// ErrNilConsumePartitionFunc is thrown when ConsumePartitionFuncProvider
	// returns a nil ConsumePartitionFunc.
	ErrNilConsumePartitionFunc = errors.New("kafkabp: ConsumePartitionFunc is nil")
)

Functions

func AWSAvailabilityZoneRackID

func AWSAvailabilityZoneRackID() string

AWSAvailabilityZoneRackID is a RackIDFunc implementation that returns AWS availability zone as the rack id.

It also caches the result globally, so if you have more than one AWSAvailabilityZoneRackID in your process only the first one actually makes the HTTP request, for example:

consumer1 := kafkabp.NewConsumer(kafkabp.ConsumerConfig{
    RackID: kafkabp.AWSAvailabilityZoneRackID,
    Topic:  "topic1",
    // other configs
})
consumer2 := kafkabp.NewConsumer(kafkabp.ConsumerConfig{
    RackID: kafkabp.AWSAvailabilityZoneRackID,
    Topic:  "topic2",
    // other configs
})

It uses SimpleHTTPRackIDConfig underneath with log.DefaultWrapper with a prometheus counter of kafkabp_aws_rack_id_failure_total and default Limit & Timeout.

func ConsumeAllPartitionsFunc added in v0.9.12

func ConsumeAllPartitionsFunc(partitionID int32) bool

ConsumeAllPartitionsFunc is a ConsumePartitionFunc that is to be used to specify all partitions to be consumed by the topic consumer.

This function always returns true, causing all partitions to be consumed.

Types

type ConsumeErrorFunc

type ConsumeErrorFunc func(err error)

ConsumeErrorFunc is a function type for consuming consumer errors.

Note that these are usually system level consuming errors (e.g. read from broker failed, etc.), not individual message consuming errors.

In most cases the implementation just needs to log the error and emit a counter, for example:

consumer.Consume(
  consumeMessageFunc,
  func(err error) {
    log.Errorw(
      context.Background(),
      "kafka consumer error",
      "err", err,
      // additional key value pairs, for example topic info
    )
    // a prometheus counter
    consumerErrorCounter.Inc()
  },
)

type ConsumeMessageFunc

type ConsumeMessageFunc func(ctx context.Context, msg *sarama.ConsumerMessage)

ConsumeMessageFunc is a function type for consuming consumer messages.

The implementation is expected to handle all consuming errors. For example, if there was anything wrong with handling the message and it needs to be retried, the ConsumeMessageFunc implementation should handle the retry (usually put the message into a retry topic).

type ConsumePartitionFunc added in v0.9.12

type ConsumePartitionFunc func(partitionID int32) bool

ConsumePartitionFunc is a function type for application to specify which partitions of the topic to consume data from.

func ConsumeAllPartitionsFuncProvider added in v0.9.12

func ConsumeAllPartitionsFuncProvider(numPartitions int) ConsumePartitionFunc

ConsumeAllPartitionsFuncProvider is a ConsumePartitionFuncProvider that always selects all partitions.

type ConsumePartitionFuncProvider added in v0.9.12

type ConsumePartitionFuncProvider func(numPartitions int) ConsumePartitionFunc

ConsumePartitionFuncProvider is a function type for application to provide a lambda function of ConsumePartitionFunc pinned for specific number of partitions. This allows creation of ConsumePartitionFunc once per reset. All PartitionConsumers when created decide if a partition is to be skipped or selected for consumption based on decision handed out by same instance implementation of ConsumePartitionFunc.

type Consumer

type Consumer interface {
	io.Closer

	Consume(ConsumeMessageFunc, ConsumeErrorFunc) error

	// IsHealthy returns false after Consume returns.
	IsHealthy(ctx context.Context) bool
}

Consumer defines the interface of a consumer struct.

It's also a superset of (implements) baseplate.HealthChecker.

func NewConsumer

func NewConsumer(cfg ConsumerConfig) (Consumer, error)

NewConsumer creates a new Kafka consumer.

It creates one of the two different implementations of Kafka consumer, depending on whether GroupID in config is empty:

- If GroupID is non-empty, it creates a consumer that is part of a consumer group (sharing the same GroupID). The group will guarantee that every message is delivered to one of the consumers in the group exactly once. This is suitable for the traditional exactly-once message queue consumer use cases.

- If GroupID is empty, it creates a consumer that has the whole view of the topic. This implementation of Kafka consumer is suitable for use cases like deliver config/data through Kafka to services.

func NewConsumerWithConfigOverriders added in v0.9.16

func NewConsumerWithConfigOverriders(cfg ConsumerConfig, overriders ...SaramaConfigOverrider) (Consumer, error)

NewConsumerWithConfigOverriders is provided as an escape hatch for use cases requiring specific sarama config not supported by ConsumerConfig.

type ConsumerConfig

type ConsumerConfig struct {
	// Required. Brokers specifies a slice of broker addresses.
	Brokers []string `yaml:"brokers"`

	// Required. Topic is used to specify the topic to consume.
	Topic string `yaml:"topic"`

	// Required. ClientID is used by Kafka broker to track clients' consuming
	// progresses on the topics.
	//
	// In most cases, every instance is expected to have a unique ClientID.
	// The Kubernetes pod ID is usually a good candidate for this unique ID.
	ClientID string `yaml:"clientID"`

	// Optional. When GroupID is non-empty, a new group consumer will be created
	// instead. Messages from the topic will be consumed by one of the consumers
	// in the group (sharing the same GroupID) exactly once. This is the usual use
	// case of streaming consumers.
	//
	// When GroupID is empty, each consumer will have the whole view of the topic
	// (based on Offset), so that is usually for use cases like to deliver
	// configs/data through Kafka brokers.
	//
	// When GroupID is non-empty, Version must be at least "0.10.2.0".
	GroupID string `yaml:"groupID"`

	// Optional. This is only applicable when GroupID is empty string.
	// When GroupID is empty, the configuration enables a TopicConsumer to read
	// from all partitions of the given kafka stream.
	// When ConsumePartitionFuncProvider is also specified, the TopicConsumer will
	// only consume partitions that evaluates to true with the given
	// predicate returned from ConsumePartitionFuncProvider.
	// If ConsumePartitionFuncProvider is specified,
	// it must return a non-nil predicate of type ConsumePartitionFunc,
	// else the topic consumer will return error ErrNilConsumePartitionFunc.
	//
	// This function is called once per reset. The returned ConsumePartitionFunc
	// is also called once per reset per partition.
	// The API is designed to be two-layer so that it's possible to shift all
	// heavylifting to ConsumePartitionFuncProvider to generate a []bool or set of
	// ints, and make the returned ConsumePartitionFunc just do a simple lookup.
	ConsumePartitionFuncProvider ConsumePartitionFuncProvider `yaml:"-"`

	// Optional. The version of the kafka broker this consumer is connected to.
	// In format of "0.10.2.0" or "2.4.0".
	//
	// When omitted, Sarama library would pick the oldest supported version in
	// order to maintain maximum backward compatibility, but some of the newer
	// features might be unavailable. For example, using GroupID requires the
	// version to be at least "0.10.2.0".
	Version string `yaml:"version"`

	// Optional. Defaults to "oldest". Valid values are "oldest" and "newest".
	//
	// Only used when GroupID is empty.
	Offset string `yaml:"offset"`

	// Optional. If non-nil, will be used to log errors. At present, this only
	// pertains to logging errors closing the existing consumer when calling
	// consumer.reset() when GroupID is empty.
	Logger log.Wrapper `yaml:"logger"`

	// Optional. The function to set rack id for this kafka client.
	// It should match rack configured on the broker(s).
	//
	// Currently it defaults to no rack id.
	// In the future the default might be changed to AWSAvailabilityZoneRackID.
	//
	// This feature is currently experimental.
	// It might not make any difference on your client,
	// or it might make things worse.
	// You are advised to test before using non-empty rack id in production.
	RackID RackIDFunc `yaml:"rackID"`
}

ConsumerConfig can be used to configure a kafkabp Consumer.

Can be deserialized from YAML.

Example:

kafka:
  brokers:
    - 127.0.0.1:9090
    - 127.0.0.2:9090
  topic: sample-topic
  clientID: myclient
  version: 2.4.0
  offset: oldest

func (*ConsumerConfig) NewSaramaConfig

func (cfg *ConsumerConfig) NewSaramaConfig() (*sarama.Config, error)

NewSaramaConfig instantiates a sarama.Config with sane consumer defaults from sarama.NewConfig(), overwritten by values parsed from cfg.

type GroupConsumerHandler

type GroupConsumerHandler struct {
	Callback ConsumeMessageFunc
	Topic    string
}

GroupConsumerHandler implements sarama.ConsumerGroupHandler.

It's exported so that users of this library can write mocks to test their ConsumeMessageFunc implementation.

func (GroupConsumerHandler) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.

func (GroupConsumerHandler) ConsumeClaim

ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages() chan.

func (GroupConsumerHandler) Setup

Setup is run at the beginning of a new session, before ConsumeClaim.

type RackIDFunc

type RackIDFunc func() string

RackIDFunc defines a function to provide the kafka rack id to use.

Rack id is not considered a crucial part of kakfa client configuration, so this function doesn't have the ability to return any errors. If an error occurred while retrieving the rack id, the implementation should return empty string and handle the error by itself (logging, panic, etc.).

See the following URL for more info regarding kafka's rack awareness feature: https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment

func FixedRackID

func FixedRackID(id string) RackIDFunc

FixedRackID is a RackIDFunc implementation that returns a fixed rack id.

func SimpleHTTPRackID

func SimpleHTTPRackID(cfg SimpleHTTPRackIDConfig) RackIDFunc

SimpleHTTPRackID is a RackIDFunc implementation that gets the rack id from an HTTP URL.

It's "simple" as in it always treat the HTTP response as plain text (so it shouldn't be used for JSON endpoints), read up to Limit bytes, and trim leading and trailing spaces before returning. If an HTTP error occurred it will be logged using Logger passed in.

func (*RackIDFunc) UnmarshalText

func (r *RackIDFunc) UnmarshalText(text []byte) error

UnmarshalText implements encoding.TextUnmarshaler.

It makes RackIDFunc possible to be used directly in yaml and other config files.

Please note that this currently only support limited implementations:

- empty: Empty rack id (same as "fixed:"). Please note that this might be changed to "aws" in the future.

- "fixed:id": FixedRackID with given id. A special case of "fixed:" means no rack id.

- "aws": AWSAvailabilityZoneRackID.

- "http://url" or "https://url": SimpleHTTPRackID with log.DefaultWrapper and prometheus counter of kafkabp_http_rack_id_failure_total, default timeout & limit, and given URL.

- anything else: FixedRackID with the given value. For example "foobar" is the same as "fixed:foobar".

type SaramaConfigOverrider added in v0.9.16

type SaramaConfigOverrider func(*sarama.Config)

SaramaConfigOverrider provides a way for users to override certain fields in *sarama.Config generated from ConsumerConfig.

type SimpleHTTPRackIDConfig

type SimpleHTTPRackIDConfig struct {
	// URL to fetch rack id from. Required.
	URL string

	// Limit of how many bytes to read from the response.
	//
	// Optional, default to SimpleHTTPRackIDDefaultLimit.
	Limit int64

	// HTTP client timeout.
	//
	// Optional, default to SimpleHTTPRackIDDefaultTimeout.
	Timeout time.Duration

	// Logger to be used on http errors. Optional.
	Logger log.Wrapper
}

SimpleHTTPRackIDConfig defines the config to be used in SimpleHTTPRackID.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL