kstreams

package module
v0.0.0-...-35ecdc5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2023 License: MIT Imports: 14 Imported by: 5

README

kstreams

kstreams is a stream processing library for go, inspired by kafka streams. This is currently under work, and a prototype is being written. Goals are still under work and not yet fully clear.

Goals

  • (Go 1.18 Generics-)Typed Processors, Stores, SerDes
  • State stores
  • Object storage backed state stores. Changelog topics as backup will be available as well. We encourage object storage for operational simplicity.
  • At-least-once-delivery guarantee
  • Minimal dependencies (little core deps except franz-go, specific stores and plugins like prometheus are separate)
  • Add-ons available for popular stores (bigtable,redis,...) and data formats (proto, json, avro, msgpack, ...), Prometheus
  • Emphasis on parallelism: Strong async and bulk processing support. Be MUCH better at this that kafka streams, and compete with parallel-consumer.
  • Best developer experience. Be nice to use, but do not hide important details from the user
  • Processors for windowing, including calendar based windowing (https://github.com/confluentinc/kafka-streams-examples/blob/21d3079eb8de91619cc873148655cdf607135846/src/test/java/io/confluent/examples/streams/window/CustomWindowTest.java#L192)
  • Joins

Non-Goals

  • Exactly-once-delivery
  • Kafka streams-like DSL is out of the question until Go generics support method type parameters.

Next steps

Object storage/S3 backend
  • Minio testcontainer
  • Tests S3
  • Add GCS, generify some code so both s3 and gcs can make use of write-back cache or whatever we add to make it performant
Windowing
  • Generify, add aggregation processor, which outputs with Key=WindowKey[K], Value=Type of user, uses allows for "state" type which is used in store.
  • Add support for calendar-based aggregations; i.e. properly aggregate time ranges that do not match UTC-day, or span days where daylight savings change the timezone.
  • Performance: add benchmarks
  • Cleanup. When are value deleted from db? Can we just delegate it to the store ? (Pebble TTL, cleanup policy of bucket)
  • Add record timestamp to context

Stores

  • How to allow read access to stores / make available via gRPC or other protocol

General

  • Explore tombstone handling, some things have to be improved probably so tombstones work "nicely".

Documentation

Index

Constants

View Source
const (
	StateCreated            = "CREATED"
	StatePartitionsAssigned = "PARTITIONS_ASSIGNED"
	StateRunning            = "RUNNING"
	StateCloseRequested     = "CLOSE_REQUESTED"
	StateClosed             = "CLOSED"
)

Variables

View Source
var ErrInternal = errors.New("internal")
View Source
var (
	ErrKeyNotFound = errors.New("store: key not found")
)
View Source
var ErrNodeAlreadyExists = errors.New("node exists already")
View Source
var ErrNodeNotFound = errors.New("node not found")
View Source
var ErrTaskNotFound = errors.New("task not found")
View Source
var WithBrokers = func(brokers []string) Option {
	return func(s *App) {
		s.brokers = brokers
	}
}
View Source
var WithCommitInterval = func(commitInterval time.Duration) Option {
	return func(s *App) {
		s.commitInterval = commitInterval
	}
}
View Source
var WithLog = func(log *slog.Logger) Option {
	return func(s *App) {
		s.log = log
	}
}
View Source
var WithWorkersCount = func(n int) Option {
	return func(s *App) {
		s.numRoutines = n
	}
}

Functions

func ContainsAny

func ContainsAny[E comparable](s []E, v []E) bool

Contains reports whether v is present in s.

func KVStore

func KVStore[K, V any](storeBuilder func(name string, p int32) (StoreBackend, error), keySerde SerDe[K], valueSerde SerDe[V]) func(name string, p int32) (Store, error)

func MustRegisterProcessor

func MustRegisterProcessor[Kin, Vin, Kout, Vout any](t *TopologyBuilder, p ProcessorBuilder[Kin, Vin, Kout, Vout], name string, parent string, stores ...string)

func MustRegisterSink

func MustRegisterSink[K, V any](t *TopologyBuilder, name, topic string, keySerializer Serializer[K], valueSerializer Serializer[V], parent string)

func MustRegisterSource

func MustRegisterSource[K, V any](t *TopologyBuilder, name string, topic string, keyDeserializer Deserializer[K], valueDeserializer Deserializer[V])

func MustSetParent

func MustSetParent(t *TopologyBuilder, parent, child string)

func NewPartitionGroupBalancer

func NewPartitionGroupBalancer(log *slog.Logger, pgs []*PartitionGroup) kgo.GroupBalancer

func NullLogger

func NullLogger() *slog.Logger

func RegisterProcessor

func RegisterProcessor[Kin, Vin, Kout, Vout any](t *TopologyBuilder, p ProcessorBuilder[Kin, Vin, Kout, Vout], name string, parent string, stores ...string) error

func RegisterSink

func RegisterSink[K, V any](t *TopologyBuilder, name, topic string, keySerializer Serializer[K], valueSerializer Serializer[V], parent string) error

func RegisterSource

func RegisterSource[K, V any](t *TopologyBuilder, name string, topic string, keyDeserializer Deserializer[K], valueDeserializer Deserializer[V]) error

func RegisterStore

func RegisterStore(t *TopologyBuilder, storeBuilder StoreBuilder, name string)

func SetParent

func SetParent(t *TopologyBuilder, parent, child string) error

Types

type App

type App struct {
	// contains filtered or unexported fields
}

func New

func New(t *Topology, groupName string, opts ...Option) *App

func (*App) Close

func (c *App) Close() error

func (*App) Run

func (c *App) Run() error

Run blocks until it's exited, either by an error or by a graceful shutdown triggered by a call to Close.

type AssignedOrRevoked

type AssignedOrRevoked struct {
	Assigned map[string][]int32
	Revoked  map[string][]int32
}

type BalanceError

type BalanceError struct {
	// contains filtered or unexported fields
}

func (*BalanceError) IntoSyncAssignment

func (e *BalanceError) IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment

func (*BalanceError) IntoSyncAssignmentOrError

func (e *BalanceError) IntoSyncAssignmentOrError() ([]kmsg.SyncGroupRequestGroupAssignment, error)

type Deserializer

type Deserializer[T any] func([]byte) (T, error)

type Flusher

type Flusher interface {
	Flush(context.Context) error
}

type Input

type Input[Kin any, Vin any] struct {
}

type InputProcessor

type InputProcessor[K any, V any] interface {
	Process(context.Context, K, V) error
}

InputProcessor is a partial interface covering only the generic input K/V, without requiring the caller to know the generic types of the output.

type InternalProcessorContext

type InternalProcessorContext[Kout any, Vout any] struct {
	// contains filtered or unexported fields
}

func NewInternalkProcessorContext

func NewInternalkProcessorContext[Kout any, Vout any](
	outputs map[string]InputProcessor[Kout, Vout],
	stores map[string]Store,
) *InternalProcessorContext[Kout, Vout]

func (*InternalProcessorContext[Kout, Vout]) Forward

func (c *InternalProcessorContext[Kout, Vout]) Forward(ctx context.Context, k Kout, v Vout)

func (*InternalProcessorContext[Kout, Vout]) ForwardTo

func (c *InternalProcessorContext[Kout, Vout]) ForwardTo(ctx context.Context, k Kout, v Vout, childName string)

func (*InternalProcessorContext[Kout, Vout]) GetStore

func (c *InternalProcessorContext[Kout, Vout]) GetStore(name string) Store

type KeyValueStore

type KeyValueStore[K, V any] struct {
	// contains filtered or unexported fields
}

func NewKeyValueStore

func NewKeyValueStore[K, V any](
	store StoreBackend,
	keySerializer Serializer[K],
	valueSerializer Serializer[V],
	keyDeserializer Deserializer[K],
	valueDeserializer Deserializer[V],
) *KeyValueStore[K, V]

func (*KeyValueStore[K, V]) Checkpoint

func (t *KeyValueStore[K, V]) Checkpoint(ctx context.Context, id string) error

func (*KeyValueStore[K, V]) Close

func (t *KeyValueStore[K, V]) Close() error

func (*KeyValueStore[K, V]) Flush

func (t *KeyValueStore[K, V]) Flush() error

func (*KeyValueStore[K, V]) Get

func (t *KeyValueStore[K, V]) Get(k K) (V, error)

func (*KeyValueStore[K, V]) Init

func (t *KeyValueStore[K, V]) Init() error

func (*KeyValueStore[K, V]) Set

func (t *KeyValueStore[K, V]) Set(k K, v V) error

type Nexter

type Nexter[K, V any] interface {
	AddNext(InputProcessor[K, V])
}

type Node

type Node interface {
	Init() error
	Close() error
}

Node does not know about any specific types of nodes, because it would otherwise need to have an ounbounded number of generic types. Generic types are hidden inside the actual implementations using the Node interfaces.

type NullWriter

type NullWriter struct{}

func (NullWriter) Write

func (NullWriter) Write([]byte) (int, error)

type Option

type Option func(*App)

type Output

type Output[Kout any, Vout any] struct {
}

type PartitionGroup

type PartitionGroup struct {
	// contains filtered or unexported fields
}

PartitionGroup is a sub-graph of nodes that must be co-partitioned as they depend on each other.

type PartitionGroupBalancer

type PartitionGroupBalancer struct {
	// contains filtered or unexported fields
}

PartitionGroupBalancer is a balancer that uses kgo's Cooperative-sticky balancer under the hood, but enforces co-partitioning as defined by the given PartitionGroups.

func (*PartitionGroupBalancer) IsCooperative

func (w *PartitionGroupBalancer) IsCooperative() bool

func (*PartitionGroupBalancer) JoinGroupMetadata

func (w *PartitionGroupBalancer) JoinGroupMetadata(
	topicInterests []string,
	currentAssignment map[string][]int32,
	generation int32,
) []byte

func (*PartitionGroupBalancer) MemberBalancer

func (w *PartitionGroupBalancer) MemberBalancer(members []kmsg.JoinGroupResponseMember) (b kgo.GroupMemberBalancer, topics map[string]struct{}, err error)

func (*PartitionGroupBalancer) ParseSyncAssignment

func (w *PartitionGroupBalancer) ParseSyncAssignment(assignment []byte) (map[string][]int32, error)

func (*PartitionGroupBalancer) ProtocolName

func (w *PartitionGroupBalancer) ProtocolName() string

type Processor

type Processor[Kin any, Vin any, Kout any, Vout any] interface {
	Init(ProcessorContext[Kout, Vout]) error
	Close() error
	Process(ctx context.Context, k Kin, v Vin) error
}

Processor is a low-level interface. The implementation can retain the ProcessorContext passed into Init and use it to access state stores and forward data to downstream nodes. This is fairly low-level and allows for a lot of flexibility, but may be inconvenient to use for more specialized use cases. More high-level interfaces can be built on top of this, i.e. a Processor that receives input, and forwards it to only one downstream node.

type ProcessorBuilder

type ProcessorBuilder[Kin any, Vin any, Kout any, Vout any] func() Processor[Kin, Vin, Kout, Vout]

ProcessorBuilder creates an actual processor for a specific TopicPartition.

type ProcessorContext

type ProcessorContext[Kout any, Vout any] interface {
	// Forward to all child nodes.
	Forward(ctx context.Context, k Kout, v Vout)
	// Forward to specific child node. Panics if child node is not found.
	ForwardTo(ctx context.Context, k Kout, v Vout, childName string) // TBD: should forward return error ? or are errs...ignored?
	// Get state store by name. Returns nil if not found.
	GetStore(name string) Store
}

type ProcessorInterceptor

type ProcessorInterceptor[K any, V any, Kout any, Vout any] func(ctx context.Context, k K, v V, processor Processor[K, V, Kout, Vout])

type ProcessorNode

type ProcessorNode[Kin any, Vin any, Kout any, Vout any] struct {
	// contains filtered or unexported fields
}

func (*ProcessorNode[Kin, Vin, Kout, Vout]) Close

func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Close() error

func (*ProcessorNode[Kin, Vin, Kout, Vout]) Init

func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Init() error

func (*ProcessorNode[Kin, Vin, Kout, Vout]) Process

func (p *ProcessorNode[Kin, Vin, Kout, Vout]) Process(ctx context.Context, k Kin, v Vin) error

type Record

type Record[K, V any] struct {
	Key   K
	Value V
	X     kgo.Record
}

type RecordProcessor

type RecordProcessor interface {
	Process(ctx context.Context, m *kgo.Record) error
}

type RoutineState

type RoutineState string

type SerDe

type SerDe[T any] struct {
	Serializer   Serializer[T]
	Deserializer Deserializer[T]
}

type Serializer

type Serializer[T any] func(T) ([]byte, error)

type SinkNode

type SinkNode[K any, V any] struct {
	KeySerializer   Serializer[K]
	ValueSerializer Serializer[V]
	// contains filtered or unexported fields
}

func NewSinkNode

func NewSinkNode[K, V any](client *kgo.Client, topic string, keySerializer Serializer[K], valueSerializer Serializer[V]) *SinkNode[K, V]

func (*SinkNode[K, V]) Flush

func (s *SinkNode[K, V]) Flush(ctx context.Context) error

func (*SinkNode[K, V]) Process

func (s *SinkNode[K, V]) Process(ctx context.Context, k K, v V) error

type SourceNode

type SourceNode[K any, V any] struct {
	KeyDeserializer   Deserializer[K]
	ValueDeserializer Deserializer[V]

	DownstreamProcessors []InputProcessor[K, V]
}

SourceNode[K,V] receives kgo records, and forward these to all downstream processors.

func (*SourceNode[K, V]) AddNext

func (n *SourceNode[K, V]) AddNext(next InputProcessor[K, V])

func (*SourceNode[K, V]) Process

func (n *SourceNode[K, V]) Process(ctx context.Context, m *kgo.Record) error

type Store

type Store interface {
	Init() error
	Flush() error
	Close() error
}

type StoreBackend

type StoreBackend interface {
	Store
	Set(k, v []byte) error
	Get(k []byte) (v []byte, err error)
}

type StoreBackendBuilder

type StoreBackendBuilder func(name string, p int32) (StoreBackend, error)

type StoreBuilder

type StoreBuilder func(name string, partition int32) (Store, error)

TODO/FIXME make store name part of params

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(topics []string, partition int32, rootNodes map[string]RecordProcessor, stores map[string]Store, processors map[string]Node, sinks map[string]Flusher, processorToStore map[string][]string) *Task

func (*Task) ClearOffsets

func (t *Task) ClearOffsets()

func (*Task) Close

func (t *Task) Close(ctx context.Context) error

func (*Task) Flush

func (t *Task) Flush(ctx context.Context) error

Flush flushes state stores and sinks.

func (*Task) GetOffsetsToCommit

func (t *Task) GetOffsetsToCommit() map[string]kgo.EpochOffset

func (*Task) Init

func (t *Task) Init() error

func (*Task) Process

func (t *Task) Process(ctx context.Context, records ...*kgo.Record) error

func (*Task) String

func (t *Task) String() string

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

func (*TaskManager) Assigned

func (t *TaskManager) Assigned(assigned map[string][]int32) error

Assigned handles topic-partition assignment: it creates tasks as needed.

func (*TaskManager) Close

func (t *TaskManager) Close(ctx context.Context) error

func (*TaskManager) Commit

func (t *TaskManager) Commit(ctx context.Context) error

Commit triggers a commit. This flushes all tasks' stores, and then performs a commit of all tasks' processed records.

func (*TaskManager) Revoked

func (t *TaskManager) Revoked(revoked map[string][]int32) error

Revoked closes and removes tasks as dictated per revoked map.

func (*TaskManager) TaskFor

func (t *TaskManager) TaskFor(topic string, partition int32) (*Task, error)

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

Topology is a fully built DAG that can be used in a kstreams app.

func (*Topology) CreateTask

func (t *Topology) CreateTask(topics []string, partition int32, client *kgo.Client) (*Task, error)

func (*Topology) GetTopics

func (t *Topology) GetTopics() []string

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

func NewTopologyBuilder

func NewTopologyBuilder() *TopologyBuilder

func (*TopologyBuilder) Build

func (tb *TopologyBuilder) Build() (*Topology, error)

func (*TopologyBuilder) MustBuild

func (tb *TopologyBuilder) MustBuild() *Topology

type TopologyProcessor

type TopologyProcessor struct {
	Name           string
	Build          func(stores map[string]Store) Node
	ChildNodeNames []string
	AddChildFunc   func(parent any, child any, childName string) // TODO - possible to do w/o parent ?
	StoreNames     []string
}

type TopologySink

type TopologySink struct {
	Name    string
	Builder func(*kgo.Client) Flusher
}

type TopologySource

type TopologySource struct {
	Name           string
	Build          func() RecordProcessor
	ChildNodeNames []string
	AddChildFunc   func(parent any, child any, childName string) // TODO - possible to do w/o parent ?
}

type TopologyStore

type TopologyStore struct {
	Name  string
	Build StoreBuilder
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Nice read https://jaceklaskowski.gitbooks.io/mastering-kafka-streams/content/kafka-streams-internals-StreamThread.html

func NewWorker

func NewWorker(log *slog.Logger, name string, t *Topology, group string, brokers []string, commitInterval time.Duration) (*Worker, error)

Config

func (*Worker) Close

func (r *Worker) Close() error

func (*Worker) Loop

func (r *Worker) Loop() error

State transitions may only be done from within the loop

func (*Worker) Run

func (r *Worker) Run() error

type WrappingMemberBalancer

type WrappingMemberBalancer struct {
	// contains filtered or unexported fields
}

func (*WrappingMemberBalancer) Balance

func (wb *WrappingMemberBalancer) Balance(topics map[string]int32) kgo.IntoSyncAssignment

func (*WrappingMemberBalancer) BalanceOrError

func (wb *WrappingMemberBalancer) BalanceOrError(topics map[string]int32) (kgo.IntoSyncAssignment, error)

Directories

Path Synopsis
examples module
stores
pebble Module
s3 Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL