goka

package module
v0.0.0-...-8c342a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2017 License: BSD-3-Clause Imports: 18 Imported by: 0

README

Goka License Build Status GoDoc

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka.

Package API documentation is available at GoDoc.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform operations on these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes a topic's partitions across all the processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group tables are partitioned key-value tables stored in Kafka that belong to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a processor group's complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

Get Started

An example Goka application could look like the following:

Emitter
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "mini-input"
)

func main() {
  // create a new emitter which allows you to send
  // messages to Kafka
	emitter, err := goka.NewEmitter(brokers, topic,
		new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}

  // emitter Finish should be called always before
  // terminating the application to ensure the emitter
  // has delivered all the pending messages to Kafka
	defer emitter.Finish()

	t := time.NewTicker(5 * time.Second)
	defer t.Stop()

  // on every timer tick, emit a message to containing
  // the current timestamp to Kafka
	i := 0
	for range t.C {
		key := fmt.Sprintf("%d", i%10)
		value := fmt.Sprintf("%s", time.Now())
		emitter.EmitSync(key, value)
		i++
	}
}
Processor
package main

import (
	"log"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "mini-input"
	group   goka.Group  = "mini-group"
)

func main() {
  // Define a new processor group. The group defines all
  // the inputs, output, serialization formats and the
  // topics of the processor
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), process),
		goka.Persist(new(codec.Int64)),
	)
	if p, err := goka.NewProcessor(brokers, g); err != nil {
		log.Fatalf("error creating processor: %v", err)
	} else if err = p.Start(); err != nil {
		log.Fatalf("error running processor: %v", err)
	}
}

// process is the callback the processor will call for
// each message that arrives in the "mini-input" topic.
func process(ctx goka.Context, msg interface{}) {
	var counter int64
  // ctx.Value gets from the group table the value that
  // is stored for the message's key.
	if val := ctx.Value(); val != nil {
		counter = val.(int64)
	}
	counter++
  // SetValue stores the incremented counter in the
  // group table for in the message's key.
	ctx.SetValue(counter)

	log.Println("[proc] key:", ctx.Key(),
		"count:", counter, "msg:", msg)
}
View
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers            = []string{"localhost:9092"}
	group   goka.Group = "mini-group"
)

func main() {
  // creates a new view which is provides read-only
  // access to the mini-group's group table
	view, err := goka.NewView(brokers,
		goka.GroupTable(group),
		new(codec.Int64),
	)
	if err != nil {
		log.Fatalf("error creating view: %v", err)
	}
  // starting the view begins receiving updates
  // from Kafka
	go view.Start()
	defer view.Stop()

	t := time.NewTicker(10 * time.Second)
	defer t.Stop()

  // on every timer tick, print out the values
  // stored in the group table
	for range t.C {
		for i := 0; i < 10; i++ {
			val, _ := view.Get(fmt.Sprintf("%d", i))
			log.Printf("[view] %d: %v\n", i, val)
		}
	}
}

Documentation

Overview

Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.

Processors

A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.

In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.

Views

A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultUpdate

func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error

DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.

func NewMockController

func NewMockController(t gomock.TestReporter) *gomock.Controller

NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.

Types

type Codec

type Codec interface {
	Encode(value interface{}) (data []byte, err error)
	Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type Context

type Context interface {
	// Topic returns the topic of input message.
	Topic() Stream

	// Key returns the key of the input message.
	Key() string

	// Value returns the value of the key in the group table.
	Value() interface{}

	// SetValue updates the value of the key in the group table.
	SetValue(value interface{})

	// Join returns the value of key in the copartitioned table.
	Join(topic Table) interface{}

	// Lookup returns the value of key in the view of table.
	Lookup(topic Table, key string) interface{}

	// Emit asynchronously writes a message into a topic.
	Emit(topic Stream, key string, value interface{})

	// Loopback asynchronously sends a message to another key of the group
	// table. Value passed to loopback is encoded via the codec given in the
	// Loop subscription.
	Loopback(key string, value interface{})

	// Fail stops execution and shuts down the processor
	Fail(err error)
}

Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message.

type Edge

type Edge interface {
	String() string
	Topic() string
	Codec() Codec
}

func Input

func Input(topic Stream, c Codec, cb ProcessCallback) Edge

Stream returns a subscription for a co-partitioned topic. The processor subscribing for a stream topic will start reading from the newest offset of the partition.

func Inputs

func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge

Inputs creates Edges for multiple input streams sharing the same codec and callback.

func Join

func Join(topic Table, c Codec) Edge

Table is one or more co-partitioned, log-compacted topic. The processor subscribing for a table topic will start reading from the oldest offset of the partition.

func Lookup

func Lookup(topic Table, c Codec) Edge

func Loop

func Loop(c Codec, cb ProcessCallback) Edge

Loop defines a consume callback on the loop topic

func Output

func Output(topic Stream, c Codec) Edge

func Persist

func Persist(c Codec) Edge

type Edges

type Edges []Edge

func (Edges) Topics

func (e Edges) Topics() []string

type EmitHandler

type EmitHandler func(topic string, key string, value []byte) *kafka.Promise

EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

func NewEmitter

func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)

NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options

func (*Emitter) Emit

func (e *Emitter) Emit(key string, msg interface{}) (*kafka.Promise, error)

Emit sends a message for passed key using the emitter's codec.

func (*Emitter) EmitSync

func (e *Emitter) EmitSync(key string, msg interface{}) error

EmitSync sends a message to passed topic and key

func (*Emitter) Finish

func (e *Emitter) Finish()

Finish waits until the emitter is finished producing all pending messages

type EmitterOption

type EmitterOption func(*eoptions)

EmitterOption defines a configuration option to be used when creating an emitter.

func WithEmitterClientID

func WithEmitterClientID(clientID string) EmitterOption

WithEmitterClientID defines the client ID used to identify with kafka.

func WithEmitterKafkaMetrics

func WithEmitterKafkaMetrics(registry metrics.Registry) EmitterOption

WithEmitterKafkaMetrics sets a go-metrics registry to collect kafka metrics. The metric-points are https://godoc.org/github.com/Shopify/sarama

func WithEmitterProducer

func WithEmitterProducer(p kafka.Producer) EmitterOption

WithEmitterProducer replaces goka's default producer. Mainly for testing.

func WithEmitterTopicManager

func WithEmitterTopicManager(tm kafka.TopicManager) EmitterOption

WithEmitterTopicManager defines a topic manager.

type Errors

type Errors struct {
	// contains filtered or unexported fields
}

Errors represent a list of errors triggered during the execution of a goka view/processor. Normally, the first error leads to stopping the processor/view, but during shutdown, more errors might occur.

func (*Errors) Error

func (e *Errors) Error() string

type Getter

type Getter func(string) (interface{}, error)

Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.

type Group

type Group string

type GroupGraph

type GroupGraph struct {
	// contains filtered or unexported fields
}

func DefineGroup

func DefineGroup(group Group, edges ...Edge) *GroupGraph

func (*GroupGraph) Group

func (gg *GroupGraph) Group() Group

func (*GroupGraph) GroupTable

func (gg *GroupGraph) GroupTable() Edge

func (*GroupGraph) InputStreams

func (gg *GroupGraph) InputStreams() Edges

func (*GroupGraph) JointTables

func (gg *GroupGraph) JointTables() Edges

func (*GroupGraph) LookupTables

func (gg *GroupGraph) LookupTables() Edges

func (*GroupGraph) LoopStream

func (gg *GroupGraph) LoopStream() Edge

func (*GroupGraph) OutputStreams

func (gg *GroupGraph) OutputStreams() Edges

func (*GroupGraph) Validate

func (gg *GroupGraph) Validate() error

type KafkaMock

type KafkaMock struct {
	// contains filtered or unexported fields
}

KafkaMock is allows interacting with a test processor

func NewKafkaMock

func NewKafkaMock(t Tester, groupName Group) *KafkaMock

NewKafkaMock returns a new testprocessor mocking every external service

func (*KafkaMock) ConsumeProto

func (km *KafkaMock) ConsumeProto(topic string, key string, msg proto.Message)

ConsumeProto simulates a message on kafka in a topic with a key.

func (*KafkaMock) ConsumeString

func (km *KafkaMock) ConsumeString(topic string, key string, msg string)

func (*KafkaMock) ExpectAllEmitted

func (km *KafkaMock) ExpectAllEmitted(handler func(topic string, key string, value []byte))

ExpectAllEmitted calls passed expected-emit-handler function for all emitted values and clears the emitted values

func (*KafkaMock) ExpectEmit

func (km *KafkaMock) ExpectEmit(topic string, key string, expecter func(value []byte))

ExpectEmit ensures a message exists in passed topic and key. The message may be inspected/unmarshalled by a passed expecter function.

func (*KafkaMock) Finish

func (km *KafkaMock) Finish(fail bool)

Finish marks the kafkamock that there is no emit to be expected. Set @param fail to true, if kafkamock is supposed to fail the test case in case of remaining emits. Clears the list of emits either case. This should always be called at the end of a test case to make sure no emits of prior test cases are stuck in the list and mess with the test results.

func (*KafkaMock) ProcessorOptions

func (km *KafkaMock) ProcessorOptions() []ProcessorOption

ProcessorOptions returns the options that must be passed to NewProcessor to use the Mock. It essentially replaces the consumer/producer/topicmanager with a mock. For convenience, the storage is also mocked. For example, a normal call to NewProcessor like this

NewProcessor(brokers, group, subscriptions,
                  option_a,
                  option_b,
                  option_c,
)

would become in the unit test: kafkaMock := NewKafkaMock(t) NewProcessor(brokers, group, subscriptions,

                  append(kafkaMock.ProcessorOptions(),
                  option_a,
                  option_b,
                  option_c,
                  )...,
)

func (*KafkaMock) ReplaceEmitHandler

func (km *KafkaMock) ReplaceEmitHandler(emitter EmitHandler)

func (*KafkaMock) SetGroupTableCreator

func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte))

func (*KafkaMock) SetValue

func (km *KafkaMock) SetValue(key string, value interface{})

SetValue sets a value in the storage.

func (*KafkaMock) ValueForKey

func (km *KafkaMock) ValueForKey(key string) interface{}

ValueForKey attempts to get a value from KafkaMock's storage.

type ProcessCallback

type ProcessCallback func(ctx Context, msg interface{})

ProcessCallback function is called for every message received by the processor.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.

Example (Simplest)

Example shows how to use a callback. For each partition of the topics, a new goroutine will be created. Topics should be co-partitioned (they should have the same number of partitions and be partitioned by the same key).

var (
	brokers        = []string{"127.0.0.1:9092"}
	group   Group  = "group"
	topic   Stream = "topic"
)

consume := func(ctx Context, m interface{}) {
	fmt.Printf("Hello world: %v", m)
}

c, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, consume)))
if err != nil {
	log.Fatalln(err)
}

// start consumer with a goroutine (blocks)
go func() {
	err := c.Start()
	panic(err)
}()

// wait for bad things to happen
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<-wait
c.Stop()
Output:

func NewProcessor

func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)

NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.

func (*Processor) Get

func (g *Processor) Get(key string) (interface{}, error)

Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be only used with stateful processors (ie, when group table is enabled).

func (*Processor) Registry

func (g *Processor) Registry() metrics.Registry

Registry returns the go-metrics registry used by the processor.

func (*Processor) Start

func (g *Processor) Start() error

Start starts receiving messages from Kafka for the subscribed topics. For each partition, a recovery will be attempted.

func (*Processor) Stop

func (g *Processor) Stop()

Stop gracefully stops the consumer

type ProcessorOption

type ProcessorOption func(*poptions)

ProcessorOption defines a configuration option to be used when creating a processor.

func WithClientID

func WithClientID(clientID string) ProcessorOption

WithClientID defines the client ID used to identify with kafka.

func WithConsumer

func WithConsumer(c kafka.Consumer) ProcessorOption

WithConsumer replaces goka's default consumer. Mainly for testing.

func WithKafkaMetrics

func WithKafkaMetrics(registry metrics.Registry) ProcessorOption

WithKafkaMetrics sets a go-metrics registry to collect kafka metrics. The metric-points are https://godoc.org/github.com/Shopify/sarama

func WithPartitionChannelSize

func WithPartitionChannelSize(size int) ProcessorOption

WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithProducer

func WithProducer(p kafka.Producer) ProcessorOption

WithProducer replaces goka'S default producer. Mainly for testing.

func WithStorageBuilder

func WithStorageBuilder(sb StorageBuilder) ProcessorOption

WithStorageBuilder defines a builder for the storage of each partition.

func WithStoragePath

func WithStoragePath(storagePath string) ProcessorOption

WithStoragePath defines the base path for the local storage on disk

func WithStorageSnapshotInterval

func WithStorageSnapshotInterval(interval time.Duration) ProcessorOption

WithStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all) Greater interval -> less writes to disk, more memory usage Smaller interval -> more writes to disk, less memory usage

func WithTopicManager

func WithTopicManager(tm kafka.TopicManager) ProcessorOption

WithTopicManager defines a topic manager.

func WithUpdateCallback

func WithUpdateCallback(cb UpdateCallback) ProcessorOption

WithUpdateCallback defines the callback called upon recovering a message from the log.

type StorageBuilder

type StorageBuilder func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error)

StorageBuilder creates a local storage (a persistent cache) for a topic table. StorageBuilder creates one storage for each partition of the topic.

type Stream

type Stream string

type Streams

type Streams []Stream

type Table

type Table string

func GroupTable

func GroupTable(group Group) Table

GroupTable returns the name of the group table of group.

type Tester

type Tester interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(a ...interface{})
}

Tester abstracts the interface we assume from the test case. Will most likely be *testing.T

type UpdateCallback

type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error

UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.

type View

type View struct {
	// contains filtered or unexported fields
}

View is a materialized (i.e. persistent) cache of a group table.

Example (Simple)
var (
	brokers       = []string{"localhost:9092"}
	group   Group = "group-name"
)
sr, err := NewView(brokers, GroupTable(group), nil)
if err != nil {
	panic(err)
}
errs := sr.Start()
if errs != nil {
	panic(errs)
}
Output:

func NewView

func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error)

NewView creates a new View object from a group.

func (*View) Get

func (v *View) Get(key string) (interface{}, error)

Get returns the value for the key in the view, if exists. Nil if it doesn't.

func (*View) Has

func (v *View) Has(key string) (bool, error)

Has checks whether a value for passed key exists in the view.

func (*View) Ready

func (v *View) Ready() bool

Ready returns true when the view has caught up with events from kafka.

func (*View) Registry

func (v *View) Registry() metrics.Registry

Registry returns the go-metrics registry used by the view.

func (*View) Start

func (v *View) Start() error

Start starts consuming the view's topic.

func (*View) Stop

func (v *View) Stop()

Stop stops the view, frees any resources + connections to kafka

func (*View) Topic

func (v *View) Topic() string

Topic returns the view's topic

type ViewOption

type ViewOption func(*voptions)

ViewOption defines a configuration option to be used when creating a view.

func WithViewCallback

func WithViewCallback(cb UpdateCallback) ViewOption

WithViewCallback defines the callback called upon recovering a message from the log.

func WithViewConsumer

func WithViewConsumer(c kafka.Consumer) ViewOption

WithViewConsumer replaces goka's default view consumer. Mainly for testing.

func WithViewKafkaMetrics

func WithViewKafkaMetrics(registry metrics.Registry) ViewOption

WithViewKafkaMetrics sets a go-metrics registry to collect kafka metrics. The metric-points are https://godoc.org/github.com/Shopify/sarama

func WithViewPartitionChannelSize

func WithViewPartitionChannelSize(size int) ViewOption

WithViewPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithViewStorageBuilder

func WithViewStorageBuilder(sb StorageBuilder) ViewOption

WithViewStorageBuilder defines a builder for the storage of each partition.

func WithViewStoragePath

func WithViewStoragePath(storagePath string) ViewOption

WithViewStoragePath defines the base path for the local storage on disk

func WithViewStorageSnapshotInterval

func WithViewStorageSnapshotInterval(interval time.Duration) ViewOption

WithViewStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all) Greater interval -> less writes to disk, more memory usage Smaller interval -> more writes to disk, less memory usage

func WithViewTopicManager

func WithViewTopicManager(tm kafka.TopicManager) ViewOption

WithViewTopicManager defines a topic manager.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL