events

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2022 License: MIT Imports: 15 Imported by: 8

README

events

An event abstraction library with a Kafka implementation.

You can find examples on examples folder.

Usage

go get github.com/blacklane/go-libs/x/events@events

Consumer

	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"group.id":           "consumer-example" + strconv.Itoa(int(time.Now().Unix())),
		"bootstrap.servers":  "localhost:9092",
		"session.timeout.ms": 6000,
		"auto.offset.reset":  "earliest",
	})
	if err != nil {
		log.Panicf("could not create kafka consumer: %v", err)
	}

	if err := consumer.Subscribe(topic, nil); err != nil {
		log.Panicf("failed to subscribe to Kafka topic %s: %v", topic, err)
	}

	c := events.NewKafkaConsumer(consumer, events.HandlerFunc(
		func(ctx context.Context, e events.Event) error {
			log.Printf("consumed event: %s", e.Payload)
			return nil
		}))

	c.Run(time.Second)

Producer

	errHandler := events.ErrorHandlerFunc(func(event events.Event, err error) {
		log.Panicf("failed to deliver the event %s: %v", string(event.Payload), err)
	})

	kpc := events.NewKafkaProducerConfig(&kafka.ConfigMap{
		"bootstrap.servers":  "localhost:9092",
		"message.timeout.ms": 1000,
	})
	kpc.WithEventDeliveryErrHandler(errHandler)

	p, err := events.NewKafkaProducer(kpc)
	if err != nil {
		log.Panicf("could not create kafka producer: %v", err)
	}

	// handle failed deliveries
	_ = p.HandleEvents()
	defer p.Shutdown(context.Background())

	e := events.Event{Payload: []byte("Hello, Gophers")}
	err = p.Send(
		e,
		"events-example-topic")
	if err != nil {
		log.Panicf("error sending the event %s: %v", e, err)
	}

Usage with OAuth2.0 authentication

On examples/oauth/Makefile you find a makefile with the default kafka console consumer and producer configured to use OAuth2.0 authentication. You only have to set your OAuth Client ID and Secret.

Consumer

tokenSource := oauth.NewTokenSource(
		"FIX_ME_ClientID",
		"FIX_ME_ClientSecret",
		"FIX_ME_TokenURL",
		5*time.Second,
		http.Client{Timeout: 3 * time.Second})

	kafkaConfig := &kafka.ConfigMap{
		"group.id":           "KafkaGroupID",
		"bootstrap.servers":  "KafkaServer",
		"session.timeout.ms": 6000,
		"auto.offset.reset":  "earliest",
	}

	kc := events.NewKafkaConsumerConfig(kafkaConfig)
	kc.WithOAuth(tokenSource)
	kc.WithErrFunc(func(err error) { errLogger.Print(err) })

	log.Printf("creating kafka consumer for topic %s...", topic)
	c, err := events.NewKafkaConsumer(
		kc,
		[]string{topic},
		events.HandlerFunc(
			func(ctx context.Context, e events.Event) error {
				log.Printf("consumed event: %s", e.Payload)
				return nil
			}))
	if err != nil {
		panic(fmt.Sprintf("could not create kafka consumer: %v", err))
	}

	c.Run(time.Second)

Producer

	tokenSource := oauth.NewTokenSource(
		"FIX_ME_ClientID",
		"FIX_ME_ClientSecret",
		"FIX_ME_TokenURL",
		5*time.Second,
		http.Client{Timeout: 3 * time.Second})

	kpc := events.NewKafkaProducerConfig(&kafka.ConfigMap{
		"bootstrap.servers":  config.KafkaServer,
		"message.timeout.ms": 6000,
	})
	kpc.WithEventDeliveryErrHandler(errHandler)
	kpc.WithOAuth(tokenSource)
	kpc.WithErrFunc(func(err error) { errLogger.Print(err) })

	p, err := events.NewKafkaProducer(kpc)
	if err != nil {
		log.Panicf("could not create kafka producer: %v", err)
	}

	_ = p.HandleEvents()
	defer func() { log.Printf("Shutdown: %v", p.Shutdown(context.Background())) }()

	payload := fmt.Sprintf("[%s] Hello, Gophers", time.Now())
	e := events.Event{Payload: []byte(payload)}

	err = p.Send(e, topic)
	if err != nil {
		log.Printf("[ERROR] sending the event %s: %v", e, err)
	}

Development

We provide a docker-compose to spin up all the needed dependencies and run the tests as well as .env_local with the needed environment variables.

To spin up a kafka cluster and connect from you local machine, run:

make compose-kafka
  • kafka is reachable on localhost:9092
  • zookeeper is reachable on localhost:2181

you might run make clean to stop and remove the containers created by docker-compose

Tests

  • Pass the build tag integration to go test to run integrations tests
  • Use the -short to skip slow tests. As some tests need to publish and consume messages they sleep for a few seconds.
  • use .env_compose if running the tests through dockercompose

Docker compose

make compose-tests

Local

make test

IDEs

As build tags are used to separate integration tests from unit tests make sure to set up your IDE to include the files with the integration build flag

GoLand

To set it up to always use custom build flags:

  • Go to Preferences > Go > Build Tags & Vendoring and fill in Custom tags

  • Then on Run/Debug Configurations set Templates > Go Test to Use all custom build tags

Open Run/Debug Configurations

Set Run/Debug Configurations

Documentation

Index

Examples

Constants

View Source
const HeaderTrackingID = "X-Tracking-Id"
View Source
const OTelTracerName = "github.com/blacklane/go-libs/x/events"

OTelTracerName is the name to be used when getting the OTel tracer. TODO: make a method to create/get a tracer?

Variables

View Source
var ErrConsumerAlreadyShutdown = errors.New("consumer already shutdown")
View Source
var ErrProducerIsAlreadyRunning = errors.New("producer is already running")
View Source
var ErrProducerNotHandlingEvents = errors.New("producer should be handling events")
View Source
var ErrShutdownTimeout = errors.New("shutdown timeout: not all handlers finished, not closing kafka client")

Functions

func NewKafkaConfig added in v0.5.0

func NewKafkaConfig(options ...KafkaConfigOption) *kafka.ConfigMap

NewKafkaConfig creates a kafka config object according to confluentic documentation: https://docs.confluent.io/platform/current/installation/configuration

func NewTokenSource

func NewTokenSource(
	clientID,
	clientSecret,
	url string,
	refreshBefore time.Duration,
	httpClient http.Client) oauth2.TokenSource

NewTokenSource returns a simple oauth2.TokenSource implementation. It refreshes the token refreshBefore the token expiration.

Types

type Consumer

type Consumer interface {
	Run(timeout time.Duration)
	Shutdown(ctx context.Context) error
}

func NewKafkaConsumer

func NewKafkaConsumer(config *KafkaConsumerConfig, topics []string, handlers ...Handler) (Consumer, error)

NewKafkaConsumer returns a Consumer which will send every message to all handlers and ignore any error returned by them. A middleware should handle the errors. To handle errors, either `kafka.Error` messages or any other error while interacting with Kafka, register a Error function on *KafkaConsumerConfig.

type Event

type Event struct {
	Headers        Header
	TopicPartition TopicPartition
	Key            []byte
	Payload        []byte
}

type Handler

type Handler interface {
	Handle(context.Context, Event) error
}

type HandlerBuilder

type HandlerBuilder struct {
	// contains filtered or unexported fields
}
Example
m1 := Middleware(func(handler Handler) Handler {
	return HandlerFunc(func(ctx context.Context, e Event) error {
		fmt.Println("middleware 1: before handler")
		err := handler.Handle(ctx, e)
		fmt.Println("middleware 1: after handler")
		return err
	})
})
m2 := Middleware(func(handler Handler) Handler {
	return HandlerFunc(func(ctx context.Context, e Event) error {
		fmt.Println("middleware 2: before handler")
		err := handler.Handle(ctx, e)
		fmt.Println("middleware 2: after handler")
		return err
	})
})
h := HandlerFunc(func(_ context.Context, _ Event) error {
	fmt.Println("handler")
	return nil
})

hb := HandlerBuilder{}
hb.AddHandler(h)
hb.UseMiddleware(m1, m2)

// HandlerBuilder.Build returns a slice as several handlers might be added
handler := hb.Build()[0]

err := handler.Handle(context.Background(), Event{})
if err != nil {
	fmt.Println("handler error: " + err.Error())
}
Output:

middleware 1: before handler
middleware 2: before handler
handler
middleware 2: after handler
middleware 1: after handler
Example (MultipleHandlers)
m1 := Middleware(func(handler Handler) Handler {
	return HandlerFunc(func(ctx context.Context, e Event) error {
		fmt.Println("middleware 1: before handler")
		err := handler.Handle(ctx, e)
		fmt.Println("middleware 1: after handler")
		return err
	})
})
m2 := Middleware(func(handler Handler) Handler {
	return HandlerFunc(func(ctx context.Context, e Event) error {
		fmt.Println("middleware 2: before handler")
		err := handler.Handle(ctx, e)
		fmt.Println("middleware 2: after handler")
		return err
	})
})
h1 := HandlerFunc(func(_ context.Context, _ Event) error {
	fmt.Println("handler 1")
	return nil
})
h2 := HandlerFunc(func(_ context.Context, _ Event) error {
	fmt.Println("handler 2")
	return nil
})

hb := HandlerBuilder{}
hb.AddHandler(h1)
hb.UseMiddleware(m1, m2)
hb.AddHandler(h2)

// HandlerBuilder.Build returns a slice as several handlers might be added
handlers := hb.Build()
handler1 := handlers[0]
handler2 := handlers[1]

err := handler1.Handle(context.Background(), Event{})
if err != nil {
	fmt.Println("handler1 error: " + err.Error())
}

fmt.Print("\n")

err = handler2.Handle(context.Background(), Event{})
if err != nil {
	fmt.Println("handler2 error: " + err.Error())
}
Output:

middleware 1: before handler
middleware 2: before handler
handler 1
middleware 2: after handler
middleware 1: after handler

middleware 1: before handler
middleware 2: before handler
handler 2
middleware 2: after handler
middleware 1: after handler

func (*HandlerBuilder) AddHandler

func (hb *HandlerBuilder) AddHandler(h Handler)

func (HandlerBuilder) Build

func (hb HandlerBuilder) Build() []Handler

func (*HandlerBuilder) UseMiddleware

func (hb *HandlerBuilder) UseMiddleware(m ...Middleware)

type HandlerFunc

type HandlerFunc func(ctx context.Context, e Event) error

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(ctx context.Context, e Event) error
type Header map[string]string

func (Header) Get

func (h Header) Get(key string) string

func (Header) Keys

func (h Header) Keys() []string

func (Header) Set

func (h Header) Set(key, value string)

type KafkaConfigOption added in v0.5.0

type KafkaConfigOption func(kafka.ConfigMap)

func WithAutoOffsetReset added in v0.5.0

func WithAutoOffsetReset(offsetReset OffsetReset) KafkaConfigOption

WithAutoOffsetReset specify what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)

func WithBootstrapServers added in v0.5.0

func WithBootstrapServers(servers []string) KafkaConfigOption

WithBootstrapServers sets the list of host/port pairs to use for establishing the initial connection to the Kafka cluster.

func WithCommaSeparatedBootstrapServers added in v0.5.0

func WithCommaSeparatedBootstrapServers(servers string) KafkaConfigOption

WithCommaSeparatedBootstrapServers sets the list of host/port pairs to use for establishing the initial connection to the Kafka cluster.

func WithGroupID added in v0.5.0

func WithGroupID(groupID string) KafkaConfigOption

WithGroupID sets a unique string that identifies the consumer group this consumer belongs to.

func WithKeyValue added in v0.5.0

func WithKeyValue(key string, value interface{}) KafkaConfigOption

func WithLogConnectionClose added in v0.5.0

func WithLogConnectionClose(logClose bool) KafkaConfigOption

func WithSessionTimeout added in v0.5.0

func WithSessionTimeout(timeout time.Duration) KafkaConfigOption

WithSessionTimeout sets the timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker.

func WithTopicMetadataRefreshInterval added in v0.5.0

func WithTopicMetadataRefreshInterval(interval time.Duration) KafkaConfigOption

type KafkaConsumerConfig

type KafkaConsumerConfig struct {
	// contains filtered or unexported fields
}

KafkaConsumerConfig holds all possible configurations for the kafka consumer. Use NewKafkaProducerConfig to initialise it. To see the possible configurations, check the its WithXXX methods and *kafkaConfig.WithXXX` methods as well

func NewKafkaConsumerConfig

func NewKafkaConsumerConfig(config *kafka.ConfigMap) *KafkaConsumerConfig

NewKafkaConsumerConfig returns a initialised *KafkaConsumerConfig

func (KafkaConsumerConfig) WithErrFunc

func (kc KafkaConsumerConfig) WithErrFunc(errFn func(error))

WithErrFunc sets a function to handle any error beyond producer delivery errors.

func (KafkaConsumerConfig) WithOAuth

func (kc KafkaConsumerConfig) WithOAuth(tokenSource oauth2.TokenSource)

WithOAuth prepares to handle OAuth2. It'll set the kafka configurations:

sasl.mechanism: OAUTHBEARER
security.protocol: SASL_SSL

it'll override any existing value for sasl.mechanism, security.protocol.

type KafkaProducerConfig

type KafkaProducerConfig struct {
	// contains filtered or unexported fields
}

KafkaProducerConfig holds all possible configurations for the kafka producer. Use NewKafkaProducerConfig to initialise it. To see the possible configurations, check the its WithXXX methods and *kafkaConfig.WithXXX` methods as well

func NewKafkaProducerConfig

func NewKafkaProducerConfig(config *kafka.ConfigMap) *KafkaProducerConfig

NewKafkaProducerConfig returns an initialised *KafkaProducerConfig

func (KafkaProducerConfig) WithErrFunc

func (kc KafkaProducerConfig) WithErrFunc(errFn func(error))

WithErrFunc sets a function to handle any error beyond producer delivery errors.

func (*KafkaProducerConfig) WithEventDeliveryErrHandler

func (pc *KafkaProducerConfig) WithEventDeliveryErrHandler(errHandler func(Event, error))

WithEventDeliveryErrHandler registers a delivery error handler to be called whenever a delivery fails.

func (*KafkaProducerConfig) WithFlushTimeout

func (pc *KafkaProducerConfig) WithFlushTimeout(timeout int)

WithFlushTimeout sets the producer Flush timeout.

func (KafkaProducerConfig) WithOAuth

func (kc KafkaProducerConfig) WithOAuth(tokenSource oauth2.TokenSource)

WithOAuth prepares to handle OAuth2. It'll set the kafka configurations:

sasl.mechanism: OAUTHBEARER
security.protocol: SASL_SSL

it'll override any existing value for sasl.mechanism, security.protocol.

type Middleware

type Middleware func(Handler) Handler

type OffsetReset added in v0.5.0

type OffsetReset string
const (
	// OffsetResetEarliest automatically reset the offset to the earliest offset
	OffsetResetEarliest OffsetReset = "earliest"
	// OffsetResetLatest automatically reset the offset to the latest offset
	OffsetResetLatest OffsetReset = "latest"
	// OffsetResetNone throw exception to the consumer if no previous offset is found for the consumer's group
	OffsetResetNone OffsetReset = "none"
)

type Producer

type Producer interface {
	// Send sends an event to the given topic
	// Deprecated. use SendCtx instead
	Send(event Event, topic string) error
	// SendCtx send an event to the given topic.
	// It also adds the OTel propagation headers and the X-Tracking-Id if not set
	// already.
	SendCtx(ctx context.Context, eventName string, event Event, topic string) error
	// SendWithTrackingID adds the tracking ID to the event's headers and sends
	// it to the given topic
	// Deprecated. use SendCtx instead
	SendWithTrackingID(trackingID string, event Event, topic string) error
	// HandleEvents starts to listen to the producer events channel
	HandleEvents() error
	// Shutdown gracefully shuts down the producer, it respect the context
	// timeout.
	Shutdown(ctx context.Context) error
}

func NewKafkaProducer

func NewKafkaProducer(c *KafkaProducerConfig) (Producer, error)

NewKafkaProducer returns new a producer. To handle errors, either `kafka.Error` messages or any other error while interacting with Kafka, register an Error function on *KafkaConsumerConfig.

type Token

type Token struct {
	AccessToken      string `json:"access_token"`
	ExpiresIn        int    `json:"expires_in"`
	RefreshExpiresIn int    `json:"refresh_expires_in"`
	RefreshToken     string `json:"refresh_token"`
	TokenType        string `json:"token_type"`
	NotBeforePolicy  int    `json:"not-before-policy"`
	SessionState     string `json:"session_state"`
	Scope            string `json:"scope"`
}

Token represents an JWT token. TODO: ensure it isn't used and remove it.

type TopicPartition added in v0.4.0

type TopicPartition struct {
	Topic     string
	Partition int32
	Offset    int64
}

Directories

Path Synopsis
Package eventstest is a generated GoMock package.
Package eventstest is a generated GoMock package.
examples
oauth Module
without-auth Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL