kafka

package
v1.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2020 License: Apache-2.0 Imports: 9 Imported by: 24

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// RequiredAcks will be used in Kafka configs
	// to set the 'RequiredAcks' value.
	RequiredAcks = sarama.WaitForAll
)

Functions

func GetPartitions

func GetPartitions(brokerHosts []string, topic string) (partitions []int32, err error)

GetPartitions is a helper function to look up which partitions are available via the given brokers for the given topic. This should be called only on startup.

func NewPublisher

func NewPublisher(cfg *Config) (pubsub.Publisher, error)

NewPublisher will initiate a new experimental Kafka publisher.

func NewSubscriber

func NewSubscriber(cfg *Config, offsetProvider func() int64, offsetBroadcast func(int64)) (pubsub.Subscriber, error)

NewSubscriber will initiate a the experimental Kafka consumer.

Types

type Config

type Config struct {
	BrokerHosts []string
	// BrokerHostsString is used when loading the list from environment variables.
	// If loaded via the LoadEnvConfig() func, BrokerHosts will get updated with these
	// values.
	BrokerHostsString string `envconfig:"KAFKA_BROKER_HOSTS"`

	Partition int32  `envconfig:"KAFKA_PARTITION"`
	Topic     string `envconfig:"KAFKA_TOPIC"`

	MaxRetry int `envconfig:"KAFKA_MAX_RETRY"`

	// Config is a sarama config struct for more control over the underlying Kafka client.
	Config *sarama.Config
}

Config holds the basic information for working with Kafka.

func LoadConfigFromEnv

func LoadConfigFromEnv() *Config

LoadConfigFromEnv will attempt to load an Kafka object from environment variables. If not populated, nil is returned.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is an experimental publisher that provides an implementation for Kafka using the Shopify/sarama library.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, key string, m proto.Message) error

Publish will marshal the proto message and emit it to the Kafka topic.

func (*Publisher) PublishRaw

func (p *Publisher) PublishRaw(_ context.Context, key string, m []byte) error

PublishRaw will emit the byte array to the Kafka topic.

func (*Publisher) Stop

func (p *Publisher) Stop() error

Stop will close the pub connection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL