flow

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2022 License: Apache-2.0 Imports: 6 Imported by: 4

README

flow

Go Reference

An OpenTelemetry SpanProcessor reporting tracing flow metrics.

Getting Started

Assuming you have working code using the OpenTelemetry SDK, update the registration of your exporter to use a wrapped SpanProcessor.

Update your exporter registration with a BatchSpanProcessor to use the equivalent flow TracerProviderOption.

import (
	"github.com/MrAlias/flow"
	"go.opentelemetry.io/otel/sdk/trace"
)

func main() {
	sdk := trace.NewTracerProvider(flow.WithBatcher(exporter{}))
	/* ... */
}

More generically, all SpanProcessors can be wrapped directly.

import (
	"github.com/MrAlias/flow"
	"go.opentelemetry.io/otel/sdk/trace"
)

func main() {
	spanProcessor := trace.NewSimpleSpanProcessor(exporter{})
	sdk := trace.NewTracerProvider(flow.WithSpanProcessor(spanProcessor))
	/* ... */
}

See the included example for an end-to-end illustration of functionality.

Produced Metrics

The flow SpanProcessor will report spans_total metrics as a counter. They are exposed at localhost:41820 by default (this can be changed using the WithListenAddress option).

$ curl -s http://localhost:41820/metrics | grep 'spans_total'

# HELP spans_total The total number of processed spans
# TYPE spans_total counter
spans_total{state="ended"} 762
spans_total{state="started"} 762

Configure a locally running Prometheus or OpenTelemetry Collector instance to scrape these using a scrape target similar to this.

scrape_configs:
- job_name: myapp
  static_configs:
  - targets:
    - 'localhost:41820'

Documentation

Overview

Package flow provides an OpenTelemetry SpanProcessor that reports telemetry flow as Prometheus metrics.

To start using, replace the TracerProviderOption from the default OpenTelemetry SDK with the ones provided here. For example:

sdk := trace.NewTracerProvider(trace.WithBatcher(exporter{}))

Can be replaced with:

sdk := trace.NewTracerProvider(flow.WithBatcher(exporter{}))

Additionally, any custom span processor can be wrapped into a TracerProviderOption. For example:

spanProcessor := trace.NewSimpleSpanProcessor(exporter{})
sdk := trace.NewTracerProvider(flow.WithSpanProcessor(spanProcessor))
Example
package main

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"strings"

	"github.com/MrAlias/flow"
	"go.opentelemetry.io/otel/sdk/trace"
)

func main() {
	ctx := context.TODO()
	sdk := trace.NewTracerProvider(flow.WithBatcher(exporter{}))
	defer func() { _ = sdk.Shutdown(ctx) }()

	_, span := sdk.Tracer("flow-example").Start(ctx, "example")
	fmt.Println("started span")
	printSpansTotal()

	span.End()
	fmt.Println("ended span")
	printSpansTotal()

}

type exporter struct{}

func (e exporter) ExportSpans(_ context.Context, spans []trace.ReadOnlySpan) error {
	for _, span := range spans {
		fmt.Println("exported:", span.Name())
	}
	return nil
}

func (e exporter) Shutdown(ctx context.Context) error { return nil }

func printSpansTotal() {
	addr := fmt.Sprintf("http://localhost:%d/metrics", flow.DefaultListenPort)
	resp, err := http.Get(addr)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	b, err := io.ReadAll(resp.Body)
	if err != nil {
		panic(err)
	}

	for _, line := range strings.Split(string(b), "\n") {
		if strings.HasPrefix(line, "spans_total") {
			fmt.Println(string(line))
		}
	}
}
Output:

started span
spans_total{state="started"} 1
ended span
spans_total{state="ended"} 1
spans_total{state="started"} 1
exported: example

Index

Examples

Constants

View Source
const (
	// DefaultListenPort is the port the HTTP server listens on if not
	// configured with the WithListenAddress option.
	DefaultListenPort = 41820
	// DefaultListenAddress is the listen address of the HTTP server if not
	// configured with the WithListenAddress option.
	DefaultListenAddress = ":41820"
)

Variables

This section is empty.

Functions

func WithBatcher

WithBatcher returns an option that registers exporter using a BatchSpanProcessor with a TracerProvider after wrapping it to report telemetry flow metrics.

If configuration of the flow span processor is needed, use WithSpanProcessor or Wrap directly.

Example
package main

import (
	"context"
	"fmt"

	"github.com/MrAlias/flow"
	"go.opentelemetry.io/otel/sdk/trace"
)

func main() {
	sdk := trace.NewTracerProvider(flow.WithBatcher(exporter{}))
	defer func() { _ = sdk.Shutdown(context.Background()) }()
}

type exporter struct{}

func (e exporter) ExportSpans(_ context.Context, spans []trace.ReadOnlySpan) error {
	for _, span := range spans {
		fmt.Println("exported:", span.Name())
	}
	return nil
}

func (e exporter) Shutdown(ctx context.Context) error { return nil }
Output:

func WithSpanProcessor

func WithSpanProcessor(spanProcessor trace.SpanProcessor, options ...Option) trace.TracerProviderOption

WithSpanProcessor returns an option that registers spanProcessor with a TracerProvider after wrapping it to report telemetry flow metrics.

Example
package main

import (
	"context"
	"fmt"

	"github.com/MrAlias/flow"
	"go.opentelemetry.io/otel/sdk/trace"
)

func main() {
	spanProcessor := trace.NewSimpleSpanProcessor(exporter{})
	sdk := trace.NewTracerProvider(flow.WithSpanProcessor(spanProcessor))
	defer func() { _ = sdk.Shutdown(context.Background()) }()
}

type exporter struct{}

func (e exporter) ExportSpans(_ context.Context, spans []trace.ReadOnlySpan) error {
	for _, span := range spans {
		fmt.Println("exported:", span.Name())
	}
	return nil
}

func (e exporter) Shutdown(ctx context.Context) error { return nil }
Output:

func Wrap

func Wrap(downstream trace.SpanProcessor, options ...Option) trace.SpanProcessor

Wrap returns a wrapped version of the downstream SpanProcessor with telemetry flow reporting. All calls to the returned SpanProcessor will introspected for telemetry data and then forwarded to downstream.

Types

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option configures the flow SpanProcessor.

func WithListenAddress

func WithListenAddress(addr string) Option

WithListenAddress sets the listen address of the HTTP server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL