kafka_go

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2021 License: MIT Imports: 8 Imported by: 0

README

Go Kafka Client - an easy abstraction

GoDoc Mit License Build Status

kafka_go is an abstraction over popular kafka client sarama (https://github.com/Shopify/sarama). Though sarama provides good enough APIs to integrate with a kafka cluster but still lags simplicity and needs a bit of domain knowledge even for a standard use case.

End-user has to maintain fail-safety, reclaim after re-balancing or similar scenarios, API doesn't seem very intuitive for the first time kafka users. kafka_go tries to solve all such problems with its easy to understand APIs to start consuming from a kafka cluster with significant less domain knowledge and complete fail-safety.

How kafka_go helps

The package abstracts out fairly easy and intuitive abstraction to integrate with a kafka cluster with minimal effort. It takes care of the complexity of fail-safety, possible leaks scenarios, re-balance handling, and export easy to use functional interface. The API is divided into Consumer and Producer modules -

Consumer

The consumer is an important abstraction of any PubSub library. The package tries to make it as simple as possible.

type Consumer interface {

	//Start should trigger the actual message consumption process, it should be blocking in nature to avoid killing
	//process immaturely.
	Start(ctx context.Context)

	//Stop should trigger the closure of the consumption process. Should cancel the context to relieve resources and take
	//care of possible leaks
	Stop()
}

The package implements this simple generic Consumer interface for Kafka. A user can instantiate a consumer instance with minimal effort, Example -

consumer, err := NewKafkaConsumer(&KafkaConsumerParam{
	Brokers:       []string{"broker-1", "broker-2", "broker-3"},
	GroupID:       "test-cg",
	OffsetInitial: OtNewest,
	Topics:        []string{"test-topic"},
	Handlers: map[string]TopicHandler{
		"test-topic": &testTopicHandler{},
	},
})
if err != nil {
	Logger.Panicf("Error creating consumer instance %v", err)
}
consumer.Start(context.Background())

Refer to examples in the code base for better understanding.

KafkaConsumerParam

It provides a single place to configure the Consumer client. ConsumerParam provider rich set of keys configurable by the user with fairly standard defaults to start consuming out of the box.

type KafkaConsumerParam struct {
	// Brokers in kafka clusters
	Brokers []string

	// Consumer group id of this consumer group
	GroupID string

	// List of topics to start listening from
	Topics []string

	// Topic to handlers map to consumer message from a topic.
	Handlers map[string]TopicHandler

	// [Optional]
	// Topic to its fallback handler map.
	// If the Main handler returns false, it will try to fallback handler.
	// it will commit the offset not matter fallback handler returns true or false.
	// default - "no fallback"
	Fallbacks map[string]TopicHandler

	// [Optional]
	// List of middleware to be triggered post claim of every message & before actual
	// message handling. Middleware will be triggered in increasing order order of index.
	// default - "no middleware"
	middleware []ConsumerMiddleware

	// [Optional]
	// List of interceptor, like Middleware it trigger post claim of every message, but unlike
	// middleware interceptor is available after the actual handler return. Interceptors are
	// triggered in layered manner, lower index being the outer layer and vice versa. This is
	// similar to recursive call, the one called first will return last.
	// default - "noOpInterceptor"
	interceptor []ConsumerInterceptor

	// [Optional]
	// Attach a meta map with every claimed message before passing it to actual handler, can be used
	// to persist state during the lifecycle of a claimed message. Middleware or Interceptor can also
	// use this meta to store variable across. default - "false"
	MessageMeta bool

	// [Optional]
	// Client identity for logging purpose
	ClientID string

	// [Optional]
	// The initial offset to use if no offset was previously committed.
	// Should be OtNewest or OtOldest. defaults - OtNewest.
	OffsetInitial OffsetType

	// [Optional]
	// kafka cluster version. eg - "2.2.1" default - "2.3.0"
	// supports versions from "0.8.x" to "2.3.x"
	Version string
}

TopicHandler

type TopicHandler interface {
	// Handle gets the actual message to be handled. Business logic for a given
	// message should ideally be implemented here.
	Handle(ctx context.Context, message *SubscriberMessage) bool
}

TopicHandler is an intuitive interface that exposes a single function Handle. An implementation of this interface should be used as handler for messages from a topic. Technically the business logic post receiving the message from a topic should trigger from here. The handler has not been tightly bound to a topic on purpose to allow it to be used for multiple topics at once. The implementation should be thread/goroutine safe obviously. Example -

// test topic handler
type testTopicHandler struct {
}

func (t *testTopicHandler) Handle(ctx context.Context, msg *SubscriberMessage) bool {
	Logger.Printf("Topic: %v, Partition: %v, SubscriberMessage: %v", msg.Topic, msg.Partition, string(msg.Value))
	return true
}

Middleware

type ConsumerMiddleware func(ctx context.Context, msg *SubscriberMessage)

Middleware is a construct similar to Before advice in AOP. Middleware comes just after claiming a message from a topic and before passing it to its handler down the line. Middleware can be used to decorate the message or preprocess something across topics before actual handling. This saves from writing common code across all the topic handlers. The middleware can be optionally used to enrich message meta which can be used inside actual handler or later in the handling pipeline. Multiple independent middleware can be used which pre-process the message in a pre-defined manner. Use cases can be -

  • Filtering messages to pass relevant messages to the actual handler
  • Validating message for correctness
  • logging, throttling, decorating etc are few other possibilities

Interceptor

type ConsumerInterceptor func(ctx context.Context, msg *SubscriberMessage, handler func(context.Context, *SubscriberMessage) bool) bool

Interceptor is a construct similar to Around advice in AOP. Interceptor life cycle start just after claiming a message around the handler and available even after the handler return. Interceptor might come handy start some processing before actual message handling and ending it after the handling is done. Use cases can be -

  • Newrelic transaction start before message handling and close after handler returns
  • Collect data around message processing like time to process a message or so
  • Take some common action on expected/unexpected handling of a message, like publish to dead letter

Multiple independent interceptor can be used which works in a layered fashion. For example if the List of interceptor consist of IC_1, IC_2, IC_3, the processing will look like -

IC_1
{
  before_1
    IC_2
    {
      before_2
        IC_3
        {
          before_3
            message_handler(.....)
          after_3
        }
      after_2
    }
  after_1
}

Producer

The next obvious abstraction that any PubSub library will export is Producer.

type Producer interface {

	// PublishSync sends a message to the PubSub cluster in Sync way. Call to this function is blocking and
	// returns only after the publish is done or result in an error. Meta contains the publish related meta info
	PublishSync(message *PublisherMessage) (meta map[string]interface{}, err error)

	// PublishSyncBulk sends messages to the PubSub cluster in sync all at once. Call to this function is blocking
	// and return only after publish attempt is done for all the messages. Return error if the bulk publish is
	// partially successful
	PublishSyncBulk(messages []*PublisherMessage) error

	// PublishAsync sends a message to the PubSub cluster in Async way. Call to this function is non-blocking and
	// returns immediately.
	PublishAsync(message *PublisherMessage)

	// PublishAsyncBulk sends messages in bulk to the PubSub cluster in Async way. Call to this function is non-blocking
	// and return immediately
	PublishAsyncBulk(messages []*PublisherMessage)

	// Close triggers the closure of the associated producer client to avoid any leaks
	Close()
}

The package implements the Producer interface to provide a rich set of functionality to produce messages to kafka cluster in every possible way. The abstraction clearly hides the complexity to manage channels, provides high throughput async bulk publish functionality to publish a fairly huge amount of messages at once without blocking the main process. Though the user is advised to take care of CPU/Memory implications of producing too many messages at once. Example -

producer, err := NewKafkaProducerQuick([]string{"broker-1", "broker-2", "broker-3"})
if err != nil {
	Logger.Fatalf("Error creating producer client, %v", err)
}
data, _ := json.Marshal("test message")
producer.PublishAsync(&PublisherMessage{
	Topic: "test-topic",
	Key:   "test-key",
	Data:  data,
})

For better understanding and implementation, refer to the coe base and test examples.

Future work

Package kept the Consumer and Producer interfaces fairly generic and can be implemented for different PubSub frameworks like Redis-PubSub, AWS SNS-SQS, Rabbit-MQ, etc in future. The intent of this package is to make integration to Kafka like PubSub framework fairly easy with minimal efforts. The author decided to create this package after personally struggling making the integration stable and fail-safe. The author observed that most of the developers are struggling around the same problem and end up writing their own abstraction to make it look clean. Although most of these abstractions are similar but mostly written considering their own repo structure under consideration. This package tried taking all such abstraction under consideration and export fairly reusable code across various use cases.

The license has been chosen Mit License on purpose. Feel free to take fork and raise a pull request for any possible bugs or new use cases. Always keep in mind the intent of the package to keep it fairly simple to understand and use.

Documentation

Overview

Package kafka_go is an abstraction over popular kafka client sarama (https://github.com/Shopify/sarama). Though sarama provides good enough APIs to integrate with a kafka cluster but still lags simplicity and need a bit of domain knowledge even for a standard use case. End user has to maintain fail safety, reclaim after re-balancing or similar scenarios, API doesn't seems very intuitive for the first time kafka users. kafka_go tries to solves all such problems with its easy to understand APIs to start consuming from a kafka cluster with significant less domain knowledge and complete fail safety.

Note: this package implements at-least once analogy of message consumption, user will have to maintain idempotence on their own.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// Logger is the kafka-go package single logger instance. It logs most important
	// event during the process of message consumption and production. This logger
	// can be overridden to complete stop any sort of logs to be printed on standard output
	// example:
	// Logger = log.New(ioutil.Discard, "[kafka-go]", log.LstdFlags)
	// above will discard all the logs printed by kafka-go logger
	Logger = log.New(os.Stdout, "[kafka-go]", log.LstdFlags)
)

Functions

func DefaultKafkaProducerParam

func DefaultKafkaProducerParam(brokers []string) *kafkaProducerParam

DefaultKafkaProducerParam is only way to create kafkaProducerParam instance to be used for producer instance creation. This hsa been restricted to avoid user from setting bad default values while configuring a producer. User interested in changing the params should first create param instance using this method and then change the specific param key if needed.

example:

params := DefaultKafkaProducerParam(brokers)
params.LogError = false

func NewKafkaConsumer

func NewKafkaConsumer(params *KafkaConsumerParam) (*kafkaConsumer, error)

NewKafkaConsumer creates a new kafkaConsumer instance used for consuming from kafka cluster. Needs KafkaConsumerParam to instantiate the connection.

example:

consumer, err := NewKafkaConsumer(&KafkaConsumerParam{
	Brokers:       []string{"localhost:9091", "localhost:9092", "localhost:9093"},
	GroupID:       "test-cg",
	OffsetInitial: OtNewest,
	Topics:        []string{"test-topic"},
	Handlers: map[string]TopicHandler{
		"test-topic": &testTopicHandler{},
	},
})
if err != nil {
	// handle error
}
// start the blocking consumer process
consumer.Start(context.Background())

Refer to test example for better understanding

Note: consumer instance is not goroutine safe and should be used once (in one goroutine) to start the consumer. If multiple such consumer is required even to the same broker set and consumer group, consider creating a new one using this function. However using same instance to start consumer multiple times won't be fatal.

func NewKafkaProducer

func NewKafkaProducer(params *kafkaProducerParam) (*kafkaProducer, error)

NewKafkaProducer creates a kafkaProducer instance used for producing messages to kafka cluster. Input kafkaProducerParam should be created using DefaultKafkaProducerParam and then the required params should changed from there. Its not allowed to construct kafkaProducerParam directly. This is enforced to prevent user from setting unexpected default values.

If the user is only interested with providing broker address and do not willing to changes any default param values, NewKafkaProducerQuick can be used instead in such cases. which only takes the broker as input.

example:

params := DefaultKafkaProducerParam([]string{"localhost:9091", "localhost:9092", "localhost:9093"})
params.Retry = 5
params.Acknowledge = AtAll
producer, err := NewKafkaProducer(params)

func NewKafkaProducerQuick

func NewKafkaProducerQuick(brokers []string) (*kafkaProducer, error)

NewKafkaProducerQuick is quicker way to instantiate a producer instance by just providing the brokers address all together. It will take case of other params and set them to there best default values. In case user want to have better configured producer as per their use case, they should rather use NewKafkaProducer with custom kafkaProducerParam key values. DefaultKafkaProducerParam is the only way to create kafkaProducerParam because changing any other params associated.

Types

type AcknowledgmentType

type AcknowledgmentType int16

AcknowledgmentType specifies the kind of ack end user should expect while publishing message to the cluster

const (
	// Acknowledge from all in-sync replicas
	AtAll AcknowledgmentType = iota - 1
	// No acknowledgement required
	AtNone
	// Acknowledge only from master broker
	AtLocal
)

type CompressionType

type CompressionType int8

CompressionType specifies the type of message compression before publishing messages to the cluster

const (
	// No compression
	CtNone CompressionType = iota
	// GZIP compression type
	CtGzip
	// SAPPY data compression
	CtSnappy
	// LZ4 algorithm compression
	CtLz4
	// Z-standard compression
	CtZstd
)

type Consumer

type Consumer interface {
	//Start should trigger the actual message consumption process, it should be blocking in nature to avoid killing
	//process immaturely.
	Start(ctx context.Context)

	//Stop should trigger the closure of consumption process. Should cancel the context to relieve resources and take
	//care of possible leaks
	Stop()
}

Consumer is the exposed functionality available to the end customer to interact with its consumer instance. The interface is implemented by kafkaConsumer in the package. The interface is generic enough can be used with any other pubsub services.

type ConsumerInterceptor

type ConsumerInterceptor func(ctx context.Context, msg *SubscriberMessage, handler func(context.Context, *SubscriberMessage) bool) bool

Interceptor is construct similar to Around advice in AOP. An Interceptor will be able to not only touch the message or execute something before message being passed to the handler, but also get to do the needful post the handler returns.

B : task before handler A : task after handler Interceptor : IC

msg => IC_0 => {B_0 -> IC_1 => {{B_1 -> .... ->IC_n => {..{B_n -> [msg_handler] -> A_n}..} -> .... -> A_1}} -> A_0}

type ConsumerMiddleware

type ConsumerMiddleware func(ctx context.Context, msg *SubscriberMessage)

Consumer functional Middleware to be used to touch message before it get passed to the actual message handler. Its similar to Before advice in AOP. Middleware can also set some sort of message state that can be retrieved and used later at the time of message handling. Can be thought of as pre handler across all topics and can be used to decorate message before passing it to the handler.

see SubscriberMessage.Meta

Middleware : MW

msg => MW_0 => MW_1 => ...... => MW_n => [msg_handler]

type KafkaConsumerParam

type KafkaConsumerParam struct {
	// Brokers in kafka clusters
	Brokers []string

	// Consumer group id of this consumer group
	GroupID string

	// List of topics to start listening from
	Topics []string

	// Topic to handlers map to consumer message from a topic.
	Handlers map[string]TopicHandler

	// [Optional]
	// Topic to its fallback handler map.
	// If the Main handler returns false, it will try to fallback handler.
	// it will commit the offset not matter fallback handler returns true or false.
	// default - "no fallback"
	Fallbacks map[string]TopicHandler

	// [Optional]
	// List of Middleware to be triggered post claim of every message & before actual
	// message handling. Middleware will be triggered in increasing order order of index.
	// default - "no Middleware"
	Middleware []ConsumerMiddleware

	// [Optional]
	// List of Interceptor, like Middleware it trigger post claim of every message, but unlike
	// Middleware Interceptor is available after the actual handler return. Interceptors are
	// triggered in layered manner, lower index being the outer layer and vice versa. This is
	// similar to recursive call, the one called first will return last.
	// default - "noOpInterceptor"
	Interceptor []ConsumerInterceptor

	// [Optional]
	// Attach a meta map with every claimed message before passing it to actual handler, can be used
	// to persist state during the lifecycle of a claimed message. Middleware or Interceptor can also
	// use this meta to store variable across. default - "false"
	MessageMeta bool

	// [Optional]
	// Client identity for logging purpose
	ClientID string

	// [Optional]
	// The initial offset to use if no offset was previously committed.
	// Should be OtNewest or OtOldest. defaults - OtNewest.
	OffsetInitial OffsetType

	// [Optional]
	// kafka cluster version. eg - "2.2.1" default - "2.3.0"
	// supports versions from "0.8.x" to "2.3.x"
	Version string
}

KafkaConsumerParam is the input expected from the user to start a consumer session with the kafka cluster.

type OffsetType

type OffsetType string

OffsetType specifies strategy to determine the starting offset of a consumer group if there is not previously committed offset that consumer group in the cluster. Offset type setting will be ignored all together if client finds any existing committed offset in the cluster while registering its consumer process to the cluster.

const (
	// Set offset to the offset of the next message to be appeared in the partition
	OtNewest OffsetType = "newest"

	// Set offset to the offset of the oldest available message present in the partition
	OtOldest OffsetType = "oldest"
)

type Producer

type Producer interface {
	// PublishSync send message to the pubsub cluster in Sync way. Call to this function is blocking and
	// returns only after the publish is done or result in an error. Meta contains the publish related meta info
	PublishSync(message *PublisherMessage) (meta map[string]interface{}, err error)

	// PublishSyncBulk send messages to the pubsub cluster in sync all at once. Call to this function is blocking
	// and return only after publish attempt is done for all the messages. Return error if the bulk publish is
	// partially successful
	PublishSyncBulk(messages []*PublisherMessage) error

	// PublishAsync send message to the pubsub cluster in Async way. Call to this function is non blocking and
	// returns immediately.
	PublishAsync(message *PublisherMessage)

	// PublishAsyncBulk send messages in bulk to the pubsub cluster in Async way. Call to this function is non blocking
	// and return immediately
	PublishAsyncBulk(messages []*PublisherMessage)

	// Close triggers the closure of the associated producer client to avoid any leaks
	Close()
}

Producer is the exposed functionality to the end customer to interact with the producer instance. The interface is implemented by kafkaConsumer in the package

Example
param := DefaultKafkaProducerParam([]string{"broker-1", "broker-2", "broker-3"})
param.LogError = false
producer, err := NewKafkaProducer(param)
if err != nil {
	Logger.Fatalf("Error creating producer client, %v", err)
}
data, _ := json.Marshal("test message")
meta, err := producer.PublishSync(&PublisherMessage{
	Topic: "test-topic",
	Key:   "test-key",
	Data:  data,
})
if err != nil {
	Logger.Printf("Error producing message to kafka cluster, %v", err)
} else {
	Logger.Printf("Message publishedm, meta %v", meta)
}
Output:

type PublisherMessage

type PublisherMessage struct {
	// Topic on which message is to be published
	Topic string

	// Partition key to decide partition
	Key string

	// Actual data to be published
	Data []byte
}

PublisherMessage instance should be used to publish data on a topic.

type SubscriberMessage

type SubscriberMessage struct {
	// topic of the message
	Topic string

	// partition within the topic
	Partition int32

	// offset within the partition
	Offset int64

	// partition key bytes
	Key []byte

	// actual message bytes
	Value []byte

	// any state to carry with message
	Meta map[string]interface{}
}

SubscriberMessage instance will be received by configured topic handler. Contains data required in standard use cases.

type TopicHandler

type TopicHandler interface {
	// Handle gets the actual message to be handled. A business logic for a given
	// message should ideally be implemented here.
	Handle(ctx context.Context, message *SubscriberMessage) bool
}

TopicHandler should be implemented by the user to consume message from a topic. SubscriberMessage received from a topic forwarded to once of such handlers to take care of the business logic required.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL