gocloud.dev: gocloud.dev/pubsub/kafkapubsub Index | Examples | Files

package kafkapubsub

import "gocloud.dev/pubsub/kafkapubsub"

Package kafkapubsub provides an implementation of pubsub for Kafka. It requires a minimum Kafka version of 0.11.x for Header support. Some functionality may work with earlier versions of Kafka.

See https://kafka.apache.org/documentation.html#semantics for a discussion of message semantics in Kafka. sarama.Config exposes many knobs that can affect performance and semantics, so review and set them carefully.

kafkapubsub does not support Message.Nack; Message.Nackable will return false, and Message.Nack will panic if called.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, kafkapubsub registers for the scheme "kafka". The default URL opener will connect to a default set of Kafka brokers based on the environment variable "KAFKA_BROKERS", expected to be a comma-delimited set of server addresses. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Escaping

Go CDK supports all UTF-8 strings. No escaping is required for Kafka. Message metadata is supported through Kafka Headers, which allow arbitrary []byte for both key and value. These are converted to string for use in Message.Metadata.

As

kafkapubsub exposes the following types for As:

- Topic: sarama.SyncProducer
- Subscription: sarama.ConsumerGroup, sarama.ConsumerGroupSession (may be nil during session renegotiation, and session may go stale at any time)
- Message: *sarama.ConsumerMessage
- Message.BeforeSend: *sarama.ProducerMessage
- Error: sarama.ConsumerError, sarama.ConsumerErrors, sarama.ProducerError, sarama.ProducerErrors, sarama.ConfigurationError, sarama.PacketDecodingError, sarama.PacketEncodingError, sarama.KError

Code:

// This example is used in https://gocloud.dev/howto/pubsub/subscribe/#kafka

// import _ "gocloud.dev/pubsub/kafkapubsub"

// Variables set up elsewhere:
ctx := context.Background()

// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// The host + path are used as the consumer group name.
// The "topic" query parameter sets one or more topics to subscribe to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
subscription, err := pubsub.OpenSubscription(ctx,
    "kafka://my-group?topic=my-topic")
if err != nil {
    log.Fatal(err)
}
defer subscription.Shutdown(ctx)

Code:

// This example is used in https://gocloud.dev/howto/pubsub/publish/#kafka

// import _ "gocloud.dev/pubsub/kafkapubsub"

// Variables set up elsewhere:
ctx := context.Background()

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// The host + path are the topic name to send to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic")
if err != nil {
    log.Fatal(err)
}
defer topic.Shutdown(ctx)

Index

Examples

Package Files

kafka.go

Constants

const Scheme = "kafka"

Scheme is the URL scheme that kafkapubsub registers its URLOpeners under on pubsub.DefaultMux.

func MinimalConfig Uses

func MinimalConfig() *sarama.Config

MinimalConfig returns a minimal sarama.Config.

func OpenSubscription Uses

func OpenSubscription(brokers []string, config *sarama.Config, group string, topics []string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription creates a pubsub.Subscription that joins group, receiving messages from topics.

It uses a sarama.ConsumerGroup to receive messages. Consumer options can be configured in the Consumer section of the sarama.Config: https://godoc.org/github.com/Shopify/sarama#Config.

Code:

// This example is used in https://gocloud.dev/howto/pubsub/subscribe/#kafka-ctor

// Variables set up elsewhere:
ctx := context.Background()

// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()

// Construct a *pubsub.Subscription, joining the consumer group "my-group"
// and receiving messages from "my-topic".
subscription, err := kafkapubsub.OpenSubscription(
    addrs, config, "my-group", []string{"my-topic"}, nil)
if err != nil {
    log.Fatal(err)
}
defer subscription.Shutdown(ctx)

func OpenTopic Uses

func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopic creates a pubsub.Topic that sends to a Kafka topic.

It uses a sarama.SyncProducer to send messages. Producer options can be configured in the Producer section of the sarama.Config: https://godoc.org/github.com/Shopify/sarama#Config.

Config.Producer.Return.Success must be set to true.

Code:

// This example is used in https://gocloud.dev/howto/pubsub/publish/#kafka-ctor

// Variables set up elsewhere:
ctx := context.Background()

// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()

// Construct a *pubsub.Topic.
topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil)
if err != nil {
    log.Fatal(err)
}
defer topic.Shutdown(ctx)

type SubscriptionOptions Uses

type SubscriptionOptions struct {
    // KeyName optionally sets the Message.Metadata key in which to store the
    // Kafka message key. If set, and if the Kafka message key is non-empty,
    // the key value will be stored in Message.Metadata under KeyName.
    KeyName string

    // WaitForJoin causes OpenSubscription to wait for up to WaitForJoin
    // to allow the client to join the consumer group.
    // Messages sent to the topic before the client joins the group
    // may not be received by this subscription.
    // OpenSubscription will succeed even if WaitForJoin elapses and
    // the subscription still hasn't been joined successfully.
    WaitForJoin time.Duration
}

SubscriptionOptions contains configuration for subscriptions.

type TopicOptions Uses

type TopicOptions struct {
    // KeyName optionally sets the Message.Metadata key to use as the optional
    // Kafka message key. If set, and if a matching Message.Metadata key is found,
    // the value for that key will be used as the message key when sending to
    // Kafka, instead of being added to the message headers.
    KeyName string
}

TopicOptions contains configuration options for topics.

type URLOpener Uses

type URLOpener struct {
    // Brokers is the slice of brokers in the Kafka cluster.
    Brokers []string
    // Config is the Sarama Config.
    // Config.Producer.Return.Success must be set to true.
    Config *sarama.Config

    // TopicOptions specifies the options to pass to OpenTopic.
    TopicOptions TopicOptions
    // SubscriptionOptions specifies the options to pass to OpenSubscription.
    SubscriptionOptions SubscriptionOptions
}

URLOpener opens Kafka URLs like "kafka://mytopic" for topics and "kafka://group?topic=mytopic" for subscriptions.

For topics, the URL's host+path is used as the topic name.

For subscriptions, the URL's host+path is used as the group name, and the "topic" query parameter(s) are used as the set of topics to subscribe to.

func (*URLOpener) OpenSubscriptionURL Uses

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL Uses

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Package kafkapubsub imports 16 packages (graph) and is imported by 2 packages. Updated 2019-07-17. Refresh now. Tools for package owners.