kafkapubsub

package
v0.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package kafkapubsub provides an implementation of pubsub for Kafka. It requires a minimum Kafka version of 0.11.x for Header support. Some functionality may work with earlier versions of Kafka.

See https://kafka.apache.org/documentation.html#semantics for a discussion of message semantics in Kafka. sarama.Config exposes many knobs that can affect performance and semantics, so review and set them carefully.

kafkapubsub does not support Message.Nack; Message.Nackable will return false, and Message.Nack will panic if called.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, kafkapubsub registers for the scheme "kafka". The default URL opener will connect to a default set of Kafka brokers based on the environment variable "KAFKA_BROKERS", expected to be a comma-delimited set of server addresses. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://github.com/eliben/gocdkx/concepts/urls/ for background information.

Escaping

Go CDK supports all UTF-8 strings. No escaping is required for Kafka. Message metadata is supported through Kafka Headers, which allow arbitrary []byte for both key and value. These are converted to string for use in Message.Metadata.

As

kafkapubsub exposes the following types for As:

  • Topic: sarama.SyncProducer
  • Subscription: sarama.ConsumerGroup, sarama.ConsumerGroupSession (may be nil during session renegotiation, and session may go stale at any time)
  • Message: *sarama.ConsumerMessage
  • Message.BeforeSend: *sarama.ProducerMessage
  • Error: sarama.ConsumerError, sarama.ConsumerErrors, sarama.ProducerError, sarama.ProducerErrors, sarama.ConfigurationError, sarama.PacketDecodingError, sarama.PacketEncodingError, sarama.KError
Example (OpenSubscription)
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#kafka

	// import _ "github.com/eliben/gocdkx/pubsub/kafkapubsub"

	// Variables set up elsewhere:
	ctx := context.Background()

	// OpenSubscription creates a *pubsub.Subscription from a URL.
	// The host + path are used as the consumer group name.
	// The "topic" query parameter sets one or more topics to subscribe to.
	// The set of brokers must be in an environment variable KAFKA_BROKERS.
	subscription, err := pubsub.OpenSubscription(ctx,
		"kafka://my-group?topic=my-topic")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopic)
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#kafka

	// import _ "github.com/eliben/gocdkx/pubsub/kafkapubsub"

	// Variables set up elsewhere:
	ctx := context.Background()

	// OpenTopic creates a *pubsub.Topic from a URL.
	// The host + path are the topic name to send to.
	// The set of brokers must be in an environment variable KAFKA_BROKERS.
	topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "kafka"

Scheme is the URL scheme that kafkapubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func MinimalConfig

func MinimalConfig() *sarama.Config

MinimalConfig returns a minimal sarama.Config.

func OpenSubscription

func OpenSubscription(brokers []string, config *sarama.Config, group string, topics []string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription creates a pubsub.Subscription that joins group, receiving messages from topics.

It uses a sarama.ConsumerGroup to receive messages. Consumer options can be configured in the Consumer section of the sarama.Config: https://godoc.org/github.com/Shopify/sarama#Config.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub/kafkapubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#kafka-ctor

	// Variables set up elsewhere:
	ctx := context.Background()

	// The set of brokers in the Kafka cluster.
	addrs := []string{"1.2.3.4:9092"}
	// The Kafka client configuration to use.
	config := kafkapubsub.MinimalConfig()

	// Construct a *pubsub.Subscription, joining the consumer group "my-group"
	// and receiving messages from "my-topic".
	subscription, err := kafkapubsub.OpenSubscription(
		addrs, config, "my-group", []string{"my-topic"}, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

func OpenTopic

func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopic creates a pubsub.Topic that sends to a Kafka topic.

It uses a sarama.SyncProducer to send messages. Producer options can be configured in the Producer section of the sarama.Config: https://godoc.org/github.com/Shopify/sarama#Config.

Config.Producer.Return.Success must be set to true.

Example
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub/kafkapubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#kafka-ctor

	// Variables set up elsewhere:
	ctx := context.Background()

	// The set of brokers in the Kafka cluster.
	addrs := []string{"1.2.3.4:9092"}
	// The Kafka client configuration to use.
	config := kafkapubsub.MinimalConfig()

	// Construct a *pubsub.Topic.
	topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil)
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// KeyName optionally sets the Message.Metadata key in which to store the
	// Kafka message key. If set, and if the Kafka message key is non-empty,
	// the key value will be stored in Message.Metadata under KeyName.
	KeyName string

	// WaitForJoin causes OpenSubscription to wait for up to WaitForJoin
	// to allow the client to join the consumer group.
	// Messages sent to the topic before the client joins the group
	// may not be received by this subscription.
	// OpenSubscription will succeed even if WaitForJoin elapses and
	// the subscription still hasn't been joined successfully.
	WaitForJoin time.Duration
}

SubscriptionOptions contains configuration for subscriptions.

type TopicOptions

type TopicOptions struct {
	// KeyName optionally sets the Message.Metadata key to use as the optional
	// Kafka message key. If set, and if a matching Message.Metadata key is found,
	// the value for that key will be used as the message key when sending to
	// Kafka, instead of being added to the message headers.
	KeyName string
}

TopicOptions contains configuration options for topics.

type URLOpener

type URLOpener struct {
	// Brokers is the slice of brokers in the Kafka cluster.
	Brokers []string
	// Config is the Sarama Config.
	// Config.Producer.Return.Success must be set to true.
	Config *sarama.Config

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens Kafka URLs like "kafka://mytopic" for topics and "kafka://group?topic=mytopic" for subscriptions.

For topics, the URL's host+path is used as the topic name.

For subscriptions, the URL's host+path is used as the group name, and the "topic" query parameter(s) are used as the set of topics to subscribe to.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL