apmkafkago

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2023 License: Apache-2.0 Imports: 4 Imported by: 2

README

apm-kafkago

Elastic APM Go agent support for segmentio/kafka-go client.

This package wraps segementio/kafka-go reader and writer to add the traceparent header to the message header and create a new transaction for each message.

Usage as Consumer

import(
    "github.com/segmentio/kafka-go"
    "github.com/sohaibomr/apm-kafkago"
)
kReader := kafka.NewReader(kafka.ReaderConfig{
  Brokers: []string{kafkaBrokers},
  Topic:   topic,
  GroupID: "payment-group",
 })
 reader := apmkafkago.WrapReader(kReader) // use this reader to read messages, this will parse the Traceparent header from the message header and create a new transaction for each message

Usage as Producer

import(
    "github.com/segmentio/kafka-go"
    "github.com/sohaibomr/apm-kafkago"
)
kWriter := kafka.NewWriter(kafka.WriterConfig{
  Brokers: []string{kafkaBrokers},
  Topic:   topic,
})
writer := apmkafkago.WrapWriter(kWriter) // use this writer to write messages, this will add the Traceparent header to the message header

How will it look on the APM Dashboard ?

Alt text

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Reader

type Reader struct {
	R *kafka.Reader
}

Reader is a wrapper around kafka.Read

func WrapReader

func WrapReader(r *kafka.Reader) *Reader

WrapReader returns a new Reader wraper around kafka.Reader

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error)

ReadMessage reads a message from kafka

func (*Reader) ReadMessageTx

func (r *Reader) ReadMessageTx(ctx context.Context) (msg kafka.Message, tx *apm.Transaction, err error)

ReadMessageTx reads a message from kafka and returns apm transaction to be used in a transactional context call transaction.end at the end of processing kafka message

type Writer

type Writer struct {
	W *kafka.Writer
}

Writer is a wrapper around kafka.Writer

func WrapWriter

func WrapWriter(w *kafka.Writer) *Writer

WrapWriter returns a new Writer wraper around kafka.Writer

func (*Writer) WriteMessages

func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error

WriteMessage writes a message to kafka

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL