interceptors

command module
v0.0.0-...-9127f1c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2023 License: MIT Imports: 13 Imported by: 0

README

Interceptors Example

It creates a Producer interceptor to produce some OpenTelemetry spans and also modifies the intercepted message to include some headers.

conf.Producer.Interceptors = []sarama.ProducerInterceptor{xxx}

Run the example

  • go run main.go trace_interceptor.go.
  • or `go build and pass different parameters.
go build && ./interceptors --h
Usage of ./interceptors:
  -brokers string
        The Kafka brokers to connect to, as a comma separated list (default "localhost:9092")
  -topic string
        The Kafka topic to use (default "default_topic")

App will output OpenTelemetry spans for every intercepted message, i.e:

go run main.go trace_interceptor.go
[Sarama] 2020/08/11 11:35:56 Initializing new client
[Sarama] 2020/08/11 11:35:56 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2020/08/11 11:35:56 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2020/08/11 11:35:56 client/metadata fetching metadata for all topics from broker localhost:9092
[Sarama] 2020/08/11 11:35:56 Connected to broker at localhost:9092 (unregistered)
[Sarama] 2020/08/11 11:35:56 client/brokers registered new broker #1 at localhost:9092
[Sarama] 2020/08/11 11:35:56 Successfully initialized new client
INFO[0000] Starting to produce 2 messages every 5s
INFO[0005] producing 2 messages at 2020-08-11T11:36:01-07:00  topic=default_topic
[
  {
    "SpanContext": {
      "TraceID": "2c4210c1d6c2ebe758eb41cbc95a0478",
      "SpanID": "046bfc6d6db17ed7",
      "TraceFlags": 1
    },
    "ParentSpanID": "0000000000000000",
    "SpanKind": 1,
    "Name": "default_topic",
    "StartTime": "2020-08-11T11:36:01.57487-07:00",
    "EndTime": "2020-08-11T11:36:01.574891849-07:00",
    "Attributes": [
      {
        "Key": "messaging.destination_kind",
        "Value": { "Type": "STRING", "Value": "topic" }
      },
      {
        "Key": "span.otel.kind",
        "Value": { "Type": "STRING", "Value": "PRODUCER" }
      },
      {
        "Key": "messaging.system",
        "Value": { "Type": "STRING", "Value": "kafka" }
      },
      {
        "Key": "net.transport",
        "Value": { "Type": "STRING", "Value": "IP.TCP" }
      },
      {
        "Key": "messaging.url",
        "Value": { "Type": "STRING", "Value": "localhost:9092" }
      },
      {
        "Key": "messaging.destination",
        "Value": { "Type": "STRING", "Value": "default_topic" }
      },
      {
        "Key": "messaging.message_id",
        "Value": { "Type": "STRING", "Value": "046bfc6d6db17ed7" }
      }
    ],
    "MessageEvents": null,
    "Links": null,
    "StatusCode": 0,
    "StatusMessage": "",
    "HasRemoteParent": false,
    "DroppedAttributeCount": 0,
    "DroppedMessageEventCount": 0,
    "DroppedLinkCount": 0,
    "ChildSpanCount": 0,
    "Resource": null,
    "InstrumentationLibrary": {
      "Name": "shopify.com/sarama/examples/interceptors",
      "Version": ""
    }
  }
]
[{"SpanContext":{"TraceID":"b3922fbbaab23b16401c353b0ff9ce6b","SpanID":"269f5133c0d0116e","TraceFlags":1},"ParentSpanID":"0000000000000000","SpanKind":1,"Name":"default_topic","StartTime":"2020-08-11T11:36:01.575388-07:00","EndTime":"2020-08-11T11:36:01.575399065-07:00","Attributes":[{"Key":"messaging.destination_kind","Value":{"Type":"STRING","Value":"topic"}},{"Key":"span.otel.kind","Value":{"Type":"STRING","Value":"PRODUCER"}},{"Key":"messaging.system","Value":{"Type":"STRING","Value":"kafka"}},{"Key":"net.transport","Value":{"Type":"STRING","Value":"IP.TCP"}},{"Key":"messaging.url","Value":{"Type":"STRING","Value":"localhost:9092"}},{"Key":"messaging.destination","Value":{"Type":"STRING","Value":"default_topic"}},{"Key":"messaging.message_id","Value":{"Type":"STRING","Value":"269f5133c0d0116e"}}],"MessageEvents":null,"Links":null,"StatusCode":0,"StatusMessage":"","HasRemoteParent":false,"DroppedAttributeCount":0,"DroppedMessageEventCount":0,"DroppedLinkCount":0,"ChildSpanCount":0,"Resource":null,"InstrumentationLibrary":{"Name":"shopify.com/sarama/examples/interceptors","Version":""}}]
[Sarama] 2020/08/11 11:36:01 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2020/08/11 11:36:01 producer/broker/1 starting up
[Sarama] 2020/08/11 11:36:01 producer/broker/1 state change to [open] on default_topic/0
[Sarama] 2020/08/11 11:36:01 Connected to broker at localhost:9092 (registered as #1)
^CINFO[0005] terminating the program
INFO[0005] Bye :)
[Sarama] 2020/08/11 11:36:02 Producer shutting down.

Check the produced intercepted messages

Check that messages have some headers added by the interceptor:

kafkacat -Cb localhost:9092 -t default_topic -f '\n- %s\nheaders: %h'
headers: trace_id=235b3424775d8b2f9bf21e458496f447,span_id=50da1552c105e712,message_id=50da1552c105e712
- test message 1/2 from kafka-client-go-test at 2020-08-11T11:36:01-07:00
headers: trace_id=2c4210c1d6c2ebe758eb41cbc95a0478,span_id=046bfc6d6db17ed7,message_id=046bfc6d6db17ed7
- test message 2/2 from kafka-client-go-test at 2020-08-11T11:36:01-07:00
% Reached end of topic default_topic [0] at offset 444

Documentation

The Go Gopher

There is no documentation for this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL