kasper: github.com/movio/kasper Index | Files | Directories

package kasper

import "github.com/movio/kasper"

Kasper is a lightweight library for processing Kafka topics. It is heavily inspired by Apache Samza (See http://samza.apache.org). Kasper processes Kafka messages in small batches and is designed to work with centralized key-value stores such as Redis, Cassandra or Elasticsearch for maintaining state during processing. Kasper is a good fit for high-throughput applications (> 10k messages per second) that can tolerate a moderate amount of processing latency (~1000ms). Please note that Kasper is designed for idempotent processing of at-least-once semantics streams. If you require exactly-once semantics or need to perform non-idempotent operations, Kasper is likely not a good choice.

Step 1 - Create a sarama Client

Kasper uses Shopify's excellent sarama library (see https://github.com/Shopify/sarama) for consuming and producing messages to Kafka. All Kasper application must begin with instantiating a sarama Client. Choose the parameters in sarama.Config carefully; the performance, reliability, and correctness of your application are all highly sensitive to these settings. We recommend setting sarama.Config.Producer.RequiredAcks to WaitForAll.

saramaConfig := sarama.NewConfig()
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
client, err := sarama.NewClient([]string{"kafka-broker.local:9092"}, saramaConfig)

Step 2 - create a Config

TopicProcessorName is used for logging, labeling metrics, and is used as a suffix to the Kafka consumer group. InputTopics and InputPartitions are the lists of topics and partitions to consume. Please note that Kasper currently does not support consuming topics with differing numbers of partitions. This limitation can be alleviated by manually adding an extra fan-out step in your processing pipeline to a new topic with the desired number of partitions.

config := &kasper.Config{
	TopicProcessorName:    "twitter-reach",
	Client:                client,
	InputTopics:           []string{"tweets", "twitter-followers"},
	InputPartitions:       []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
	BatchSize: 	       10000,
	BatchWaitDuration:     5 * time.Second,
	Logger: 	       kasper.NewJSONLogger("twitter-reach-0", false),
	MetricsProvider:       kasper.NewPrometheus("twitter-reach-0"),
	MetricsUpdateInterval: 60 * time.Second,
}

Kasper is instrumented with a number of useful metrics so we recommend setting MetricsProvider for production applications. Kasper includes an implementation for collecting metrics in Prometheus and adapting the interface to other tools should be easy.

Step 3 - Create a MessageProcessor per input partition

You need to create a map[int]MessageProcessor. The MessageProcessor instances can safely be shared across partitions. Each MessageProcessor must implement a single function:

func (*TweetProcessor) Process(messages []*sarama.ConsumerMessage, sender Sender) error {
	// process messages here
}

All messages for the input topics on the specified partition will be passed to the appropriate MessageProcessor instance. This is useful for implementing partition-wise joins of different topics. The Sender instance must be used to produce messages to output topics. Messages passed to Sender are not sent directly but are collected in an array instead. When Process returns, the messages are sent to Kafka and Kasper waits for the configured number of acks. When all messages have been successfully produced, Kasper updates the consumer offsets of the input partitions and resumes processing. If Process returns a non-nil error value, Kasper stops all processing.

Step 4 - Create a TopicProcessor

To start processing messages, call TopicProcessor.RunLoop(). Kasper does not spawn any goroutines and runs a single-threaded event loop instead. RunLoop() will block the current goroutine and will run forever until an error occurs or until Close() is called. For parallel processing, run multiple TopicProcessor instances in different goroutines or processes (the input partitions cannot overlap). You should set Config.TopicProcessorName to the same value on all instances in order to easily scale the processing up or down.

Index

Package Files

config.go elasticsearch.go logger.go map.go metrics.go multi_elasticsearch.go multi_map.go multi_redis.go noop_metrics.go partition_processor.go prometheus.go redis.go sender.go store.go topic_processor.go

type Config Uses

type Config struct {
    // Used for logging, metrics, and Kafka consumer group
    TopicProcessorName string
    // Used for consuming and producing messages
    Client sarama.Client
    // Input topics (all topics need to have the same number of partitions)
    InputTopics []string
    // Input partitions (cannot overlap between TopicProcessor instances)
    InputPartitions []int
    // Maximum number of messages processed in one go
    BatchSize int
    // Maximum amount of time spent waiting for a batch to be filled
    BatchWaitDuration time.Duration
    // Use NewBasicLogger() or any other Logger
    Logger Logger
    // Use NewPrometheus() or any other MetricsProvider
    MetricsProvider MetricsProvider
    // 15 seconds is a sensible value
    MetricsUpdateInterval time.Duration
}

Config contains the configuration settings for a TopicProcessor.

type Counter Uses

type Counter interface {
    Inc(labelValues ...string)
    Add(value float64, labelValues ...string)
}

Counter is a single float metric that can be incremented by one or added to.

type Elasticsearch Uses

type Elasticsearch struct {
    // contains filtered or unexported fields
}

Elasticsearch is an implementation of Store that uses Elasticsearch. Each instance provides key-value access to a given index and a given document type. This implementation supports Elasticsearch 5.x and uses Oliver Eilhard's Go Elasticsearch client. See https://github.com/olivere/elastic

func NewElasticsearch Uses

func NewElasticsearch(config *Config, client *elastic.Client, indexName, typeName string) *Elasticsearch

NewElasticsearch creates Elasticsearch instances. All documents read and written will correspond to the URL:

https://{cluster}:9092/{indexName}/{typeName}/{key}

func (*Elasticsearch) Delete Uses

func (s *Elasticsearch) Delete(key string) error

Delete removes a document from the store. It does not return an error if the document was not present. It is implemented using the Elasticsearch Delete API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html

func (*Elasticsearch) Flush Uses

func (s *Elasticsearch) Flush() error

Flush flushes the Elasticsearch translog to disk. It is implemented using the Elasticsearch Flush API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html

func (*Elasticsearch) Get Uses

func (s *Elasticsearch) Get(key string) ([]byte, error)

Get gets a document by key (i.e. the Elasticsearch _id). It is implemented by using the Elasticsearch Get API. The returned byte slice contains the UTF8-encoded JSON document (i.e., _source). This function returns (nil, nil) if the document does not exist. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html

func (*Elasticsearch) GetAll Uses

func (s *Elasticsearch) GetAll(keys []string) (map[string][]byte, error)

GetAll gets multiple document from the store. It is implemented using the Elasticsearch MultiGet API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html

func (*Elasticsearch) GetClient Uses

func (s *Elasticsearch) GetClient() *elastic.Client

GetClient returns the underlying elastic.Client

func (*Elasticsearch) Put Uses

func (s *Elasticsearch) Put(key string, value []byte) error

Put inserts or updates a document in the store (key is used as the document _id). It is implemented using the Elasticsearch Index API. The value byte slice must contain the UTF8-encoded JSON document (i.e., _source). See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html

func (*Elasticsearch) PutAll Uses

func (s *Elasticsearch) PutAll(kvs map[string][]byte) error

PutAll inserts or updates a number of documents in the store. It is implemented using the Elasticsearch Bulk and Index APIs. It returns an error if any operation fails. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

type ElasticsearchTenancy Uses

type ElasticsearchTenancy interface {
    TenantIndexAndType(tenant string) (indexName, typeName string)
}

ElasticsearchTenancy defines how tenanted keys are mapped to an index and type. Here is a simple example:

type CustomerTenancy struct{}
func (CustomerTenancy) TenantIndexAndType(tenant string) (indexName, typeName string) {
	indexName = fmt.Sprintf("sales-service~%s", tenant)
	typeName = "customer"
	return
}

type Gauge Uses

type Gauge interface {
    Set(value float64, labelValues ...string)
}

Gauge is a single float metric that can be set to a specific value.

type Logger Uses

type Logger interface {
    Debug(...interface{})
    Debugf(string, ...interface{})

    Info(...interface{})
    Infof(string, ...interface{})

    Error(...interface{})
    Errorf(string, ...interface{})

    Panic(...interface{})
    Panicf(string, ...interface{})
}

Logger is a logging interface for Kasper.

func NewBasicLogger Uses

func NewBasicLogger(debug bool) Logger

NewBasicLogger uses the Go standard library logger. See https://golang.org/pkg/log/

func NewJSONLogger Uses

func NewJSONLogger(label string, debug bool) Logger

NewJSONLogger uses the logrus JSON formatter. See https://github.com/sirupsen/logrus

func NewTextLogger Uses

func NewTextLogger(label string, debug bool) Logger

NewTextLogger uses the logrus text formatter. See https://github.com/sirupsen/logrus

type Map Uses

type Map struct {
    // contains filtered or unexported fields
}

Map wraps a map[string][]byte value and implements the Store interface.

func NewMap Uses

func NewMap(size int) *Map

NewMap creates a new map of the given size.

func (*Map) Delete Uses

func (s *Map) Delete(key string) error

Delete removes a single value by key. Does not return an error if the key is not present.

func (*Map) Flush Uses

func (s *Map) Flush() error

Flush does nothing.

func (*Map) Get Uses

func (s *Map) Get(key string) ([]byte, error)

Get gets a value by key. Returns (nil, nil) if the key is not present.

func (*Map) GetAll Uses

func (s *Map) GetAll(keys []string) (map[string][]byte, error)

GetAll returns multiple values by key. The returned map does not contain entries for missing documents.

func (*Map) GetMap Uses

func (s *Map) GetMap() map[string][]byte

GetMap returns the underlying map.

func (*Map) Put Uses

func (s *Map) Put(key string, value []byte) error

Put inserts or updates a value by key.

func (*Map) PutAll Uses

func (s *Map) PutAll(kvs map[string][]byte) error

PutAll inserts or updates multiple key-value pairs.

type MessageProcessor Uses

type MessageProcessor interface {
    // Process receives a slice of incoming Kafka messages and a Sender to send messages to output topics.
    // References to the byte slice or Sender interface cannot be held between calls.
    // If Process returns a non-nil error value, Kasper stops all processing.
    // This error value is then returned by TopicProcessor.RunLoop().
    Process([]*sarama.ConsumerMessage, Sender) error
}

MessageProcessor is the interface that encapsulates application business logic. It receives all messages of a single partition of the TopicProcessor's input topics.

type MetricsProvider Uses

type MetricsProvider interface {
    NewCounter(name string, help string, labelNames ...string) Counter
    NewGauge(name string, help string, labelNames ...string) Gauge
    NewSummary(name string, help string, labelNames ...string) Summary
}

MetricsProvider is a facility to create metrics instances.

type MultiElasticsearch Uses

type MultiElasticsearch struct {
    // contains filtered or unexported fields
}

MultiElasticsearch is an implementation of MultiStore that uses Elasticsearch. Each instance provides key-value access to a subset of the Elasticsearch documents defined by a tenancy instance (see ElasticsearchTenancy). This implementation supports Elasticsearch 5.x

func NewMultiElasticsearch Uses

func NewMultiElasticsearch(config *Config, client *elastic.Client, tenancy ElasticsearchTenancy) *MultiElasticsearch

NewMultiElasticsearch creates MultiElasticsearch instances. All documents read and written will correspond to the URL:

https://{cluster}:9092/{indexName}/{typeName}/{key}

where indexName and typeName depend on the tenant and the tenancy instance.

func (*MultiElasticsearch) AllTenants Uses

func (s *MultiElasticsearch) AllTenants() []string

AllTenants returns the list of tenants known to this instance.

func (*MultiElasticsearch) Fetch Uses

func (s *MultiElasticsearch) Fetch(keys []TenantKey) (*MultiMap, error)

Fetch performs a single MultiGet operation on the Elasticsearch cluster across multiple tenants (i.e. indexes).

func (*MultiElasticsearch) Push Uses

func (s *MultiElasticsearch) Push(m *MultiMap) error

Push performs a single Bulk index request with all documents provided. It returns an error if any operation fails.

func (*MultiElasticsearch) Tenant Uses

func (s *MultiElasticsearch) Tenant(tenant string) Store

Tenant returns an Elasticsearch Store for the given tenant. Created instances are cached on future invocations.

type MultiMap Uses

type MultiMap struct {
    // contains filtered or unexported fields
}

MultiMap is a multitenanted version of Map that implements the MultiStore interface.

func NewMultiMap Uses

func NewMultiMap(size int) *MultiMap

NewMultiMap creates new MultiMap. Each underlying Map instance is initialized to the given size.

func (*MultiMap) AllTenants Uses

func (mtkv *MultiMap) AllTenants() []string

AllTenants returns the list of tenants known to this instance.

func (*MultiMap) Fetch Uses

func (mtkv *MultiMap) Fetch(tenantKeys []TenantKey) (*MultiMap, error)

Fetch reads multiple values from the MultiStore.

func (*MultiMap) Push Uses

func (mtkv *MultiMap) Push(store *MultiMap) error

Push inserts or updates multiple values in the MultiStore.

func (*MultiMap) Tenant Uses

func (mtkv *MultiMap) Tenant(tenant string) Store

Tenant returns a Map for the given tenant. Created instances are cached on future invocations.

type MultiRedis Uses

type MultiRedis struct {
    // contains filtered or unexported fields
}

MultiRedis is an implementation of MultiStore that uses Redis. Each instance provides multitenant key-value access to keys of the form {tenant}/{keyPrefix}/{key}. This implementation uses Gary Burd's Go Redis client. See https://github.com/garyburd/redigo

func NewMultiRedis Uses

func NewMultiRedis(config *Config, conn redis.Conn, keyPrefix string) *MultiRedis

NewMultiRedis creates MultiRedis instances. All keys read and written will be of the form:

{tenant}/{keyPrefix}/{key}

func (*MultiRedis) AllTenants Uses

func (s *MultiRedis) AllTenants() []string

AllTenants returns the list of tenants known to this instance.

func (*MultiRedis) Fetch Uses

func (s *MultiRedis) Fetch(keys []TenantKey) (*MultiMap, error)

Fetch performs a single MULTI GET Redis command across multiple tenants.

func (*MultiRedis) Push Uses

func (s *MultiRedis) Push(entries *MultiMap) error

Fetch performs a single MULTI SET Redis command across multiple tenants.

func (*MultiRedis) Tenant Uses

func (s *MultiRedis) Tenant(tenant string) Store

Tenant returns an Redis Store for the given tenant. Created instances are cached on future invocations.

type MultiStore Uses

type MultiStore interface {
    // Tenant returns the underlying store for a tenant.
    Tenant(tenant string) Store
    // AllTenants returns the list of known tenants to this instance.
    AllTenants() []string
    // Fetch is a multitenant version of GetAll.
    Fetch(keys []TenantKey) (*MultiMap, error)
    // Push is a multitenant version of PutAll
    Push(store *MultiMap) error
}

MultiStore is a multitenant version of Store. Tenants are represented as strings. Each tenant has an underlying Store.

type NoopMetricsProvider Uses

type NoopMetricsProvider struct{}

NoopMetricsProvider is a dummy implementation of MetricsProvider that does nothing. Useful for testing, not recommended in production.

func (*NoopMetricsProvider) NewCounter Uses

func (m *NoopMetricsProvider) NewCounter(name string, help string, labelNames ...string) Counter

NewCounter creates a new no-op Counter

func (*NoopMetricsProvider) NewGauge Uses

func (m *NoopMetricsProvider) NewGauge(name string, help string, labelNames ...string) Gauge

NewGauge creates a new no-op Gauge

func (*NoopMetricsProvider) NewSummary Uses

func (m *NoopMetricsProvider) NewSummary(name string, help string, labelNames ...string) Summary

NewSummary creates a new no-op Summary

type Prometheus Uses

type Prometheus struct {
    Registry *prometheus.Registry
    // contains filtered or unexported fields
}

Prometheus is an implementation of MetricsProvider that uses Prometheus. See https://github.com/prometheus/client_golang

func NewPrometheus Uses

func NewPrometheus(label string) *Prometheus

NewPrometheus creates new Prometheus instance.

func (*Prometheus) NewCounter Uses

func (provider *Prometheus) NewCounter(name string, help string, labelNames ...string) Counter

NewCounter creates a new prometheus CounterVec

func (*Prometheus) NewGauge Uses

func (provider *Prometheus) NewGauge(name string, help string, labelNames ...string) Gauge

NewGauge creates a new prometheus GaugeVec

func (*Prometheus) NewSummary Uses

func (provider *Prometheus) NewSummary(name string, help string, labelNames ...string) Summary

NewSummary creates a new prometheus SummaryVec

type Redis Uses

type Redis struct {
    // contains filtered or unexported fields
}

Redis is an implementation of Store that uses Redis. Each instance provides key-value access to keys with a specific prefix. This implementation uses Gary Burd's Go Redis client. See https://github.com/garyburd/redigo

func NewRedis Uses

func NewRedis(config *Config, conn redis.Conn, keyPrefix string) *Redis

NewRedis creates Redis instances. All keys read and written in Redis are of the form:

{keyPrefix}/{key}

func (*Redis) Delete Uses

func (s *Redis) Delete(key string) error

Delete deletes a value by key. It is implemented using the Redis DEL command. See https://redis.io/commands/del

func (*Redis) Flush Uses

func (s *Redis) Flush() error

Flush executes the SAVE command. See https://redis.io/commands/save

func (*Redis) Get Uses

func (s *Redis) Get(key string) ([]byte, error)

Get gets a value by key. Returns nil, nil if the key is missing. It is implemented using the Redis GET command. See https://redis.io/commands/get

func (*Redis) GetAll Uses

func (s *Redis) GetAll(keys []string) (map[string][]byte, error)

GetAll gets multiple values by key. It is implemented by using the MULTI and GET commands. See https://redis.io/commands/multi

func (*Redis) Put Uses

func (s *Redis) Put(key string, value []byte) error

Puts inserts or updates a value by key. It is implemented using the Redis SET command. See https://redis.io/commands/set

func (*Redis) PutAll Uses

func (s *Redis) PutAll(entries map[string][]byte) error

PutAll inserts or updates multiple values by key. It is implemented by using the MULTI and SET commands. See https://redis.io/commands/multi

type Sender Uses

type Sender interface {

    // Send appends a message to a slice held by the sender instance.
    // These messages are sent in bulk when Process() returns or when Flush() is called.
    Send(msg *sarama.ProducerMessage)

    // Flush immediately sends all messages held in the sender slice in bulk, and empties the slice. See Send() above.
    Flush() error
}

Sender instances are given to MessageProcessor.Process to send messages to Kafka topics. Messages passed to Sender are not sent directly but are collected in an array instead. When Process returns, the messages are sent to Kafka and Kasper waits for the configured number of acks. When all messages have been successfully produced, Kasper updates the consumer offsets of the input partitions and resumes processing.

type Store Uses

type Store interface {
    // Get gets a value by key.
    Get(key string) ([]byte, error)
    // GetAll gets multiple values by key.
    GetAll(keys []string) (map[string][]byte, error)
    // Put insert or update a value by key.
    Put(key string, value []byte) error
    // PutAll inserts or updates multiple key-value pairs.
    PutAll(map[string][]byte) error
    // Delete deletes a key from the store.
    Delete(key string) error
    // Flush indicates that the underlying storage must be made persistent.
    Flush() error
}

Store is a universal interface for a key-value store. Keys are strings, and values are byte slices.

type Summary Uses

type Summary interface {
    Observe(value float64, labelValues ...string)
}

Summary is a float value metric that provides a history of observations.

type TenantKey Uses

type TenantKey struct {
    Tenant string
    Key    string
}

TenantKey is a pair of tenant and key. Used by MultiStore.GetAll

type TopicProcessor Uses

type TopicProcessor struct {
    // contains filtered or unexported fields
}

TopicProcessor is the main entity in Kasper. It implements a single-threaded processing loop for a set of topics and partitions.

func NewTopicProcessor Uses

func NewTopicProcessor(config *Config, messageProcessors map[int]MessageProcessor) *TopicProcessor

NewTopicProcessor creates a new instance of TopicProcessor. For parallel processing, run multiple TopicProcessor instances in different goroutines or processes (the input partitions cannot overlap). You should set Config.TopicProcessorName to the same value on all instances in order to easily scale the processing up or down.

func (*TopicProcessor) Close Uses

func (tp *TopicProcessor) Close()

Close safely shuts down the TopicProcessor, which makes RunLoop() return.

func (*TopicProcessor) HasConsumedAllMessages Uses

func (tp *TopicProcessor) HasConsumedAllMessages() bool

HasConsumedAllMessages returns true when all input topics have been entirely consumed. Kasper checks all high water marks and offsets for all topics before returning.

func (*TopicProcessor) RunLoop Uses

func (tp *TopicProcessor) RunLoop() error

RunLoop is the main processing loop of Kasper. It does not spawn any goroutines and runs a single-threaded event loop instead. RunLoop will block the current goroutine and will run forever until an error occurs or until Close() is called. RunLoop propagates the error returned by MessageProcessor.Process if not nil.

Directories

PathSynopsis
ci
examples

Package kasper imports 15 packages (graph) and is imported by 1 packages. Updated 2017-07-24. Refresh now. Tools for package owners.