dd-trace-go.v1: gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama Index | Examples | Files

package sarama

import "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"

Package sarama provides functions to trace the Shopify/sarama package (https://github.com/Shopify/sarama).

Code:

cfg := sarama.NewConfig()

producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cfg)
if err != nil {
    panic(err)
}
defer producer.Close()

producer = saramatrace.WrapAsyncProducer(cfg, producer)

msg := &sarama.ProducerMessage{
    Topic: "some-topic",
    Value: sarama.StringEncoder("Hello World"),
}
producer.Input() <- msg

Code:

consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
    panic(err)
}
defer consumer.Close()

consumer = saramatrace.WrapConsumer(consumer)

partitionConsumer, err := consumer.ConsumePartition("some-topic", 0, sarama.OffsetNewest)
if err != nil {
    panic(err)
}
defer partitionConsumer.Close()

consumed := 0
for msg := range partitionConsumer.Messages() {
    // if you want to use the kafka message as a parent span:
    if spanctx, err := tracer.Extract(saramatrace.NewConsumerMessageCarrier(msg)); err == nil {
        // you can create a span using ChildOf(spanctx)
        _ = spanctx
    }

    log.Printf("Consumed message offset %d\n", msg.Offset)
    consumed++
}

Code:

cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, cfg)
if err != nil {
    panic(err)
}
defer producer.Close()

producer = saramatrace.WrapSyncProducer(cfg, producer)

msg := &sarama.ProducerMessage{
    Topic: "some-topic",
    Value: sarama.StringEncoder("Hello World"),
}
_, _, err = producer.SendMessage(msg)
if err != nil {
    panic(err)
}

Index

Examples

Package Files

headers.go option.go sarama.go

func WrapAsyncProducer Uses

func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer

WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages are traced. It requires the underlying sarama Config so we can know whether or not sucesses will be returned.

func WrapConsumer Uses

func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer

WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created via Consumer.ConsumePartition.

func WrapPartitionConsumer Uses

func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer

WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received message to be traced.

func WrapSyncProducer Uses

func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer

WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages are traced.

type ConsumerMessageCarrier Uses

type ConsumerMessageCarrier struct {
    // contains filtered or unexported fields
}

A ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage.

func NewConsumerMessageCarrier Uses

func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier

NewConsumerMessageCarrier creates a new ConsumerMessageCarrier.

func (ConsumerMessageCarrier) ForeachKey Uses

func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (ConsumerMessageCarrier) Set Uses

func (c ConsumerMessageCarrier) Set(key, val string)

Set sets a header.

type Option Uses

type Option func(cfg *config)

An Option is used to customize the config for the sarama tracer.

func WithAnalytics Uses

func WithAnalytics(on bool) Option

WithAnalytics enables Trace Analytics for all started spans.

func WithAnalyticsRate Uses

func WithAnalyticsRate(rate float64) Option

WithAnalyticsRate sets the sampling rate for Trace Analytics events correlated to started spans.

func WithServiceName Uses

func WithServiceName(name string) Option

WithServiceName sets the given service name for the intercepted client.

type ProducerMessageCarrier Uses

type ProducerMessageCarrier struct {
    // contains filtered or unexported fields
}

A ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage.

func NewProducerMessageCarrier Uses

func NewProducerMessageCarrier(msg *sarama.ProducerMessage) ProducerMessageCarrier

NewProducerMessageCarrier creates a new ProducerMessageCarrier.

func (ProducerMessageCarrier) ForeachKey Uses

func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error

ForeachKey iterates over every header.

func (ProducerMessageCarrier) Set Uses

func (c ProducerMessageCarrier) Set(key, val string)

Set sets a header.

Package sarama imports 5 packages (graph). Updated 2020-02-15. Refresh now. Tools for package owners.