kafka

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2019 License: Apache-2.0 Imports: 6 Imported by: 0

README

constructor-kafka - Constructor wrapper for sarama consumers and producers

codecov Build Status GoDoc

Quick Start

package main

import (
    "context"

    "github.com/Shopify/sarama"

    "github.com/stackopsd/config"
    "github.com/stackopsd/constructor"
    ckafka "github.com/stackopsd/constructor-kafka"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    syncProducerConst := ckafka.NewSyncProducerConstructor()
    asyncProducerConst := ckafka.NewAsyncProducerConstructor()
    consumerGroupConst := ckafka.NewConsumerGroupConstructor()

    syncProducer := new(sarama.SyncProducer)
    asyncProducer := new(sarama.AsyncProducer)
    consumerGroup := new(sarama.ConsumerGroup)

    // Create a loader from a source of configuration data.
    syncProducerLoader := config.NewFileLoader("/path/to/some/config.json_or_yaml")
    asyncProducerLoader := config.NewFileLoader("/path/to/some/config.json_or_yaml")
    consumerGroupLoader := config.NewFileLoader("/path/to/some/config.json_or_yaml")
    if err := constructor.New(ctx, syncProducerLoader, &syncProducerConst, syncProducer); err != nil {
        panic(err)
    }
    if err := constructor.New(ctx, asyncProducerLoader, &asyncProducerConst, asyncProducer); err != nil {
        panic(err)
    }
    if err := constructor.New(ctx, consumerGroupLoader, &consumerGroupConst, consumerGroup); err != nil {
        panic(err)
    }

    // Publish and consume messages using the sarama clients.
}

Example Configuration

Note that more configuration options are expected over time. For now this contains a fairly minimal subset of options required to get a producer or consumer running. Extended options will need to be added in the future before production use. For example, there is no current way to install a metrics collector or fine tune the byte limits of consumer operations.

If loading from a file, the following represents all the possible options for a sync producer:

client_id: "myService"
channel_buffer_size: 256 # Size of success/error channel buffers
version: "2.3.0"
addresses: # Addresses of initial brokers
    - kafka:9092
    - kafka2:9092
partitioner: "hashing" # One of random, hashing, roundrobin, or manual
compression: "none" # One of gzip, lz4, none, snappy, or zstd
compression_level: -1000 # Compression strategy specific seting.
required_acks: "local" # One of none, local, all.
ack_timeout: "10s" # Max time to wait when require_acks is all.
max_message_bytes: 1000000
net:
    max_open_requests: 5 # Number of requests on a connection before blocking.
    dial_timeout: "30s" # How long to wait for the initial connection.
    read_timeout: "30s" # How long to wait for a response.
    write_timeout: "30s" # How long to wait for a transmit.
	keep_alive: "0s" # Network keep-alive time. 0 for disabled.
	tls_enabled: true # Optional TLS support
    tls: # All TLS options provided by https://github.com/stackopsd/constructor-tls
        insecure_skip_verify: true
        server_name: "my-server"
        client_auth: "require_and_verify_client_cert"
        rand:
            choice: "crypto" # Can be zero if needed for testing.
        certificates: # Set of certificates to present when authenticating.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
        root_cas: # Set of root authorities to use when verifying a server certificate.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
        client_cas: # Set of root authorities to use when verifying a client certificate.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"

The following are all options for the async producer:

client_id: "myService"
channel_buffer_size: 256 # Size of success/error channel buffers
version: "2.3.0"
addresses: # Addresses of initial brokers
    - kafka:9092
    - kafka2:9092
partitioner: "hashing" # One of random, hashing, roundrobin, or manual
compression: "none" # One of gzip, lz4, none, snappy, or zstd
compression_level: -1000 # Compression strategy specific seting.
required_acks: "local" # One of none, local, all.
ack_timeout: "10s" # Max time to wait when require_acks is all.
max_message_bytes: 1000000
return_sucesses: false # Enable the success channel on the producer.
return_errors: false # Enable the error channel on the producer.
net:
    max_open_requests: 5 # Number of requests on a connection before blocking.
    dial_timeout: "30s" # How long to wait for the initial connection.
    read_timeout: "30s" # How long to wait for a response.
    write_timeout: "30s" # How long to wait for a transmit.
    keep_alive: "0s" # Network keep-alive time. 0 for disabled.
    tls_enabled: true # Optional TLS support
    tls: # All TLS options provided by https://github.com/stackopsd/constructor-tls
        insecure_skip_verify: true
        server_name: "my-server"
        client_auth: "require_and_verify_client_cert"
        rand:
            choice: "crypto" # Can be zero if needed for testing.
        certificates: # Set of certificates to present when authenticating.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
        root_cas: # Set of root authorities to use when verifying a server certificate.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
        client_cas: # Set of root authorities to use when verifying a client certificate.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"

and the following are the options for the consumer group:

client_id: "myService"
channel_buffer_size: 256 # Size of success/error channel buffers
version: "2.3.0"
addresses: # Addresses of initial brokers
    - kafka:9092
    - kafka2:9092
