streams

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2022 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

GKES (Go Kafka Event Source) attempts to fill the gaps ub the Go/Kafka library ecosystem. It supplies Exactly Once Semantics (EOS), local state stores and incremental consumer rebalancing to Go Kafka consumers, making it a viable alternative to a traditional Kafka Streams application written in Java.

What it is

GKES is Go/Kafka library tailored towards the development of Event Sourcing applications, by providing a high-throughput, low-latency Kafka client framework. Using Kafka transactions, it provides for EOS, data integrity and high availability. If you wish to use GKES as straight Kafka consumer, it will fit the bill as well. Though there are plenty of libraries for that, and researching which best fits your use case is time well spent.

GKES is not an all-in-one, do-everything black box. Some elements, in particular the StateStore, have been left without comprehensive implementations.

StateStores

A useful and performant local state store rarely has a flat data structure. If your state store does, there are some convenient implementations provided. However, to achieve optimum performance, you will not only need to write a StateStore implementation, but will also need to understand what the proper data structures are for your use case (trees, heaps, maps, disk-based LSM trees or combinations thereof). You can use the provided github.com/aws/go-kafka-event-source/streams/stores.SimpleStore as a starting point.

Vending State

GKES purposefully does not provide a pre-canned way for exposing StateStore data, other than a producing to another Kafka topic. There are as many ways to vend data as there are web applications. Rather than putting effort into inventing yet another one, GKES provides the mechanisms to query StateStores via Interjections. This mechanism can be plugged into whatever request/response mechanism that suits your use-case (gRPC, RESTful HTTP service...any number of web frameworks already in the Go ecosystem). [TODO: provide a simple http example]

Interjections

For this familiar with thw Kafka Streams API, GKES provides for stream `Punctuators“, but we call them `Interjections` (because it sounds cool). Interjections allow you to insert actions into your EventSource at specicifed interval per partition assigned via streams.EventSource.ScheduleInterjection, or at any time via streams.EventSource.Interject. This is useful for bookeeping activities, aggregated metric production or even error handling. Interjections have full access to the StateStore associated with an EventSource and can interact with output topics like any other EventProcessor.

Incremental Consumer Rebalancing

One issue that Kafka conumer applications have long suffered from are latency spikes during a consumer rebalance. The cooperative sticky rebalancing introduced by Kafka and implemented by kgo helps resolve this issue. However, once StateStore are thrown into the mix, things get a bit more complicated because initializing the StateStore on a host invloves consuming a compacted TopicPartion from start to end. GKES solves this with the IncrementalRebalancer and takes it one step further. The IncrementalRebalancer rebalances consumer partitions in a controlled fashion, minimizing latency spikes and limiting the blast of a bad deployment.

Async Processing

GKES provides conventions for asynchronously processing events on the same Kafka partition while still maintaining data/stream integrity. The AsyncBatcher and AsyncJobScheduler allow you to split a TopicPartition into sub-streams by key, ensuring all events for a partitcular key are processed in order, allowing for parallel processing on a given TopicPartition.

For more details, see Async Processing Examples

High-Throughput/Low-Latency EOS

A Kafka transaction is a powerful tool which allows for Exactly Once Semantics (EOS) by linking a consumer offset commit to one or more records that are being produced by your application (a StateStore record for example). The history of Kafka EOS is a long and complicated one with varied degrees of performance and efficiency.

Early iterations required one producer transaction per consumer partition, which was very ineffiecient as Topic with 1000 partitions would also require 1000 clients in order to provide EOS. This has since been addressed, but depending on client implementations, there is a high risk of running into "producer fenced" errors as well as reduced throughput.

In a traditional Java Kafka Streams application, transactions are committed according to the auto-commit frequency, which defaults to 100ms. This means that your application will only produce readable records every 100ms per partition. The effect of this is that no matter what you do, your tail latency will be at least 100ms and downstream consumers will receive records in bursts rather than a steady stream. For many use cases, this is unaceptable.

GKES solves this issue by using a configurable transactional producer pool and a type of "Nagle's algorithm". Uncommitted offsets are added to the transaction pool in sequence. Once a producer has reach its record limit, or enough time has elapsed (10ms by default), the head transaction will wait for any incomplete events to finsh, then flush and commit. While this transaction is committing, GKES continues to process events and optimistically begins a new transaction and produces records on the next producer in pool. Since trasnaction produce in sequence, there is no danger of commit offset overlap or duplicate message processing in the case of a failure.

To ensure EOS, your EventSource must use either the IncrementalRebalancer, or kgos cooperative sticky implementation. Though if you're using a StateStore, IncrementalRebalancer should be used to avoid lengthy periods of inactivity during application deployments.

Kafka Client Library

Rather than create yet another Kafka driver, GKES is built on top of kgo. This Kafka client was chosen as it (in our testing) has superior throughput and latency profiles compared to other client libraries currently available to Go developers.

One other key adavantage is that it provides a migration path to cooperative consumer rebalancing, required for our EOS implementation. Other Go Kafka libraries provide cooperative rebalancing, but do not allow you to migrate froma non-cooperative rebalancing strategy (range, sticky etc.). This is a major roadblock for existing deployemtns as the only migration paths are an entirely new consumer group, or to bring your application completely down and re-deploy with a new rebalance strategy. These migration plans, to put it mildly, are big challenge for zero-downtime/live applications. The kgo package now makes this migration possible with zero downtime.

Kgo also has the proper hooks need to implement the IncrementalGroupRebalancer, which is necessary for safe deployments when using a local state store. Kudos to kgo!

Index

Examples

Constants

View Source
const AutoAssign = int32(-1)
View Source
const DefaultBatchDelay = 10 * time.Millisecond
View Source
const DefaultMaxBatchSize = 10000
View Source
const DefaultPendingTxnCount = 1
View Source
const DefaultPoolSize = 3
View Source
const DefaultTargetBatchSize = 1000
View Source
const IncrementalCoopProtocol = "incr_coop"
View Source
const LexInt64Size = int(unsafe.Sizeof(uint64(1))) + 1
View Source
const PartitionPreppedOperation = "PartitionPrepped"
View Source
const RecordTypeHeaderKey = "__grt__" // let's keep it small. every byte counts

The record.Header key that GKES uses to transmit type information about an IncomingRecord or a ChangeLogEntry.

View Source
const TxnCommitOperation = "TxnCommit"

Variables

View Source
var ComputeConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU(),
	WorkerQueueDepth:  1000,
	MaxConcurrentKeys: 10000,
}
View Source
var DefaultBalanceStrategies = []BalanceStrategy{IncrementalBalanceStrategy}
View Source
var DefaultConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU(),
	WorkerQueueDepth:  1000,
	MaxConcurrentKeys: 10000,
}
View Source
var DefaultEosConfig = EosConfig{
	PoolSize:        DefaultPoolSize,
	PendingTxnCount: DefaultPendingTxnCount,
	TargetBatchSize: DefaultTargetBatchSize,
	MaxBatchSize:    DefaultMaxBatchSize,
	BatchDelay:      DefaultBatchDelay,
}
View Source
var ErrPartitionNotAssigned = errors.New("partition is not assigned")
View Source
var ErrPartitionNotReady = errors.New("partition is not ready")
View Source
var FastNetworkConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU() * 4,
	WorkerQueueDepth:  100,
	MaxConcurrentKeys: 10000,
}
View Source
var Int32Codec = intCodec[int32]{}

Convenience codec for working with int32 types Will never induce an error unless there is an OOM condition, so they are safe to ignore on Encode/Decode

View Source
var Int64Codec = intCodec[int64]{}

Convenience codec for working with int64 types Will never induce an error unless there is an OOM condition, so they are safe to ignore on Encode/Decode

View Source
var IntCodec = intCodec[int]{}

Convenience codec for working with int types. Will never induce an error unless there is an OOM condition, so they are safe to ignore on Encode/Decode

View Source
var LexoInt64Codec = lexoInt64Codec{}

A convenience Codec for integers where the encoded value is suitable for sorting in data structure which use []byte as keys (such as an LSM based db like BadgerDB or RocksDB). Useful if you need to persist items in order by timestamp or some other integer value. Decode will generate an error if the input []byte size is not LexInt64Size.

View Source
var SlowNetworkConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU() * 16,
	WorkerQueueDepth:  100,
	MaxConcurrentKeys: 10000,
}
View Source
var WideNetworkConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU() * 32,
	WorkerQueueDepth:  1000,
	MaxConcurrentKeys: 10000,
}

Functions

func DeleteSource

func DeleteSource(sourceConfig EventSourceConfig) error

Deletes all topics associated with a Source. Provided for local testing purpoose only. Do not call this in deployed applications unless your topics are transient in nature.

func JsonItemDecoder

func JsonItemDecoder[T any](record IncomingRecord) (T, error)

A convenience function for decoding an IncomingRecord. Conforms to streams.IncomingRecordDecoder interface needed for streams.RegisterEventType

streams.RegisterEventType(myEventSource, codec.JsonItemDecoder[myType], myHandler, "myType")
// or standalone
myDecoder := codec.JsonItemDecoder[myType]
myItem := myDecoder(incomingRecord)

func NewClient

func NewClient(cluster Cluster, options ...kgo.Opt) (*kgo.Client, error)

NewClient creates a kgo.Client from the options retuned from the provided Cluster and addtional `options`. Used internally and exposed for convenience.

func RegisterDefaultHandler added in v1.0.4

func RegisterDefaultHandler[T StateStore](es *EventSource[T], recordProcessor EventProcessor[T, IncomingRecord], eventType string)

A convenience method to avoid chick-egg scenarios when initializing an EventSource. Must not be called after `EventSource.ConsumeEvents()`

func RegisterEventType

func RegisterEventType[T StateStore, V any](es *EventSource[T], transformer IncomingRecordDecoder[V], eventProcessor EventProcessor[T, V], eventType string)

Registers eventType with a transformer (usuall a codec.Codec) with the supplied EventProcessor. Must not be called after `EventSource.ConsumeEvents()`

func SetRecordType

func SetRecordType(r *kgo.Record, recordType string)

A convenience function provided in case you are working with a raw kgo producer and want to integrate with streams. This will ensure that the EventSource will route the record to the proper handler without falling back to the defaultHandler

Types

type AsyncBatcher

type AsyncBatcher[S any, K comparable, V any] struct {
	// contains filtered or unexported fields
}

AsyncBatcher performs a similar function to the AsyncJobScheduler, but is intended for performing actions for multiple events at a time. This is particularly useful when interacting with systems which provide a batch API.

For detailed examples, see https://github.com/aws/go-kafka-event-source/docs/asynprocessing.md

func NewAsyncBatcher

func NewAsyncBatcher[S StateStore, K comparable, V any](executor BatchExecutor[K, V], maxBatchSize, maxConcurrentBatches int, delay time.Duration) *AsyncBatcher[S, K, V]

Create a new AsynBatcher. Each invocation of `executor` will have a maximum of `maxBatchSize` items. No more than `maxConcurrentBatches` will be executing at any given time. AsynBatcher will accumulate items until `delay` has elapsed, or `maxBatchSize` items have been received.

func (*AsyncBatcher[S, K, V]) Add

func (ab *AsyncBatcher[S, K, V]) Add(batch *BatchItems[S, K, V]) ExecutionState

Schedules items in BatchItems to be executed when capoacity is available.

type AsyncCompleter added in v1.0.4

type AsyncCompleter[T any] interface {
	AsyncComplete(AsyncJob[T])
}

type AsyncJob added in v1.0.4

type AsyncJob[T any] struct {
	// contains filtered or unexported fields
}

func (AsyncJob[T]) Finalize added in v1.0.4

func (aj AsyncJob[T]) Finalize() ExecutionState

type AsyncJobFinalizer

type AsyncJobFinalizer[T any, K comparable, V any] func(*EventContext[T], K, V, error) ExecutionState

A callback invoked when a previously scheduled AsyncJob has been completed.

type AsyncJobProcessor

type AsyncJobProcessor[K comparable, V any] func(K, V) error

A handler invoked when a previously scheduled AsyncJob should be performed.

type AsyncJobScheduler

type AsyncJobScheduler[S StateStore, K comparable, V any] struct {
	// contains filtered or unexported fields
}

The AsyncJobScheduler provides a generic work scheduler/job serializer which takes a key/value as input via Schedule. All work is organized into queues by 'key'. So for a given key, all work is serial allowing the use of the single writer principle in an asynchronous fashion. In practice, it divides a stream partition into it's individual keys and processes the keys in parallel.

After the the scheduling is complete for a key/value, Scheduler will call the `processor` callback defined at initialization. The output of this call will be passed to the `finalizer` callback. If `finalizer` is nil, the event is marked as `Complete`, once the job is finished, ignoring any errors.

For detailed examples, see https://github.com/aws/go-kafka-event-source/docs/asynprocessing.md

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aws/go-kafka-event-source/streams"
	"github.com/aws/go-kafka-event-source/streams/sak"
	"github.com/aws/go-kafka-event-source/streams/stores"
)

type Contact struct {
	Id          string
	PhoneNumber string
	Email       string
	FirstName   string
	LastName    string
	LastContact time.Time
}

type NotifyContactEvent struct {
	ContactId        string
	NotificationType string
}

type EmailNotification struct {
	ContactId string
	Address   string
	Payload   string
}

func (c Contact) Key() string {
	return c.Id
}

func createContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState {
	contactStore := ctx.Store()
	ctx.RecordChange(contactStore.Put(contact))
	fmt.Printf("Created contact: %s\n", contact.Id)
	return streams.Complete
}

// simply providing an example of how you might wrap the store into your own type
type ContactStore struct {
	*stores.SimpleStore[Contact]
}

func NewContactStore(tp streams.TopicPartition) ContactStore {
	return ContactStore{stores.NewJsonSimpleStore[Contact](tp)}
}

var notificationScheduler *streams.AsyncJobScheduler[ContactStore, string, EmailNotification]

func notifyContactAsync(ctx *streams.EventContext[ContactStore], notification NotifyContactEvent) streams.ExecutionState {
	contactStore := ctx.Store()
	if contact, ok := contactStore.Get(notification.ContactId); ok {
		fmt.Printf("Notifying contact: %s asynchronously by %s\n", contact.Id, notification.NotificationType)
		return notificationScheduler.Schedule(ctx, contact.Email, EmailNotification{
			ContactId: contact.Id,
			Address:   contact.Email,
			Payload:   "sending you mail...from a computer!",
		})
	} else {
		fmt.Printf("Contact %s does not exist!\n", notification.ContactId)
	}
	return streams.Complete
}

func sendEmailToContact(key string, notification EmailNotification) error {

	fmt.Printf("Processing an email job with key: '%s'. This may take some time, emails are tricky!\n", key)
	time.Sleep(500 * time.Millisecond)
	return nil
}

func emailToContactComplete(ctx *streams.EventContext[ContactStore], _ string, email EmailNotification, err error) streams.ExecutionState {

	contactStore := ctx.Store()
	if contact, ok := contactStore.Get(email.ContactId); ok {
		fmt.Printf("Notified contact: %s, address: %s, payload: '%s'\n", contact.Id, email.Address, email.Payload)
		contact.LastContact = time.Now()
		contactStore.Put(contact)
	}
	return streams.Complete
}

func main() {
	streams.InitLogger(streams.SimpleLogger(streams.LogLevelError), streams.LogLevelError)

	contactsCluster := streams.SimpleCluster([]string{"127.0.0.1:9092"})
	sourceConfig := streams.EventSourceConfig{
		GroupId:       "ExampleAsyncJobSchedulerGroup",
		Topic:         "ExampleAsyncJobScheduler",
		NumPartitions: 10,
		SourceCluster: contactsCluster,
	}

	destination := streams.Destination{
		Cluster:      sourceConfig.SourceCluster,
		DefaultTopic: sourceConfig.Topic,
	}

	eventSource := sak.Must(streams.NewEventSource(sourceConfig, NewContactStore, nil))

	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], createContact, "CreateContact")
	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[NotifyContactEvent], notifyContactAsync, "NotifyContact")

	notificationScheduler = sak.Must(streams.CreateAsyncJobScheduler(eventSource,
		sendEmailToContact, emailToContactComplete, streams.DefaultConfig))

	eventSource.ConsumeEvents()

	contact := Contact{
		Id:          "123",
		Email:       "billy@bob.com",
		PhoneNumber: "+18005551212",
		FirstName:   "Billy",
		LastName:    "Bob",
	}

	notification := NotifyContactEvent{
		ContactId:        "123",
		NotificationType: "email",
	}

	producer := streams.NewProducer(destination)

	createContactRecord := streams.JsonItemEncoder("CreateContact", contact)
	createContactRecord.WriteKeyString(contact.Id)

	notificationRecord := streams.JsonItemEncoder("NotifyContact", notification)
	notificationRecord.WriteKeyString(notification.ContactId)

	producer.Produce(context.Background(), createContactRecord)
	producer.Produce(context.Background(), notificationRecord)

	eventSource.WaitForSignals(nil)
	// Expected Output: Created contact: 123
	// Notifying contact: 123 asynchronously by email
	// Processing an email job with key: 'billy@bob.com'. This may take some time, emails are tricky!
	// Notified contact: 123, address: billy@bob.com, payload: 'sending you mail...from a computer!'
}
Output:

func CreateAsyncJobScheduler

func CreateAsyncJobScheduler[S StateStore, K comparable, V any](
	eventSource *EventSource[S],
	processor AsyncJobProcessor[K, V],
	finalizer AsyncJobFinalizer[S, K, V],
	config SchedulerConfig) (*AsyncJobScheduler[S, K, V], error)

Creates an AsyncJobScheduler which is tied to the RunStatus of EventSource.

func NewAsyncJobScheduler added in v1.0.4

func NewAsyncJobScheduler[S StateStore, K comparable, V any](
	runStatus sak.RunStatus,
	processor AsyncJobProcessor[K, V],
	finalizer AsyncJobFinalizer[S, K, V],
	config SchedulerConfig) (*AsyncJobScheduler[S, K, V], error)

Creates an AsyncJobScheduler which will continue to run while runStatus.Running()

func (*AsyncJobScheduler[S, K, V]) Schedule

func (ap *AsyncJobScheduler[S, K, V]) Schedule(ec *EventContext[S], key K, value V) ExecutionState

Schedules the value for processing in order by key. The finalizer will be invoked once processing is complete.

func (*AsyncJobScheduler[S, K, V]) SetMaxConcurrentKeys

func (ap *AsyncJobScheduler[S, K, V]) SetMaxConcurrentKeys(size int)

Dynamically update the MaxConcurrentKeys for the current scheduler.

func (*AsyncJobScheduler[S, K, V]) SetWorkerQueueDepth

func (ap *AsyncJobScheduler[S, K, V]) SetWorkerQueueDepth(size int)

type BalanceStrategy

type BalanceStrategy int
const (
	RangeBalanceStrategy             BalanceStrategy = 0
	RoundRobinBalanceStrategy        BalanceStrategy = 1
	CooperativeStickyBalanceStrategy BalanceStrategy = 2
	IncrementalBalanceStrategy       BalanceStrategy = 3
)

type BatchCallback

type BatchCallback[S any, K comparable, V any] func(*EventContext[S], *BatchItems[S, K, V]) ExecutionState

type BatchExecutor

type BatchExecutor[K comparable, V any] func(batch []*BatchItem[K, V])

type BatchItem

type BatchItem[K comparable, V any] struct {
	Value    V
	Err      error
	UserData any
	// contains filtered or unexported fields
}

func (BatchItem[K, V]) Key

func (bi BatchItem[K, V]) Key() K

type BatchItems added in v1.0.4

type BatchItems[S any, K comparable, V any] struct {
	UserData any
	// contains filtered or unexported fields
}

func NewBatchItems added in v1.0.4

func NewBatchItems[S any, K comparable, V any](ec *EventContext[S], key K, cb BatchCallback[S, K, V]) *BatchItems[S, K, V]

Creates a container for BatchItems and ties them to an EventContext. Once all items in BatchItems.Items() have been executed, the provided BatchCallback will be executed.

func (*BatchItems[S, K, V]) Add added in v1.0.4

func (b *BatchItems[S, K, V]) Add(values ...V) *BatchItems[S, K, V]

Adds items to BatchItems container. Values added in this method will inherit their key from the BatchItems container.

func (*BatchItems[S, K, V]) AddWithKey added in v1.0.4

func (b *BatchItems[S, K, V]) AddWithKey(key K, values ...V) *BatchItems[S, K, V]

AddWithKey() is similar to Add(), but the items added do not inherit their key from the BatchItems. Useful for interjectors that may need to batch items that belong to multiple keys.

func (*BatchItems[S, K, V]) Items added in v1.0.4

func (b *BatchItems[S, K, V]) Items() []BatchItem[K, V]

func (*BatchItems[S, K, V]) Key added in v1.0.4

func (b *BatchItems[S, K, V]) Key() K

type BatchProducer

type BatchProducer[S any] struct {
	// contains filtered or unexported fields
}

func NewBatchProducer

func NewBatchProducer[S any](destination Destination, opts ...kgo.Opt) *BatchProducer[S]

Provides similar functionality to AsyncBatcher, but in the context of producing Kafka records. Since the underlying Kafka producer already batches in an ordered fashion, there is no need to add the overhead of the AsyncBatcher. Records produced by a BatchProducer are not transactional, and therefore duplicates could be created. The use cases for the BatchProducer vs EventContext.Forward are as follows:

- The topic you are producing to is not on the same Kafka cluster as your EventSource

- Duplicates are OK and you do not want to wait for the transaction to complete before the consumers of these records can see the data (lower latency)

If your use case does not fall into the above buckets, it is recommended to just use [EventConetxt.Forward]

func (*BatchProducer[S]) Produce

func (p *BatchProducer[S]) Produce(ec *EventContext[S], records []*Record, cb BatchProducerCallback[S], userData any) ExecutionState

Produces `records` and invokes BatchProducerCallback once all records have been produced or have errored out. If there was an error in producing, it can be retrieved with record.Error()

It is important to note that GKES uses a Record pool. After the transaction has completed for this record, it is returned to the pool for reuse. Your application should not hold on to references to the Record(s) after BatchProducerCallback has been invoked.

type BatchProducerCallback

type BatchProducerCallback[S any] func(eventContext *EventContext[S], records []*Record, userData any) ExecutionState

type ChangeLogEntry

type ChangeLogEntry struct {
	// contains filtered or unexported fields
}

ChangeLogEntry represents a Kafka record which wil be produced to the StateStore for your EventSource. Note that you can not set a topic or partition on a ChangeLogEntry. These values are managed by GKES.

func CreateChangeLogEntry

func CreateChangeLogEntry[T any](item T, codec Codec[T]) (ChangeLogEntry, error)

A shortcut method for createing a ChangeLogEntry with a value endcoded using the provided codec.

cle := CreateChangeLogEntry(myValue, myCodec).WithKeyString(myKey).WithEntryType(myType)
eventContext.RecordChange(cle)

func CreateJsonChangeLogEntry

func CreateJsonChangeLogEntry[T any](item T) (ChangeLogEntry, error)

A shortcut method for createing a ChangeLogEntry with a json endcoded value.

cle := CreateJsonChangeLogEntry(myValue).WithKeyString(myKey).WithEntryType(myType)
eventContext.RecordChange(cle)

func EncodeJsonChangeLogEntryValue added in v1.0.4

func EncodeJsonChangeLogEntryValue[T any](entryType string, item T) ChangeLogEntry

A convenience function for encoding an item into a ChangeLogEntry suitable writing to a StateStore Please not that the Key on the entry will be left uninitialized. Usage:

entry := codec.EncodeJsonChangeLogEntryValue("myType", myItem)
entry.WriteKeyString(myItem.Key)

func NewChangeLogEntry

func NewChangeLogEntry() ChangeLogEntry

func (ChangeLogEntry) KeyWriter

func (cle ChangeLogEntry) KeyWriter() *bytes.Buffer

func (ChangeLogEntry) ValueWriter

func (cle ChangeLogEntry) ValueWriter() *bytes.Buffer

func (ChangeLogEntry) WithEntryType

func (cle ChangeLogEntry) WithEntryType(entryType string) ChangeLogEntry

func (ChangeLogEntry) WithHeader added in v1.0.5

func (cle ChangeLogEntry) WithHeader(key string, value []byte) ChangeLogEntry

func (ChangeLogEntry) WithKey

func (cle ChangeLogEntry) WithKey(key ...[]byte) ChangeLogEntry

func (ChangeLogEntry) WithKeyString

func (cle ChangeLogEntry) WithKeyString(key ...string) ChangeLogEntry

func (ChangeLogEntry) WithValue

func (cle ChangeLogEntry) WithValue(value ...[]byte) ChangeLogEntry

func (ChangeLogEntry) WriteKey

func (cle ChangeLogEntry) WriteKey(bs ...[]byte)

func (ChangeLogEntry) WriteKeyString

func (cle ChangeLogEntry) WriteKeyString(ss ...string)

func (ChangeLogEntry) WriteValue

func (cle ChangeLogEntry) WriteValue(bs ...[]byte)

func (ChangeLogEntry) WriteValueString

func (cle ChangeLogEntry) WriteValueString(ss ...string)

type ChangeLogReceiver

type ChangeLogReceiver interface {
	ReceiveChange(IncomingRecord) error
}

type CleanupPolicy

type CleanupPolicy int
const (
	CompactCleanupPolicy CleanupPolicy = iota
	DeleteCleanupPolicy
)

type Cluster

type Cluster interface {
	// Returns the list of kgo.Opt(s) that will be used whenever a connection is made to this cluster.
	// At minimum, it should return the kgo.SeedBrokers() option.
	Config() ([]kgo.Opt, error)
}

An interface for implementing a resusable Kafka client configuration. TODO: document reserved options

type Codec

type Codec[T any] interface {
	Encode(*bytes.Buffer, T) error
	Decode([]byte) (T, error)
}
var ByteCodec Codec[[]byte] = byteCodec{}

Convenience codec for working with raw `[]byte`s

var StringCodec Codec[string] = stringCodec{}

Convenience codec for working with strings.

type DeserializationErrorHandler

type DeserializationErrorHandler func(ec ErrorContext, eventType string, err error) ErrorResponse

type Destination

type Destination struct {
	// The topic to use for records being produced which have empty topic data
	DefaultTopic string
	// Optional, used in CreateDestination call.
	NumPartitions int
	// Optional, used in CreateDestination call.
	ReplicationFactor int
	// Optional, used in CreateDestination call.
	MinInSync int
	// The Kafka cluster where this destination resides.
	Cluster Cluster
}

func CreateDestination

func CreateDestination(destination Destination) (resolved Destination, err error)

type EosConfig

type EosConfig struct {
	// PoolSize is the number of transactional producer clients in the pool.
	PoolSize int
	// The maximum number of pending transactions to be allowed in the pool at any given point in time.
	PendingTxnCount int
	// TargetBatchSize is the target number of events or records (whichever is greater) for a transaction before a commit is attempted.
	TargetBatchSize int
	// MaxBatchSize is the maximum number of events or records (whichever is greater) for a transaction before it will stop accepting new events.
	// Once a transaction reaches MaxBatchSize, it  ust be commited.
	MaxBatchSize int
	// The maximum amount of time to wait before committing a transaction. Once this time has elapsed, the transaction will commit
	// even if TargetBatchSize has not been achieved. This number will be the tail latency of the consume/produce cycle during periods of low activity.
	// Under high load, this setting has little impact unless set too low. If this value is too low, produce batch sizes will be extremely small a
	// and Kafka will need to manage an excessive number of transactions.
	// The recommnded value is 10ms and the minimum allowed value is 1ms.
	BatchDelay time.Duration
}

EosDiagarm

 On-Deck Txn            Pending Txn Channel          Commit Go-Routine
┌───────────┐           ┌─────────────────┐          ┌─────────────────────────────────────┐
│ EventCtx  │           │  Pending Txn(s) │          │  Committing Txn                     │
│ Offset: 7 │           │  ┌───────────┐  │          │  ┌───────────┐                      │
├───────────┤           │  │ EventCtx  │  │          │  │ EventCtx  │  1: Receive Txn      │
│ EventCtx  │           │  │ Offset: 4 │  │          │  │ Offset: 1 │                      │
│ Offset: 8 │           │  ├───────────┤  │          │  ├───────────┤  2: EventCtx(s).Wait │
├───────────┼──────────►│  │ EventCtx  │  ├─────────►│  │ EventCtx  │                      │
│ EventCtx  │           │  │ Offset: 5 │  │          │  │ Offset: 2 │  3: Flush Records    │
│ Offset: 9 │           │  ├───────────┤  │          │  ├───────────┤                      │
└───────────┘           │  │ EventCtx  │  │          │  │ EventCtx  │  4: Commit           │
      ▲                 │  │ Offset: 6 │  │          │  │ Offset: 3 │                      │
      │                 │  └───────────┘  │          │  └───────────┘                      │
      │                 └─────────────────┘          └─────────────────────────────────────┘
Incoming EventCtx

func (EosConfig) IsZero

func (cfg EosConfig) IsZero() bool

IsZero returns true if EosConfig is uninitialized, or all values equal zero. Used to determine whether the EventSource should fall back to DefaultEosConfig.

type ErrorContext

type ErrorContext interface {
	TopicPartition() TopicPartition
	Offset() int64
	Input() (IncomingRecord, bool)
}

type ErrorResponse

type ErrorResponse int

in structs GKES and how to proceed when an error is encountered.

const (
	// Instructs GKES to ignore any error stateand continue processing as normal. If this is used in response to
	// Kafka transaction error, there will likely be data loss or corruption. This ErrorResponse is not recommended as it is unlikely that
	// a consumer will be able to recover gracefully from a transaction error. In almost all situations, FailConsumer is preferred.
	Continue ErrorResponse = iota

	// Instructs GKES to immediately stop processing and the consumer to immediately leave the group.
	// This is preferable to a FatallyExit as Kafka will immediatly recognize the consumer as exiting the group
	// (if there is still comminication with the cluster) and processing of the
	// failed partitions will begin without waiting for the session timeout value.
	FailConsumer

	// As the name implies, the application will fatally exit. The partitions owned by this consumer will not be reassigned until the configured
	// session timeout on the broker.
	FatallyExit
)

func DefaultDeserializationErrorHandler

func DefaultDeserializationErrorHandler(ec ErrorContext, eventType string, err error) ErrorResponse

The default DeserializationErrorHandler. Simply logs the error and returns Continue.

func DefaultTxnErrorHandler

func DefaultTxnErrorHandler(err error) ErrorResponse

The default and recommended TxnErrorHandler. Returns FailConsumer on txn errors.

type EventContext

type EventContext[T any] struct {
	// contains filtered or unexported fields
}

Contains information about the current event. Is passed to EventProcessors and Interjections

func MockEventContext added in v1.0.4

func MockEventContext[T any](ctx context.Context, input *Record, stateStoreTopc string, store T, asyncCompleter AsyncCompleter[T], producer EventContextProducer[T]) *EventContext[T]

A convenience function for creating unit tests for an EventContext from an incoming Kafka Record. All arguments other than `ctx` are optional unless you are interacting with those resources. For example, if you call EventContext.Forward/RecordChange, you will need to provide a mock producer. If you run the EventContext through an async process, you will need to provide a mock AsyncCompleter.

func TestMyHandler(t *testing.T) {
	eventContext := streams.MockEventContext(context.TODO(), mockRecord(), "storeTopic", mockStore(), mockCompleter(), mockProducer())
	if err := testMyHandler(eventContext, eventContext.Input()); {
		t.Error(err)
	}
}

func MockInterjectionEventContext added in v1.0.4

func MockInterjectionEventContext[T any](ctx context.Context, topicPartition TopicPartition, stateStoreTopc string, store T, asyncCompleter AsyncCompleter[T], producer EventContextProducer[T]) *EventContext[T]

A convenience function for creating unit tests for an EventContext from an interjection. All arguments other than `ctx` are optional unless you are interacting with those resources. For example, if you call EventContext.Forward/RecordChange, you will need to provide a mock producer. If you run the EventContext through an async process, you will need to provide a mock AsyncCompleter.

func TestMyInterjector(t *testing.T) {
	eventContext := streams.MockInterjectionEventContext(context.TODO(), myTopicPartition, "storeTopic", mockStore(), mockCompleter(), mockProducer())
	if err := testMyInterjector(eventContext, time.Now()); {
		t.Error(err)
	}
}

func (*EventContext[T]) AsyncJobComplete

func (ec *EventContext[T]) AsyncJobComplete(finalize func() ExecutionState)

AsyncJobComplete should be called when an async event processor has performed it's function. the finalize cunction should return Complete if there are no other pending asynchronous jobs for the event context in question, regardless of error state. `finalize` does no accept any arguments, so you're callback should encapsulate any pertinent data needed for processing. If you are using [ ], AsyncJobScheduler or BatchProducer, you should not need to interact with this method directly.

func (*EventContext[T]) Fail added in v1.0.6

func (ec *EventContext[T]) Fail(err error) ExecutionState

Fail the event with the given error and return Fail, which has the effect of marking the consumer as unhealthy and shutting it down.

func (*EventContext[T]) Forward

func (ec *EventContext[T]) Forward(records ...*Record)

Forwards produces records on the transactional producer for your EventSource. If the transaction fails, records produced in this fashion will not be visible to other consumers who have a fetch isolation of `read_commited`. An isolation level of `read_commited“ is required for Exactly Once Semantics

It is important to note that GKES uses a Record pool. After the transaction has completed for this record, it is returned to the pool for reuse. Your application should not hold on to references to the Record(s) after Forward has been invoked.

func (*EventContext[T]) Input

func (ec *EventContext[T]) Input() (IncomingRecord, bool)

Return the raw input record for this event or an uninitialized record and false if the EventContect represents an Interjections

func (*EventContext[T]) IsInterjection

func (ec *EventContext[T]) IsInterjection() bool

Returns true if this EventContext represents an Interjection

func (*EventContext[T]) Offset

func (ec *EventContext[T]) Offset() int64

The offset for this event, -1 for an Interjection

func (*EventContext[T]) RecordChange

func (ec *EventContext[T]) RecordChange(entries ...ChangeLogEntry)

Forwards records to the transactional producer for your EventSource. When you add an item to your StateStore, you must call this method for that change to be recorded in the stream. This ensures that when the TopicPartition for this change is tansferred to a new consumer, it will also have this change. If the transaction fails, records produced in this fashion will not be visible to other consumers who have a fetch isolation of `read_commited`. An isolation level of `read_commited“ is required for Exactly Once Semantics

It is important to note that GKES uses a Record pool. After the transaction has completed for this record, it is returned to the pool for reuse. Your application should not hold on to references to the ChangeLogEntry(s) after RecordChange has been invoked.

func (*EventContext[T]) Store

func (ec *EventContext[T]) Store() T

Returns the StateStore for this event/TopicPartition

func (*EventContext[T]) TopicPartition

func (ec *EventContext[T]) TopicPartition() TopicPartition

The TopicParition for this event. It is present for both normal events and Interjections

type EventContextProducer added in v1.0.4

type EventContextProducer[T any] interface {
	ProduceRecord(*EventContext[T], *Record, func(*Record, error))
}

type EventProcessor

type EventProcessor[T any, V any] func(*EventContext[T], V) ExecutionState

A callback invoked when a new record has been received from the EventSource, after it has been transformed via IncomingRecordTransformer.

type EventSource

type EventSource[T StateStore] struct {
	// contains filtered or unexported fields
}

EventSource provides an abstraction over raw kgo.Record/streams.IncomingRecord consumption, allowing the use of strongly typed event handlers. One of the key features of the EventSource is to allow for the routing of events based off of a type header. See RegisterEventType for details.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aws/go-kafka-event-source/streams"
	"github.com/aws/go-kafka-event-source/streams/sak"
	"github.com/aws/go-kafka-event-source/streams/stores"
)

type Contact struct {
	Id          string
	PhoneNumber string
	Email       string
	FirstName   string
	LastName    string
	LastContact time.Time
}

type NotifyContactEvent struct {
	ContactId        string
	NotificationType string
}

func (c Contact) Key() string {
	return c.Id
}

func createContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState {
	contactStore := ctx.Store()
	ctx.RecordChange(contactStore.Put(contact))
	fmt.Printf("Created contact: %s\n", contact.Id)
	return streams.Complete
}

func deleteContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState {
	contactStore := ctx.Store()
	if entry, ok := contactStore.Delete(contact); ok {
		ctx.RecordChange(entry)
		fmt.Printf("Deleted contact: %s\n", contact.Id)
	}
	return streams.Complete
}

func notifyContact(ctx *streams.EventContext[ContactStore], notification NotifyContactEvent) streams.ExecutionState {
	contactStore := ctx.Store()
	if contact, ok := contactStore.Get(notification.ContactId); ok {
		fmt.Printf("Notifying contact: %s by %s\n", contact.Id, notification.NotificationType)
	} else {
		fmt.Printf("Contact %s does not exist!\n", notification.ContactId)
	}
	return streams.Complete
}

// simply providing an example of how you might wrap the store into your own type
type ContactStore struct {
	*stores.SimpleStore[Contact]
}

func NewContactStore(tp streams.TopicPartition) ContactStore {
	return ContactStore{stores.NewJsonSimpleStore[Contact](tp)}
}

func main() {
	streams.InitLogger(streams.SimpleLogger(streams.LogLevelError), streams.LogLevelError)

	contactsCluster := streams.SimpleCluster([]string{"127.0.0.1:9092"})
	sourceConfig := streams.EventSourceConfig{
		GroupId:       "ExampleEventSourceGroup",
		Topic:         "ExampleEventSource",
		NumPartitions: 10,
		SourceCluster: contactsCluster,
	}

	destination := streams.Destination{
		Cluster:      sourceConfig.SourceCluster,
		DefaultTopic: sourceConfig.Topic,
	}

	eventSource := sak.Must(streams.NewEventSource(sourceConfig, NewContactStore, nil))

	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], createContact, "CreateContact")
	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], deleteContact, "DeleteContact")
	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[NotifyContactEvent], notifyContact, "NotifyContact")

	eventSource.ConsumeEvents()

	contact := Contact{
		Id:          "123",
		PhoneNumber: "+18005551212",
		FirstName:   "Billy",
		LastName:    "Bob",
	}

	notification := NotifyContactEvent{
		ContactId:        "123",
		NotificationType: "email",
	}

	producer := streams.NewProducer(destination)

	createContactRecord := streams.JsonItemEncoder("CreateContact", contact)
	createContactRecord.WriteKeyString(contact.Id)

	deleteContactRecord := streams.JsonItemEncoder("DeleteContact", contact)
	deleteContactRecord.WriteKeyString(contact.Id)

	notificationRecord := streams.JsonItemEncoder("NotifyContact", notification)
	notificationRecord.WriteKeyString(notification.ContactId)

	producer.Produce(context.Background(), createContactRecord)
	producer.Produce(context.Background(), notificationRecord)
	producer.Produce(context.Background(), deleteContactRecord)
	producer.Produce(context.Background(), notificationRecord)

	eventSource.WaitForSignals(nil)
	// Expected  Output: Created contact: 123
	// Notifying contact: 123 by email
	// Deleted contact: 123
	// Contact 123 does not exist!
}
Output:

func NewEventSource

func NewEventSource[T StateStore](sourceConfig EventSourceConfig, stateStoreFactory StateStoreFactory[T], defaultProcessor EventProcessor[T, IncomingRecord],
	additionalClientOptions ...kgo.Opt) (*EventSource[T], error)

Create an EventSource. `defaultProcessor` will be invoked if a suitable EventProcessor can not be found, or the IncomingRecord has no RecordType header. `additionalClientoptions` allows you to add additional options to the underlying kgo.Client. There are some restrictions here however. The following options are reserved:

kgo.Balancers
kgo.ConsumerGroup
kgo.ConsumeTopics
kgo.OnPartitionsAssigned
kgo.OnPartitionsRevoked
kgo.AdjustFetchOffsetsFn

In addition, if you wish to set a TopicPartitioner for use in EventContext.Forward(), the partitioner must be of the supplied OptionalPartitioner as StateStore entries require manual partitioning and are produced on the same client as used by the EventContext for producing records. The default partitioner is initialized as follows, which should give parity with the canonical Java murmur2 partitioner:

kgo.RecordPartitioner(NewOptionalPartitioner(kgo.StickyKeyPartitioner(nil)))

func (*EventSource[T]) ConsumeEvents

func (es *EventSource[T]) ConsumeEvents()

ConsumeEvents starts the underlying Kafka consumer. This call is non-blocking, so if called from main(), it should be followed by some other blocking call to prevent the application from exiting. See streams.EventSource.WaitForSignals for an example.

func (*EventSource[T]) Done

func (es *EventSource[T]) Done() <-chan struct{}

Done blocks while the underlying Kafka consumer is active.

func (*EventSource[T]) EmitMetric

func (es *EventSource[T]) EmitMetric(m Metric)

func (*EventSource[T]) ForkRunStatus added in v1.0.5

func (es *EventSource[T]) ForkRunStatus() sak.RunStatus

func (*EventSource[T]) Interject

func (es *EventSource[T]) Interject(partition int32, cmd Interjector[T]) <-chan error

Executes `cmd` in the context of the given partition.

func (*EventSource[T]) InterjectAll

func (es *EventSource[T]) InterjectAll(interjector Interjector[T])

InterjectAll is a convenience function which allows you to Interject into every active partition assigned to the consumer without create an individual timer per partition. The equivalent of calling Interject() on each active partition, blocking until all are performed. It is worth noting that the interjections are run in parallel, so care must be taken not to create a deadlock between partitions via locking mechanisms such as a Mutex. If parallel processing is not of concern, streams.EventSource.InterjectAllSync is an alternative. Useful for gathering store statistics, but can be used in place of a standard Interjection. Example:

preCount := int64(0)
postCount := int64(0)
eventSource.InterjectAllAsync(func (ec *EventContext[myStateStore], when time.Time) streams.ExecutionState {
	store := ec.Store()
	atomic.AddInt64(&preCount, int64(store.Len()))
	store.performBookeepingTasks()
	atomic.AddInt64(&postCount, int64(store.Len()))
	return streams.Complete
})
fmt.Printf("Number of items before: %d, after: %d\n", preCount, postCount)

func (*EventSource[T]) InterjectAllSync

func (es *EventSource[T]) InterjectAllSync(interjector Interjector[T])

InterjectAllSync performs the same function as streams.EventSource.InterjectAll, however it blocks on each iteration. It may be useful if parallel processing is not of concern andyou want to avoid locking on a shared data structure. Example:

itemCount := 0
eventSource.InterjectAll(func (ec *EventContext[myStateStore], when time.Time) streams.ExecutionState {
	store := ec.Store()
	itemCount += store.Len()
	return streams.Complete
})
fmt.Println("Number of items: ", itemCount)

func (*EventSource[T]) ScheduleInterjection

func (es *EventSource[T]) ScheduleInterjection(interjector Interjector[T], every, jitter time.Duration)

ScheduleInterjection sets a timer for `interjector` to be run `every` time interval, plus or minues a random time.Duration not greater than the absolute value of `jitter` on every invocation. `interjector` will have access to EventContext.Store() and can create/delete store items, or forward events just as a standard EventProcessor. Example:

 func cleanupStaleItems(ec *EventContext[myStateStore], when time.Time)  streams.ExecutionState {
	ec.Store().cleanup(when)
	return ec.Complete
 }
 // schedules cleanupStaleItems to be executed every 900ms - 1100ms
 eventSource.ScheduleInterjection(cleanupStaleItems, time.Second, 100 * time.Millisecond)

func (*EventSource[T]) Source added in v1.0.2

func (es *EventSource[T]) Source() *Source

The Source used by the EventSource.

func (*EventSource[T]) State

func (es *EventSource[T]) State() EventSourceState

Returns the EventSourceState of the underlying Source, Healthy or Unhealthy. When the EventSource encounters an unrecoverable error (unable to execute a transaction for example), it will enter an Unhealthy state. Intended to be used by a health check processes for rolling back during a bad deployment.

func (*EventSource[T]) Stop

func (es *EventSource[T]) Stop()

Signals the underlying *kgo.Client that the underlying consumer should exit the group. If you are using an IncrementalGroupRebalancer, this will trigger a graceful exit where owned partitions are surrendered according to it's configuration. If you are not, this call has the same effect as streams.EventSource.StopNow.

Calls to Stop are not blocking. To block during the shut down process, this call should be followed by `<-eventSource.Done()`

To simplify running from main(), the streams.EventSource.WaitForSignals and streams.EventSource.WaitForChannel calls have been provided. So unless you have extremely complex application shutdown logic, you should not need to interact with this method directly.

func (*EventSource[T]) StopNow

func (es *EventSource[T]) StopNow()

Immediately stops the underlying consumer *kgo.Client by invoking sc.client.Close() This has the effect of immediately surrendering all owned partitions, then closing the client. If you are using an IncrementalGroupRebalancer, this can be used as a force quit.

func (*EventSource[T]) WaitForChannel

func (es *EventSource[T]) WaitForChannel(c chan struct{}, callback func())

WaitForChannel is similar to WaitForSignals, but blocks on a `chan struct{}` then invokes `callback` when finished. Useful when you have multiple EventSources in a single application. Example:

func main() {

	myEventSource1 := initEventSource1()
	myEventSource2.ConsumeEvents()

	myEventSource2 := initEventSource2()
	myEventSource2.ConsumeEvents()

	wg := &sync.WaitGroup{}
	wg.Add(2)

	eventSourceChannel = make(chan struct{})

	go myEventSource1.WaitForChannel(eventSourceChannel, wg.Done)
	go myEventSource2.WaitForChannel(eventSourceChannel, wg.Done)

	osChannel := make(chan os.Signal)
	signal.Notify(osChannel, syscall.SIGINT, syscall.SIGHUP)
	<-osChannel
	close(eventSourceChannel)
	wg.Wait()
	fmt.Println("exiting")
}

func (*EventSource[T]) WaitForSignals

func (es *EventSource[T]) WaitForSignals(preHook func(os.Signal) bool, signals ...os.Signal)

WaitForSignals is convenience function suitable for use in a main() function. Blocks until `signals` are received then gracefully closes the consumer by calling streams.EventSource.Stop. If `signals` are not provided, syscall.SIGINT and syscall.SIGHUP are used. If `preHook` is non-nil, it will be invoked before Stop() is invoked. If the preHook returns false, this call continues to block. If true is returned, `signal.Reset(signals...)` is invoked and the consumer shutdown process begins. Simple example:

func main(){
	myEventSource := initEventSource()
	myEventSource.ConsumeEvents()
	myEventSource.WaitForSignals(nil)
	fmt.Println("exiting")
}

Prehook example:

func main(){
	myEventSource := initEventSource()
	myEventSource.ConsumeEvents()
	myEventSource.WaitForSignals(func(s os.Signal) bool {
		fmt.Printf("starting shutdown from signal %v\n", s)
		shutDownSomeOtherProcess()
		return true
	})
	fmt.Println("exiting")
}

In this example, The consumer will close on syscall.SIGINT or syscall.SIGHUP but not syscall.SIGUSR1:

func main(){
	myEventSource := initEventSource()
	myEventSource.ConsumeEvents()
	myEventSource.WaitForSignals(func(s os.Signal) bool {
		if s == syscall.SIGUSR1 {
			fmt.Println("user signal received")
			performSomeTask()
			return false
		}
		return true
	}, syscall.SIGINT and syscall.SIGHUP, syscall.SIGUSR1)
	fmt.Println("exiting")
}

type EventSourceConfig

type EventSourceConfig struct {
	// The group id for the underlying Kafka consumer group.
	GroupId string
	// The Kafka topic to consume
	Topic string
	// The compacted Kafka topic on which to publish/consume [StateStore] data. If not provided, GKES will generate a name which includes
	// Topic and GroupId.
	StateStoreTopic string
	// The desired number of partitions for Topic.
	NumPartitions int
	// The desired replication factor for Topic. Defaults to 1.
	ReplicationFactor int
	// The desired min-insync-replicas for Topic. Defaults to 1.
	MinInSync int
	// The number of Kafka partitions to use for the applications commit log. Defaults to 5 if unset.
	CommitLogPartitions int
	// The Kafka cluster on which Topic resides, or the source of incoming events.
	SourceCluster Cluster
	// StateCluster is the Kafka cluster on which the commit log and the StateStore topic resides. If left unset (recommended), defaults to SourceCluster.
	StateCluster Cluster
	// The consumer rebalance strategies to use for the underlying Kafka consumer group.
	BalanceStrategies []BalanceStrategy
	/*
		CommitOffsets should be set to true if you are migrating from a traditional consumer group.
		This will ensure that the offsets are commited to the consumer group
		when in a mixed fleet scenario (migrating into an EventSource from a standard consumer).
		If the deploytment fails, the original non-EventSource application can then
		resume consuming from the commited offsets. Once the EventSource application is well-established,
		this setting should be switched to false as offsets are managed by another topic.
		In a EventSource application, committing offsets via the standard mechanism only
		consumes resources and provides no benefit.
	*/
	CommitOffsets bool
	/*
		The config used for the eos producer pool. If empty, [DefaultEosConfig] is used. If an EventSource is initialized with an invalid
		[EosConfig], the application will panic.
	*/
	EosConfig EosConfig
	// If non-nil, the EventSorce will emit [Metric] objects of varying types. This is backed by a channel. If the channel is full
	// (presumably because the MetricHandler is not able to keep up),
	// GKES will drop the metric and log at WARN level to prevent processing slow down.
	MetricsHandler MetricsHandler

	// Called when a partition has been assigned to the EventSource consumer client. This does not indicate that the partion is being processed.
	OnPartitionAssigned SourcePartitionEventHandler

	// Called when a perviously assigned partition has been activated, meaning the EventSource will start processing events for this partition. At the time this handler is called, the  StateStore associated with this partition has been bootstrapped and is ready for use.
	OnPartitionActivated SourcePartitionEventHandler

	// Called when a partition is about to be revoked from the EventSource consumer client.
	// This is a blocking call and, as such, should return quickly.
	OnPartitionWillRevoke SourcePartitionEventHandler
	// Called when a partition has been revoked from the EventSource consumer client.
	// This handler is invoked after GKES has stopped processing and has finished removing any associated resources for the partition.
	OnPartitionRevoked SourcePartitionEventHandler
	// The error handler invoked when record deserilaization errors occur for a given [EventSource]
	DeserializationErrorHandler DeserializationErrorHandler
	// The error handler invoked when errors occur in the transactionl producer for a given [EventSource]
	TxnErrorHandler TxnErrorHandler
}

type EventSourceState

type EventSourceState uint64
const (
	Healthy EventSourceState = iota
	Unhealthy
)

type ExecutionState

type ExecutionState int

Returned by an EventProcessor or Interjector in response to an EventContext. ExecutionState should not be conflated with concepts of error state, such as Success or Failure.

const (
	// Complete signals the EventSource that the event or interjection is completely processed.
	// Once Complete is returned, the offset for the associated EventContext will be commited.
	Complete ExecutionState = 0
	// Incomplete signals the EventSource that the event or interjection is still ongoing, and
	// that your application promises to fulfill the EventContext in the future.
	// The offset for the associated EventContext will not be commited.
	Incomplete ExecutionState = 1

	// Signifies that the consumer should no longer continue processing events. The EvntSource will go into an unhealthy state and execute [StopNow]
	Fail ExecutionState = 2

	// Signifies that the consumer should panic.
	Fatal ExecutionState = 3
)

func DefaultProcessingErrorHandler added in v1.0.6

func DefaultProcessingErrorHandler(userData any, err error) ExecutionState

The default ProcessingErrorHandler. Invoked by EventContext.Fail and returns Complete.

type GlobalChangeLog

type GlobalChangeLog[T ChangeLogReceiver] struct {
	// contains filtered or unexported fields
}

A GlobalChangeLog is simply a consumer which continously consumes all partitions within the given topic and forwards all records to it's StateStore. GlobalChangeLogs can be useful for sharing small amounts of data between a group of hosts. For example, GKES uses a global change log to keep track of consumer group offsets.

func NewGlobalChangeLog

func NewGlobalChangeLog[T ChangeLogReceiver](cluster Cluster, receiver T, numPartitions int, topic string, cleanupPolicy CleanupPolicy) GlobalChangeLog[T]

Creates a NewGlobalChangeLog consumer and forward all records to `receiver`.

func NewGlobalChangeLogWithRunStatus added in v1.0.5

func NewGlobalChangeLogWithRunStatus[T ChangeLogReceiver](runStatus sak.RunStatus, cluster Cluster, receiver T, numPartitions int, topic string, cleanupPolicy CleanupPolicy) GlobalChangeLog[T]

Creates a NewGlobalChangeLog consumer and forward all records to `receiver`.

func (GlobalChangeLog[T]) Pause

func (cl GlobalChangeLog[T]) Pause(partition int32)

Pauses consumption of a partition.

func (GlobalChangeLog[T]) PauseAllPartitions

func (cl GlobalChangeLog[T]) PauseAllPartitions()

Pauses consumption of all partitions.

func (GlobalChangeLog[T]) ResumePartitionAt

func (cl GlobalChangeLog[T]) ResumePartitionAt(partition int32, offset int64)

Resumes consumption of a partition at offset.

func (GlobalChangeLog[T]) Start

func (cl GlobalChangeLog[T]) Start()

func (GlobalChangeLog[T]) Stop

func (cl GlobalChangeLog[T]) Stop()

type IncomingRecord

type IncomingRecord struct {
	// contains filtered or unexported fields
}

func (IncomingRecord) HeaderValue

func (r IncomingRecord) HeaderValue(name string) []byte

func (IncomingRecord) Headers

func (r IncomingRecord) Headers() []kgo.RecordHeader

func (IncomingRecord) Key

func (r IncomingRecord) Key() []byte

func (IncomingRecord) LeaderEpoch

func (r IncomingRecord) LeaderEpoch() int32

func (IncomingRecord) Offset

func (r IncomingRecord) Offset() int64

func (IncomingRecord) RecordType

func (r IncomingRecord) RecordType() string

func (IncomingRecord) Timestamp

func (r IncomingRecord) Timestamp() time.Time

func (IncomingRecord) TopicPartition

func (r IncomingRecord) TopicPartition() TopicPartition

func (IncomingRecord) Value

func (r IncomingRecord) Value() []byte

type IncomingRecordDecoder

type IncomingRecordDecoder[V any] func(IncomingRecord) (V, error)

A callback invoked when a new record has been received from the EventSource.

type IncrGroupMemberInstructions

type IncrGroupMemberInstructions struct {
	Prepare []TopicPartition
	Forget  []TopicPartition // not currently used
}

type IncrGroupMemberMeta

type IncrGroupMemberMeta struct {
	Preparing []TopicPartition
	Ready     []TopicPartition
	Status    MemberStatus
	LeftAt    int64
}

type IncrGroupPartitionState

type IncrGroupPartitionState struct {
	Preparing []TopicPartition
	Ready     []TopicPartition
}

type IncrRebalanceInstructionHandler

type IncrRebalanceInstructionHandler interface {
	// Called by the IncrementalGroupRebalancer. Signals the instruction handler that this partition is destined for this consumer.
	// In the case of the EventSource, prepartion involves pre-populating the StateStore for this partition.
	PrepareTopicPartition(tp TopicPartition)
	// Called by the IncrementalGroupRebalancer. Signals the instruction handler that it is safe to forget this previously prepped TopicPartition.
	ForgetPreparedTopicPartition(tp TopicPartition)
	// // Called by the IncrementalGroupRebalancer. A valid *kgo.Client, which is on the same cluster as the Source.Topic, must be returned.
	Client() *kgo.Client
}

Defines the interface needed for the IncrementalGroupRebalancer to function. EventSource fulfills this interface. If you are using EventSource, there is nothing else for you to implement.

type IncrementalGroupRebalancer

type IncrementalGroupRebalancer interface {
	kgo.GroupBalancer
	// Must be called by the InstuctionHandler once a TopicPartition is ready for consumption
	PartitionPrepared(TopicPartition)
	// Must be called by the InstuctionHandler if it fails to prepare a TopicPartition it was previously instructed to prepare
	PartitionPreparationFailed(TopicPartition)
	// Must be called by the InstuctionHandler once it receives an assignment
	PartitionsAssigned(...TopicPartition)
	// Must be called by the InstuctionHandler if it wishes to leave the consumer group in a graceful fashion
	GracefullyLeaveGroup() <-chan struct{}
}

The IncrementalGroupRebalancer interface is an extension to kgo.GroupBalancer. This balancer allows for slowly moving partions during consumer topology changes. This helps reduce blast radius in the case of failures, as well as keep the inherent latency penalty of trasnistioning partitions to a minumum.

func IncrementalRebalancer

func IncrementalRebalancer(instructionHandler IncrRebalanceInstructionHandler) IncrementalGroupRebalancer

Creates an IncrementalRebalancer suitatble for use by the kgo Kafka driver. In most cases, the instructionHandler is the EventSource. `activeTransitions` defines how many partitons may be in receivership at any given point in time.

Example, when `activeTransitions` is 1 and the grpoup stat is imbalanced (a new member is added or a member signals it wishes to leave the group), the IncrementalGroupRebalancer will choose 1 partition to move. Once the receiver of that partition signals it is ready for the partition, it will assign it, then choose anothe partion to move. This process continues until the group has reached a balanced state.

In all cases, any unassigned partitions will be assigned immediately. If a consumer host crashes, for example, it's partitions will be assigned immediately, regardless of preparation state.

receivership - the state of being dealt with by an official receiver.

type Interjector

type Interjector[T any] func(*EventContext[T], time.Time) ExecutionState

Defines the method signature needed by the EventSource to perform a stream interjection. See EventSource.Interject.

type JsonCodec

type JsonCodec[T any] struct{}

A generic JSON en/decoder. Uses "github.com/json-iterator/go".ConfigCompatibleWithStandardLibrary for en/decoding JSON in a performant way

func (JsonCodec[T]) Decode

func (JsonCodec[T]) Decode(b []byte) (T, error)

Decodes the provided []byte,

func (JsonCodec[T]) Encode

func (JsonCodec[T]) Encode(b *bytes.Buffer, t T) error

Encodes the provided value.

type LogLevel

type LogLevel int
const (
	LogLevelNone LogLevel = iota
	LogLevelTrace
	LogLevelDebug
	LogLevelInfo
	LogLevelWarn
	LogLevelError
)

type Logger

type Logger interface {
	Tracef(msg string, args ...any)
	Debugf(msg string, args ...any)
	Infof(msg string, args ...any)
	Warnf(msg string, args ...any)
	Errorf(msg string, args ...any)
}

Provides the interface needed by GKES to intergrate with your loggin mechanism. Example:

 import (
	"mylogger"
	"github.com/aws/go-kafka-event-source/streams"
 )

 func main() {
	// GKES will emit log at whatever level is defined by NewLogger()
	// kgo will emit logs at LogLevelError
	streams.InitLogger(mylogger.NewLogger(), streams.LogLevelError)
 }

func InitLogger

func InitLogger(l Logger, kafkaDriverLogLevel LogLevel) Logger

Initializes the GKES logger. `kafkaDriverLogLevel` defines the log level for the underlying kgo clients. This call should be the first interaction with the GKES module. Subsequent calls will have no effect. If never called, the default unitialized logger writes to STDOUT at LogLevelError for both GKES and kgo. Example:

 import "github.com/aws/go-kafka-event-source/streams"

 func main() {
	streams.InitLogger(streams.SimpleLogger(streams.LogLevelInfo), streams.LogLevelError)
	// ... initialize your application
 }

func WrapLogger

func WrapLogger(logger Logger, level LogLevel) Logger

WrapLogger allows GKES to emit logs at a higher level than your own Logger. Useful if you need debug level logging for your own application, but want to cluuter your logs with gstream output. Example:

 import (
	"mylogger"
	"github.com/aws/go-kafka-event-source/streams"
 )

 func main() {
	// your application will emit logs at "Debug"
	// GKES will emit logs at LogLevelError
	// kgo will emit logs at LogLevelNone
	gkesLogger := streams.WrapLogger(mylogger.NewLogger("Debug"), streams.LogLevelError)
	streams.InitLogger(gkesLogger, streams.LogLevelNone)
 }

type MemberStatus

type MemberStatus int

The status of a consumer group member.

const (
	ActiveMember MemberStatus = iota
	InactiveMember
	Defunct
)

type Metric

type Metric struct {
	StartTime      time.Time
	ExecuteTime    time.Time
	EndTime        time.Time
	Count          int
	Bytes          int
	PartitionCount int
	Partition      int32
	Operation      string
	Topic          string
	GroupId        string
}

func (Metric) Duration

func (m Metric) Duration() time.Duration

func (Metric) ExecuteDuration

func (m Metric) ExecuteDuration() time.Duration

func (Metric) Linger

func (m Metric) Linger() time.Duration

type MetricsHandler

type MetricsHandler func(Metric)

type OptionalPartitioner

type OptionalPartitioner struct {
	// contains filtered or unexported fields
}

func NewOptionalPartitioner

func NewOptionalPartitioner(partitioner kgo.Partitioner) OptionalPartitioner

A kgo compatible partitioner which respects Record partitions that are manually assigned. If the record partition is AutoAssign, the provided kgo.Partitioner will be used for partition assignment. Note: NewRecord will return a record with a partition of AutoAssign.

func NewOptionalPerTopicPartitioner added in v1.0.4

func NewOptionalPerTopicPartitioner(defaultPartitioner kgo.Partitioner, topicPartitioners map[string]kgo.Partitioner) OptionalPartitioner

A kgo compatible partitioner which respects Record partitions that are manually assigned. Allows you to set different partitioner per topic. If a topic is encountered that has not been defined, defaultPartitioner will be used.

func (OptionalPartitioner) ForTopic

func (op OptionalPartitioner) ForTopic(topic string) kgo.TopicPartitioner

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

A simple kafka producer

func NewProducer

func NewProducer(destination Destination, opts ...kgo.Opt) *Producer

Create a new Producer. Destination provides cluster connect information. Defaults options are: kgo.ProducerLinger(5 * time.Millisecond) and kgo.RecordPartitioner(NewOptionalPartitioner(kgo.StickyKeyPartitioner(nil)))

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, record *Record) (err error)

Produces a record, blocking until complete. If the record has not topic, the DefaultTopic of the producer's Destination will be used.

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(ctx context.Context, record *Record, callback func(*Record, error))

Produces a record asynchronously. If callback is non-nill, it will be executed `callback` when the call is complete. If the record has not topic, the DefaultTopic of the producer's Destination will be used.

type Record

type Record struct {
	// contains filtered or unexported fields
}

func JsonItemEncoder

func JsonItemEncoder[T any](recordType string, item T) *Record

A convenience function for encoding an item into a Record suitable for sending to a producer Please not that the Key on the record will be left uninitialized. Usage:

record := codec.JsonItemEncoder("myType", myItem)
record.WriteKeyString(myItem.Key)

func NewRecord

func NewRecord() *Record

func (*Record) AsIncomingRecord added in v1.0.4

func (r *Record) AsIncomingRecord() IncomingRecord

A convenience function for unit testing. This method should not need to be invoked in a production code.

func (*Record) Error

func (r *Record) Error() error

func (*Record) KeyWriter

func (r *Record) KeyWriter() *bytes.Buffer

func (*Record) Offset

func (r *Record) Offset() int64

func (*Record) Release

func (r *Record) Release()

func (*Record) ToKafkaRecord

func (r *Record) ToKafkaRecord() *kgo.Record

Creates a newly allocated kgo.Record. The Key and Value fields are freshly allocated bytes, copied from streams.Record.

func (*Record) TopicPartition added in v1.0.4

func (r *Record) TopicPartition() TopicPartition

func (*Record) ValueWriter

func (r *Record) ValueWriter() *bytes.Buffer

func (*Record) WithError added in v1.0.6

func (r *Record) WithError(err error) *Record

Should only be used for unit testing purposes

func (*Record) WithHeader

func (r *Record) WithHeader(key string, value []byte) *Record

func (*Record) WithKey

func (r *Record) WithKey(key ...[]byte) *Record

func (*Record) WithKeyString

func (r *Record) WithKeyString(key ...string) *Record

func (*Record) WithPartition

func (r *Record) WithPartition(partition int32) *Record

func (*Record) WithRecordType

func (r *Record) WithRecordType(recordType string) *Record

func (*Record) WithTopic

func (r *Record) WithTopic(topic string) *Record

func (*Record) WithValue

func (r *Record) WithValue(value ...[]byte) *Record

func (*Record) WriteKey

func (r *Record) WriteKey(bs ...[]byte)

func (*Record) WriteKeyString

func (r *Record) WriteKeyString(ss ...string)

func (*Record) WriteValue

func (r *Record) WriteValue(bs ...[]byte)

func (*Record) WriteValueString

func (r *Record) WriteValueString(ss ...string)

type SchedulerConfig

type SchedulerConfig struct {
	Concurrency, WorkerQueueDepth, MaxConcurrentKeys int
}

type SimpleCluster

type SimpleCluster []string

A Cluster implementation useful for local development/testing. Establishes a plain text connection to a Kafka cluster. For a more advanced example, see github.com/aws/go-kafka-event-source/msk.

cluster := streams.SimpleCluster([]string{"127.0.0.1:9092"})

func (SimpleCluster) Config

func (sc SimpleCluster) Config() ([]kgo.Opt, error)

Returns []kgo.Opt{kgo.SeedBrokers(sc...)}

type SimpleLogger

type SimpleLogger LogLevel

SimpleLogger implements Logger and writes to STDOUT. Good for development purposes.

func (SimpleLogger) Debugf

func (sl SimpleLogger) Debugf(msg string, args ...any)

func (SimpleLogger) Errorf

func (sl SimpleLogger) Errorf(msg string, args ...any)

func (SimpleLogger) Infof

func (sl SimpleLogger) Infof(msg string, args ...any)

func (SimpleLogger) Tracef

func (sl SimpleLogger) Tracef(msg string, args ...any)

func (SimpleLogger) Warnf

func (sl SimpleLogger) Warnf(msg string, args ...any)

type Source

type Source struct {
	// contains filtered or unexported fields
}

A readonly wrapper of EventSourceConfig. When an EventSource is initialized, it reconciles the actual Topic configuration (NumPartitions) from the Kafka cluster (or creates it if missing) and wraps the corrected EventSourceConfig.

func CreateSource

func CreateSource(sourceConfig EventSourceConfig) (resolved *Source, err error)

Creates all necessary topics in the Kafka appropriate clusters as defined by Source. Automatically invoked as part of NewSourceConsumer(). Ignores errros TOPIC_ALREADT_EXISTS errors. Returns a corrected Source where NumPartitions and CommitLogPartitions are pulled from a ListTopics call. This is to prevent drift errors. Returns an error if the details for Source topics could not be retrieved, or if there is a mismatch in partition counts fo the source topic and change log topic.

func (*Source) AsDestination

func (s *Source) AsDestination() Destination

A convenience method for creating a Destination form your Source. Can be used for creating a Producer or BatchProducer which publishes to your EventSource.

func (*Source) BalanceStrategies

func (s *Source) BalanceStrategies() []BalanceStrategy

func (*Source) CommitLogTopicNameForGroupId

func (s *Source) CommitLogTopicNameForGroupId() string

Returns the formatted topic name used for the commit log of Source

func (*Source) Config

func (s *Source) Config() EventSourceConfig

func (*Source) GroupId

func (s *Source) GroupId() string

func (*Source) NumPartitions

func (s *Source) NumPartitions() int

func (*Source) State

func (s *Source) State() EventSourceState

func (*Source) StateStoreTopicName added in v1.0.2

func (s *Source) StateStoreTopicName() string

Returns the formatted topic name used for the StateStore of Source

func (*Source) Topic

func (s *Source) Topic() string

type SourcePartitionEventHandler

type SourcePartitionEventHandler func(*Source, int32)

type StateStore

type StateStore interface {
	ReceiveChange(IncomingRecord) error
	Revoked()
}

type StateStoreFactory

type StateStoreFactory[T StateStore] TopicPartitionCallback[T]

A callback invoked when a new TopicPartition has been assigned to a EventSource. Your callback should return an empty StateStore.

type TopicPartition

type TopicPartition struct {
	Partition int32
	Topic     string
}

type TopicPartitionCallback

type TopicPartitionCallback[T any] func(TopicPartition) T

Defines a method which accepts a TopiCPartition argument and returns T

type TopicPartitionSet

type TopicPartitionSet struct {
	*btree.BTreeG[TopicPartition]
}

A convenience data structure. It is what the name implies, a Set of TopicPartitions. This data structure is not thread-safe. You will need to providde your own locking mechanism.

func NewTopicPartitionSet

func NewTopicPartitionSet() TopicPartitionSet

Returns a new, empty TopicPartitionSet.

func (TopicPartitionSet) Contains

func (tps TopicPartitionSet) Contains(tp TopicPartition) bool

Tertuens true if the tp is currently a member of TopicPartitionSet

func (TopicPartitionSet) Insert

func (tps TopicPartitionSet) Insert(tp TopicPartition) bool

Insert the TopicPartition. Returns true if the item was inserted, false if the item was aready present

func (TopicPartitionSet) Items

func (tps TopicPartitionSet) Items() []TopicPartition

Converts the set to a newly allocate slice of TopicPartitions.

func (TopicPartitionSet) Remove

func (tps TopicPartitionSet) Remove(tp TopicPartition) bool

Removes tp from the TopicPartitionSet. Rerurns true is the item was present.

type TxnErrorHandler

type TxnErrorHandler func(err error) ErrorResponse

Directories

Path Synopsis
package "sak" (Swiss Army knife) provides some basic util functions
package "sak" (Swiss Army knife) provides some basic util functions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL