instasarama

package module
v1.22.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: MIT Imports: 12 Imported by: 1

README

Instana instrumentation for github.com/IBM/sarama

This module contains instrumentation code for Kafka producers and consumers that use github.com/IBM/sarama library starting from v1.41.0 and above.

GoDoc

Installation

$ go get github.com/instana/go-sensor/instrumentation/instasarama

Usage

For detailed usage examples see the documentation or the following links:

This instrumentation requires an instance of instana.Sensor to initialize spans and handle the trace context propagation. You can create a new instance of Instana sensor using instana.NewSensor().

instasarama provides a set of convenience wrappers for constructor functions exported by github.com/IBM/sarama. These wrappers are named the same way as their origins and use the same set of arguments. In most cases it's enough to replace sarama with instasarama in the constructor call and append an instance of instana.TracerLogger to the argument list.

Note: Kafka supports record headers starting from v0.11.0. In order to enable trace context propagation, you need to make sure that your (sarama.Config).Version is set to at least sarama.V0_11_0_0.

Instrumenting sarama.SyncProducer

For more detailed example code please consult the package documentation or example_sync_producer_test.go.

To create an instrumented instance of sarama.SyncProducer from a list of broker addresses use instasarama.NewSyncProducer():

producer := instasarama.NewSyncProducer(brokers, config, sensor)

instasarama.NewSyncProducerFromClient() does the same, but from an existing sarama.Client:

producer := instasarama.NewSyncProducerFromClient(client, sensor)

The wrapped producer takes care of trace context propagation by creating an exit span and injecting the trace context into each Kafka message headers. Since github.com/IBM/sarama does not use context.Context, which is a conventional way of passing the parent span in Instana Go sensor, the caller needs to inject the parent span context using instasarama.ProducerMessageWithSpan() before passing it to the wrapped producer.

Instrumenting sarama.AsyncProducer

Similarly to sarama.SyncProducer, instasarama provides wrappers for constructor methods of sarama.AsyncProducer and expects the parent span context to be injected into message headers using instasarama.ProducerMessageWithSpan().

For more detailed example code please consult the package documentation or example_async_producer_test.go.

To create an instrumented instance of sarama.AsyncProducer from a list of broker addresses use instasarama.NewAsyncProducer():

producer := instasarama.NewAsyncProducer(brokers, config, sensor)

instasarama.NewAsyncProducerFromClient() does the same, but from an existing sarama.Client:

producer := instasarama.NewAsyncProducerFromClient(client, sensor)

The wrapped producer takes care of trace context propagation by creating an exit span and injecting the trace context into each Kafka message headers. Since github.com/IBM/sarama does not use context.Context, which is a conventional way of passing the parent span in Instana Go sensor, the caller needs to inject the parent span context using instasarama.ProducerMessageWithSpan() before passing it to the wrapped producer.

Instrumenting sarama.Consumer

For more detailed example code please consult the package documentation or example_consumer_test.go.

To create an instrumented instance of sarama.Consumer from a list of broker addresses use instasarama.NewConsumer():

consumer := instasarama.NewConsumer(brokers, config, sensor)

instasarama.NewConsumerFromClient() does the same, but from an existing sarama.Client:

consumer := instasarama.NewConsumerFromClient(client, sensor)

The wrapped consumer will pick up the existing trace context if found in message headers, start a new entry span and inject its context into each message. This context can be retrieved with instasarama.SpanContextFromConsumerMessage() and used in the message handler to continue the trace.

Instrumenting sarama.ConsumerGroup

For more detailed example code please consult the package documentation or example_consumer_group_test.go.

instasarama provides instasarama.WrapConsumerGroupHandler() to wrap your sarama.ConsumerGroupHandler into a wrapper that takes care of trace context extraction, creating an entry span and injecting its context into each received sarama.ConsumerMessage:

var client sarama.ConsumerGroup

consumer := instasarama.WrapConsumerGroupHandler(&Consumer{}, sensor)

// use the wrapped consumer in the Consume() call
for {
	client.Consume(ctx, consumer)
}

The wrapped consumer will pick up the existing trace context if found in message headers, start a new entry span and inject its context into each message. This context can be retrieved with instasarama.SpanContextFromConsumerMessage() and used in the message handler to continue the trace.

Working With Kafka Header Formats

Since v1.2.0, the instrumentation supports Instana's trace correlation headers in both binary (legacy) and string (new) formats.

By default, both sets of headers (binary and string) will be added to messages. Versions prior to v1.2.0 will only add headers in the binary format.

This change affects how Instana headers are propagated via a producer when a message is sent. Consumers will always look for the string headers first and fallback to the binary format if necessary.

In the future, the binary headers will be discontinued and only the headers in the string format will be considered.

To choose a header format provide the INSTANA_KAFKA_HEADER_FORMAT environment variable to the application. The following are valid values:

  • binary: Producers will only add binary headers to Kafka messages.
  • string: Producers will only add string headers to Kafka messages.
  • both: Producers will add both sets of headers to Kafka messages.

If no environment variable is provided, or its value is empty or if it's not a valid value, Kafka headers will be treated as binary

See the topic Kafka header migration in Instana's documentation for more information.

Documentation

Overview

Example (AsyncProducer)

This example demonstrates how to instrument an async Kafka producer using instasarama. Error handling is omitted for brevity.

package main

import (
	"github.com/IBM/sarama"
	instana "github.com/instana/go-sensor"
	"github.com/instana/go-sensor/instrumentation/instasarama"
	"github.com/opentracing/opentracing-go/ext"
)

func main() {
	sensor := instana.NewSensor("my-service")
	brokers := []string{"localhost:9092"}

	config := sarama.NewConfig()
	// enable the use record headers added in kafka v0.11.0 and used to propagate
	// trace context
	config.Version = sarama.V0_11_0_0

	// create a new instrumented instance of sarama.SyncProducer
	producer, _ := instasarama.NewAsyncProducer(brokers, config, sensor)

	// start a new entry span
	sp := sensor.Tracer().StartSpan("my-producing-method")
	ext.SpanKind.Set(sp, "entry")

	msg := &sarama.ProducerMessage{
		// ...
	}

	// inject the span before passing the message to producer
	producer.Input() <- instasarama.ProducerMessageWithSpan(msg, sp)
}
Output:

Example (Consumer)

This example demonstrates how to instrument a Kafka consumer using instasarama and extract the trace context to ensure continuation. Error handling is omitted for brevity.

// (c) Copyright IBM Corp. 2023

//go:build go1.17
// +build go1.17

package main

import (
	"fmt"

	"github.com/IBM/sarama"
	instana "github.com/instana/go-sensor"
	"github.com/instana/go-sensor/instrumentation/instasarama"
	"github.com/opentracing/opentracing-go"
)

// This example demonstrates how to instrument a Kafka consumer using instasarama
// and extract the trace context to ensure continuation. Error handling is omitted for brevity.
func main() {
	sensor := instana.NewSensor("my-service")
	brokers := []string{"localhost:9092"}

	conf := sarama.NewConfig()
	conf.Version = sarama.V0_11_0_0

	// create a new instrumented instance of sarama.Consumer
	consumer, _ := instasarama.NewConsumer(brokers, conf, sensor)

	c, _ := consumer.ConsumePartition("test-topic-1", 0, sarama.OffsetNewest)
	defer c.Close()

	for msg := range c.Messages() {
		fmt.Println("Got messagge", msg)
		processMessage(msg, sensor)
	}
}

func processMessage(msg *sarama.ConsumerMessage, sensor instana.TracerLogger) {
	// extract trace context and start a new span
	parentCtx, _ := instasarama.SpanContextFromConsumerMessage(msg, sensor)

	sp := sensor.Tracer().StartSpan("process-message", opentracing.ChildOf(parentCtx))
	defer sp.Finish()

	// process message
}
Output:

Example (ConsumerGroup)

This example demonstrates how to instrument a Kafka consumer group using instasarama and extract the trace context to ensure continuation. Error handling is omitted for brevity.

// (c) Copyright IBM Corp. 2023

//go:build go1.17
// +build go1.17

package main

import (
	"context"

	"github.com/IBM/sarama"
	instana "github.com/instana/go-sensor"
	"github.com/instana/go-sensor/instrumentation/instasarama"
	"github.com/opentracing/opentracing-go"
)

// This example demonstrates how to instrument a Kafka consumer group using instasarama
// and extract the trace context to ensure continuation. Error handling is omitted for brevity.
func main() {
	sensor := instana.NewSensor("my-service")
	brokers := []string{"localhost:9092"}
	topics := []string{"records", "more-records"}

	conf := sarama.NewConfig()
	conf.Version = sarama.V0_11_0_0

	client, _ := sarama.NewConsumerGroup(brokers, "my-service-consumers", conf)
	defer client.Close()

	ctx := context.Background()
	consumer := instasarama.WrapConsumerGroupHandler(&Consumer{sensor: sensor}, sensor)

	// start consuming
	for {
		_ = client.Consume(ctx, topics, consumer)

		// ...
	}
}

type Consumer struct {
	sensor instana.TracerLogger
}

func (*Consumer) Setup(sarama.ConsumerGroupSession) error {
	// setup consumer
	return nil
}

func (*Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	// cleanup consumer
	return nil
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		c.processMessage(msg)
		session.MarkMessage(msg, "")
	}

	return nil
}

func (c *Consumer) processMessage(msg *sarama.ConsumerMessage) {
	// extract trace context and start a new span
	parentCtx, _ := instasarama.SpanContextFromConsumerMessage(msg, c.sensor)

	sp := c.sensor.Tracer().StartSpan("process-message", opentracing.ChildOf(parentCtx))
	defer sp.Finish()

	// process message
}
Output:

Example (SyncProducer)

This example demonstrates how to instrument a sync Kafka producer using instasarama. Error handling is omitted for brevity.

package main

import (
	"github.com/IBM/sarama"
	instana "github.com/instana/go-sensor"
	"github.com/instana/go-sensor/instrumentation/instasarama"
	"github.com/opentracing/opentracing-go/ext"
)

func main() {
	sensor := instana.NewSensor("my-service")
	brokers := []string{"localhost:9092"}

	config := sarama.NewConfig()
	// sarama requires Producer.Return.Successes to be set for sync producers
	config.Producer.Return.Successes = true
	// enable the use record headers added in kafka v0.11.0 and used to propagate
	// trace context
	config.Version = sarama.V0_11_0_0

	// create a new instrumented instance of sarama.SyncProducer
	producer, _ := instasarama.NewSyncProducer(brokers, config, sensor)

	// start a new entry span
	sp := sensor.Tracer().StartSpan("my-producing-method")
	ext.SpanKind.Set(sp, "entry")

	msg := &sarama.ProducerMessage{
		Topic:  "test-topic-1",
		Offset: sarama.OffsetNewest,
		Value:  sarama.StringEncoder("I am a message"),
		// ...
	}

	// inject the span before passing the message to producer
	msg = instasarama.ProducerMessageWithSpan(msg, sp)

	// pass it to the producer
	producer.SendMessage(msg)
}
Output:

Index

Examples

Constants

View Source
const (

	// FieldC is the trace context header key
	FieldC = "X_INSTANA_C"
	// FieldL is the trace level header key
	FieldL = "X_INSTANA_L"

	// FieldT is the trace id
	FieldT = "X_INSTANA_T"
	// FieldS is the span id
	FieldS = "X_INSTANA_S"
	// FieldLS is the trace level
	FieldLS = "X_INSTANA_L_S"
)
View Source
const KafkaHeaderEnvVarKey = "INSTANA_KAFKA_HEADER_FORMAT"
View Source
const Version = "1.22.1"

Version is the instrumentation module semantic version

Variables

This section is empty.

Functions

func NewAsyncProducer

func NewAsyncProducer(addrs []string, conf *sarama.Config, sensor instana.TracerLogger) (sarama.AsyncProducer, error)

NewAsyncProducer creates a new sarama.AsyncProducer using the given broker addresses and configuration, and instruments its calls

func NewAsyncProducerFromClient

func NewAsyncProducerFromClient(client sarama.Client, sensor instana.TracerLogger) (sarama.AsyncProducer, error)

NewAsyncProducerFromClient creates a new sarama.AsyncProducer using the given client, and instruments its calls

func NewConsumer

func NewConsumer(addrs []string, config *sarama.Config, sensor instana.TracerLogger) (sarama.Consumer, error)

NewConsumer creates a new consumer using the given broker addresses and configuration, and instruments its calls

func NewConsumerFromClient

func NewConsumerFromClient(client sarama.Client, sensor instana.TracerLogger) (sarama.Consumer, error)

NewConsumerFromClient creates a new consumer using the given client and instruments its calls

func NewConsumerGroup added in v1.3.0

func NewConsumerGroup(addrs []string, groupID string, config *sarama.Config, sensor instana.TracerLogger) (sarama.ConsumerGroup, error)

NewConsumerGroup creates an instrumented sarama.ConsumerGroup

func NewConsumerGroupFromClient added in v1.3.0

func NewConsumerGroupFromClient(groupID string, client sarama.Client, sensor instana.TracerLogger) (sarama.ConsumerGroup, error)

NewConsumerGroupFromClient creates an instrumented sarama.ConsumerGroup from sarama.Client

func NewSyncProducer

func NewSyncProducer(addrs []string, config *sarama.Config, sensor instana.TracerLogger) (sarama.SyncProducer, error)

NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration, and instruments its calls

func NewSyncProducerFromClient

func NewSyncProducerFromClient(client sarama.Client, sensor instana.TracerLogger) (sarama.SyncProducer, error)

NewSyncProducerFromClient creates a new SyncProducer using the given client, and instruments its calls

func PackTraceContextHeader

func PackTraceContextHeader(traceID, spanID string) []byte

PackTraceContextHeader packs the trace and span ID into a byte slice to be used as (sarama.RecordHeader).Value. The returned slice is always 24 bytes long.

func PackTraceLevelHeader

func PackTraceLevelHeader(val string) []byte

PackTraceLevelHeader packs the X-INSTANA-L value into a byte slice to be used as (sarama.RecordHeader).Value. It returns a 1-byte slice containing 0x00 if the passed value is "0", and 0x01 otherwise.

func ProducerMessageWithSpan

func ProducerMessageWithSpan(pm *sarama.ProducerMessage, sp ot.Span) *sarama.ProducerMessage

ProducerMessageWithSpan injects the tracing context into producer message headers to propagate them through the Kafka requests made with instasarama producers.

func ProducerMessageWithSpanFromContext added in v1.4.0

func ProducerMessageWithSpanFromContext(ctx context.Context, pm *sarama.ProducerMessage) *sarama.ProducerMessage

ProducerMessageWithSpanFromContext injects the tracing context into producer's message headers from the context if it is there.

func SpanContextFromConsumerMessage

func SpanContextFromConsumerMessage(cm *sarama.ConsumerMessage, sensor instana.TracerLogger) (ot.SpanContext, bool)

SpanContextFromConsumerMessage extracts the tracing context from consumer message

func UnpackTraceContextHeader

func UnpackTraceContextHeader(val []byte) (string, string, error)

UnpackTraceContextHeader unpacks and returns the trace and span ID, padding them with zeroes to 32 and 16 characters correspondingly. It expects the provided buffer to have exactly 24 bytes.

func UnpackTraceLevelHeader

func UnpackTraceLevelHeader(val []byte) (string, error)

UnpackTraceLevelHeader returns "1" if the value contains a non-zero byte, and "0" otherwise. It expects the provided buffer to have exactly 1 byte.

Types

type AsyncProducer

type AsyncProducer struct {
	sarama.AsyncProducer
	// contains filtered or unexported fields
}

AsyncProducer is a wrapper for sarama.AsyncProducer that instruments its calls using provided instana.Sensor

func WrapAsyncProducer

func WrapAsyncProducer(p sarama.AsyncProducer, conf *sarama.Config, sensor instana.TracerLogger) *AsyncProducer

WrapAsyncProducer wraps an existing sarama.AsyncProducer and instruments its calls. It requires the same config that was used to create this producer to detect the Kafka version and whether it's supposed to return successes/errors. To initialize a new sync producer instance use instasarama.NewAsyncProducer() and instasarama.NewAsyncProducerFromClient() convenience methods instead

func (*AsyncProducer) Errors

func (p *AsyncProducer) Errors() <-chan *sarama.ProducerError

Errors is the error output channel back to the user

func (*AsyncProducer) Input

func (p *AsyncProducer) Input() chan<- *sarama.ProducerMessage

Input is the input channel for the user to write messages to that they wish to send. The async producer will then create a new exit span for each message that has trace context added with instasarama.ProducerMessageWithSpan()

func (*AsyncProducer) Successes

func (p *AsyncProducer) Successes() <-chan *sarama.ProducerMessage

Successes is the success output channel back to the user

type Consumer

type Consumer struct {
	sarama.Consumer
	// contains filtered or unexported fields
}

Consumer is a wrapper for sarama.Consumer that wraps and returns instrumented partition consumers

func WrapConsumer

func WrapConsumer(c sarama.Consumer, sensor instana.TracerLogger) *Consumer

WrapConsumer wraps an existing sarama.Consumer instance and instruments its calls. To initialize a new instance of sarama.Consumer use instasarama.NewConsumer() and instasarama.NewConsumerFromclient() convenience methods instead

func (*Consumer) ConsumePartition

func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition instruments and returns the partition consumer returned by undelying sarama.Consumer

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

ConsumerGroupHandler is a wrapper for sarama.ConsumerGroupHandler that creates an entry span for each incoming Kafka message, ensuring the extraction and continuation of the existing trace context if provided

func WrapConsumerGroupHandler

func WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler, sensor instana.TracerLogger) *ConsumerGroupHandler

WrapConsumerGroupHandler wraps the existing group handler and instruments its calls

func (*ConsumerGroupHandler) Cleanup

Cleanup calls the underlying handler's Cleanup() method

func (*ConsumerGroupHandler) ConsumeClaim

ConsumeClaim injects the trace context into incoming message headers and delegates further processing to the underlying handler

func (*ConsumerGroupHandler) Setup

Setup calls the underlying handler's Setup() method

type ConsumerMessageCarrier

type ConsumerMessageCarrier struct {
	Message *sarama.ConsumerMessage
}

ConsumerMessageCarrier is a trace context carrier that extracts Instana OpenTracing headers from Kafka consumer messages

func (ConsumerMessageCarrier) ForeachKey

func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey implements opentracing.TextMapReader for ConsumerMessageCarrier

func (ConsumerMessageCarrier) RemoveAll

func (c ConsumerMessageCarrier) RemoveAll()

RemoveAll removes all tracing headers previously set by Set()

func (ConsumerMessageCarrier) Set

func (c ConsumerMessageCarrier) Set(key, val string)

Set implements opentracing.TextMapWriter for ConsumerMessageCarrier

type PartitionConsumer

type PartitionConsumer struct {
	sarama.PartitionConsumer
	// contains filtered or unexported fields
}

PartitionConsumer is a wrapper for sarama.PartitionConsumer that instruments its calls using provided instana.Sensor

func WrapPartitionConsumer

func WrapPartitionConsumer(c sarama.PartitionConsumer, sensor instana.TracerLogger) *PartitionConsumer

WrapPartitionConsumer wraps sarama.PartitionConsumer instance and instruments its calls

func (*PartitionConsumer) Messages

func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages returns a channel of consumer messages of the underlying partition consumer

type ProducerMessageCarrier

type ProducerMessageCarrier struct {
	Message *sarama.ProducerMessage
}

ProducerMessageCarrier is a trace context carrier that propagates Instana OpenTracing headers throughout Kafka producer messages

func (ProducerMessageCarrier) ForeachKey

func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey implements opentracing.TextMapReader for ProducerMessageCarrier

func (ProducerMessageCarrier) RemoveAll

func (c ProducerMessageCarrier) RemoveAll()

RemoveAll removes all tracing headers previously set by Set()

func (ProducerMessageCarrier) Set

func (c ProducerMessageCarrier) Set(key, val string)

Set implements opentracing.TextMapWriter for ProducerMessageCarrier

type SyncProducer

type SyncProducer struct {
	sarama.SyncProducer
	// contains filtered or unexported fields
}

SyncProducer is a wrapper for sarama.SyncProducer that instruments its calls using provided instana.Sensor

func WrapSyncProducer

func WrapSyncProducer(sp sarama.SyncProducer, config *sarama.Config, sensor instana.TracerLogger) *SyncProducer

WrapSyncProducer wraps an existing sarama.SyncProducer instance and instruments its calls. It requires the same config that was used to create this producer to detect the Kafka version and whether it's supposed to return successes/errors. To initialize a new sync producer instance use instasarama.NewSyncProducer() and instasarama.NewSyncProducerFromClient() convenience methods instead

func (*SyncProducer) SendMessage

func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error)

SendMessage picks up the trace context previously added to the message with instasarama.ProducerMessageWithSpan(), starts a new child span and injects its context into the message headers before sending it to the underlying producer. The call will not be traced if there the message does not contain trace context

func (*SyncProducer) SendMessages

func (p *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error

SendMessages starts a new span and injects its context into messages headers before sending them with the underlying producer.

This method attempts to use the existing trace context found in message headers. There will be NO SPAN CREATED for this call if messages originate from different trace contexts. A possible use case that result in such behavior would be if messages resulted from different HTTP requests are buffered and later being sent in one batch asynchronously. In case you want your batch publish operation to be a part of a specific trace, make sure that you inject the parent span of this trace explicitly before calling `SendMessages()`, i.e.

type MessageCollector struct {
	CollectedMessages []*sarama.ProducerMessage
	producer *instasarama.SyncProducer
	// ...
}

func (c MessageCollector) Flush(ctx context.Context) error {
	// extract the parent span from context and use it to continue the trace
	if parentSpan, ok := instana.SpanFromContext(ctx); ok {
		// start a new span for the batch send job
		sp := parentSpan.Tracer().StartSpan("batch-send", ot.ChilfOf(parentSpan.Context()))
		defer sp.Finish()

		// inject the trace context into every collected message, overriding the existing one
		for i, msg := range c.CollectedMessages {
			c.CollectedMessages = instasarama.ProducerMessageWithSpan(msg, sp)
		}
	}

	return c.producer.SendMessages(c.CollectedMessages)
}

Directories

Path Synopsis
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL