ppsarama

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

README

ppsarama

This package instruments the Shopify/sarama package.

Installation

$ go get github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama
import "github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"

Usage

PkgGoDev

This package instruments Kafka consumers and producers.

Consumer

To instrument a Kafka consumer, ConsumeMessageContext. In order to display the kafka broker on the pinpoint screen, a context with broker addresses must be created and delivered using NewContext.

ConsumePartition example:

ctx := ppsarama.NewContext(context.Background(), broker)
pc, _ := consumer.ConsumePartition(topic, partition, offset)
for msg := range pc.Messages() {
    ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
}

ConsumerGroupHandler example:

func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    ctx := sess.Context()
    for msg := range claim.Messages() {
        _ = ppsarama.ConsumeMessageContext(process, ctx, msg)
    }
}

func main() {     
    ctx := ppsarama.NewContext(context.Background(), broker)
    handler := exampleConsumerGroupHandler{}
    err := group.Consume(ctx, topics, handler)

ConsumeMessageContext passes a context added pinpoint.Tracer to HandlerContextFunc. In HandlerContextFunc, this tracer can be obtained by using the pinpoint.FromContext function. Alternatively, the context may be propagated where the context that contains the pinpoint.Tracer is required.

func process(ctx context.Context, msg *sarama.ConsumerMessage) error {
    tracer := pinpoint.FromContext(ctx)
    defer tracer.NewSpanEvent("process").EndSpanEvent()

    fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
package main

import (
    "github.com/Shopify/sarama"
    "github.com/pinpoint-apm/pinpoint-go-agent"
    "github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"
)

func processMessage(ctx context.Context, msg *sarama.ConsumerMessage) error {
    tracer := pinpoint.FromContext(ctx)
    defer tracer.NewSpanEvent("processMessage").EndSpanEvent()
    fmt.Println("retrieving message: ", string(msg.Value))
    ...
}

func subscribe() {
    broker := []string{"localhost:9092"}
    config := sarama.NewConfig()
    consumer, err := sarama.NewConsumer(broker, config)
    ...
    
    ctx := ppsarama.NewContext(context.Background(), broker)
    for _, partition := range partitionList {
        pc, _ := consumer.ConsumePartition(topic, partition, initialOffset)

        go func(pc sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
            }
        }(pc)
	...	
}

func main() {
    ... //setup agent

    subscribe()
}

Full Example Source

Producer

To instrument a Kafka producer, use NewSyncProducer or NewAsyncProducer.

config := sarama.NewConfig()
producer, err := ppsarama.NewSyncProducer(brokers, config)

It is necessary to pass the context containing the pinpoint.Tracer to sarama.SyncProducer (or sarama.AsyncProducer) using WithContext function.

ppsarama.WithContext(pinpoint.NewContext(context.Background(), tracer), producer)
partition, offset, err := producer.SendMessage(msg)
package main

import (
    "github.com/Shopify/sarama"
    "github.com/pinpoint-apm/pinpoint-go-agent"
    "github.com/github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"
)

func prepareMessage(topic, message string) *sarama.ProducerMessage {
    return &sarama.ProducerMessage{
        Topic:     topic,
        Partition: -1,
        Value:     sarama.StringEncoder(message),
    }
}

func save(w http.ResponseWriter, r *http.Request) {
    msg := prepareMessage("topic", "Hello, Kafka!!")
    ppsarama.WithContext(r.Context(), producer)
    partition, offset, err := producer.SendMessage(msg)
    ...
}

var producer sarama.SyncProducer

func main() {
    ... //setup agent
	
    config := sarama.NewConfig()
    ...

    producer, err := ppsarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    http.HandleFunc("/save", pphttp.WrapHandlerFunc(save))
}

Full Example Source

Documentation

Overview

Package ppsarama instruments the Shopify/sarama package (https://github.com/Shopify/sarama).

This package instruments Kafka consumers and producers.

To instrument a Kafka consumer, use ConsumeMessageContext. In order to display the kafka broker on the pinpoint screen, a context with broker addresses must be created and delivered using NewContext.

ConsumePartition example:

ctx := ppsarama.NewContext(context.Background(), broker)
pc, _ := consumer.ConsumePartition(topic, partition, offset)
for msg := range pc.Messages() {
  ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
}

ConsumerGroupHandler example:

func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  ctx := sess.Context()
  for msg := range claim.Messages() {
    _ = ppsarama.ConsumeMessageContext(process, ctx, msg)
  }

ConsumeMessageContext passes a context added pinpoint.Tracer to HandlerContextFunc. In HandlerContextFunc, this tracer can be obtained by using the pinpoint.FromContext function.

func process(ctx context.Context, msg *sarama.ConsumerMessage) error {
  tracer := pinpoint.FromContext(ctx)
  defer tracer.NewSpanEvent("process").EndSpanEvent()

  fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)

To instrument a Kafka producer, use NewSyncProducer or NewAsyncProducer.

config := sarama.NewConfig()
producer, err = ppsarama.NewSyncProducer(brokers, config)

It is necessary to pass the context containing the pinpoint.Tracer to sarama.SyncProducer (or sarama.AsyncProducer) using WithContext function.

ppsarama.WithContext(pinpoint.NewContext(context.Background(), tracer), producer)
partition, offset, err := producer.SendMessage(msg)

The WithContext function() function is not thread-safe, so use the SendMessageContext function() if you have a data trace.

partition, offset, err := producer.SendMessageContext(r.Context(), msg)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumeMessage

func ConsumeMessage(handler HandlerFunc, msg *sarama.ConsumerMessage) error

ConsumeMessage is deprecated. ConsumeMessage creates a pinpoint.Tracer that instruments the sarama.ConsumerMessage. The tracer extracts the pinpoint header from message header, and then creates a span that initiates or continues the transaction. ConsumeMessage passes a ConsumerMessage having pinpoint.Tracer to HandlerFunc.

func ConsumeMessageContext

func ConsumeMessageContext(handler HandlerContextFunc, ctx context.Context, msg *sarama.ConsumerMessage) error

ConsumeMessageContext creates a pinpoint.Tracer that instruments the sarama.ConsumerMessage. The tracer extracts the pinpoint header from message header, and then creates a span that initiates or continues the transaction. ConsumeMessageContext passes a context added pinpoint.Tracer to HandlerContextFunc.

func NewAsyncProducer

func NewAsyncProducer(addrs []string, config *sarama.Config) (sarama.AsyncProducer, error)

NewAsyncProducer wraps sarama.NewAsyncProducer and returns a sarama.AsyncProducer ready to instrument.

func NewContext

func NewContext(ctx context.Context, addrs []string) context.Context

NewContext returns a new Context that contains the given broker addresses.

func WithContext

func WithContext(ctx context.Context, producer interface{})

WithContext passes the context to the provided producer. It is possible to trace only when the given context contains a pinpoint.Tracer.

Types

type Consumer

type Consumer struct {
	sarama.Consumer
	// contains filtered or unexported fields
}

Consumer is deprecated.

func NewConsumer

func NewConsumer(addrs []string, config *sarama.Config) (*Consumer, error)

NewConsumer is deprecated.

func (*Consumer) ConsumePartition

func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (*PartitionConsumer, error)

ConsumePartition is deprecated.

type ConsumerMessage

type ConsumerMessage struct {
	*sarama.ConsumerMessage
	// contains filtered or unexported fields
}

ConsumerMessage is deprecated.

func WrapConsumerMessage

func WrapConsumerMessage(msg *sarama.ConsumerMessage) *ConsumerMessage

WrapConsumerMessage is deprecated. WrapConsumerMessage wraps a sarama.ConsumerMessage and creates a pinpoint.Tracer that instruments the sarama.ConsumerMessage. The tracer extracts the pinpoint header from message header, and then creates a span that initiates or continues the transaction.

func (*ConsumerMessage) SpanTracer

func (c *ConsumerMessage) SpanTracer() pinpoint.Tracer

SpanTracer is deprecated. Use Tracer.

func (*ConsumerMessage) Tracer

func (c *ConsumerMessage) Tracer() pinpoint.Tracer

Tracer returns the pinpoint.Tracer.

type HandlerContextFunc

type HandlerContextFunc func(context.Context, *sarama.ConsumerMessage) error

type HandlerFunc

type HandlerFunc func(msg *ConsumerMessage) error

HandlerFunc is deprecated.

type PartitionConsumer

type PartitionConsumer struct {
	sarama.PartitionConsumer
	// contains filtered or unexported fields
}

PartitionConsumer is deprecated.

func WrapPartitionConsumer

func WrapPartitionConsumer(pc sarama.PartitionConsumer) *PartitionConsumer

WrapPartitionConsumer is deprecated.

func (*PartitionConsumer) Messages

func (pc *PartitionConsumer) Messages() <-chan *ConsumerMessage

Messages is deprecated.

type SyncProducer added in v1.4.0

type SyncProducer interface {
	sarama.SyncProducer
	SendMessageContext(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
	SendMessagesContext(ctx context.Context, msgs []*sarama.ProducerMessage) error
}

func NewSyncProducer

func NewSyncProducer(addrs []string, config *sarama.Config) (SyncProducer, error)

NewSyncProducer wraps sarama.NewSyncProducer and returns a sarama.SyncProducer ready to instrument.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL