kafka: github.com/optiopay/kafka/kafkatest Index | Examples | Files

package kafkatest

import "github.com/optiopay/kafka/kafkatest"

Package kafkatest provides mock objects for high level kafka interface.

Use NewBroker function to create mock broker object and standard methods to create producers and consumers.

Index

Examples

Package Files

broker.go doc.go server.go

Variables

var (
    ErrTimeout = errors.New("timeout")

    ErrNotImplemented = errors.New("not implemented")
)

type Broker Uses

type Broker struct {

    // OffsetEarliestHandler is callback function called whenever
    // OffsetEarliest method of the broker is called. Overwrite to change
    // default behaviour -- always returning ErrUnknownTopicOrPartition
    OffsetEarliestHandler func(string, int32) (int64, error)

    // OffsetLatestHandler is callback function called whenever OffsetLatest
    // method of the broker is called. Overwrite to change default behaviour --
    // always returning ErrUnknownTopicOrPartition
    OffsetLatestHandler func(string, int32) (int64, error)
    // contains filtered or unexported fields
}

Broker is mock version of kafka's broker. It's implementing Broker interface and provides easy way of mocking server actions.

func NewBroker Uses

func NewBroker() *Broker

func (*Broker) Close Uses

func (b *Broker) Close()

Close is no operation method, required by Broker interface.

func (*Broker) Consumer Uses

func (b *Broker) Consumer(conf kafka.ConsumerConf) (kafka.Consumer, error)

Consumer returns consumer mock and never error.

At most one consumer for every topic-partition pair can be created -- calling this for the same topic-partition will always return the same consumer instance.

Code:

broker := NewBroker()
msg := &proto.Message{Value: []byte("first")}

// mock server actions, pushing data through consumer
go func() {
    consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0))
    c := consumer.(*Consumer)
    // it is possible to send messages through consumer...
    c.Messages <- msg

    // every consumer fetch call is blocking untill there is either message
    // or error ready to return, this way we can test slow consumers
    time.Sleep(time.Millisecond * 20)

    // ...as well as push errors to mock failure
    c.Errors <- errors.New("expected error is expected")
}()

// test broker never fails creating consumer
consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0))

m, err := consumer.Consume()
if err == nil {
    fmt.Printf("Value: %q\n", m.Value)
}
if _, err = consumer.Consume(); err != nil {
    fmt.Printf("Error: %s\n", err)
}

Output:

Value: "first"
Error: expected error is expected

func (*Broker) OffsetCoordinator Uses

func (b *Broker) OffsetCoordinator(conf kafka.OffsetCoordinatorConf) (kafka.OffsetCoordinator, error)

OffsetCoordinator returns offset coordinator mock instance. It's always successful, so you can always ignore returned error.

func (*Broker) OffsetEarliest Uses

func (b *Broker) OffsetEarliest(topic string, partition int32) (int64, error)

OffsetEarliest return result of OffsetEarliestHandler callback set on the broker. If not set, always return ErrUnknownTopicOrPartition

func (*Broker) OffsetLatest Uses

func (b *Broker) OffsetLatest(topic string, partition int32) (int64, error)

OffsetLatest return result of OffsetLatestHandler callback set on the broker. If not set, always return ErrUnknownTopicOrPartition

func (*Broker) Producer Uses

func (b *Broker) Producer(kafka.ProducerConf) kafka.Producer

Producer returns producer mock instance.

Code:

broker := NewBroker()
msg := &proto.Message{Value: []byte("first")}

producer := broker.Producer(kafka.NewProducerConf())

// mock server actions, handling any produce call
go func() {
    resp, err := broker.ReadProducers(time.Millisecond * 20)
    if err != nil {
        panic(fmt.Sprintf("failed reading producers: %s", err))
    }
    if len(resp.Messages) != 1 {
        panic("expected single message")
    }
    if !reflect.DeepEqual(resp.Messages[0], msg) {
        panic("expected different message")
    }
}()

// provide data for above goroutine
_, err := producer.Produce("my-topic", 0, msg)
if err != nil {
    panic(fmt.Sprintf("cannot produce message: %s", err))
}

mockProducer := producer.(*Producer)

// test error handling by forcing producer to return error,
//
// it is possible to manipulate produce result by changing producer's
// ResponseOffset and ResponseError attributes
mockProducer.ResponseError = errors.New("my spoon is too big!")
_, err = producer.Produce("my-topic", 0, msg)
fmt.Printf("Error: %s\n", err)

Output:

Error: my spoon is too big!

func (*Broker) ReadProducers Uses

func (b *Broker) ReadProducers(timeout time.Duration) (*ProducedMessages, error)

ReadProducers return ProduceMessages representing produce call of one of created by broker producers or ErrTimeout.

type Consumer Uses

type Consumer struct {
    Broker *Broker

    // Messages is channel consumed by fetch method call. Pushing message into
    // this channel will result in Consume method call returning message data.
    Messages chan *proto.Message

    // Errors is channel consumed by fetch method call. Pushing error into this
    // channel will result in Consume method call returning error.
    Errors chan error
    // contains filtered or unexported fields
}

Consumer mocks kafka's consumer. Use Messages and Errors channels to mock Consume method results.

func (*Consumer) Consume Uses

func (c *Consumer) Consume() (*proto.Message, error)

Consume returns message or error pushed through consumers Messages and Errors channel. Function call will block until data on at least one of those channels is available.

type Middleware Uses

type Middleware func(nodeID int32, requestKind int16, content []byte) Response

Middleware is function that is called for every incomming kafka message, before running default processing handler. Middleware function can return nil or kafka response message.

type OffsetCoordinator Uses

type OffsetCoordinator struct {
    Broker *Broker

    // Offsets is used to store all offset commits when using mocked
    // coordinator's default behaviour.
    Offsets map[string]int64

    // CommitHandler is callback function called whenever Commit method of the
    // OffsetCoordinator is called. If CommitHandler is nil, Commit method will
    // return data using Offset attribute as store.
    CommitHandler func(consumerGroup string, topic string, partition int32, offset int64) error

    // OffsetHandler is callback function called whenever Offset method of the
    // OffsetCoordinator is called. If OffsetHandler is nil, Commit method will
    // use Offset attribute to retrieve the offset.
    OffsetHandler func(consumerGroup string, topic string, partition int32) (offset int64, metadata string, err error)
    // contains filtered or unexported fields
}

func (*OffsetCoordinator) Commit Uses

func (c *OffsetCoordinator) Commit(topic string, partition int32, offset int64) error

Commit return result of CommitHandler callback set on coordinator. If handler is nil, this method will use Offsets attribute to store data for further use.

func (*OffsetCoordinator) Offset Uses

func (c *OffsetCoordinator) Offset(topic string, partition int32) (offset int64, metadata string, err error)

Offset return result of OffsetHandler callback set on coordinator. If handler is nil, this method will use Offsets attribute to retrieve committed offset. If no offset for given topic and partition pair was saved, proto.ErrUnknownTopicOrPartition is returned.

type ProducedMessages Uses

type ProducedMessages struct {
    Topic     string
    Partition int32
    Messages  []*proto.Message
}

ProducedMessages represents all arguments used for single Produce method call.

type Producer Uses

type Producer struct {
    Broker *Broker

    // ResponseError if set, force Produce method call to instantly return
    // error, without publishing messages. By default nil.
    ResponseError error
    // contains filtered or unexported fields
}

Producer mocks kafka's producer.

func (*Producer) Produce Uses

func (p *Producer) Produce(topic string, partition int32, messages ...*proto.Message) (int64, error)

Produce is settings messages Crc and Offset attributes and pushing all passed arguments to broker. Produce call is blocking until pushed message will be read with broker's ReadProduces.

func (*Producer) ResponseOffset Uses

func (p *Producer) ResponseOffset() int64

ResponseOffset returns the offset counter. The counter is incremented every time the Produce method is called. By default the counter is set to 1.

type Response Uses

type Response interface {
    Bytes() ([]byte, error)
}

Response is any kafka response as defined in kafka/proto package

type Server Uses

type Server struct {
    // contains filtered or unexported fields
}

Server is container for fake kafka server data.

Code:

// symulate server latency for all fetch requests
delayFetch := func(nodeID int32, reqKind int16, content []byte) Response {
    if reqKind != proto.FetchReqKind {
        return nil
    }
    time.Sleep(time.Millisecond * 500)
    return nil
}

server := NewServer(delayFetch)
server.MustSpawn()
defer func() {
    _ = server.Close()
}()
fmt.Printf("running server: %s", server.Addr())

server.AddMessages("my-topic", 0,
    &proto.Message{Value: []byte("first")},
    &proto.Message{Value: []byte("second")})

// connect to server using broker and fetch/write messages

func NewServer Uses

func NewServer(middlewares ...Middleware) *Server

NewServer return new mock server instance. Any number of middlewares can be passed to customize request handling. For every incomming request, all middlewares are called one after another in order they were passed. If any middleware return non nil response message, response is instasntly written to the client and no further code execution for the request is made -- no other middleware is called nor the default handler is executed.

func (*Server) AddMessages Uses

func (s *Server) AddMessages(topic string, partition int32, messages ...*proto.Message)

AddMessages append messages to given topic/partition. If topic or partition does not exists, it is being created. To only create topic/partition, call this method withough giving any message.

func (*Server) Addr Uses

func (s *Server) Addr() string

Addr return server instance address or empty string if not running.

func (*Server) Close Uses

func (s *Server) Close() (err error)

Close shut down server if running. It is safe to call it more than once.

func (*Server) MustSpawn Uses

func (s *Server) MustSpawn()

MustSpawn run server in the background on random port. It panics if server cannot be spawned. Use Close method to stop spawned server.

func (*Server) Reset Uses

func (s *Server) Reset()

Reset will clear out local messages and topics.

func (*Server) Run Uses

func (s *Server) Run(addr string) error

Run starts kafka mock server listening on given address.

func (*Server) ServeHTTP Uses

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP provides JSON serialized server state information.

Package kafkatest imports 14 packages (graph) and is imported by 1 packages. Updated 2018-04-17. Refresh now. Tools for package owners.