kafkapubsub

package module
v0.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: Apache-2.0 Imports: 15 Imported by: 49

Documentation

Overview

Package kafkapubsub provides an implementation of pubsub for Kafka. It requires a minimum Kafka version of 0.11.x for Header support. Some functionality may work with earlier versions of Kafka.

See https://kafka.apache.org/documentation.html#semantics for a discussion of message semantics in Kafka. sarama.Config exposes many knobs that can affect performance and semantics, so review and set them carefully.

kafkapubsub does not support Message.Nack; Message.Nackable will return false, and Message.Nack will panic if called.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, kafkapubsub registers for the scheme "kafka". The default URL opener will connect to a default set of Kafka brokers based on the environment variable "KAFKA_BROKERS", expected to be a comma-delimited set of server addresses. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Escaping

Go CDK supports all UTF-8 strings. No escaping is required for Kafka. Message metadata is supported through Kafka Headers, which allow arbitrary []byte for both key and value. These are converted to string for use in Message.Metadata.

As

kafkapubsub exposes the following types for As:

  • Topic: sarama.SyncProducer
  • Subscription: sarama.ConsumerGroup, sarama.ConsumerGroupSession (may be nil during session renegotiation, and session may go stale at any time)
  • Message: *sarama.ConsumerMessage
  • Message.BeforeSend: *sarama.ProducerMessage
  • Message.AfterSend: None
  • Error: sarama.ConsumerError, sarama.ConsumerErrors, sarama.ProducerError, sarama.ProducerErrors, sarama.ConfigurationError, sarama.PacketDecodingError, sarama.PacketEncodingError, sarama.KError
Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/kafkapubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// The host + path are used as the consumer group name.
	// The "topic" query parameter sets one or more topics to subscribe to.
	// The set of brokers must be in an environment variable KAFKA_BROKERS.
	subscription, err := pubsub.OpenSubscription(ctx,
		"kafka://my-group?topic=my-topic")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/kafkapubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// The host + path are the topic name to send to.
	// The set of brokers must be in an environment variable KAFKA_BROKERS.
	topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "kafka"

Scheme is the URL scheme that kafkapubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func MinimalConfig

func MinimalConfig() *sarama.Config

MinimalConfig returns a minimal sarama.Config.

func OpenSubscription

func OpenSubscription(brokers []string, config *sarama.Config, group string, topics []string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription creates a pubsub.Subscription that joins group, receiving messages from topics.

It uses a sarama.ConsumerGroup to receive messages. Consumer options can be configured in the Consumer section of the sarama.Config: https://godoc.org/github.com/IBM/sarama#Config.

Example
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub/kafkapubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// The set of brokers in the Kafka cluster.
	addrs := []string{"1.2.3.4:9092"}
	// The Kafka client configuration to use.
	config := kafkapubsub.MinimalConfig()

	// Construct a *pubsub.Subscription, joining the consumer group "my-group"
	// and receiving messages from "my-topic".
	subscription, err := kafkapubsub.OpenSubscription(
		addrs, config, "my-group", []string{"my-topic"}, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

func OpenTopic

func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopic creates a pubsub.Topic that sends to a Kafka topic.

It uses a sarama.SyncProducer to send messages. Producer options can be configured in the Producer section of the sarama.Config: https://godoc.org/github.com/IBM/sarama#Config.

Config.Producer.Return.Success must be set to true.

Example
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub/kafkapubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// The set of brokers in the Kafka cluster.
	addrs := []string{"1.2.3.4:9092"}
	// The Kafka client configuration to use.
	config := kafkapubsub.MinimalConfig()

	// Construct a *pubsub.Topic.
	topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil)
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// KeyName optionally sets the Message.Metadata key in which to store the
	// Kafka message key. If set, and if the Kafka message key is non-empty,
	// the key value will be stored in Message.Metadata under KeyName.
	KeyName string

	// WaitForJoin causes OpenSubscription to wait for up to WaitForJoin
	// to allow the client to join the consumer group.
	// Messages sent to the topic before the client joins the group
	// may not be received by this subscription.
	// OpenSubscription will succeed even if WaitForJoin elapses and
	// the subscription still hasn't been joined successfully.
	WaitForJoin time.Duration
}

SubscriptionOptions contains configuration for subscriptions.

type TopicOptions

type TopicOptions struct {
	// KeyName optionally sets the Message.Metadata key to use as the optional
	// Kafka message key. If set, and if a matching Message.Metadata key is found,
	// the value for that key will be used as the message key when sending to
	// Kafka, instead of being added to the message headers.
	KeyName string

	// BatcherOptions adds constraints to the default batching done for sends.
	BatcherOptions batcher.Options
}

TopicOptions contains configuration options for topics.

type URLOpener

type URLOpener struct {
	// Brokers is the slice of brokers in the Kafka cluster.
	Brokers []string
	// Config is the Sarama Config.
	// Config.Producer.Return.Success must be set to true.
	Config *sarama.Config

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens Kafka URLs like "kafka://mytopic" for topics and "kafka://group?topic=mytopic" for subscriptions.

For topics, the URL's host+path is used as the topic name.

For subscriptions, the URL's host+path is used as the group name, and the "topic" query parameter(s) are used as the set of topics to subscribe to. The "offset" parameter is available to subscribers to set the Kafka consumer's initial offset. Where "oldest" starts consuming from the oldest offset of the consumer group and "newest" starts consuming from the most recent offset on the topic.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL