instaamqp

package module
v0.0.0-...-9bf41b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2022 License: MIT Imports: 6 Imported by: 0

README

Instana instrumentation for streadway-amqp

This module contains instrumentation code for RabbitMQ clients written with streadway-amqp.

GoDoc

Installation

To add the module to your go.mod file run the following command in your project directory:

$ go get github.com/mier85/go-sensor/instrumentation/instaamqp

Usage

instaamqp offers a function wrapper around amqp.Channel that returns an instaamqp.AmqpChannel instance. This Instana object provides instrumentation for the amqp.Channel.Publish and amqp.Channel.Consume methods, that are responsible for tracing data from messages sent and received.

For any other amqp.Channel methods, the original amqp.Channel instance can be normally used.

A publisher example:

func Example_publisher() {
	exchangeName := "my-exchange"
	url := "amqp://guest:guest@localhost:5672/"

	// Create the Instana sensor
	sensor := instana.NewSensor("rabbitmq-client")

	c, err := amqp.Dial(url)
	failOnError(err, "Could not connect to the server")
	defer c.Close()

	ch, err := c.Channel()
	failOnError(err, "Could not acquire the channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
	failOnError(err, "Could not declare the exchange")

	// There must be an entry span per publish call.
	// In most common cases, creating an entry span manually is not needed, as the entry span is originated from an
	// incoming HTTP client call.
	entrySpan := sensor.Tracer().StartSpan("my-publishing-method")
	ext.SpanKind.Set(entrySpan, ext.SpanKindRPCServerEnum)

	// We wrap the original amqp.Channel.Publish and amqp.Channel.Consume methods into an Instana object.
	instaCh := instaamqp.WrapChannel(sensor, ch, url)

	// Use the Instana `Publish` method with the same arguments as the original `Publish` method, with the additional
	// `entrySpan` argument. That's it!
	err = instaCh.Publish(entrySpan, exchangeName, "", false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(fmt.Sprintf("My published message")),
	})

	failOnError(err, "Error publishing the message")
	entrySpan.Finish()
}

A consumer example:

func Example_consumer() {
	exchangeName := "my-exchange"
	queueName := "my-queue"
	url := "amqp://guest:guest@localhost:5672/"

	sensor := instana.NewSensor("rabbitmq-client")

	c, err := amqp.Dial(url)
	failOnError(err, "Could not connect to the server")
	defer c.Close()

	ch, err := c.Channel()
	failOnError(err, "Could not acquire the channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
	failOnError(err, "Could not declare the exchange")

	q, err := ch.QueueDeclare(queueName, false, false, true, false, nil)
	failOnError(err, "Could not declare queue")

	err = ch.QueueBind(q.Name, "", exchangeName, false, nil)
	failOnError(err, "Could not bind the queue to the exchange")

	instaCh := instaamqp.WrapChannel(sensor, ch, url)

	// Use the Instana `Consume` method with the same arguments as the original `Consume` method.
	msgs, err := instaCh.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Could not consume messages")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			fmt.Println("Got a message:", string(d.Body))
		}
	}()

	<-forever
}

See the instaamqp package documentation for detailed examples.

Documentation

Overview

Example (Consumer)
exchangeName := "my-exchange"
queueName := "my-queue"
url := "amqp://guest:guest@localhost:5672/"

sensor := instana.NewSensor("rabbitmq-client")

c, err := amqp.Dial(url)
failOnError(err, "Could not connect to the server")
defer c.Close()

ch, err := c.Channel()
failOnError(err, "Could not acquire the channel")
defer ch.Close()

err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
failOnError(err, "Could not declare the exchange")

q, err := ch.QueueDeclare(queueName, false, false, true, false, nil)
failOnError(err, "Could not declare queue")

err = ch.QueueBind(q.Name, "", exchangeName, false, nil)
failOnError(err, "Could not bind the queue to the exchange")

instaCh := instaamqp.WrapChannel(sensor, ch, url)

// Use the Instana `Consume` method with the same arguments as the original `Consume` method.
msgs, err := instaCh.Consume(q.Name, "", true, false, false, false, nil)
failOnError(err, "Could not consume messages")

for d := range msgs {
	fmt.Println("Got a message:", string(d.Body))
}
Output:

Example (Publisher)
exchangeName := "my-exchange"
url := "amqp://guest:guest@localhost:5672/"

// Create the Instana sensor
sensor := instana.NewSensor("rabbitmq-client")

c, err := amqp.Dial(url)
failOnError(err, "Could not connect to the server")
defer c.Close()

ch, err := c.Channel()
failOnError(err, "Could not acquire the channel")
defer ch.Close()

err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
failOnError(err, "Could not declare the exchange")

// There must be a new entry span per publish call.
// In the most common cases, creating an entry span manually is not needed, as the entry span is originated from an
// incoming HTTP client call.
entrySpan := sensor.Tracer().StartSpan("my-publishing-method")
ext.SpanKind.Set(entrySpan, ext.SpanKindRPCServerEnum)

instaCh := instaamqp.WrapChannel(sensor, ch, url)

// Use the Instana `Publish` method with the same arguments as the original `Publish` method, with the additional
// entrySpan argument.
err = instaCh.Publish(entrySpan, exchangeName, "", false, false, amqp.Publishing{
	ContentType: "text/plain",
	Body:        []byte("My published message"),
})

failOnError(err, "Error publishing the message")
entrySpan.Finish()
Output:

Index

Examples

Constants

View Source
const Version = "0.1.0"

Version is the instrumentation module semantic version

Variables

This section is empty.

Functions

func SpanContextFromConsumerMessage

func SpanContextFromConsumerMessage(d amqp.Delivery, sensor *instana.Sensor) (ot.SpanContext, bool)

SpanContextFromConsumerMessage extracts the tracing context from amqp.Delivery#Headers

Types

type AmqpChannel

type AmqpChannel struct {
	// contains filtered or unexported fields
}

AmqpChannel is a wrapper around the amqp.Channel object and contains all the relevant information to be tracked

func WrapChannel

func WrapChannel(sensor *instana.Sensor, ch PubCons, serverUrl string) *AmqpChannel

WrapChannel returns the AmqpChannel, which is Instana's wrapper around amqp.Channel

func (AmqpChannel) Consume

func (c AmqpChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

Consume replaces the original amqp.Channel.Consume method in order to collect the relevant data to be tracked

func (AmqpChannel) Publish

func (c AmqpChannel) Publish(entrySpan ot.Span, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

Publish replaces the original amqp.Channel.Publish method in order to collect the relevant data to be tracked

type PubCons

type PubCons interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
}

PubCons contains all methods that we want to instrument from the amqp library

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL