goka: github.com/lovoo/goka Index | Examples | Files | Directories

package goka

import "github.com/lovoo/goka"

Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.

Processors

A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.

In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.

Views

A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.

Index

Examples

Package Files

codec.go context.go doc.go emitter.go errors.go graph.go iterator.go once.go options.go partition.go processor.go proxy.go stats.go view.go

func DefaultHasher Uses

func DefaultHasher() func() hash.Hash32

DefaultHasher returns an FNV hasher builder to assign keys to partitions.

func DefaultProcessorStoragePath Uses

func DefaultProcessorStoragePath(group Group) string

DefaultProcessorStoragePath is the default path where processor state will be stored.

func DefaultRebalance Uses

func DefaultRebalance(a kafka.Assignment)

DefaultRebalance is the default callback when a new partition assignment is received. DefaultRebalance can be used in the function passed to WithRebalanceCallback.

func DefaultUpdate Uses

func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error

DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.

func DefaultViewStoragePath Uses

func DefaultViewStoragePath() string

DefaultViewStoragePath returns the default path where view state will be stored.

type Codec Uses

type Codec interface {
    Encode(value interface{}) (data []byte, err error)
    Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type Context Uses

type Context interface {
    // Topic returns the topic of input message.
    Topic() Stream

    // Key returns the key of the input message.
    Key() string

    // Partition returns the partition of the input message.
    Partition() int32

    // Offset returns the offset of the input message.
    Offset() int64

    // Value returns the value of the key in the group table.
    Value() interface{}

    // SetValue updates the value of the key in the group table.
    SetValue(value interface{})

    // Delete deletes a value from the group table. IMPORTANT: this deletes the
    // value associated with the key from both the local cache and the persisted
    // table in Kafka.
    Delete()

    // Timestamp returns the timestamp of the input message. If the timestamp is
    // invalid, a zero time will be returned.
    Timestamp() time.Time

    // Join returns the value of key in the copartitioned table.
    Join(topic Table) interface{}

    // Lookup returns the value of key in the view of table.
    Lookup(topic Table, key string) interface{}

    // Emit asynchronously writes a message into a topic.
    Emit(topic Stream, key string, value interface{})

    // Loopback asynchronously sends a message to another key of the group
    // table. Value passed to loopback is encoded via the codec given in the
    // Loop subscription.
    Loopback(key string, value interface{})

    // Fail stops execution and shuts down the processor
    Fail(err error)

    // Context returns the underlying context used to start the processor or a
    // subcontext.
    Context() context.Context
}

Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message.

type Edge Uses

type Edge interface {
    String() string
    Topic() string
    Codec() Codec
}

Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.

func Input Uses

func Input(topic Stream, c Codec, cb ProcessCallback) Edge

Input represents an edge of an input stream topic. The edge specifies the topic name, its codec and the ProcessorCallback used to process it. The topic has to be copartitioned with any other input stream of the group and with the group table. The group starts reading the topic from the newest offset.

func Inputs Uses

func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge

Inputs creates edges of multiple input streams sharing the same codec and callback.

func Join Uses

func Join(topic Table, c Codec) Edge

Join represents an edge of a copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until all partitions of the table are recovered.

func Lookup Uses

func Lookup(topic Table, c Codec) Edge

Lookup represents an edge of a non-copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until the table is fully recovered.

func Loop Uses

func Loop(c Codec, cb ProcessCallback) Edge

Loop represents the edge of the loopback topic of the group. The edge specifies the codec of the messages in the topic and ProcesCallback to process the messages of the topic. Context.Loopback() is used to write messages into this topic from any callback of the group.

func Output Uses

func Output(topic Stream, c Codec) Edge

Output represents an edge of an output stream topic. The edge specifies the topic name and the codec of the messages of the topic. Context.Emit() only emits messages into Output edges defined in the group graph. The topic does not have to be copartitioned with the input streams.

func Persist Uses

func Persist(c Codec) Edge

Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams. This edge specifies the codec of the messages in the topic, ie, the codec of the values of the table. The processing of input streams is blocked until all partitions of the group table are recovered.

type Edges Uses

type Edges []Edge

Edges is a slice of edge objects.

func (Edges) Topics Uses

func (e Edges) Topics() []string

Topics returns the names of the topics of the edges.

type Emitter Uses

type Emitter struct {
    // contains filtered or unexported fields
}

Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.

func NewEmitter Uses

func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)

NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.

func (*Emitter) Emit Uses

func (e *Emitter) Emit(key string, msg interface{}) (*kafka.Promise, error)

Emit sends a message for passed key using the emitter's codec.

func (*Emitter) EmitSync Uses

func (e *Emitter) EmitSync(key string, msg interface{}) error

EmitSync sends a message to passed topic and key.

func (*Emitter) Finish Uses

func (e *Emitter) Finish() error

Finish waits until the emitter is finished producing all pending messages.

type EmitterOption Uses

type EmitterOption func(*eoptions, Stream, Codec)

EmitterOption defines a configuration option to be used when creating an emitter.

func WithEmitterClientID Uses

func WithEmitterClientID(clientID string) EmitterOption

WithEmitterClientID defines the client ID used to identify with kafka.

func WithEmitterHasher Uses

func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption

WithEmitterHasher sets the hash function that assigns keys to partitions.

func WithEmitterLogger Uses

func WithEmitterLogger(log logger.Logger) EmitterOption

WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.

func WithEmitterProducerBuilder Uses

func WithEmitterProducerBuilder(pb kafka.ProducerBuilder) EmitterOption

WithEmitterProducerBuilder replaces the default producer builder.

func WithEmitterTester Uses

func WithEmitterTester(t Tester) EmitterOption

func WithEmitterTopicManagerBuilder Uses

func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption

WithEmitterTopicManagerBuilder replaces the default topic manager builder.

type Getter Uses

type Getter func(string) (interface{}, error)

Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.

type Group Uses

type Group string

Group is the name of a consumer group in Kafka and represents a processor group in Goka. A processor group may have a group table and a group loopback stream. By default, the group table is named <group>-table and the loopback stream <group>-loop.

type GroupGraph Uses

type GroupGraph struct {
    // contains filtered or unexported fields
}

GroupGraph is the specification of a processor group. It contains all input, output, and any other topic from which and into which the processor group may consume or produce events. Each of these links to Kafka is called Edge.

func DefineGroup Uses

func DefineGroup(group Group, edges ...Edge) *GroupGraph

DefineGroup creates a group graph with a given group name and a list of edges.

func (*GroupGraph) Group Uses

func (gg *GroupGraph) Group() Group

Group returns the group name.

func (*GroupGraph) GroupTable Uses

func (gg *GroupGraph) GroupTable() Edge

GroupTable returns the group table edge of the group.

func (*GroupGraph) InputStreams Uses

func (gg *GroupGraph) InputStreams() Edges

InputStreams returns all input stream edges of the group.

func (*GroupGraph) JointTables Uses

func (gg *GroupGraph) JointTables() Edges

JointTables retuns all joint table edges of the group.

func (*GroupGraph) LookupTables Uses

func (gg *GroupGraph) LookupTables() Edges

LookupTables retuns all lookup table edges of the group.

func (*GroupGraph) LoopStream Uses

func (gg *GroupGraph) LoopStream() Edge

LoopStream returns the loopback edge of the group.

func (*GroupGraph) OutputStreams Uses

func (gg *GroupGraph) OutputStreams() Edges

OutputStreams returns the output stream edges of the group.

func (*GroupGraph) Validate Uses

func (gg *GroupGraph) Validate() error

Validate validates the group graph and returns an error if invalid. Main validation checks are: - at most one loopback stream edge is allowed - at most one group table edge is allowed - at least one input stream is required - table and loopback topics cannot be used in any other edge.

type InputStats Uses

type InputStats struct {
    Count uint
    Bytes int
    Delay time.Duration
}

InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.

type Iterator Uses

type Iterator interface {
    // Next advances the iterator to the next KV-pair. Err should be called
    // after Next returns false to check whether the iteration finished
    // from exhaustion or was aborted due to an error.
    Next() bool
    Key() string
    Value() (interface{}, error)
    Release()
    // Err returns the possible iteration error.
    Err() error
    Seek(key string) bool
}

Iterator allows one to iterate over the keys of a view.

type NilHandling Uses

type NilHandling int

NilHandling defines how nil messages should be handled by the processor.

const (
    // NilIgnore drops any message with nil value.
    NilIgnore NilHandling = 0 + iota
    // NilProcess passes the nil value to ProcessCallback.
    NilProcess
    // NilDecode passes the nil value to decoder before calling ProcessCallback.
    NilDecode
)

type OutputStats Uses

type OutputStats struct {
    Count uint
    Bytes int
}

OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.

type PartitionStats Uses

type PartitionStats struct {
    Now time.Time

    Table struct {
        Status  PartitionStatus
        Stalled bool

        Offset int64 // last offset processed or recovered
        Hwm    int64 // next offset to be written

        StartTime    time.Time
        RecoveryTime time.Time
    }
    Input  map[string]InputStats
    Output map[string]OutputStats
}

PartitionStats represents metrics and measurements of a partition.

type PartitionStatus Uses

type PartitionStatus int

PartitionStatus is the status of the partition of a table (group table or joined table).

const (
    // PartitionRecovering indicates the partition is recovering and the storage
    // is writing updates in bulk-mode (if the storage implementation supports it).
    PartitionRecovering PartitionStatus = iota
    // PartitionPreparing indicates the end of the bulk-mode. Depending on the storage
    // implementation, the Preparing phase may take long because the storage compacts its logs.
    PartitionPreparing
    // PartitionRunning indicates the partition is recovered and processing updates
    // in normal operation.
    PartitionRunning
)

type ProcessCallback Uses

type ProcessCallback func(ctx Context, msg interface{})

ProcessCallback function is called for every message received by the processor.

type Processor Uses

type Processor struct {
    // contains filtered or unexported fields
}

Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.

Example shows how to use a callback. For each partition of the topics, a new goroutine will be created. Topics should be co-partitioned (they should have the same number of partitions and be partitioned by the same key).

Code:

var (
    brokers        = []string{"127.0.0.1:9092"}
    group   Group  = "group"
    topic   Stream = "topic"
)

consume := func(ctx Context, m interface{}) {
    fmt.Printf("Hello world: %v", m)
}

p, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, consume)))
if err != nil {
    log.Fatalln(err)
}

// start consumer with a goroutine (blocks)
ctx, cancel := context.WithCancel(context.Background())
go func() {
    err := p.Run(ctx)
    panic(err)
}()

// wait for bad things to happen
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<-wait
cancel()

func NewProcessor Uses

func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)

NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.

func (*Processor) Get Uses

func (g *Processor) Get(key string) (interface{}, error)

Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.

func (*Processor) Graph Uses

func (g *Processor) Graph() *GroupGraph

Graph returns the GroupGraph given at the creation of the processor.

func (*Processor) Recovered Uses

func (g *Processor) Recovered() bool

Recovered returns true when the processor has caught up with events from kafka.

func (*Processor) Run Uses

func (g *Processor) Run(ctx context.Context) (rerr error)

Run starts receiving messages from Kafka for the subscribed topics. For each partition, a recovery will be attempted. Cancel the context to stop the processor.

func (*Processor) Stats Uses

func (g *Processor) Stats() *ProcessorStats

Stats returns a set of performance metrics of the processor.

type ProcessorOption Uses

type ProcessorOption func(*poptions, *GroupGraph)

ProcessorOption defines a configuration option to be used when creating a processor.

func WithClientID Uses

func WithClientID(clientID string) ProcessorOption

WithClientID defines the client ID used to identify with Kafka.

func WithConsumerBuilder Uses

func WithConsumerBuilder(cb kafka.ConsumerBuilder) ProcessorOption

WithConsumerBuilder replaces the default consumer builder.

func WithGroupGraphHook Uses

func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption

WithGroupGraphHook allows a function to obtain the group graph when a processor is started.

func WithHasher Uses

func WithHasher(hasher func() hash.Hash32) ProcessorOption

WithHasher sets the hash function that assigns keys to partitions.

func WithLogger Uses

func WithLogger(log logger.Logger) ProcessorOption

WithLogger sets the logger the processor should use. By default, processors use the standard library logger.

func WithNilHandling Uses

func WithNilHandling(nh NilHandling) ProcessorOption

WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.

func WithPartitionChannelSize Uses

func WithPartitionChannelSize(size int) ProcessorOption

WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithProducerBuilder Uses

func WithProducerBuilder(pb kafka.ProducerBuilder) ProcessorOption

WithProducerBuilder replaces the default producer builder.

func WithRebalanceCallback Uses

func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption

WithRebalanceCallback sets the callback for when a new partition assignment is received. By default, this is an empty function.

func WithStorageBuilder Uses

func WithStorageBuilder(sb storage.Builder) ProcessorOption

WithStorageBuilder defines a builder for the storage of each partition.

func WithTester Uses

func WithTester(t Tester) ProcessorOption

WithTester configures all external connections of a processor, ie, storage, consumer and producer

func WithTopicManagerBuilder Uses

func WithTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ProcessorOption

WithTopicManagerBuilder replaces the default topic manager builder.

func WithUpdateCallback Uses

func WithUpdateCallback(cb UpdateCallback) ProcessorOption

WithUpdateCallback defines the callback called upon recovering a message from the log.

type ProcessorStats Uses

type ProcessorStats struct {
    Group  map[int32]*PartitionStats
    Joined map[int32]map[string]*PartitionStats
    Lookup map[string]*ViewStats
}

ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.

type RebalanceCallback Uses

type RebalanceCallback func(a kafka.Assignment)

RebalanceCallback is invoked when the processor receives a new partition assignment.

type Stream Uses

type Stream string

Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete

type Streams Uses

type Streams []Stream

Streams is a slice of Stream names.

type Table Uses

type Table string

Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact

func GroupTable Uses

func GroupTable(group Group) Table

GroupTable returns the name of the group table of group.

type Tester Uses

type Tester interface {
    StorageBuilder() storage.Builder
    ConsumerBuilder() kafka.ConsumerBuilder
    ProducerBuilder() kafka.ProducerBuilder
    EmitterProducerBuilder() kafka.ProducerBuilder
    TopicManagerBuilder() kafka.TopicManagerBuilder
    RegisterGroupGraph(*GroupGraph)
    RegisterEmitter(Stream, Codec)
    RegisterView(Table, Codec)
}

Tester interface to avoid import cycles when a processor needs to register to the tester.

type UpdateCallback Uses

type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error

UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.

type View Uses

type View struct {
    // contains filtered or unexported fields
}

View is a materialized (i.e. persistent) cache of a group table.

Code:

var (
    brokers       = []string{"localhost:9092"}
    group   Group = "group-name"
)
v, err := NewView(brokers, GroupTable(group), nil)
if err != nil {
    panic(err)
}
if err = v.Run(context.Background()); err != nil {
    panic(err)
}

func NewView Uses

func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error)

NewView creates a new View object from a group.

func (*View) Evict Uses

func (v *View) Evict(key string) error

Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.

func (*View) Get Uses

func (v *View) Get(key string) (interface{}, error)

Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.

func (*View) Has Uses

func (v *View) Has(key string) (bool, error)

Has checks whether a value for passed key exists in the view.

func (*View) Iterator Uses

func (v *View) Iterator() (Iterator, error)

Iterator returns an iterator that iterates over the state of the View.

func (*View) IteratorWithRange Uses

func (v *View) IteratorWithRange(start, limit string) (Iterator, error)

IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.

func (*View) Recovered Uses

func (v *View) Recovered() bool

Recovered returns true when the view has caught up with events from kafka.

func (*View) Run Uses

func (v *View) Run(ctx context.Context) error

Run starts consuming the view's topic.

func (*View) Stats Uses

func (v *View) Stats() *ViewStats

Stats returns a set of performance metrics of the view.

func (*View) Terminate Uses

func (v *View) Terminate() error

Terminate closes storage partitions. It must be called only if the view is restartable (see WithViewRestartable() option). Once Terminate() is called, the view cannot be restarted anymore.

func (*View) Topic Uses

func (v *View) Topic() string

Topic returns the view's topic

type ViewOption Uses

type ViewOption func(*voptions, Table, Codec)

ViewOption defines a configuration option to be used when creating a view.

func WithViewCallback Uses

func WithViewCallback(cb UpdateCallback) ViewOption

WithViewCallback defines the callback called upon recovering a message from the log.

func WithViewClientID Uses

func WithViewClientID(clientID string) ViewOption

WithViewClientID defines the client ID used to identify with Kafka.

func WithViewConsumerBuilder Uses

func WithViewConsumerBuilder(cb kafka.ConsumerBuilder) ViewOption

WithViewConsumerBuilder replaces default view consumer.

func WithViewHasher Uses

func WithViewHasher(hasher func() hash.Hash32) ViewOption

WithViewHasher sets the hash function that assigns keys to partitions.

func WithViewLogger Uses

func WithViewLogger(log logger.Logger) ViewOption

WithViewLogger sets the logger the view should use. By default, views use the standard library logger.

func WithViewPartitionChannelSize Uses

func WithViewPartitionChannelSize(size int) ViewOption

WithViewPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithViewRestartable Uses

func WithViewRestartable() ViewOption

WithViewRestartable defines the view can be restarted, even when Run() returns errors. If the view is restartable, the client must call Terminate() to release all resources, ie, close the local storage.

func WithViewStorageBuilder Uses

func WithViewStorageBuilder(sb storage.Builder) ViewOption

WithViewStorageBuilder defines a builder for the storage of each partition.

func WithViewTester Uses

func WithViewTester(t Tester) ViewOption

WithViewTester configures all external connections of a processor, ie, storage, consumer and producer

func WithViewTopicManagerBuilder Uses

func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption

WithViewTopicManagerBuilder replaces the default topic manager.

type ViewStats Uses

type ViewStats struct {
    Partitions map[int32]*PartitionStats
}

ViewStats represents the metrics of all partitions of a view.

Directories

PathSynopsis
codec
integrationtest
kafka
kafka/mockPackage mock is a generated GoMock package.
logger
mockPackage mock is a generated GoMock package.
multierr
storage
storage/redis
testerThis package provides a kafka mock that allows integration testing of goka processors.
web/index
web/monitor
web/query
web/templates

Package goka imports 17 packages (graph) and is imported by 19 packages. Updated 2019-10-30. Refresh now. Tools for package owners.