kafka

package
v0.0.0-...-d8aa7a9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2021 License: MIT Imports: 4 Imported by: 0

README

Kafka

This package is a helper to create kafka sync and/or async producer, using sarama under the hood.

How To Create Producers

package main

func main() {
    // Create synchronous producer
    producer, err := NewKafkaProducer([]string{"localhost:9092"}, "2.5.0", WithClientID("dhuwit"), WithRetryMax(5))
    if err != nil {
        panic(err)
    }

    // Create asynchronous producer
    asyncProducer, err := NewKafkaAsyncProducer([]string{"localhost:9092"}, "2.5.0", WithClientID("dhuwit"), WithRetryMax(5))
    if err != nil {
        panic(err)
    }
}

When create the producer, it will set some default values.

Options

When creating producer, this helper using options pattern, so you can inject the options as needed with WithXXX() functions. WithXXX() functions have ProducerConfigOption type. It will act as configuration properties that sarama producer has. Right now ProducerConfigOption only provides kafka producer configuration options that often used. For perkakas maintainers, you can create additional ProducerConfigOption option in the future by create WithXXX() functions, based on the sarama producer configuration as needed.

Options Available

WithClientID(clientID string) -- Set kafka producer client ID

WithMaxMessageBytes(maxMessageBytes int) -- Set a single message size in bytes

WithoutSASL() -- Disable SASL support

WithRequiredAcks(mode int) -- Set the required acks mode

WithRetryBackoff(d time.Duration) -- Set the delay between retries if retry happens

WithRetryMax(max int) -- Set the max retry attempt

WithReturnErrors(t bool) -- Enable/disable the kafka producer to return error to the error channel

WithSuccesses(t bool) -- Enable/disable the kafka producer to return successfully delivered messages to Successes channel. Always on on synchronous producer

WithSASL(username, password string) -- Set SASL support to the producer. Takes SASL username and password

WithTLS(t bool) -- Enable/disable TLS support when connecting to kafka brokers

WithVerbose() -- Set the kafka producer logger, discard by default

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewKafkaAsyncProducer

func NewKafkaAsyncProducer(brokers []string, version string, opts ...ProducerConfigOption) (producer sarama.AsyncProducer, err error)

NewKafkaAsyncProducer initialize asynchronous producer for kafka.

func NewKafkaProducer

func NewKafkaProducer(brokers []string, version string, opts ...ProducerConfigOption) (producer sarama.SyncProducer, err error)

NewKafkaProducer initialize synchronous producer for kafka.

func NewKafkaProducerConfig

func NewKafkaProducerConfig(version string, opts ...ProducerConfigOption) (*sarama.Config, error)

NewKafkaProducerConfig is a function to initialize default kafka producer configuration It accepts version of kafka and options. This options are taken from sarama configuration as ProducerConfigOption, but not all covered. ProducerConfigOption only provides kafka producer configuration options that often used. For perkakas maintainers, you can add the ProducerConfigOption provided in the future by create WithXXX() functions, based on the sarama producer configuration.

Types

type ProducerConfigOption

type ProducerConfigOption func(*sarama.Config)

ProducerConfigOption is an function type alias that accepts *sarama.Config. It will be used as producer configuration options type.

func WithClientID

func WithClientID(clientID string) ProducerConfigOption

WithClientID sets the client id in producer configuration

func WithMaxMessageBytes

func WithMaxMessageBytes(max int) ProducerConfigOption

WithMaxMessageBytes sets the max permitted size of message. Defaults to 1000000.

func WithRequiredAcks

func WithRequiredAcks(reqAcks sarama.RequiredAcks) ProducerConfigOption

WithRequiredAcks sets the level of aknowledement reliability. Default to WaitForLocal.

func WithRetryBackoff

func WithRetryBackoff(duration time.Duration) ProducerConfigOption

WithRetryBackoff sets duration between retry. Default 100 ms.

func WithRetryMax

func WithRetryMax(max int) ProducerConfigOption

WithRetryMax sets max retry if producer failed to produce the message. Default 3 if you not set this.

func WithReturnErrors

func WithReturnErrors(isReturnError bool) ProducerConfigOption

WithReturnErrors tells the producers to return error. Default enabled.

func WithReturnSuccesses

func WithReturnSuccesses(isReturnSuccess bool) ProducerConfigOption

WithReturnSuccesses tells producers to return success. Default disabled.

func WithSASL

func WithSASL(user, pass string) ProducerConfigOption

WithSASL sets the expected username and password for SASL and enabling the SASL mode in producer configuration.

func WithTLS

func WithTLS(withTLS bool) ProducerConfigOption

WithTLS sets producer to connect with tls or not. Default false/disabled.

func WithVerbose

func WithVerbose() ProducerConfigOption

WithVerbose sets the logger to output the log into stdout. If you not set this, the producer will discard the log.

func WithoutSASL

func WithoutSASL() ProducerConfigOption

WithoutSASL disable SASL mode in producer and emptying SASL username and password configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL