frafka

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2021 License: MIT Imports: 10 Imported by: 0

README

Frafka

Travis Build Status Coverage Status MIT licensed GitHub release Go Report Card GoDoc

Frafka is a Kafka implementation for Frizzle based on confluent-go-kafka.

Frizzle is a magic message (Msg) bus designed for parallel processing w many goroutines.

  • Receive() messages from a configured Source
  • Do your processing, possibly Send() each Msg on to one or more Sink destinations
  • Ack() (or Fail()) the Msg to notify the Source that processing completed

Prereqs / Build instructions

Install librdkafka

The underlying kafka library, confluent-kafka-go has some particularly important nuances:

  • alpine builds (e.g. FROM golang-1.14-alpine should run all go commands with -tags musl
    • e.g. go test -tags musl ./...
  • all builds producing an executable should run with CGO_ENABLED=1
    • not necessary for libraries, however.

Otherwise, should be good to go with

go get github.com/qntfy/frafka
cd frafka
go build

Basic API usage

Sink

Create a new sink with NewSink:

// error omitted - handle in proper code
sink, _ := frafka.NewSink("broker1:15151,broker2:15151", 16 * 1024)

Running the tests

Frafka has integration tests which require a kafka broker to test against. KAFKA_BROKERS environment variable is used by tests. simplesteph/kafka-stack-docker-compose has a great simple docker-compose setup that is used in frafka CI currently.

curl --silent -L -o kafka.yml https://raw.githubusercontent.com/simplesteph/kafka-stack-docker-compose/v5.1.0/zk-single-kafka-single.yml
DOCKER_HOST_IP=127.0.0.1 docker-compose -f kafka.yml up -d
# takes a while to initialize; can use a tool like wait-for-it.sh in scripting
export KAFKA_BROKERS=127.0.0.1:9092
go test -v --cover ./...

Configuration

Frafka Sources and Sinks are configured using Viper.

func InitSink(config *viper.Viper) (*Sink, error)

func InitSource(config *viper.Viper) (*Source, error)

We typically initialize Viper through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix before the below values.

Variable Required Description Default
KAFKA_BROKERS required address(es) of kafka brokers, space separated
KAFKA_TOPICS source topic(s) to read from
KAFKA_CONSUMER_GROUP source consumer group value for coordinating multiple clients
KAFKA_CONFIG optional Add librdkafka client config, format key1=value1 key2=value2 ...
KAFKA_CONFIG_FILE optional relative or absolute file path to a config file for librdkafka client config (see notes)
Kafka Client Configuration Notes
  • KAFKA_CONFIG allows setting arbitrary librdkafka configuration such as retries=10 max.in.flight=1000 delivery.report.only.error=true
  • KAFKA_CONFIG_FILE allows another method for arbitrary config similar to KAFKA_CONFIG. KAFKA_CONFIG takes priority over KAFKA_CONFIG_FILE. The specified file is parsed with viper which supports a range of config file formats, for simplicity we recommend using yaml similar to the provided example file (used in tests).
  • Required config set via environment variables listed above (e.g. KAFKA_BROKERS) will always take priority over optional values - if bootstrap.servers is set in KAFKA_CONFIG to a different value, it will be ignored.
  • Sensible defaults are set for several additional config values, see variables in source.go and sink.go for specifics
  • An earlier version of frafka also supported setting specific optional kafka configs via environment variables, such as compression. This functionality has been removed to simplify config logic and reduce confusion if values are set in multiple places.
Suggested Kafka Config

Some values that we commonly set, particularly in a memory constrained environment (e.g. running a producer/consumer service against a 9 partition topic with average message size less than 10KB and less than 200MB memory available).

  • queued.max.messages.kbytes: 2048 (up to 16384)
  • auto.offset.reset: (latest|earliest)
  • receive.message.max.bytes: 2000000
  • fetch.max.bytes: 1000000
  • compression.type: snappy (and possibly linger.ms value depending on throughput/latency requirements) are great to set to reduce network traffic and disk usage on brokers

Async Error Handling

Since records are sent in batch fashion, Kafka may report errors or other information asynchronously. Event can be recovered via channels returned by the Sink.Events() and Source.Events() methods. Partition changes and EOF will be reported as non-error Events, other errors will conform to error interface. Where possible, Events will retain underlying type from confluent-kafka-go if more information is desired.

Contributing

Contributions welcome! Take a look at open issues.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitByViper

func InitByViper(v *viper.Viper) (frizzle.Frizzle, error)

InitByViper initializes a full Frizzle with a kafka Source and Sink based on a provided Viper

Types

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink encapsulates a kafka producer for Sending Msgs

func InitSink

func InitSink(config *viper.Viper) (*Sink, error)

InitSink initializes a basic Sink via *viper.Config.

func NewSink added in v0.2.0

func NewSink(brokerString string, bufferSize int) (*Sink, error)

NewSink sets up a basic frafka sink.

func (*Sink) Close

func (s *Sink) Close() error

Close the Sink after flushing any Msgs not fully sent

func (*Sink) Events

func (s *Sink) Events() <-chan frizzle.Event

Events reports async Events that occur during processing

func (*Sink) Send

func (s *Sink) Send(m frizzle.Msg, topic string) error

Send a Msg to specified topic

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source encapsulates a kafka consumer for receiving and tracking Msgs

func InitSource

func InitSource(config *viper.Viper) (*Source, error)

InitSource initializes a kafka Source

func (*Source) Ack

func (s *Source) Ack(m frizzle.Msg) error

Ack a Msg

func (*Source) Close

func (s *Source) Close() error

Close cleans up underlying resources. It errors if Stop() has not been called and/or if there are unAcked Msgs.

func (*Source) Events

func (s *Source) Events() <-chan frizzle.Event

Events reports async Events that occur during processing

func (*Source) Fail

func (s *Source) Fail(m frizzle.Msg) error

Fail a Msg

func (*Source) Ping added in v0.1.1

func (s *Source) Ping() error

Ping brokers to ensure Source can connect to configured topics

func (*Source) Receive

func (s *Source) Receive() <-chan frizzle.Msg

Receive returns a channel for receiving Msgs

func (*Source) Stop

func (s *Source) Stop() error

Stop prevents new Msgs from being written to Receive() channel. It must be called before Close() will return.

func (*Source) UnAcked

func (s *Source) UnAcked() []frizzle.Msg

UnAcked Msgs list

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL