Documentation ¶
Overview ¶
This package provides a kafka mock that allows integration testing of goka processors.
Usage ¶
Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.
// creates the processor defining its group graph func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{ return goka.NewProcessor(brokers, goka.DefineGroup("group", // some group definitions options..., ), ) }
In the main function we would run the processor like this:
func main(){ proc := createProcessor([]string{"broker1:9092"}) proc.Run(ctx.Background()) }
And in the unit test something like:
func TestProcessor(t *testing.T){ // create tester tester := tester.New(t) // create the processor proc := createProcessor(nil, goka.WithTester(tester)) // .. do extra initialization if necessary go proc.Run(ctx.Background()) // execute the actual test tester.Consume("input-topic", "key", "value") value := tester.TableValue("group-table", "key") if value != expected{ t.Fatalf("got unexpected table value") } }
See https://github.com/lovoo/goka/tree/master/examples/testing for a full example
Index ¶
- type Codec
- type EmitHandler
- type MessageTracker
- type Signal
- type State
- type T
- type Tester
- func (km *Tester) ClearValues()
- func (km *Tester) Consume(topic string, key string, msg interface{})
- func (km *Tester) ConsumeData(topic string, key string, data []byte)
- func (km *Tester) ConsumeProto(topic string, key string, msg proto.Message)
- func (km *Tester) ConsumeString(topic string, key string, msg string)
- func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder
- func (km *Tester) NewMessageTracker(topic string) *MessageTracker
- func (km *Tester) ProducerBuilder() kafka.ProducerBuilder
- func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)
- func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph)
- func (km *Tester) ReplaceEmitHandler(emitter EmitHandler)
- func (km *Tester) SetTableValue(table goka.Table, key string, value interface{})
- func (km *Tester) StorageBuilder() storage.Builder
- func (km *Tester) TableValue(table goka.Table, key string) interface{}
- func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Codec ¶
type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) }
Codec decodes and encodes from and to []byte
type EmitHandler ¶
EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors
type MessageTracker ¶
type MessageTracker struct {
// contains filtered or unexported fields
}
MessageTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests
func (*MessageTracker) ExpectAtEnd ¶
func (mt *MessageTracker) ExpectAtEnd()
ExpectLastMessage ensures the message tracker is at the end of the topic
func (*MessageTracker) MoveToEnd ¶
func (mt *MessageTracker) MoveToEnd(topic string)
MoveToEnd marks the topic to be read regardless of its content
func (*MessageTracker) Next ¶
func (mt *MessageTracker) Next() (string, interface{}, bool)
Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message
type Signal ¶
Signal allows synchronization on a state, waiting for that state and checking the current state
func (*Signal) SetState ¶
SetState changes the state of the signal and notifies all goroutines waiting for the new state
func (*Signal) WaitForState ¶
WaitForState returns a channel that closes when the signal reaches passed state.
type T ¶
type T interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) }
T abstracts the interface we assume from the test case. Will most likely be *testing.T
type Tester ¶
type Tester struct {
// contains filtered or unexported fields
}
Tester allows interacting with a test processor
func (*Tester) ConsumeData ¶
ConsumeData pushes a marshalled byte slice to a topic and a key
func (*Tester) ConsumeProto ¶
ConsumeProto simulates a message on kafka in a topic with a key.
func (*Tester) ConsumeString ¶
ConsumeString simulates a message with a string payload.
func (*Tester) ConsumerBuilder ¶
func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder
ConsumerBuilder returns the consumer builder when this tester is used as an option to a processor
func (*Tester) NewMessageTracker ¶
func (km *Tester) NewMessageTracker(topic string) *MessageTracker
NewMessageTrackerFromEnd creates a message tracker that starts tracking the messages from the end of the current queues
func (*Tester) ProducerBuilder ¶
func (km *Tester) ProducerBuilder() kafka.ProducerBuilder
ProducerBuilder returns the producer builder when this tester is used as an option to a processor
func (*Tester) RegisterEmitter ¶
RegisterEmitter registers an emitter to be working with the tester.
func (*Tester) RegisterGroupGraph ¶
func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph)
RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure
func (*Tester) ReplaceEmitHandler ¶
func (km *Tester) ReplaceEmitHandler(emitter EmitHandler)
ReplaceEmitHandler replaces the emitter.
func (*Tester) SetTableValue ¶
SetTableValue sets a value in a processor's or view's table direcly via storage
func (*Tester) StorageBuilder ¶
StorageBuilder returns the storage builder when this tester is used as an option to a processor
func (*Tester) TableValue ¶
TableValue attempts to get a value from any table that is used in the kafka mock.
func (*Tester) TopicManagerBuilder ¶
func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder
TopicManagerBuilder returns the topicmanager builder when this tester is used as an option to a processor