session_timeout: "10s" # Time in which the broker must receive a heartbeat.
heartbeat_interval: "3s" # Frequency of sending heartbeats to the broker.
rebalance_strategy: "range": # One of range, roundrobin, or sticky
rebalance_timeout: "60s" # Time allowed for the node to join after a rebalance.
return_errors: false # Enable the error channel on the consumer.
group_name: "myConsumerGroup" # Unique consumer group name.
net:
    max_open_requests: 5 # Number of requests on a connection before blocking.
    dial_timeout: "30s" # How long to wait for the initial connection.
    read_timeout: "30s" # How long to wait for a response.
    write_timeout: "30s" # How long to wait for a transmit.
    keep_alive: "0s" # Network keep-alive time. 0 for disabled.
    tls_enabled: true # Optional TLS support
    tls: # All TLS options provided by https://github.com/stackopsd/constructor-tls
        insecure_skip_verify: true
        server_name: "my-server"
        client_auth: "require_and_verify_client_cert"
        rand:
            choice: "crypto" # Can be zero if needed for testing.
        certificates: # Set of certificates to present when authenticating.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
        root_cas: # Set of root authorities to use when verifying a server certificate.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
        client_cas: # Set of root authorities to use when verifying a client certificate.
            bytes:
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
                -   cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
                    key: "<FULL PEM ENCODED KEY AS A STRING>"
            files:
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"
                -   cert: "/path/to/pem/encoded/cert"
                    key: "/path/to/pem/encoded/key"

License

Copyright 2019 Kevin Conway

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncProducerConfig

type AsyncProducerConfig struct {
	SharedConfig
	Partitioner      string // One of random, hash, round robin, etc.
	Compression      string // One of gzip, lz4, none, etc.
	CompressionLevel int
	RequiredAcks     string // One of none, local, all.
	AckTimeout       time.Duration
	MaxMessageBytes  int
	ReturnSuccesses  bool
	ReturnErrors     bool
}

AsyncProducerConfig contains settings for a sarama.AsyncProducer

type AsyncProducerConstructor

type AsyncProducerConstructor struct {
	TLS *ctls.Constructor
}

AsyncProducerConstructor creates sarama.AsyncProducers.

func NewAsyncProducerConstructor

func NewAsyncProducerConstructor() *AsyncProducerConstructor

NewAsyncProducerConstructor generates a AsyncProducerConstructor.

func (*AsyncProducerConstructor) New

New generates a sarama.AsyncProducer.

func (*AsyncProducerConstructor) Settings

Settings generates the default configuration.

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	SharedConfig
	SessionTimeout    time.Duration
	HeartbeatInterval time.Duration
	RebalanceStrategy string
	RebalanceTimeout  time.Duration
	ReturnErrors      bool
	GroupName         string
}

ConsumerGroupConfig contains settings for a sarama.ConsumerGroup

type ConsumerGroupConstructor

type ConsumerGroupConstructor struct {
	TLS *ctls.Constructor
}

ConsumerGroupConstructor creates sarama.ConsumerGroup.

func NewConsumerGroupConstructor

func NewConsumerGroupConstructor() *ConsumerGroupConstructor

NewConsumerGroupConstructor generates a ConsumerGroupConstructor.

func (*ConsumerGroupConstructor) New

New generates a sarama.ConsumerGroup.

func (*ConsumerGroupConstructor) Settings

Settings generates the default configuration.

type NetConfig

type NetConfig struct {
	MaxOpenRequests int
	DialTimeout     time.Duration // How long to wait for the initial connection.
	ReadTimeout     time.Duration // How long to wait for a response.
	WriteTimeout    time.Duration // How long to wait for a transmit.
	KeepAlive       time.Duration
	TLSEnabled      bool
	TLS             *ctls.Config
}

NetConfig contains settings for the underlying network connections.

type SharedConfig

type SharedConfig struct {
	Net               NetConfig
	ClientID          string
	ChannelBufferSize int
	Version           string
	Addresses         []string
}

SharedConfig contains the settings used by all Sarama components.

type SyncProducerConfig

type SyncProducerConfig struct {
	SharedConfig
	Partitioner      string // One of random, hash, round robin, etc.
	Compression      string // One of gzip, lz4, none, etc.
	CompressionLevel int
	RequiredAcks     string // One of none, local, all.
	AckTimeout       time.Duration
	MaxMessageBytes  int
}

SyncProducerConfig contains settings for a sarama.SyncProducer

type SyncProducerConstructor

type SyncProducerConstructor struct {
	TLS *ctls.Constructor
}

SyncProducerConstructor creates sarama.SyncProducers.

func NewSyncProducerConstructor

func NewSyncProducerConstructor() *SyncProducerConstructor

NewSyncProducerConstructor generates a SyncProducerConstructor.

func (*SyncProducerConstructor) New

New generates a sarama.SyncProducer.

func (*SyncProducerConstructor) Settings

Settings generates the default configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL