Documentation ¶
Overview ¶
Package splunkkafka provides functions to trace the github.com/confluentinc/confluent-kafka-go/v2/kafka package.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMessageCarrier ¶
func NewMessageCarrier(message *kafka.Message) propagation.TextMapCarrier
NewMessageCarrier returns a TextMapCarrier that will encode and decode tracing information to and from the passed message.
Types ¶
type Consumer ¶
Consumer wraps a kafka.Consumer and traces its operations.
func NewConsumer ¶
NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer with tracing instrumentation.
Example ¶
package main import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/v2/kafka/splunkkafka" ) func main() { c, err := splunkkafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "myGroup", }) if err != nil { panic(err) } defer c.Close() }
Output:
func WrapConsumer ¶
WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func (*Consumer) Close ¶
Close calls the underlying Consumer.Close and if polling is enabled, ends any remaining span.
func (*Consumer) Poll ¶
Poll polls the consumer for events. Message events are traced.
Will block for at most timeoutMs milliseconds.
The following callbacks may be triggered:
Subscribe()'s rebalanceCb
Returns nil on timeout, else an Event
func (*Consumer) ReadMessage ¶
ReadMessage polls the consumer for a message and traces the read.
This is a convenience API that wraps Poll() and only returns messages or errors. All other event types are discarded.
The call will block for at most `timeout` waiting for a new message or error. `timeout` may be set to -1 for indefinite wait.
Timeout is returned as (nil, err) where err is `err.(kafka.Error).Code() == kafka.ErrTimedOut`.
Messages are returned as (msg, nil), while general errors are returned as (nil, err), and partition-specific errors are returned as (msg, err) where msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
type Option ¶
Option applies options to a configuration.
func WithAttributes ¶
WithAttributes returns an Option that appends attr to the attributes set for every span created.
func WithPropagator ¶
func WithPropagator(p propagation.TextMapPropagator) Option
WithPropagator returns an Option that sets p as the TextMapPropagator used when propagating a span context.
func WithTracerProvider ¶
func WithTracerProvider(tp trace.TracerProvider) Option
WithTracerProvider returns an Option that sets the TracerProvider used for a configuration.
type Producer ¶
A Producer wraps a kafka.Producer and traces its operations.
func NewProducer ¶
NewProducer calls kafka.NewProducer and wraps the resulting Producer with tracing instrumentation.
Example ¶
package main import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/v2/kafka/splunkkafka" ) func main() { p, err := splunkkafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", }) if err != nil { panic(err) } defer p.Close() }
Output:
func WrapProducer ¶
WrapProducer wraps a kafka.Producer so that any produced events are traced.
func (*Producer) Close ¶
func (p *Producer) Close()
Close calls the wrapped Producer.Close and closes the producer channel.
func (*Producer) ProduceChannel ¶
ProduceChannel returns the traced producer channel.