tester

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2020 License: BSD-3-Clause Imports: 11 Imported by: 0

Documentation

Overview

This package provides a kafka mock that allows integration testing of goka processors.

Usage

Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.

// creates the processor defining its group graph
func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{
  return goka.NewProcessor(brokers, goka.DefineGroup("group",
                          // some group definitions
                          options...,
                          ),
  )
}

In the main function we would run the processor like this:

func main(){
  proc := createProcessor([]string{"broker1:9092"})
  proc.Run(ctx.Background())
}

And in the unit test something like:

func TestProcessor(t *testing.T){
  // create tester
  tester := tester.New(t)
  // create the processor
  proc := createProcessor(nil, goka.WithTester(tester))

  // .. do extra initialization if necessary

  go proc.Run(ctx.Background())

  // execute the actual test
  tester.Consume("input-topic", "key", "value")

  value := tester.TableValue("group-table", "key")
  if value != expected{
    t.Fatalf("got unexpected table value")
  }
}

See https://gitlab.com/signoz-public/goka/tree/master/examples/testing for a full example

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MockTopicManager

type MockTopicManager struct {
	DefaultNumPartitions     int
	DefaultReplicationFactor int
	// contains filtered or unexported fields
}

MockTopicManager mimicks the behavior of the real topic manager

func NewMockTopicManager

func NewMockTopicManager(tt *Tester, defaultNumPartitions int, defaultReplFactor int) *MockTopicManager

NewMockTopicManager creates a new topic manager mock

func (*MockTopicManager) Close

func (tm *MockTopicManager) Close() error

Close has no action on the mock

func (*MockTopicManager) EnsureStreamExists

func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error

EnsureStreamExists ensures a stream exists

func (*MockTopicManager) EnsureTableExists

func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error

EnsureTableExists ensures a table exists

func (*MockTopicManager) EnsureTopicExists

func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

EnsureTopicExists ensures a topic exists

func (*MockTopicManager) GetOffset

func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)

GetOffset returns the offset closest to the passed time (or exactly time, if the offsets are empty)

func (*MockTopicManager) Partitions

func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)

Partitions returns all partitions for a topic

type QueueTracker

type QueueTracker struct {
	// contains filtered or unexported fields
}

QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests

func (*QueueTracker) Hwm

func (mt *QueueTracker) Hwm() int64

Hwm returns the tracked queue's hwm value

func (*QueueTracker) Next

func (mt *QueueTracker) Next() (string, interface{}, bool)

Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message

func (*QueueTracker) NextOffset

func (mt *QueueTracker) NextOffset() int64

NextOffset returns the tracker's next offset

func (*QueueTracker) NextRaw

func (mt *QueueTracker) NextRaw() (string, []byte, bool)

NextRaw returns the next message similar to Next(), but without the decoding

func (*QueueTracker) Seek

func (mt *QueueTracker) Seek(offset int64)

Seek moves the index pointer of the queue tracker to passed offset

type T

type T interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(a ...interface{})
}

T abstracts the interface we assume from the test case. Will most likely be T

type Tester

type Tester struct {
	// contains filtered or unexported fields
}

Tester mimicks kafka for complex highlevel testing of single or multiple processors/views/emitters

func New

func New(t T) *Tester

New creates a new tester instance

func (*Tester) ClearValues

func (tt *Tester) ClearValues()

ClearValues clears all table values in all storages

func (*Tester) Consume

func (tt *Tester) Consume(topic string, key string, msg interface{})

Consume pushes a message for topic/key to be consumed by all processors/views whoever is using it being registered to the Tester

func (*Tester) ConsumerBuilder

func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder

ConsumerBuilder creates a consumerbuilder that builds consumers for passed clientID

func (*Tester) ConsumerGroupBuilder

func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder

ConsumerGroupBuilder builds a builder. The builder returns the consumergroup for passed client-ID if it was expected by registering the processor to the Tester

func (*Tester) EmitterProducerBuilder

func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder

EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.

func (*Tester) NewQueueTracker

func (tt *Tester) NewQueueTracker(topic string) *QueueTracker

NewQueueTracker creates a new queue tracker

func (*Tester) ProducerBuilder

func (tt *Tester) ProducerBuilder() goka.ProducerBuilder

func (*Tester) RegisterEmitter

func (tt *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)

RegisterEmitter registers an emitter to be working with the tester.

func (*Tester) RegisterGroupGraph

func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string

RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure

func (*Tester) RegisterView

func (tt *Tester) RegisterView(table goka.Table, c goka.Codec) string

RegisterView registers a new view to the tester

func (*Tester) SetTableValue

func (tt *Tester) SetTableValue(table goka.Table, key string, value interface{})

SetTableValue sets a value in a processor's or view's table direcly via storage This method blocks until all expected clients are running, so make sure to call it *after* you have started all processors/views, otherwise it'll deadlock.

func (*Tester) StorageBuilder

func (tt *Tester) StorageBuilder() storage.Builder

StorageBuilder builds inmemory storages

func (*Tester) TableValue

func (tt *Tester) TableValue(table goka.Table, key string) interface{}

TableValue attempts to get a value from any table that is used in the tester

func (*Tester) TopicManagerBuilder

func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL