tester

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2019 License: BSD-3-Clause Imports: 11 Imported by: 0

Documentation

Overview

This package provides a kafka mock that allows integration testing of goka processors.

Usage

Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.

// creates the processor defining its group graph
func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{
  return goka.NewProcessor(brokers, goka.DefineGroup("group",
                          // some group definitions
                          options...,
                          ),
  )
}

In the main function we would run the processor like this:

func main(){
  proc := createProcessor([]string{"broker1:9092"})
  proc.Run(ctx.Background())
}

And in the unit test something like:

func TestProcessor(t *testing.T){
  // create tester
  tester := tester.New(t)
  // create the processor
  proc := createProcessor(nil, goka.WithTester(tester))

  // .. do extra initialization if necessary

  go proc.Run(ctx.Background())

  // execute the actual test
  tester.Consume("input-topic", "key", "value")

  value := tester.TableValue("group-table", "key")
  if value != expected{
    t.Fatalf("got unexpected table value")
  }
}

See https://github.com/lovoo/goka/tree/master/examples/testing for a full example

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Codec

type Codec interface {
	Encode(value interface{}) (data []byte, err error)
	Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type EmitHandler

type EmitHandler func(topic string, key string, value []byte) *kafka.Promise

EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors

type QueueTracker added in v0.1.1

type QueueTracker struct {
	// contains filtered or unexported fields
}

QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests

func (*QueueTracker) Hwm added in v0.1.1

func (mt *QueueTracker) Hwm() int64

Hwm returns the tracked queue's hwm value

func (*QueueTracker) Next added in v0.1.1

func (mt *QueueTracker) Next() (string, interface{}, bool)

Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message

func (*QueueTracker) NextOffset added in v0.1.1

func (mt *QueueTracker) NextOffset() int64

NextOffset returns the tracker's next offset

func (*QueueTracker) NextRaw added in v0.1.1

func (mt *QueueTracker) NextRaw() (string, []byte, bool)

NextRaw returns the next message similar to Next(), but without the decoding

func (*QueueTracker) Seek added in v0.1.1

func (mt *QueueTracker) Seek(offset int64)

Seek moves the index pointer of the queue tracker to passed offset

type Signal added in v0.1.1

type Signal struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Signal allows synchronization on a state, waiting for that state and checking the current state

func NewSignal added in v0.1.1

func NewSignal(states ...State) *Signal

NewSignal creates a new Signal based on the states

func (*Signal) IsState added in v0.1.1

func (s *Signal) IsState(state State) bool

IsState returns if the signal is in the requested state

func (*Signal) SetState added in v0.1.1

func (s *Signal) SetState(state State) *Signal

SetState changes the state of the signal and notifies all goroutines waiting for the new state

func (*Signal) State added in v0.1.1

func (s *Signal) State() State

State returns the current state

func (*Signal) WaitForState added in v0.1.1

func (s *Signal) WaitForState(state State) chan struct{}

WaitForState returns a channel that closes when the signal reaches passed state.

type State added in v0.1.1

type State int

State types a state of the Signal

type T

type T interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(a ...interface{})
}

T abstracts the interface we assume from the test case. Will most likely be *testing.T

type Tester

type Tester struct {
	// contains filtered or unexported fields
}

Tester allows interacting with a test processor

func New

func New(t T) *Tester

New returns a new Tester. It should be passed as goka.WithTester to goka.NewProcessor.

func (*Tester) ClearValues

func (km *Tester) ClearValues()

ClearValues resets all table values

func (*Tester) Consume

func (km *Tester) Consume(topic string, key string, msg interface{})

Consume a message using the topic's configured codec

func (*Tester) ConsumeData

func (km *Tester) ConsumeData(topic string, key string, data []byte)

ConsumeData pushes a marshalled byte slice to a topic and a key

func (*Tester) ConsumerBuilder

func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder

ConsumerBuilder returns the consumer builder when this tester is used as an option to a processor

func (*Tester) EmitterProducerBuilder added in v0.1.2

func (km *Tester) EmitterProducerBuilder() kafka.ProducerBuilder

EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.

func (*Tester) NewQueueTracker added in v0.1.1

func (km *Tester) NewQueueTracker(topic string) *QueueTracker

NewQueueTracker creates a message tracker that starts tracking the messages from the end of the current queues

func (*Tester) ProducerBuilder

func (km *Tester) ProducerBuilder() kafka.ProducerBuilder

ProducerBuilder returns the producer builder when this tester is used as an option to a processor

func (*Tester) RegisterEmitter added in v0.1.1

func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)

RegisterEmitter registers an emitter to be working with the tester.

func (*Tester) RegisterGroupGraph added in v0.1.1

func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph)

RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure

func (*Tester) RegisterView added in v0.1.2

func (km *Tester) RegisterView(table goka.Table, c goka.Codec)

RegisterView registers a view to be working with the tester.

func (*Tester) ReplaceEmitHandler

func (km *Tester) ReplaceEmitHandler(emitter EmitHandler)

ReplaceEmitHandler replaces the emitter.

func (*Tester) SetTableValue added in v0.1.1

func (km *Tester) SetTableValue(table goka.Table, key string, value interface{})

SetTableValue sets a value in a processor's or view's table direcly via storage

func (*Tester) StorageBuilder

func (km *Tester) StorageBuilder() storage.Builder

StorageBuilder returns the storage builder when this tester is used as an option to a processor

func (*Tester) TableValue added in v0.1.1

func (km *Tester) TableValue(table goka.Table, key string) interface{}

TableValue attempts to get a value from any table that is used in the kafka mock.

func (*Tester) TopicManagerBuilder

func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder

TopicManagerBuilder returns the topicmanager builder when this tester is used as an option to a processor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL