goka: github.com/lovoo/goka Index | Examples | Files | Directories

package goka

import "github.com/lovoo/goka"

Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.

Processors

A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.

In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.

Views

A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.

Package goka is a generated GoMock package.

Package goka is a generated GoMock package.

Package goka is a generated GoMock package.

Index

Examples

Package Files

assignment.go broker.go builders.go codec.go config.go context.go copartition_strategy.go doc.go emitter.go errors.go graph.go iterator.go mockautoconsumers.go mockbuilder.go mockcontroller.go mocks.go mockssarama.go mockstorage.go once.go options.go partition_processor.go partition_table.go processor.go producer.go promise.go proxy.go signal.go simple_backoff.go stats.go topic_manager.go view.go

Variables

var CopartitioningStrategy = new(copartitioningStrategy)

CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning when consuming multiple input topics with multiple processor instances

var (
    // ErrEmitterAlreadyClosed is returned when Emit is called after the emitter has been finished.
    ErrEmitterAlreadyClosed error = errors.New("emitter already closed")
)

func DefaultConfig Uses

func DefaultConfig() *sarama.Config

DefaultConfig creates a new config used by goka per default Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify goka's global config

func DefaultConsumerGroupBuilder Uses

func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.

func DefaultHasher Uses

func DefaultHasher() func() hash.Hash32

DefaultHasher returns an FNV hasher builder to assign keys to partitions.

func DefaultProcessorStoragePath Uses

func DefaultProcessorStoragePath(group Group) string

DefaultProcessorStoragePath is the default path where processor state will be stored.

func DefaultRebalance Uses

func DefaultRebalance(a Assignment)

DefaultRebalance is the default callback when a new partition assignment is received. DefaultRebalance can be used in the function passed to WithRebalanceCallback.

func DefaultSaramaConsumerBuilder Uses

func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error)

DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.

func DefaultUpdate Uses

func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error

DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.

func DefaultViewStoragePath Uses

func DefaultViewStoragePath() string

DefaultViewStoragePath returns the default path where view state will be stored.

func NewMockController Uses

func NewMockController(t gomock.TestReporter) *gomock.Controller

NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.

func ReplaceGlobalConfig Uses

func ReplaceGlobalConfig(config *sarama.Config)

ReplaceGlobalConfig registeres a standard config used during building if no other config is specified

type Assignment Uses

type Assignment map[int32]int64

Assignment represents a partition:offset assignment for the current connection

type Backoff Uses

type Backoff interface {
    Duration() time.Duration
    Reset()
}

Backoff is used for adding backoff capabilities to the restarting of failing partition tables.

func DefaultBackoffBuilder Uses

func DefaultBackoffBuilder() (Backoff, error)

DefaultBackoffBuilder returnes a simpleBackoff with 10 second steps

func NewSimpleBackoff Uses

func NewSimpleBackoff(step time.Duration) Backoff

NewSimpleBackoff returns a simple backoff waiting the specified duration longer each iteration until reset.

type BackoffBuilder Uses

type BackoffBuilder func() (Backoff, error)

BackoffBuilder creates a backoff

type Broker Uses

type Broker interface {
    Addr() string
    Connected() (bool, error)
    CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
    Open(conf *sarama.Config) error
}

Broker is an interface for the sarama broker

type Codec Uses

type Codec interface {
    Encode(value interface{}) (data []byte, err error)
    Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type ConsumerGroupBuilder Uses

type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

ConsumerGroupBuilder creates a `sarama.ConsumerGroup`

func ConsumerGroupBuilderWithConfig Uses

func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder

ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config

type Context Uses

type Context interface {
    // Topic returns the topic of input message.
    Topic() Stream

    // Key returns the key of the input message.
    Key() string

    // Partition returns the partition of the input message.
    Partition() int32

    // Offset returns the offset of the input message.
    Offset() int64

    // Value returns the value of the key in the group table.
    //
    // This method might panic to initiate an immediate shutdown of the processor
    // to maintain data integrity. Do not recover from that panic or
    // the processor might deadlock.
    Value() interface{}

    // Headers returns the headers of the input message
    Headers() map[string][]byte

    // SetValue updates the value of the key in the group table.
    // It stores the value in the local cache and sends the
    // update to the Kafka topic representing the group table.
    //
    // This method might panic to initiate an immediate shutdown of the processor
    // to maintain data integrity. Do not recover from that panic or
    // the processor might deadlock.
    SetValue(value interface{})

    // Delete deletes a value from the group table. IMPORTANT: this deletes the
    // value associated with the key from both the local cache and the persisted
    // table in Kafka.
    //
    // This method might panic to initiate an immediate shutdown of the processor
    // to maintain data integrity. Do not recover from that panic or
    // the processor might deadlock.
    Delete()

    // Timestamp returns the timestamp of the input message. If the timestamp is
    // invalid, a zero time will be returned.
    Timestamp() time.Time

    // Join returns the value of key in the copartitioned table.
    //
    // This method might panic to initiate an immediate shutdown of the processor
    // to maintain data integrity. Do not recover from that panic or
    // the processor might deadlock.
    Join(topic Table) interface{}

    // Lookup returns the value of key in the view of table.
    //
    // This method might panic to initiate an immediate shutdown of the processor
    // to maintain data integrity. Do not recover from that panic or
    // the processor might deadlock.
    Lookup(topic Table, key string) interface{}

    // Emit asynchronously writes a message into a topic.
    //
    // This method might panic to initiate an immediate shutdown of the processor
    // to maintain data integrity. Do not recover from that panic or
    // the processor might deadlock.
    Emit(topic Stream, key string, value interface{})

    // Loopback asynchronously sends a message to another key of the group
    // table. Value passed to loopback is encoded via the codec given in the
    // Loop subscription.
    //
    // This method might panic to initiate an immediate shutdown of the processor
    // to maintain data integrity. Do not recover from that panic or
    // the processor might deadlock.
    Loopback(key string, value interface{})

    // Fail stops execution and shuts down the processor
    // The callback is stopped immediately by panicking. Do not recover from that panic or
    // the processor might deadlock.
    Fail(err error)

    // Context returns the underlying context used to start the processor or a
    // subcontext.
    Context() context.Context
}

Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message. The context is only valid within the callback, do not store it or pass it to other goroutines.

Error handling

Most methods of the context can fail due to different reasons, which are handled in different ways: Synchronous errors like * wrong codec for topic (a message cannot be marshalled or unmarshalled) * Emit to a topic without the Output definition in the group graph * Value/SetValue without defining Persist in the group graph * Join/Lookup without the definition in the group graph etc.. will result in a panic to stop the callback immediately and shutdown the processor. This is necessary to preserve integrity of the processor and avoid further actions. Do not recover from that panic, otherwise the goroutine will deadlock.

Retrying synchronous errors must be implemented by restarting the processor. If errors must be tolerated (which is not advisable because they're usually persistent), provide fail-tolerant versions of the producer, storage or codec as needed.

Asynchronous errors can occur when the callback has been finished, but e.g. sending a batched message to kafka fails due to connection errors or leader election in the cluster. Those errors still shutdown the processor but will not result in a panic in the callback.

type Edge Uses

type Edge interface {
    String() string
    Topic() string
    Codec() Codec
}

Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.

func Input Uses

func Input(topic Stream, c Codec, cb ProcessCallback) Edge

Input represents an edge of an input stream topic. The edge specifies the topic name, its codec and the ProcessorCallback used to process it. The topic has to be copartitioned with any other input stream of the group and with the group table. The group starts reading the topic from the newest offset.

func Inputs Uses

func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge

Inputs creates edges of multiple input streams sharing the same codec and callback.

func Join Uses

func Join(topic Table, c Codec) Edge

Join represents an edge of a copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until all partitions of the table are recovered.

func Lookup Uses

func Lookup(topic Table, c Codec) Edge

Lookup represents an edge of a non-copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until the table is fully recovered.

func Loop Uses

func Loop(c Codec, cb ProcessCallback) Edge

Loop represents the edge of the loopback topic of the group. The edge specifies the codec of the messages in the topic and ProcesCallback to process the messages of the topic. Context.Loopback() is used to write messages into this topic from any callback of the group.

func Output Uses

func Output(topic Stream, c Codec) Edge

Output represents an edge of an output stream topic. The edge specifies the topic name and the codec of the messages of the topic. Context.Emit() only emits messages into Output edges defined in the group graph. The topic does not have to be copartitioned with the input streams.

func Persist Uses

func Persist(c Codec) Edge

Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams. Without Persist, calls to ctx.Value or ctx.SetValue in the consume callback will fail and lead to shutdown of the processor.

This edge specifies the codec of the messages in the topic, ie, the codec of the values of the table. The processing of input streams is blocked until all partitions of the group table are recovered.

The topic name is derived from the group name by appending "-table".

type Edges Uses

type Edges []Edge

Edges is a slice of edge objects.

func (Edges) Topics Uses

func (e Edges) Topics() []string

Topics returns the names of the topics of the edges.

type Emitter Uses

type Emitter struct {
    // contains filtered or unexported fields
}

Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.

func NewEmitter Uses

func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)

NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.

func (*Emitter) Emit Uses

func (e *Emitter) Emit(key string, msg interface{}) (*Promise, error)

Emit sends a message for passed key using the emitter's codec.

func (*Emitter) EmitSync Uses

func (e *Emitter) EmitSync(key string, msg interface{}) error

EmitSync sends a message to passed topic and key.

func (*Emitter) Finish Uses

func (e *Emitter) Finish() error

Finish waits until the emitter is finished producing all pending messages.

type EmitterOption Uses

type EmitterOption func(*eoptions, Stream, Codec)

EmitterOption defines a configuration option to be used when creating an emitter.

func WithEmitterClientID Uses

func WithEmitterClientID(clientID string) EmitterOption

WithEmitterClientID defines the client ID used to identify with kafka.

func WithEmitterHasher Uses

func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption

WithEmitterHasher sets the hash function that assigns keys to partitions.

func WithEmitterLogger Uses

func WithEmitterLogger(log logger.Logger) EmitterOption

WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.

func WithEmitterProducerBuilder Uses

func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption

WithEmitterProducerBuilder replaces the default producer builder.

func WithEmitterTester Uses

func WithEmitterTester(t Tester) EmitterOption

WithEmitterTester configures the emitter to use passed tester. This is used for component tests

func WithEmitterTopicManagerBuilder Uses

func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption

WithEmitterTopicManagerBuilder replaces the default topic manager builder.

type Getter Uses

type Getter func(string) (interface{}, error)

Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.

type Group Uses

type Group string

Group is the name of a consumer group in Kafka and represents a processor group in Goka. A processor group may have a group table and a group loopback stream. By default, the group table is named <group>-table and the loopback stream <group>-loop.

type GroupGraph Uses

type GroupGraph struct {
    // contains filtered or unexported fields
}

GroupGraph is the specification of a processor group. It contains all input, output, and any other topic from which and into which the processor group may consume or produce events. Each of these links to Kafka is called Edge.

func DefineGroup Uses

func DefineGroup(group Group, edges ...Edge) *GroupGraph

DefineGroup creates a group graph with a given group name and a list of edges.

func (*GroupGraph) Group Uses

func (gg *GroupGraph) Group() Group

Group returns the group name.

func (*GroupGraph) GroupTable Uses

func (gg *GroupGraph) GroupTable() Edge

GroupTable returns the group table edge of the group.

func (*GroupGraph) InputStreams Uses

func (gg *GroupGraph) InputStreams() Edges

InputStreams returns all input stream edges of the group.

func (*GroupGraph) JointTables Uses

func (gg *GroupGraph) JointTables() Edges

JointTables retuns all joint table edges of the group.

func (*GroupGraph) LookupTables Uses

func (gg *GroupGraph) LookupTables() Edges

LookupTables retuns all lookup table edges of the group.

func (*GroupGraph) LoopStream Uses

func (gg *GroupGraph) LoopStream() Edge

LoopStream returns the loopback edge of the group.

func (*GroupGraph) OutputStreams Uses

func (gg *GroupGraph) OutputStreams() Edges

OutputStreams returns the output stream edges of the group.

func (*GroupGraph) Validate Uses

func (gg *GroupGraph) Validate() error

Validate validates the group graph and returns an error if invalid. Main validation checks are: - at most one loopback stream edge is allowed - at most one group table edge is allowed - at least one input stream is required - table and loopback topics cannot be used in any other edge.

type InputStats Uses

type InputStats struct {
    Count      uint
    Bytes      int
    OffsetLag  int64
    LastOffset int64
    Delay      time.Duration
}

InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.

type Iterator Uses

type Iterator interface {
    // Next advances the iterator to the next KV-pair. Err should be called
    // after Next returns false to check whether the iteration finished
    // from exhaustion or was aborted due to an error.
    Next() bool
    // Err returns the error that stopped the iteration if any.
    Err() error
    // Return the key of the current item
    Key() string
    // Return the value of the current item
    // This value is already decoded with the view's codec (or nil, if it's nil)
    Value() (interface{}, error)
    // Release the iterator. After release, the iterator is not usable anymore
    Release()
    // Seek moves the iterator to the begining of a key-value pair sequence that
    // is greater or equal to the given key. It returns whether at least one of
    // such key-value pairs exist. Next must be called after seeking to access
    // the first pair.
    Seek(key string) bool
}

Iterator allows one to iterate over the keys of a view.

type MockAutoConsumer Uses

type MockAutoConsumer struct {
    // contains filtered or unexported fields
}

MockAutoConsumer implements sarama's Consumer interface for testing purposes. Before you can start consuming from this consumer, you have to register topic/partitions using ExpectConsumePartition, and set expectations on them.

func NewMockAutoConsumer Uses

func NewMockAutoConsumer(t *testing.T, config *sarama.Config) *MockAutoConsumer

NewMockAutoConsumer returns a new mock Consumer instance. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument can be set to nil.

func (*MockAutoConsumer) Close Uses

func (c *MockAutoConsumer) Close() error

Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.

func (*MockAutoConsumer) ConsumePartition Uses

func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.

func (*MockAutoConsumer) ExpectConsumePartition Uses

func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer

ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition. If that doesn't happen, an error will be written to the error reporter once the mock consumer is closed. It will also expect that the

func (*MockAutoConsumer) HighWaterMarks Uses

func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks returns a map of high watermarks for each topic/partition

func (*MockAutoConsumer) Partitions Uses

func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error)

Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata

func (*MockAutoConsumer) SetTopicMetadata Uses

func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32)

SetTopicMetadata sets the clusters topic/partition metadata, which will be returned by Topics() and Partitions().

func (*MockAutoConsumer) Topics Uses

func (c *MockAutoConsumer) Topics() ([]string, error)

Topics returns a list of topics, as registered with SetTopicMetadata

type MockAutoPartitionConsumer Uses

type MockAutoPartitionConsumer struct {
    // contains filtered or unexported fields
}

MockAutoPartitionConsumer implements sarama's PartitionConsumer interface for testing purposes. It is returned by the mock Consumers ConsumePartitionMethod, but only if it is registered first using the Consumer's ExpectConsumePartition method. Before consuming the Errors and Messages channel, you should specify what values will be provided on these channels using YieldMessage and YieldError.

func (*MockAutoPartitionConsumer) AsyncClose Uses

func (pc *MockAutoPartitionConsumer) AsyncClose()

AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) Close Uses

func (pc *MockAutoPartitionConsumer) Close() error

Close implements the Close method from the sarama.PartitionConsumer interface. It will verify whether the partition consumer was actually started.

func (*MockAutoPartitionConsumer) Errors Uses

func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError

Errors implements the Errors method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose Uses

func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose()

ExpectErrorsDrainedOnClose sets an expectation on the partition consumer that the errors channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose Uses

func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose()

ExpectMessagesDrainedOnClose sets an expectation on the partition consumer that the messages channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*MockAutoPartitionConsumer) HighWaterMarkOffset Uses

func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the highwatermark for the partition

func (*MockAutoPartitionConsumer) Messages Uses

func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages implements the Messages method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) YieldError Uses

func (pc *MockAutoPartitionConsumer) YieldError(err error)

YieldError will yield an error on the Errors channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this error was consumed from the Errors channel, because there are legitimate reasons for this not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that the channel is empty on close.

func (*MockAutoPartitionConsumer) YieldMessage Uses

func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)

YieldMessage will yield a messages Messages channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this message was consumed from the Messages channel, because there are legitimate reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will verify that the channel is empty on close.

type MockBroker Uses

type MockBroker struct {
    // contains filtered or unexported fields
}

MockBroker is a mock of Broker interface

func NewMockBroker Uses

func NewMockBroker(ctrl *gomock.Controller) *MockBroker

NewMockBroker creates a new mock instance

func (*MockBroker) Addr Uses

func (m *MockBroker) Addr() string

Addr mocks base method

func (*MockBroker) Connected Uses

func (m *MockBroker) Connected() (bool, error)

Connected mocks base method

func (*MockBroker) CreateTopics Uses

func (m *MockBroker) CreateTopics(arg0 *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)

CreateTopics mocks base method

func (*MockBroker) EXPECT Uses

func (m *MockBroker) EXPECT() *MockBrokerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockBroker) Open Uses

func (m *MockBroker) Open(arg0 *sarama.Config) error

Open mocks base method

type MockBrokerMockRecorder Uses

type MockBrokerMockRecorder struct {
    // contains filtered or unexported fields
}

MockBrokerMockRecorder is the mock recorder for MockBroker

func (*MockBrokerMockRecorder) Addr Uses

func (mr *MockBrokerMockRecorder) Addr() *gomock.Call

Addr indicates an expected call of Addr

func (*MockBrokerMockRecorder) Connected Uses

func (mr *MockBrokerMockRecorder) Connected() *gomock.Call

Connected indicates an expected call of Connected

func (*MockBrokerMockRecorder) CreateTopics Uses

func (mr *MockBrokerMockRecorder) CreateTopics(arg0 interface{}) *gomock.Call

CreateTopics indicates an expected call of CreateTopics

func (*MockBrokerMockRecorder) Open Uses

func (mr *MockBrokerMockRecorder) Open(arg0 interface{}) *gomock.Call

Open indicates an expected call of Open

type MockClient Uses

type MockClient struct {
    // contains filtered or unexported fields
}

MockClient is a mock of Client interface

func NewMockClient Uses

func NewMockClient(ctrl *gomock.Controller) *MockClient

NewMockClient creates a new mock instance

func (*MockClient) Brokers Uses

func (m *MockClient) Brokers() []*sarama.Broker

Brokers mocks base method

func (*MockClient) Close Uses

func (m *MockClient) Close() error

Close mocks base method

func (*MockClient) Closed Uses

func (m *MockClient) Closed() bool

Closed mocks base method

func (*MockClient) Config Uses

func (m *MockClient) Config() *sarama.Config

Config mocks base method

func (*MockClient) Controller Uses

func (m *MockClient) Controller() (*sarama.Broker, error)

Controller mocks base method

func (*MockClient) Coordinator Uses

func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error)

Coordinator mocks base method

func (*MockClient) EXPECT Uses

func (m *MockClient) EXPECT() *MockClientMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockClient) GetOffset Uses

func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)

GetOffset mocks base method

func (*MockClient) InSyncReplicas Uses

func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error)

InSyncReplicas mocks base method

func (*MockClient) InitProducerID Uses

func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)

InitProducerID mocks base method

func (*MockClient) Leader Uses

func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error)

Leader mocks base method

func (*MockClient) OfflineReplicas Uses

func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error)

OfflineReplicas mocks base method

func (*MockClient) Partitions Uses

func (m *MockClient) Partitions(arg0 string) ([]int32, error)

Partitions mocks base method

func (*MockClient) RefreshController Uses

func (m *MockClient) RefreshController() (*sarama.Broker, error)

RefreshController mocks base method

func (*MockClient) RefreshCoordinator Uses

func (m *MockClient) RefreshCoordinator(arg0 string) error

RefreshCoordinator mocks base method

func (*MockClient) RefreshMetadata Uses

func (m *MockClient) RefreshMetadata(arg0 ...string) error

RefreshMetadata mocks base method

func (*MockClient) Replicas Uses

func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error)

Replicas mocks base method

func (*MockClient) Topics Uses

func (m *MockClient) Topics() ([]string, error)

Topics mocks base method

func (*MockClient) WritablePartitions Uses

func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error)

WritablePartitions mocks base method

type MockClientMockRecorder Uses

type MockClientMockRecorder struct {
    // contains filtered or unexported fields
}

MockClientMockRecorder is the mock recorder for MockClient

func (*MockClientMockRecorder) Brokers Uses

func (mr *MockClientMockRecorder) Brokers() *gomock.Call

Brokers indicates an expected call of Brokers

func (*MockClientMockRecorder) Close Uses

func (mr *MockClientMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockClientMockRecorder) Closed Uses

func (mr *MockClientMockRecorder) Closed() *gomock.Call

Closed indicates an expected call of Closed

func (*MockClientMockRecorder) Config Uses

func (mr *MockClientMockRecorder) Config() *gomock.Call

Config indicates an expected call of Config

func (*MockClientMockRecorder) Controller Uses

func (mr *MockClientMockRecorder) Controller() *gomock.Call

Controller indicates an expected call of Controller

func (*MockClientMockRecorder) Coordinator Uses

func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call

Coordinator indicates an expected call of Coordinator

func (*MockClientMockRecorder) GetOffset Uses

func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockClientMockRecorder) InSyncReplicas Uses

func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call

InSyncReplicas indicates an expected call of InSyncReplicas

func (*MockClientMockRecorder) InitProducerID Uses

func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call

InitProducerID indicates an expected call of InitProducerID

func (*MockClientMockRecorder) Leader Uses

func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call

Leader indicates an expected call of Leader

func (*MockClientMockRecorder) OfflineReplicas Uses

func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call

OfflineReplicas indicates an expected call of OfflineReplicas

func (*MockClientMockRecorder) Partitions Uses

func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call

Partitions indicates an expected call of Partitions

func (*MockClientMockRecorder) RefreshController Uses

func (mr *MockClientMockRecorder) RefreshController() *gomock.Call

RefreshController indicates an expected call of RefreshController

func (*MockClientMockRecorder) RefreshCoordinator Uses

func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call

RefreshCoordinator indicates an expected call of RefreshCoordinator

func (*MockClientMockRecorder) RefreshMetadata Uses

func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call

RefreshMetadata indicates an expected call of RefreshMetadata

func (*MockClientMockRecorder) Replicas Uses

func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call

Replicas indicates an expected call of Replicas

func (*MockClientMockRecorder) Topics Uses

func (mr *MockClientMockRecorder) Topics() *gomock.Call

Topics indicates an expected call of Topics

func (*MockClientMockRecorder) WritablePartitions Uses

func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call

WritablePartitions indicates an expected call of WritablePartitions

type MockConsumerGroup Uses

type MockConsumerGroup struct {
    // contains filtered or unexported fields
}

MockConsumerGroup mocks the consumergroup

func NewMockConsumerGroup Uses

func NewMockConsumerGroup(t *testing.T) *MockConsumerGroup

NewMockConsumerGroup creates a new consumer group

func (*MockConsumerGroup) Close Uses

func (cg *MockConsumerGroup) Close() error

Close closes the consumergroup

func (*MockConsumerGroup) Consume Uses

func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error

Consume starts consuming from the consumergroup

func (*MockConsumerGroup) Errors Uses

func (cg *MockConsumerGroup) Errors() <-chan error

Errors returns the errors channel

func (*MockConsumerGroup) FailOnConsume Uses

func (cg *MockConsumerGroup) FailOnConsume(err error)

FailOnConsume marks the consumer to fail on consume

func (*MockConsumerGroup) SendError Uses

func (cg *MockConsumerGroup) SendError(err error)

SendError sends an error the consumergroup

func (*MockConsumerGroup) SendMessage Uses

func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{}

SendMessage sends a message to the consumergroup returns a channel that will be closed when the message has been committed by the group

func (*MockConsumerGroup) SendMessageWait Uses

func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage)

SendMessageWait sends a message to the consumergroup waiting for the message for being committed

type MockConsumerGroupClaim Uses

type MockConsumerGroupClaim struct {
    // contains filtered or unexported fields
}

MockConsumerGroupClaim mocks the consumergroupclaim

func NewMockConsumerGroupClaim Uses

func NewMockConsumerGroupClaim(topic string, partition int32) *MockConsumerGroupClaim

NewMockConsumerGroupClaim creates a new mocksconsumergroupclaim

func (*MockConsumerGroupClaim) HighWaterMarkOffset Uses

func (cgc *MockConsumerGroupClaim) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the hwm offset

func (*MockConsumerGroupClaim) InitialOffset Uses

func (cgc *MockConsumerGroupClaim) InitialOffset() int64

InitialOffset returns the initial offset

func (*MockConsumerGroupClaim) Messages Uses

func (cgc *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage

Messages returns the message channel that must be

func (*MockConsumerGroupClaim) Partition Uses

func (cgc *MockConsumerGroupClaim) Partition() int32

Partition returns the partition

func (*MockConsumerGroupClaim) Topic Uses

func (cgc *MockConsumerGroupClaim) Topic() string

Topic returns the current topic of the claim

type MockConsumerGroupSession Uses

type MockConsumerGroupSession struct {
    // contains filtered or unexported fields
}

MockConsumerGroupSession mocks the consumer group session used for testing

func (*MockConsumerGroupSession) Claims Uses

func (cgs *MockConsumerGroupSession) Claims() map[string][]int32

Claims returns the number of partitions assigned in the group session for each topic

func (*MockConsumerGroupSession) Context Uses

func (cgs *MockConsumerGroupSession) Context() context.Context

Context returns the consumer group's context

func (*MockConsumerGroupSession) GenerationID Uses

func (cgs *MockConsumerGroupSession) GenerationID() int32

GenerationID returns the generation ID of the group consumer

func (*MockConsumerGroupSession) MarkMessage Uses

func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)

MarkMessage marks the passed message as consumed

func (*MockConsumerGroupSession) MarkOffset Uses

func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)

MarkOffset marks the passed offset consumed in topic/partition

func (*MockConsumerGroupSession) MemberID Uses

func (cgs *MockConsumerGroupSession) MemberID() string

MemberID returns the member ID TOOD: clarify what that actually means and whether we need to mock taht somehow

func (*MockConsumerGroupSession) ResetOffset Uses

func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)

ResetOffset resets the offset to be consumed from

func (*MockConsumerGroupSession) SendMessage Uses

func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage)

SendMessage sends a message to the consumer

type MockProducer Uses

type MockProducer struct {
    // contains filtered or unexported fields
}

MockProducer is a mock of Producer interface

func NewMockProducer Uses

func NewMockProducer(ctrl *gomock.Controller) *MockProducer

NewMockProducer creates a new mock instance

func (*MockProducer) Close Uses

func (m *MockProducer) Close() error

Close mocks base method

func (*MockProducer) EXPECT Uses

func (m *MockProducer) EXPECT() *MockProducerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockProducer) Emit Uses

func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise

Emit mocks base method

type MockProducerMockRecorder Uses

type MockProducerMockRecorder struct {
    // contains filtered or unexported fields
}

MockProducerMockRecorder is the mock recorder for MockProducer

func (*MockProducerMockRecorder) Close Uses

func (mr *MockProducerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockProducerMockRecorder) Emit Uses

func (mr *MockProducerMockRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call

Emit indicates an expected call of Emit

type MockStorage Uses

type MockStorage struct {
    // contains filtered or unexported fields
}

MockStorage is a mock of Storage interface

func NewMockStorage Uses

func NewMockStorage(ctrl *gomock.Controller) *MockStorage

NewMockStorage creates a new mock instance

func (*MockStorage) Close Uses

func (m *MockStorage) Close() error

Close mocks base method

func (*MockStorage) Delete Uses

func (m *MockStorage) Delete(arg0 string) error

Delete mocks base method

func (*MockStorage) EXPECT Uses

func (m *MockStorage) EXPECT() *MockStorageMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockStorage) Get Uses

func (m *MockStorage) Get(arg0 string) ([]byte, error)

Get mocks base method

func (*MockStorage) GetOffset Uses

func (m *MockStorage) GetOffset(arg0 int64) (int64, error)

GetOffset mocks base method

func (*MockStorage) Has Uses

func (m *MockStorage) Has(arg0 string) (bool, error)

Has mocks base method

func (*MockStorage) Iterator Uses

func (m *MockStorage) Iterator() (storage.Iterator, error)

Iterator mocks base method

func (*MockStorage) IteratorWithRange Uses

func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error)

IteratorWithRange mocks base method

func (*MockStorage) MarkRecovered Uses

func (m *MockStorage) MarkRecovered() error

MarkRecovered mocks base method

func (*MockStorage) Open Uses

func (m *MockStorage) Open() error

Open mocks base method

func (*MockStorage) Recovered Uses

func (m *MockStorage) Recovered() bool

Recovered mocks base method

func (*MockStorage) Set Uses

func (m *MockStorage) Set(arg0 string, arg1 []byte) error

Set mocks base method

func (*MockStorage) SetOffset Uses

func (m *MockStorage) SetOffset(arg0 int64) error

SetOffset mocks base method

type MockStorageMockRecorder Uses

type MockStorageMockRecorder struct {
    // contains filtered or unexported fields
}

MockStorageMockRecorder is the mock recorder for MockStorage

func (*MockStorageMockRecorder) Close Uses

func (mr *MockStorageMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockStorageMockRecorder) Delete Uses

func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call

Delete indicates an expected call of Delete

func (*MockStorageMockRecorder) Get Uses

func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call

Get indicates an expected call of Get

func (*MockStorageMockRecorder) GetOffset Uses

func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockStorageMockRecorder) Has Uses

func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call

Has indicates an expected call of Has

func (*MockStorageMockRecorder) Iterator Uses

func (mr *MockStorageMockRecorder) Iterator() *gomock.Call

Iterator indicates an expected call of Iterator

func (*MockStorageMockRecorder) IteratorWithRange Uses

func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call

IteratorWithRange indicates an expected call of IteratorWithRange

func (*MockStorageMockRecorder) MarkRecovered Uses

func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call

MarkRecovered indicates an expected call of MarkRecovered

func (*MockStorageMockRecorder) Open Uses

func (mr *MockStorageMockRecorder) Open() *gomock.Call

Open indicates an expected call of Open

func (*MockStorageMockRecorder) Recovered Uses

func (mr *MockStorageMockRecorder) Recovered() *gomock.Call

Recovered indicates an expected call of Recovered

func (*MockStorageMockRecorder) Set Uses

func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call

Set indicates an expected call of Set

func (*MockStorageMockRecorder) SetOffset Uses

func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call

SetOffset indicates an expected call of SetOffset

type MockTopicManager Uses

type MockTopicManager struct {
    // contains filtered or unexported fields
}

MockTopicManager is a mock of TopicManager interface

func NewMockTopicManager Uses

func NewMockTopicManager(ctrl *gomock.Controller) *MockTopicManager

NewMockTopicManager creates a new mock instance

func (*MockTopicManager) Close Uses

func (m *MockTopicManager) Close() error

Close mocks base method

func (*MockTopicManager) EXPECT Uses

func (m *MockTopicManager) EXPECT() *MockTopicManagerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockTopicManager) EnsureStreamExists Uses

func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error

EnsureStreamExists mocks base method

func (*MockTopicManager) EnsureTableExists Uses

func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error

EnsureTableExists mocks base method

func (*MockTopicManager) EnsureTopicExists Uses

func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error

EnsureTopicExists mocks base method

func (*MockTopicManager) GetOffset Uses

func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)

GetOffset mocks base method

func (*MockTopicManager) Partitions Uses

func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error)

Partitions mocks base method

type MockTopicManagerMockRecorder Uses

type MockTopicManagerMockRecorder struct {
    // contains filtered or unexported fields
}

MockTopicManagerMockRecorder is the mock recorder for MockTopicManager

func (*MockTopicManagerMockRecorder) Close Uses

func (mr *MockTopicManagerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockTopicManagerMockRecorder) EnsureStreamExists Uses

func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call

EnsureStreamExists indicates an expected call of EnsureStreamExists

func (*MockTopicManagerMockRecorder) EnsureTableExists Uses

func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call

EnsureTableExists indicates an expected call of EnsureTableExists

func (*MockTopicManagerMockRecorder) EnsureTopicExists Uses

func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

EnsureTopicExists indicates an expected call of EnsureTopicExists

func (*MockTopicManagerMockRecorder) GetOffset Uses

func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockTopicManagerMockRecorder) Partitions Uses

func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call

Partitions indicates an expected call of Partitions

type NilHandling Uses

type NilHandling int

NilHandling defines how nil messages should be handled by the processor.

const (
    // NilIgnore drops any message with nil value.
    NilIgnore NilHandling = 0 + iota
    // NilProcess passes the nil value to ProcessCallback.
    NilProcess
    // NilDecode passes the nil value to decoder before calling ProcessCallback.
    NilDecode
)

type OutputStats Uses

type OutputStats struct {
    Count uint
    Bytes int
}

OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.

type PartitionProcStats Uses

type PartitionProcStats struct {
    Now time.Time

    TableStats *TableStats

    Joined map[string]*TableStats

    Input  map[string]*InputStats
    Output map[string]*OutputStats
}

PartitionProcStats represents metrics and measurements of a partition processor

type PartitionProcessor Uses

type PartitionProcessor struct {
    // contains filtered or unexported fields
}

PartitionProcessor handles message processing of one partition by serializing messages from different input topics. It also handles joined tables as well as lookup views (managed by `Processor`).

func (*PartitionProcessor) EnqueueMessage Uses

func (pp *PartitionProcessor) EnqueueMessage(msg *sarama.ConsumerMessage)

EnqueueMessage enqueues a message in the partition processor's event channel for processing

func (*PartitionProcessor) Errors Uses

func (pp *PartitionProcessor) Errors() <-chan error

Errors returns a channel or errors during consumption

func (*PartitionProcessor) Recovered Uses

func (pp *PartitionProcessor) Recovered() bool

Recovered returns whether the processor is running (i.e. all joins, lookups and the table is recovered and it's consuming messages)

func (*PartitionProcessor) Setup Uses

func (pp *PartitionProcessor) Setup(ctx context.Context) error

Setup initializes the processor after a rebalance

func (*PartitionProcessor) Stop Uses

func (pp *PartitionProcessor) Stop() error

Stop stops the partition processor

type PartitionStatus Uses

type PartitionStatus int

PartitionStatus is the status of the partition of a table (group table or joined table).

const (
    // PartitionStopped indicates the partition stopped and should not be used anymore.
    PartitionStopped PartitionStatus = iota
    // PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files),
    // and has not actually started working yet.
    PartitionInitializing
    // PartitionConnecting indicates the partition trying to (re-)connect to Kafka
    PartitionConnecting
    // PartitionRecovering indicates the partition is recovering and the storage
    // is writing updates in bulk-mode (if the storage implementation supports it).
    PartitionRecovering
    // PartitionPreparing indicates the end of the bulk-mode. Depending on the storage
    // implementation, the Preparing phase may take long because the storage compacts its logs.
    PartitionPreparing
    // PartitionRunning indicates the partition is recovered and processing updates
    // in normal operation.
    PartitionRunning
)

type PartitionTable Uses

type PartitionTable struct {
    // contains filtered or unexported fields
}

PartitionTable manages the usage of a table for one partition. It allows to setup and recover/catchup the table contents from kafka, allow updates via Get/Set/Delete accessors

func (*PartitionTable) CatchupForever Uses

func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error

CatchupForever starts catching the partition table forever (until the context is cancelled). Option restartOnError allows the view to stay open/intact even in case of consumer errors

func (*PartitionTable) Close Uses

func (p *PartitionTable) Close() error

Close closes the partition table

func (*PartitionTable) CurrentState Uses

func (p *PartitionTable) CurrentState() PartitionStatus

CurrentState returns the partition's current status

func (*PartitionTable) Delete Uses

func (p *PartitionTable) Delete(key string) error

Delete removes the passed key from the partition table by deleting from the underlying storage

func (*PartitionTable) Get Uses

func (p *PartitionTable) Get(key string) ([]byte, error)

Get returns the value for passed key

func (*PartitionTable) GetOffset Uses

func (p *PartitionTable) GetOffset(defValue int64) (int64, error)

GetOffset returns the magic offset value from storage

func (*PartitionTable) Has Uses

func (p *PartitionTable) Has(key string) (bool, error)

Has returns whether the storage contains passed key

func (*PartitionTable) IsRecovered Uses

func (p *PartitionTable) IsRecovered() bool

IsRecovered returns whether the partition table is recovered

func (*PartitionTable) RunStatsLoop Uses

func (p *PartitionTable) RunStatsLoop(ctx context.Context)

RunStatsLoop starts the handler for stats requests. This loop runs detached from the recover/catchup mechanism so clients can always request stats even if the partition table is not running (like a processor table after it's recovered).

func (*PartitionTable) Set Uses

func (p *PartitionTable) Set(key string, value []byte) error

Set sets a key value key in the partition table by modifying the underlying storage

func (*PartitionTable) SetOffset Uses

func (p *PartitionTable) SetOffset(value int64) error

SetOffset sets the magic offset value in storage

func (*PartitionTable) SetupAndRecover Uses

func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error

SetupAndRecover sets up the partition storage and recovers to HWM

func (*PartitionTable) TrackMessageWrite Uses

func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int)

TrackMessageWrite updates the write stats to passed length

func (*PartitionTable) WaitRecovered Uses

func (p *PartitionTable) WaitRecovered() chan struct{}

WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`

type ProcessCallback Uses

type ProcessCallback func(ctx Context, msg interface{})

ProcessCallback function is called for every message received by the processor.

type Processor Uses

type Processor struct {
    // contains filtered or unexported fields
}

Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.

func NewProcessor Uses

func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)

NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.

func (*Processor) Cleanup Uses

func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.

func (*Processor) ConsumeClaim Uses

func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Processor) Get Uses

func (g *Processor) Get(key string) (interface{}, error)

Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.

func (*Processor) Graph Uses

func (g *Processor) Graph() *GroupGraph

Graph returns the group graph of the processor.

func (*Processor) Recovered Uses

func (g *Processor) Recovered() bool

Recovered returns whether the processor is running, i.e. if the processor has recovered all lookups/joins/tables and is running

func (*Processor) Run Uses

func (g *Processor) Run(ctx context.Context) (rerr error)

Run starts the processor using passed context. The processor stops in case of errors or if the context is cancelled

func (*Processor) Setup Uses

func (g *Processor) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim.

func (*Processor) Stats Uses

func (g *Processor) Stats() *ProcessorStats

Stats returns the aggregated stats for the processor including all partitions, tables, lookups and joins

func (*Processor) StatsWithContext Uses

func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats

StatsWithContext returns stats for the processor, see #Processor.Stats()

func (*Processor) Stop Uses

func (g *Processor) Stop()

Stop stops the processor. This is semantically equivalent of closing the Context that was passed to Processor.Run(..). This method will return immediately, errors during running will be returned from teh Processor.Run(..)

func (*Processor) WaitForReady Uses

func (g *Processor) WaitForReady()

WaitForReady waits until the processor is ready to consume messages (or is actually consuming messages) i.e., it is done catching up all partition tables, joins and lookup tables

type ProcessorOption Uses

type ProcessorOption func(*poptions, *GroupGraph)

ProcessorOption defines a configuration option to be used when creating a processor.

func WithBackoffBuilder Uses

func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption

WithBackoffBuilder replaced the default backoff.

func WithBackoffResetTimeout Uses

func WithBackoffResetTimeout(duration time.Duration) ProcessorOption

WithBackoffResetTimeout defines the timeout when the backoff will be reset.

func WithClientID Uses

func WithClientID(clientID string) ProcessorOption

WithClientID defines the client ID used to identify with Kafka.

func WithConsumerGroupBuilder Uses

func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption

WithConsumerGroupBuilder replaces the default consumer group builder

func WithConsumerSaramaBuilder Uses

func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption

WithConsumerSaramaBuilder replaces the default consumer group builder

func WithGroupGraphHook Uses

func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption

WithGroupGraphHook allows a function to obtain the group graph when a processor is started.

func WithHasher Uses

func WithHasher(hasher func() hash.Hash32) ProcessorOption

WithHasher sets the hash function that assigns keys to partitions.

func WithLogger Uses

func WithLogger(log logger.Logger) ProcessorOption

WithLogger sets the logger the processor should use. By default, processors use the standard library logger.

func WithNilHandling Uses

func WithNilHandling(nh NilHandling) ProcessorOption

WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.

func WithPartitionChannelSize Uses

func WithPartitionChannelSize(size int) ProcessorOption

WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithProducerBuilder Uses

func WithProducerBuilder(pb ProducerBuilder) ProcessorOption

WithProducerBuilder replaces the default producer builder.

func WithRebalanceCallback Uses

func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption

WithRebalanceCallback sets the callback for when a new partition assignment is received. By default, this is an empty function.

func WithStorageBuilder Uses

func WithStorageBuilder(sb storage.Builder) ProcessorOption

WithStorageBuilder defines a builder for the storage of each partition.

func WithTester Uses

func WithTester(t Tester) ProcessorOption

WithTester configures all external connections of a processor, ie, storage, consumer and producer

func WithTopicManagerBuilder Uses

func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption

WithTopicManagerBuilder replaces the default topic manager builder.

func WithUpdateCallback Uses

func WithUpdateCallback(cb UpdateCallback) ProcessorOption

WithUpdateCallback defines the callback called upon recovering a message from the log.

type ProcessorStats Uses

type ProcessorStats struct {
    Group  map[int32]*PartitionProcStats
    Lookup map[string]*ViewStats
}

ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.

type Producer Uses

type Producer interface {
    // Emit sends a message to topic.
    Emit(topic string, key string, value []byte) *Promise
    Close() error
}

Producer abstracts the kafka producer

func DefaultProducerBuilder Uses

func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

DefaultProducerBuilder creates a Kafka producer using the Sarama library.

func NewProducer Uses

func NewProducer(brokers []string, config *sarama.Config) (Producer, error)

NewProducer creates new kafka producer for passed brokers.

type ProducerBuilder Uses

type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

ProducerBuilder create a Kafka producer.

func ProducerBuilderWithConfig Uses

func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder

ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.

type Promise Uses

type Promise struct {
    sync.Mutex
    // contains filtered or unexported fields
}

Promise as in https://en.wikipedia.org/wiki/Futures_and_promises

func NewPromise Uses

func NewPromise() *Promise

NewPromise creates a new Promise

func (*Promise) Finish Uses

func (p *Promise) Finish(msg *sarama.ProducerMessage, err error) *Promise

Finish finishes the promise by executing all callbacks and saving the message/error for late subscribers

func (*Promise) Then Uses

func (p *Promise) Then(callback func(err error)) *Promise

Then chains a callback to the Promise

func (*Promise) ThenWithMessage Uses

func (p *Promise) ThenWithMessage(callback func(msg *sarama.ProducerMessage, err error)) *Promise

ThenWithMessage chains a callback to the Promise

type RebalanceCallback Uses

type RebalanceCallback func(a Assignment)

RebalanceCallback is invoked when the processor receives a new partition assignment.

type RecoveryStats Uses

type RecoveryStats struct {
    StartTime    time.Time
    RecoveryTime time.Time

    Offset int64 // last offset processed or recovered
    Hwm    int64 // next offset to be written
}

RecoveryStats groups statistics during recovery

type SaramaConsumerBuilder Uses

type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

SaramaConsumerBuilder creates a `sarama.Consumer`

func SaramaConsumerBuilderWithConfig Uses

func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder

SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config

type Signal Uses

type Signal struct {
    // contains filtered or unexported fields
}

Signal allows synchronization on a state, waiting for that state and checking the current state

func NewSignal Uses

func NewSignal(states ...State) *Signal

NewSignal creates a new Signal based on the states

func (*Signal) IsState Uses

func (s *Signal) IsState(state State) bool

IsState returns if the signal is in the requested state

func (*Signal) ObserveStateChange Uses

func (s *Signal) ObserveStateChange() *StateChangeObserver

ObserveStateChange returns a channel that receives state changes. Note that the caller must take care of consuming that channel, otherwise the Signal will block upon state changes.

func (*Signal) SetState Uses

func (s *Signal) SetState(state State) *Signal

SetState changes the state of the signal and notifies all goroutines waiting for the new state

func (*Signal) State Uses

func (s *Signal) State() State

State returns the current state

func (*Signal) WaitForState Uses

func (s *Signal) WaitForState(state State) chan struct{}

WaitForState returns a channel that closes when the signal reaches passed state.

func (*Signal) WaitForStateMin Uses

func (s *Signal) WaitForStateMin(state State) chan struct{}

WaitForStateMin returns a channel that will be closed, when the signal enters passed state or higher (states are ints, so we're just comparing ints here)

type State Uses

type State int

State types a state of the Signal

const (
    // PPStateIdle marks the partition processor as idling (not started yet)
    PPStateIdle State = iota
    // PPStateRecovering indicates a recovering partition processor
    PPStateRecovering
    // PPStateRunning indicates a running partition processor
    PPStateRunning
    // PPStateStopping indicates a stopped partition processor
    PPStateStopping
)
const (
    // ProcStateIdle indicates an idling partition processor (not started yet)
    ProcStateIdle State = iota
    // ProcStateStarting indicates a starting partition processor, i.e. before rebalance
    ProcStateStarting
    // ProcStateSetup indicates a partition processor during setup of a rebalance round
    ProcStateSetup
    // ProcStateRunning indicates a running partition processor
    ProcStateRunning
    // ProcStateStopping indicates a stopping partition processor
    ProcStateStopping
)

type StateChangeObserver Uses

type StateChangeObserver struct {
    // contains filtered or unexported fields
}

StateChangeObserver wraps a channel that triggers when the signal's state changes

func (*StateChangeObserver) C Uses

func (s *StateChangeObserver) C() <-chan State

C returns the channel to observer state changes

func (*StateChangeObserver) Stop Uses

func (s *StateChangeObserver) Stop()

Stop stops the observer. Its update channel will be closed and

type Stream Uses

type Stream string

Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete

type Streams Uses

type Streams []Stream

Streams is a slice of Stream names.

func StringsToStreams Uses

func StringsToStreams(strings ...string) Streams

StringsToStreams is a simple cast/conversion functions that allows to pass a slice of strings as a slice of Stream (Streams) Avoids the boilerplate loop over the string array that would be necessary otherwise.

Code:

inputTopics := []string{"input1",
    "input2",
    "input3",
}

// use it, e.g. in the Inputs-Edge in the group graph
graph := DefineGroup("group",
    Inputs(StringsToStreams(inputTopics...), new(codec.String), func(ctx Context, msg interface{}) {}),
)
_ = graph

type Table Uses

type Table string

Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact

func GroupTable Uses

func GroupTable(group Group) Table

GroupTable returns the name of the group table of group.

type TableStats Uses

type TableStats struct {
    Stalled bool

    Status PartitionStatus

    Recovery *RecoveryStats

    Input  *InputStats
    Writes *OutputStats
}

TableStats represents stats for a table partition

type Tester Uses

type Tester interface {
    StorageBuilder() storage.Builder
    ProducerBuilder() ProducerBuilder
    ConsumerGroupBuilder() ConsumerGroupBuilder
    ConsumerBuilder() SaramaConsumerBuilder
    EmitterProducerBuilder() ProducerBuilder
    TopicManagerBuilder() TopicManagerBuilder
    RegisterGroupGraph(*GroupGraph) string
    RegisterEmitter(Stream, Codec)
    RegisterView(Table, Codec) string
}

Tester interface to avoid import cycles when a processor needs to register to the tester.

type TopicManager Uses

type TopicManager interface {
    // EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible
    EnsureTableExists(topic string, npar int) error
    // EnsureStreamExists checks that a stream topic exists, or create one if possible
    EnsureStreamExists(topic string, npar int) error
    // EnsureTopicExists checks that a topic exists, or create one if possible,
    // enforcing the given configuration
    EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

    // Partitions returns the number of partitions of a topic, that are assigned to the running
    // instance, i.e. it doesn't represent all partitions of a topic.
    Partitions(topic string) ([]int32, error)

    GetOffset(topic string, partitionID int32, time int64) (int64, error)

    // Close closes the topic manager
    Close() error
}

TopicManager provides an interface to create/check topics and their partitions

func DefaultTopicManagerBuilder Uses

func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error)

DefaultTopicManagerBuilder creates TopicManager using the Sarama library.

func NewTopicManager Uses

func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig) (TopicManager, error)

NewTopicManager creates a new topic manager using the sarama library

type TopicManagerBuilder Uses

type TopicManagerBuilder func(brokers []string) (TopicManager, error)

TopicManagerBuilder creates a TopicManager to check partition counts and create tables.

func TopicManagerBuilderWithConfig Uses

func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder

TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.

func TopicManagerBuilderWithTopicManagerConfig Uses

func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder

TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.

type TopicManagerConfig Uses

type TopicManagerConfig struct {
    Table struct {
        Replication int
    }
    Stream struct {
        Replication int
        Retention   time.Duration
    }
}

TopicManagerConfig contains the configuration to access the Zookeeper servers as well as the desired options of to create tables and stream topics.

func NewTopicManagerConfig Uses

func NewTopicManagerConfig() *TopicManagerConfig

NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 1 and rentention time of 1 hour.

type UpdateCallback Uses

type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error

UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.

type View Uses

type View struct {
    // contains filtered or unexported fields
}

View is a materialized (i.e. persistent) cache of a group table.

This example shows how views are typically created and used in the most basic way.

Code:

// create a new view
view, err := NewView([]string{"localhost:9092"},
    "input-topic",
    new(codec.String))

if err != nil {
    log.Fatalf("error creating view: %v", err)
}

// provide a cancelable
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start the view
done := make(chan struct{})
go func() {
    defer close(done)
    err := view.Run(ctx)
    if err != nil {
        log.Fatalf("Error running view: %v", err)
    }
}()

// wait for the view to be recovered

// Option A: by polling
for !view.Recovered() {
    select {
    case <-ctx.Done():
        return
    case <-time.After(time.Second):
    }
}

// Option B: by waiting for the signal
<-view.WaitRunning()

// retrieve a value from the view
val, err := view.Get("some-key")
if err != nil {
    log.Fatalf("Error getting item from view: %v", err)
}

if val != nil {
    // cast it to string
    // no need for type assertion, if it was not that type, the codec would've failed
    log.Printf("got value %s", val.(string))
}

has, err := view.Has("some-key")
if err != nil {
    log.Fatalf("Error getting item from view: %v", err)
}

_ = has

// stop the view and wait for it to shut down before returning
cancel()
<-done

Code:

// create a new view
view, err := NewView([]string{"localhost:9092"},
    "input-topic",
    new(codec.String),

    // Automatically reconnect in case of errors. This is useful for services where availability
    // is more important than the data being up to date in case of kafka connection issues.
    WithViewAutoReconnect(),

    // Reconnect uses a default backoff mechanism, that can be modified by providing
    // a custom backoff builder using
    // WithViewBackoffBuilder(customBackoffBuilder),

    // When the view is running successfully for some time, the backoff is reset.
    // This time range can be modified using
    // WithViewBackoffResetTimeout(3*time.Second),
)

if err != nil {
    log.Fatalf("error creating view: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
// start the view
done := make(chan struct{})
go func() {
    defer close(done)
    err := view.Run(ctx)
    if err != nil {
        log.Fatalf("Error running view: %v", err)
    }
}()

<-view.WaitRunning()
// at this point we can safely use the view with Has/Get/Iterate,
// even if the kafka connection is lost

// Stop the view and wait for it to shutdown before returning
cancel()
<-done

func NewView Uses

func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error)

NewView creates a new View object from a group.

func (*View) CurrentState Uses

func (v *View) CurrentState() ViewState

CurrentState returns the current ViewState of the view This is useful for polling e.g. when implementing health checks or metrics

func (*View) Evict Uses

func (v *View) Evict(key string) error

Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.

func (*View) Get Uses

func (v *View) Get(key string) (interface{}, error)

Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.

func (*View) Has Uses

func (v *View) Has(key string) (bool, error)

Has checks whether a value for passed key exists in the view.

func (*View) Iterator Uses

func (v *View) Iterator() (Iterator, error)

Iterator returns an iterator that iterates over the state of the View.

func (*View) IteratorWithRange Uses

func (v *View) IteratorWithRange(start, limit string) (Iterator, error)

IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.

func (*View) ObserveStateChanges Uses

func (v *View) ObserveStateChanges() *StateChangeObserver

ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view by reading from a channel. It is crucial to continuously read from that channel, otherwise the View might deadlock upon state changes. If the observer is not needed, the caller must call observer.Stop()

Example

view := goka.NewView(...)
go view.Run(ctx)

go func(){
  obs := view.ObserveStateChanges()
  defer obs.Stop()
  for {
    select{
      case state, ok := <-obs.C:
        // handle state (or closed channel)
      case <-ctx.Done():
    }
  }
}()

func (*View) Recovered Uses

func (v *View) Recovered() bool

Recovered returns true when the view has caught up with events from kafka.

func (*View) Run Uses

func (v *View) Run(ctx context.Context) (rerr error)

Run starts consuming the view's topic and saving updates in the local persistent cache.

The view will shutdown in case of errors or when the context is closed. It can be initialized with autoreconnect

view := NewView(..., WithViewAutoReconnect())

which makes the view internally reconnect in case of errors. Then it will only stop by canceling the context (see example).

func (*View) Stats Uses

func (v *View) Stats(ctx context.Context) *ViewStats

Stats returns a set of performance metrics of the view.

func (*View) Topic Uses

func (v *View) Topic() string

Topic returns the view's topic

func (*View) WaitRunning Uses

func (v *View) WaitRunning() <-chan struct{}

WaitRunning returns a channel that will be closed when the view enters the running state

type ViewOption Uses

type ViewOption func(*voptions, Table, Codec)

ViewOption defines a configuration option to be used when creating a view.

func WithViewAutoReconnect Uses

func WithViewAutoReconnect() ViewOption

WithViewAutoReconnect defines the view is reconnecting internally, so Run() does not return in case of connection errors. The view must be shutdown by cancelling the context passed to Run()

func WithViewBackoffBuilder Uses

func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption

WithViewBackoffBuilder replaced the default backoff.

func WithViewBackoffResetTimeout Uses

func WithViewBackoffResetTimeout(duration time.Duration) ViewOption

WithViewBackoffResetTimeout defines the timeout when the backoff will be reset.

func WithViewCallback Uses

func WithViewCallback(cb UpdateCallback) ViewOption

WithViewCallback defines the callback called upon recovering a message from the log.

func WithViewClientID Uses

func WithViewClientID(clientID string) ViewOption

WithViewClientID defines the client ID used to identify with Kafka.

func WithViewConsumerSaramaBuilder Uses

func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption

WithViewConsumerSaramaBuilder replaces the default sarama consumer builder

func WithViewHasher Uses

func WithViewHasher(hasher func() hash.Hash32) ViewOption

WithViewHasher sets the hash function that assigns keys to partitions.

func WithViewLogger Uses

func WithViewLogger(log logger.Logger) ViewOption

WithViewLogger sets the logger the view should use. By default, views use the standard library logger.

func WithViewRestartable Uses

func WithViewRestartable() ViewOption

WithViewRestartable is kept only for backwards compatibility. DEPRECATED: since the behavior has changed, this name is misleading and should be replaced by WithViewAutoReconnect().

func WithViewStorageBuilder Uses

func WithViewStorageBuilder(sb storage.Builder) ViewOption

WithViewStorageBuilder defines a builder for the storage of each partition.

func WithViewTester Uses

func WithViewTester(t Tester) ViewOption

WithViewTester configures all external connections of a processor, ie, storage, consumer and producer

func WithViewTopicManagerBuilder Uses

func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption

WithViewTopicManagerBuilder replaces the default topic manager.

type ViewState Uses

type ViewState int

ViewState represents the state of the view

const (
    // ViewStateIdle  - the view is not started yet
    ViewStateIdle ViewState = iota
    // ViewStateInitializing - the view (i.e. at least one partition) is initializing
    ViewStateInitializing
    // ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting
    ViewStateConnecting
    // ViewStateCatchUp - the view (i.e. at least one partition) is still catching up
    ViewStateCatchUp
    // ViewStateRunning - the view (i.e. all partitions) has caught up and is running
    ViewStateRunning
)

type ViewStats Uses

type ViewStats struct {
    Partitions map[int32]*TableStats
}

ViewStats represents the metrics of all partitions of a view.

Directories

PathSynopsis
codec
examples/1-simplest
examples/2-clicks
examples/3-messaging
examples/3-messaging/blocker
examples/3-messaging/collector
examples/3-messaging/detector
examples/3-messaging/filter
examples/3-messaging/service
examples/3-messaging/translator
examples/monitoring
examples/redis
integrationtest
logger
mockPackage mock is a generated GoMock package.
multierr
storage
storage/redis
testerThis package provides a kafka mock that allows integration testing of goka processors.
web/index
web/monitor
web/query
web/templates

Package goka imports 21 packages (graph) and is imported by 44 packages. Updated 2020-07-09. Refresh now. Tools for package owners.