gizmo: github.com/NYTimes/gizmo/pubsub/kafka Index | Files

package kafka

import "github.com/NYTimes/gizmo/pubsub/kafka"

Index

Package Files

config.go kafka.go

Variables

var (
    // RequiredAcks will be used in Kafka configs
    // to set the 'RequiredAcks' value.
    RequiredAcks = sarama.WaitForAll
)

func GetPartitions Uses

func GetPartitions(brokerHosts []string, topic string) (partitions []int32, err error)

GetPartitions is a helper function to look up which partitions are available via the given brokers for the given topic. This should be called only on startup.

func NewPublisher Uses

func NewPublisher(cfg *Config) (pubsub.Publisher, error)

NewPublisher will initiate a new experimental Kafka publisher.

func NewSubscriber Uses

func NewSubscriber(cfg *Config, offsetProvider func() int64, offsetBroadcast func(int64)) (pubsub.Subscriber, error)

NewSubscriber will initiate a the experimental Kafka consumer.

type Config Uses

type Config struct {
    BrokerHosts []string
    // BrokerHostsString is used when loading the list from environment variables.
    // If loaded via the LoadEnvConfig() func, BrokerHosts will get updated with these
    // values.
    BrokerHostsString string `envconfig:"KAFKA_BROKER_HOSTS"`

    Partition int32  `envconfig:"KAFKA_PARTITION"`
    Topic     string `envconfig:"KAFKA_TOPIC"`

    MaxRetry int `envconfig:"KAFKA_MAX_RETRY"`

    // Config is a sarama config struct for more control over the underlying Kafka client.
    Config *sarama.Config
}

Config holds the basic information for working with Kafka.

func LoadConfigFromEnv Uses

func LoadConfigFromEnv() *Config

LoadConfigFromEnv will attempt to load an Kafka object from environment variables. If not populated, nil is returned.

type Publisher Uses

type Publisher struct {
    // contains filtered or unexported fields
}

Publisher is an experimental publisher that provides an implementation for Kafka using the Shopify/sarama library.

func (*Publisher) Publish Uses

func (p *Publisher) Publish(ctx context.Context, key string, m proto.Message) error

Publish will marshal the proto message and emit it to the Kafka topic.

func (*Publisher) PublishRaw Uses

func (p *Publisher) PublishRaw(_ context.Context, key string, m []byte) error

PublishRaw will emit the byte array to the Kafka topic.

func (*Publisher) Stop Uses

func (p *Publisher) Stop() error

Stop will close the pub connection.

Package kafka imports 9 packages (graph) and is imported by 5 packages. Updated 2019-03-11. Refresh now. Tools for package owners.