tester

package
v0.0.0-...-a83ac41 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2018 License: BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Overview

This package provides a kafka mock that allows integration testing of goka processors.

Usage

Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.

// creates the processor defining its group graph
func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{
  return goka.NewProcessor(brokers, goka.DefineGroup("group",
                          // some group definitions
                          options...,
                          ),
  )
}

In the main function we would run the processor like this:

func main(){
  proc := createProcessor([]string{"broker1:9092"})
  proc.Run(ctx.Background())
}

And in the unit test something like:

func TestProcessor(t *testing.T){
  // create tester
  tester := tester.New(t)
  // create the processor
  proc := createProcessor(nil, goka.WithTester(tester))

  // .. do extra initialization if necessary

  go proc.Run(ctx.Background())

  // execute the actual test
  tester.Consume("input-topic", "key", "value")

  value := tester.TableValue("group-table", "key")
  if value != expected{
    t.Fatalf("got unexpected table value")
  }
}

See https://github.com/lovoo/goka/tree/master/examples/testing for a full example

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Codec

type Codec interface {
	Encode(value interface{}) (data []byte, err error)
	Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type EmitHandler

type EmitHandler func(topic string, key string, value []byte) *kafka.Promise

EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors

type MessageTracker

type MessageTracker struct {
	// contains filtered or unexported fields
}

MessageTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests

func (*MessageTracker) ExpectAtEnd

func (mt *MessageTracker) ExpectAtEnd()

ExpectLastMessage ensures the message tracker is at the end of the topic

func (*MessageTracker) MoveToEnd

func (mt *MessageTracker) MoveToEnd(topic string)

MoveToEnd marks the topic to be read regardless of its content

func (*MessageTracker) Next

func (mt *MessageTracker) Next() (string, interface{}, bool)

Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message

func (*MessageTracker) NextRaw

func (mt *MessageTracker) NextRaw() (string, []byte, bool)

NextRaw returns the next message similar to Next(), but without the decoding

type Signal

type Signal struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Signal allows synchronization on a state, waiting for that state and checking the current state

func NewSignal

func NewSignal(states ...State) *Signal

NewSignal creates a new Signal based on the states

func (*Signal) IsState

func (s *Signal) IsState(state State) bool

IsState returns if the signal is in the requested state

func (*Signal) SetState

func (s *Signal) SetState(state State) *Signal

SetState changes the state of the signal and notifies all goroutines waiting for the new state

func (*Signal) State

func (s *Signal) State() State

State returns the current state

func (*Signal) WaitForState

func (s *Signal) WaitForState(state State) chan struct{}

WaitForState returns a channel that closes when the signal reaches passed state.

type State

type State int

State types a state of the Signal

type T

type T interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(a ...interface{})
}

T abstracts the interface we assume from the test case. Will most likely be *testing.T

type Tester

type Tester struct {
	// contains filtered or unexported fields
}

Tester allows interacting with a test processor

func New

func New(t T) *Tester

New returns a new Tester. It should be passed as goka.WithTester to goka.NewProcessor.

func (*Tester) ClearValues

func (km *Tester) ClearValues()

ClearValues resets all table values

func (*Tester) Consume

func (km *Tester) Consume(topic string, key string, msg interface{})

Consume a message using the topic's configured codec

func (*Tester) ConsumeData

func (km *Tester) ConsumeData(topic string, key string, data []byte)

ConsumeData pushes a marshalled byte slice to a topic and a key

func (*Tester) ConsumeProto

func (km *Tester) ConsumeProto(topic string, key string, msg proto.Message)

ConsumeProto simulates a message on kafka in a topic with a key.

func (*Tester) ConsumeString

func (km *Tester) ConsumeString(topic string, key string, msg string)

ConsumeString simulates a message with a string payload.

func (*Tester) ConsumerBuilder

func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder

ConsumerBuilder returns the consumer builder when this tester is used as an option to a processor

func (*Tester) NewMessageTracker

func (km *Tester) NewMessageTracker(topic string) *MessageTracker

NewMessageTrackerFromEnd creates a message tracker that starts tracking the messages from the end of the current queues

func (*Tester) ProducerBuilder

func (km *Tester) ProducerBuilder() kafka.ProducerBuilder

ProducerBuilder returns the producer builder when this tester is used as an option to a processor

func (*Tester) RegisterEmitter

func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)

RegisterEmitter registers an emitter to be working with the tester.

func (*Tester) RegisterGroupGraph

func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph)

RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure

func (*Tester) ReplaceEmitHandler

func (km *Tester) ReplaceEmitHandler(emitter EmitHandler)

ReplaceEmitHandler replaces the emitter.

func (*Tester) SetTableValue

func (km *Tester) SetTableValue(table goka.Table, key string, value interface{})

SetTableValue sets a value in a processor's or view's table direcly via storage

func (*Tester) StorageBuilder

func (km *Tester) StorageBuilder() storage.Builder

StorageBuilder returns the storage builder when this tester is used as an option to a processor

func (*Tester) TableValue

func (km *Tester) TableValue(table goka.Table, key string) interface{}

TableValue attempts to get a value from any table that is used in the kafka mock.

func (*Tester) TopicManagerBuilder

func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder

TopicManagerBuilder returns the topicmanager builder when this tester is used as an option to a processor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL