kafka

package
v0.0.0-...-e8d64ac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2019 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NoOpHandler = &noOpHandler{}

Functions

This section is empty.

Types

type Config

type Config struct {
	Brokers  string `envconfig:"KAFKA_BROKERS"`
	Version  string `envconfig:"KAFKA_VERSION"`
	Verbose  bool   `envconfig:"KAFKA_VERBOSE"`
	ClientID string `envconfig:"KAFKA_CLIENT_ID"`
	Topics   string `envconfig:"KAFKA_TOPICS"`

	TLSEnabled bool   `envconfig:"KAFKA_TLS_ENABLED"`
	TLSKey     string `envconfig:"KAFKA_TLS_KEY"`
	TLSCert    string `envconfig:"KAFKA_TLS_CERT"`
	CACerts    string `envconfig:"KAFKA_CA_CERTS"`

	// Consumer specific parameters
	Group             string        `envconfig:"KAFKA_GROUP"`
	RebalanceStrategy string        `envconfig:"KAFKA_REBALANCE_STRATEGY"`
	RebalanceTimeout  time.Duration `envconfig:"KAFKA_REBALANCE_TIMEOUT"`
	InitOffsets       string        `envconfig:"KAFKA_INIT_OFFSETS"`
	CommitInterval    time.Duration `envconfig:"KAFKA_COMMIT_INTERVAL"`

	// Producer specific parameters
	FlushInterval time.Duration `envconfig:"KAFKA_FLUSH_INTERVAL"`
}

simple Kafka config abstraction; can be populated from env vars via FromEnv() or fields can applied to CLI flags by the caller.

func FromEnv

func FromEnv() (Config, error)

hydrate kafka.Config using environment variables

func NewKafkaConfig

func NewKafkaConfig() Config

returns a new kafka.Config with reasonable defaults for some values

type Consumer

type Consumer interface {
	// caller should run the returned function in a goroutine, and consume
	// the returned error channel until it's closed at shutdown.
	Background() (func(), chan error)
}

func NewConsumer

func NewConsumer(ctx context.Context, conf Config, handler Handler, logger *log.Logger) (Consumer, error)

caller should cancel the supplied context when a graceful consumer shutdown is desired

type ConsumerMessage

type ConsumerMessage sarama.ConsumerMessage

alias these to abstract the Sarama-specific message type from end users

type Handler

type Handler interface {
	Message(*ConsumerMessage) error
}

services consuming from Kafka should meet this contract

type Producer

type Producer interface {
	// caller should run the returned function in a goroutine, and consume
	// the returned error channel until it's closed at shutdown.
	Background() (func(), chan error)

	// user-facing event emit API
	Send(ProducerMessage) error
}

func NewProducer

func NewProducer(ctx context.Context, conf Config, logger *log.Logger) (Producer, error)

the caller can cancel the producer's context to initiate shutdown.

type ProducerMessage

type ProducerMessage struct {
	Topic string
	Key   []byte
	Value []byte
}

abstracts kafka.Producer message type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL