slogkafka

package module
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: MIT Imports: 7 Imported by: 0

README

slog: Kafka handler

tag Go Version GoDoc Build Status Go report Coverage Contributors License

A Kafka Handler for slog Go library.

See also:

HTTP middlewares:

Loggers:

Log sinks:

🚀 Install

go get github.com/samber/slog-kafka/v2

Compatibility: go >= 1.21

No breaking changes will be made to exported APIs before v3.0.0.

💡 Usage

GoDoc: https://pkg.go.dev/github.com/samber/slog-kafka/v2

Handler options
type Option struct {
	// log level (default: debug)
	Level     slog.Leveler

	// Kafka Writer
	KafkaWriter *kafka.Writer
	Timeout time.Duration // default: 60s

	// optional: customize Kafka event builder
	Converter Converter
	// optional: fetch attributes from context
	AttrFromContext []func(ctx context.Context) []slog.Attr

	// optional: see slog.HandlerOptions
	AddSource   bool
	ReplaceAttr func(groups []string, a slog.Attr) slog.Attr
}

Other global parameters:

slogkafka.SourceKey = "source"
slogkafka.ContextKey = "extra"
slogkafka.RequestKey = "request"
slogkafka.ErrorKeys = []string{"error", "err"}
slogkafka.RequestIgnoreHeaders = false
Supported attributes

The following attributes are interpreted by slogkafka.DefaultConverter:

Atribute name slog.Kind Underlying type
"user" group (see below)
"error" any error
"request" any *http.Request
other attributes *

Other attributes will be injected in extra field.

Users must be of type slog.Group. Eg:

slog.Group("user",
    slog.String("id", "user-123"),
    slog.String("username", "samber"),
    slog.Time("created_at", time.Now()),
)
Example
import (
	"context"
	"fmt"
	"time"

	slogkafka "github.com/samber/slog-kafka/v2"
	"github.com/segmentio/kafka-go"

	"log/slog"
)

func main() {
	// docker-compose up -d

	uri := "127.0.0.1:9092"

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
	}

	conn, err := dialer.DialContext(context.Background(), "tcp", uri)
	if err != nil {
		panic(err)
	}

	err = conn.CreateTopics(kafka.TopicConfig{
		Topic:             "logs",
		NumPartitions:     12,
		ReplicationFactor: 1,
	})
	if err != nil {
		panic(err)
	}

	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{uri},
		Topic:   "logs",
		Dialer:  dialer,

		Async:       true,	// !
		Balancer:    &kafka.Hash{},
		MaxAttempts: 3,

		Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
			fmt.Printf(msg+"\n", args...)
		}),
		ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
			fmt.Printf(msg+"\n", args...)
		}),
	})

	defer writer.Close()
	defer conn.Close()

	logger := slog.New(slogkafka.Option{Level: slog.LevelDebug, KafkaWriter: writer}.NewKafkaHandler())
	logger = logger.With("release", "v1.0.0")

	logger.
		With(
			slog.Group("user",
				slog.String("id", "user-123"),
				slog.Time("created_at", time.Now()),
			),
		).
		With("error", fmt.Errorf("an error")).
		Error("a message")
}

Kafka message:

{
  "level": "ERROR",
	"logger": "samber/slog-kafka",
	"message": "a message",
	"timestamp": "2023-04-30T01:33:21.676768Z",
	"error": {
		"error": "an error",
		"kind": "*errors.errorString",
		"stack": null
	},
	"extra": {
		"release": "v1.0.0"
	},
	"user": {
		"created_at": "2023-04-30T01:33:21.676704Z",
		"id": "user-123"
	}
}
Tracing

Import the samber/slog-otel library.

import (
	slogkafka "github.com/samber/slog-kafka"
	slogotel "github.com/samber/slog-otel"
	"go.opentelemetry.io/otel/sdk/trace"
)

func main() {
	tp := trace.NewTracerProvider(
		trace.WithSampler(trace.AlwaysSample()),
	)
	tracer := tp.Tracer("hello/world")

	ctx, span := tracer.Start(context.Background(), "foo")
	defer span.End()

	span.AddEvent("bar")

	logger := slog.New(
		slogkafka.Option{
			// ...
			AttrFromContext: []func(ctx context.Context) []slog.Attr{
				slogotel.ExtractOtelAttrFromContext([]string{"tracing"}, "trace_id", "span_id"),
			},
		}.NewKafkaHandler(),
	)

	logger.ErrorContext(ctx, "a message")
}

🤝 Contributing

Don't hesitate ;)

# Install some dev dependencies
make tools

# Run tests
make test
# or
make watch-test

👤 Contributors

Contributors

💫 Show your support

Give a ⭐️ if this project helped you!

GitHub Sponsors

📝 License

Copyright © 2023 Samuel Berthe.

This project is MIT licensed.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ContextKey = "extra"
View Source
var ErrorKeys = []string{"error", "err"}
View Source
var RequestIgnoreHeaders = false
View Source
var RequestKey = "request"
View Source
var SourceKey = "source"

Functions

func DefaultConverter

func DefaultConverter(addSource bool, replaceAttr func(groups []string, a slog.Attr) slog.Attr, loggerAttr []slog.Attr, groups []string, record *slog.Record) map[string]any

Types

type Converter

type Converter func(addSource bool, replaceAttr func(groups []string, a slog.Attr) slog.Attr, loggerAttr []slog.Attr, groups []string, record *slog.Record) map[string]any

type KafkaHandler

type KafkaHandler struct {
	// contains filtered or unexported fields
}

func (*KafkaHandler) Enabled

func (h *KafkaHandler) Enabled(_ context.Context, level slog.Level) bool

func (*KafkaHandler) Handle

func (h *KafkaHandler) Handle(ctx context.Context, record slog.Record) error

func (*KafkaHandler) WithAttrs

func (h *KafkaHandler) WithAttrs(attrs []slog.Attr) slog.Handler

func (*KafkaHandler) WithGroup

func (h *KafkaHandler) WithGroup(name string) slog.Handler

type Option

type Option struct {
	// log level (default: debug)
	Level slog.Leveler

	// Kafka Writer
	KafkaWriter *kafka.Writer
	Timeout     time.Duration // default: 60s

	// optional: customize Kafka event builder
	Converter Converter
	// optional: fetch attributes from context
	AttrFromContext []func(ctx context.Context) []slog.Attr

	// optional: see slog.HandlerOptions
	AddSource   bool
	ReplaceAttr func(groups []string, a slog.Attr) slog.Attr
}

func (Option) NewKafkaHandler

func (o Option) NewKafkaHandler() slog.Handler

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL