otelkafka

package module
v0.0.0-...-a076996 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

A Consumer wraps a kafka.Consumer.

func NewConsumer

func NewConsumer(conf *kafka.ConfigMap) (*Consumer, error)

NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.

func WrapConsumer

func WrapConsumer(c *kafka.Consumer) *Consumer

WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.

func (*Consumer) Close

func (c *Consumer) Close() error

Close calls the underlying Consumer.Close and if polling is enabled, finishes any remaining span.

func (*Consumer) Events

func (c *Consumer) Events() chan kafka.Event

Events returns the kafka Events channel (if enabled). Message events will be traced.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMS int) (event kafka.Event)

Poll polls the consumer for messages or events. Message events will be traced.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

A MessageCarrier injects and extracts traces from a kafka.Message.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) MessageCarrier

NewMessageCarrier creates a new MessageCarrier.

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

Get retrieves a single value for a given key.

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set sets a header.

type Producer

type Producer struct {
	ConfluentProducer *kafka.Producer
	// contains filtered or unexported fields
}

Producer wraps a kafka.Producer.

func WrapProducer

func WrapProducer(ctx context.Context, confluentProducer *kafka.Producer) *Producer

WrapProducer wraps a kafka.Producer so requests are traced.

func (*Producer) Close

func (p *Producer) Close()

Close closes internal produce channel. Use case: - User is writing to otel producer ProduceChannel() - User stops writing to otel producer ProduceChannel() - Now Close() can be safely called

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying Producer.Produce and traces the request.

func (*Producer) ProduceChannel

func (p *Producer) ProduceChannel() chan *kafka.Message

ProduceChannel returns a channel which can receive kafka Messages and will send them to the underlying producer channel.

func (*Producer) WaitTeardown

func (p *Producer) WaitTeardown()

WaitTeardown ensures that otel producer stopped writing to underlying confluent kafka produce channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL