kafkatest

package
v3.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2023 License: MIT Imports: 12 Imported by: 3

README

KafkaTest

This package contains mocks intended to be used by users of this library for testing.

Empty mocks

If you require to implement your own mock functionality, you can use the empty mocks, which are created using moq to implement the interfaces kafkatest.ConsumerGroup, kafkatest.Producer and Message

These kind of mocks are recommended for unit-test, where you may only need to check that a particular function has been called with the expected parameters.

These interfaces expose the same methods as the real Producer and ConsumerGroup structs. You can instantiate the mocks like so:

consumer := kafkatest.IConsumerGroupMock{...}
producer := kafkatest.IProducerMock{...}
message := kafkatest.MessageMock{...}

Functional mocks

The previous mocks have been extended by implementing functionality that emulates a real Producer, Consumer and message, but without communicating with any real Kafka broker.

These kind of mocks are recommended for component-test, where you may want to have a fully functional mock that behaves like the real library, but without the overhead of deploying a full kafka cluster.

If you require a functional mock to test how you interact with kafka, you can use these mocks (kafkatest.MessageConsumer, kafaktest.MessageProducer and kafkatest.Message) like so:

Consumer

1- Create consumer mock

kafkaConsumer, err := kafkatest.NewConsumer(
    ctx,
    &kafka.ConsumerGroupConfig{
        BrokerAddrs:       Addr,
        Topic:             ConsumedTopic,
        GroupName:         GroupName,
        MinBrokersHealthy: &ConsumerMinBrokersHealthy,
        KafkaVersion:      &Version,
    },
    &kafkatest.ConsumerConfig{
        NumPartitions:     10,
        ChannelBufferSize: 10,
		InitAtCreation:    false,
	},
)

Please, provide the kafka ConsumerGroupConfig as you would do for a real kafka consumer, and the required kafkatest.ConsumerConfig according to your needs for the mock.

This will create a new kafkatest consumer, with NumPartitions (e.g. 10) go-routines running the sarama handler, one emulating each kafka partition.

The sarama message and error channels will have ChannelBufferSize (e.g. 10)

And the consumer will successfully initialise at creation time if InitAtCreation is true. Otherwise, it will fail to initialise at creation time, but it will succeed shortly after when Initialise() is called.

If no kafkatest.ConsumerConfig is provided, the default values will be used

2- Use the mock:

You can provide the Mock inside the kafkatest.Consumer to your service under test. For example, you may override a service kafka getter function like so:

service.GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka) (kafka.IConsumerGroup, error) {
    kafkaConsumer, err := kafkatest.NewConsumer(...)
    return kafkaConsumer.Mock
}

3- Queue messages to the mock

Usually, when you use this consumer for testing, you want to queue kafka events, so that they are consumed by the service under test that is using the kafka consumer.

To queue a new messages to be consumed by the mock, you can call QueueMessage with the schema and event that you want to be queued for consumption, like so:

// create event that will be queued (matches schema.MySchema)
event := &models.MyEvent{
    Field1: "value one",
    FieldN: "value N"
}

// queue the event with the corresponding schema
if err := kafkaConsumer.QueueMessage(schema.MySchema, event); err != nil {
	return fmt.Errorf("failed to queue event: %w", err)
}
Producer

1- Create producer mock

kafkaProducer, err := kafkatest.NewProducer(
    ctx,
    &kafka.ProducerConfig{
        BrokerAddrs:       Addr,
        Topic:             ProducerTopic,
        MinBrokersHealthy: &ProducerMinBrokersHealthy,
        KafkaVersion:      &Version,
    },
    &kafkatest.ProducerConfig{
        ChannelBufferSize: 10,
		InitAtCreation:    false,
	},
)

Please, provide the kafka ProducerConfig as you would do for a real kafka producer, and the required kafkatest.ProducerConfig according to your needs for the mock.

The sarama message and error channels will have ChannelBufferSize (e.g. 10)

And the producer will successfully initialise at creation time if InitAtCreation is true. Otherwise, it will fail to initialise at creation time, but it will succeed shortly after when Initialise() is called.

If no kafkatest.ProducerConfig is provided, the default values will be used

2- Use the mock:

You can provide the Mock inside the kafkatest.Producer to your service under test. For example, you may override a service kafka getter function like so:

service.GetKafkaProducer = func(ctx context.Context, cfg *config.Kafka) (kafka.IProducer, error) {
    kafkaProducer, err := kafkatest.NewProducer(...)
    return kafkaProducer.Mock
}

3- Wait for message to be sent

Usually, when you use this consumer for testing, you want to check that a message is sent, so that it can be validated.

To expect a message to be sent through the mock, you can call WaitForMessageSent with the expected schema and an event pointer. The function will block until a message is sent or the provided timeout expires. If the event is sent, it will be unmarshaled to the provided pointer.

// create empty event pointer of the type you expect (matches schema.MySchema)
var e = &models.MyEvent{}

// wait for an event to be sent, with the corresponding schema
if err := kafkaProducer.WaitForMessageSent(schema.MySchema, e, timeout); err != nil {
	return fmt.Errorf("failed to expect sent message: %w", err)
}

Alternatively, your test might want to check that no message is sent (for example, if you are testing a case where a dependency returns an error).

You can check that no message is sent within a time window by calling WaitNoMessageSent with the duration of the window. Please, note that this function will block execution until the timeWindow duration has elapsed, until a message is sent through the mock, or until the producer is closed (in the latter 2 cases an error will be returned).

timeWindow := 5 * time.Second

// Wait up to 'timeWindow', expecting that no message is sent
if err := kafkaProducer.WaitNoMessageSent(timeWindow); err != nil {
    // handle error (message sent or producer closed)
}
Message
message := kafkatest.NewMessage(data, offset)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConsumerConfig = &ConsumerConfig{
	NumPartitions:     30,
	ChannelBufferSize: 30,
	InitAtCreation:    true,
}
View Source
var DefaultProducerConfig = &ProducerConfig{
	ChannelBufferSize: 30,
	InitAtCreation:    true,
}

Functions

func NewSaramaConsumerGroupClaimMock added in v3.9.0

func NewSaramaConsumerGroupClaimMock(ch chan *sarama.ConsumerMessage) *mock.SaramaConsumerGroupClaimMock

func NewSaramaConsumerGroupSessionMock added in v3.9.0

func NewSaramaConsumerGroupSessionMock(memberID, topic string, numPartitions int) (*mock.SaramaConsumerGroupSessionMock, context.CancelFunc)

NewSaramaConsumerGroupSessionMock returns a new sarama consuemr group session mock with the provided number of partitions it also returns a func to cancel the context, to be used when a session is ending

func OptionalHeaders added in v3.5.0

func OptionalHeaders(h Headers) func(o *Options) error

func SaramaBrokerGenerator added in v3.9.0

func SaramaBrokerGenerator(topic string) func(addr string) interfaces.SaramaBroker

Types

type Consumer added in v3.9.0

type Consumer struct {
	Mock *IConsumerGroupMock // Implements all moq functions so users can validate calls
	// contains filtered or unexported fields
}

Consumer is an extension of the moq ConsumerGroup with implementation of required functions and Sarama mocks to emulate a fully functional Kafka ConsumerGroup

func NewConsumer added in v3.9.0

func NewConsumer(ctx context.Context, cgConfig *kafka.ConsumerGroupConfig, cfg *ConsumerConfig) (*Consumer, error)

NewConsumer creates a testing consumer for testing. It behaves like a real consuemr-group, without network communication

func (*Consumer) QueueMessage added in v3.9.0

func (cg *Consumer) QueueMessage(schema *avro.Schema, event interface{}) error

QueueMessage will put the provided message to the testing consumption queue, so that it is consumed when the consumer is ready to do so. This emulates a message being received by a kafka broker, which is kept until a consumer consumes it.

func (*Consumer) RebalanceCluster added in v3.9.0

func (cg *Consumer) RebalanceCluster(ctx context.Context)

RebalanceCluster emulates a serer-side rebalance, which will cancel any active session

type ConsumerConfig added in v3.9.0

type ConsumerConfig struct {
	NumPartitions     int  // number of partitions assigned to the consumer
	ChannelBufferSize int  // buffer size
	InitAtCreation    bool // Determines if the consumer is initialised or not when it's created, or it will be initialised later
}

type Headers added in v3.5.0

type Headers map[string]string

type IConsumerGroupMock

type IConsumerGroupMock struct {
	// ChannelsFunc mocks the Channels method.
	ChannelsFunc func() *kafka.ConsumerGroupChannels

	// CheckerFunc mocks the Checker method.
	CheckerFunc func(ctx context.Context, state *health.CheckState) error

	// CloseFunc mocks the Close method.
	CloseFunc func(ctx context.Context, optFuncs ...kafka.OptFunc) error

	// InitialiseFunc mocks the Initialise method.
	InitialiseFunc func(ctx context.Context) error

	// IsInitialisedFunc mocks the IsInitialised method.
	IsInitialisedFunc func() bool

	// LogErrorsFunc mocks the LogErrors method.
	LogErrorsFunc func(ctx context.Context)

	// OnHealthUpdateFunc mocks the OnHealthUpdate method.
	OnHealthUpdateFunc func(status string)

	// RegisterBatchHandlerFunc mocks the RegisterBatchHandler method.
	RegisterBatchHandlerFunc func(ctx context.Context, batchHandler kafka.BatchHandler) error

	// RegisterHandlerFunc mocks the RegisterHandler method.
	RegisterHandlerFunc func(ctx context.Context, h kafka.Handler) error

	// StartFunc mocks the Start method.
	StartFunc func() error

	// StateFunc mocks the State method.
	StateFunc func() kafka.State

	// StateWaitFunc mocks the StateWait method.
	StateWaitFunc func(state kafka.State)

	// StopFunc mocks the Stop method.
	StopFunc func() error

	// StopAndWaitFunc mocks the StopAndWait method.
	StopAndWaitFunc func() error
	// contains filtered or unexported fields
}

IConsumerGroupMock is a mock implementation of kafka.IConsumerGroup.

func TestSomethingThatUsesIConsumerGroup(t *testing.T) {

	// make and configure a mocked kafka.IConsumerGroup
	mockedIConsumerGroup := &IConsumerGroupMock{
		ChannelsFunc: func() *kafka.ConsumerGroupChannels {
			panic("mock out the Channels method")
		},
		CheckerFunc: func(ctx context.Context, state *health.CheckState) error {
			panic("mock out the Checker method")
		},
		CloseFunc: func(ctx context.Context, optFuncs ...kafka.OptFunc) error {
			panic("mock out the Close method")
		},
		InitialiseFunc: func(ctx context.Context) error {
			panic("mock out the Initialise method")
		},
		IsInitialisedFunc: func() bool {
			panic("mock out the IsInitialised method")
		},
		LogErrorsFunc: func(ctx context.Context)  {
			panic("mock out the LogErrors method")
		},
		OnHealthUpdateFunc: func(status string)  {
			panic("mock out the OnHealthUpdate method")
		},
		RegisterBatchHandlerFunc: func(ctx context.Context, batchHandler kafka.BatchHandler) error {
			panic("mock out the RegisterBatchHandler method")
		},
		RegisterHandlerFunc: func(ctx context.Context, h kafka.Handler) error {
			panic("mock out the RegisterHandler method")
		},
		StartFunc: func() error {
			panic("mock out the Start method")
		},
		StateFunc: func() kafka.State {
			panic("mock out the State method")
		},
		StateWaitFunc: func(state kafka.State)  {
			panic("mock out the StateWait method")
		},
		StopFunc: func() error {
			panic("mock out the Stop method")
		},
		StopAndWaitFunc: func() error {
			panic("mock out the StopAndWait method")
		},
	}

	// use mockedIConsumerGroup in code that requires kafka.IConsumerGroup
	// and then make assertions.

}

func (*IConsumerGroupMock) Channels

Channels calls ChannelsFunc.

func (*IConsumerGroupMock) ChannelsCalls

func (mock *IConsumerGroupMock) ChannelsCalls() []struct {
}

ChannelsCalls gets all the calls that were made to Channels. Check the length with:

len(mockedIConsumerGroup.ChannelsCalls())

func (*IConsumerGroupMock) Checker

func (mock *IConsumerGroupMock) Checker(ctx context.Context, state *health.CheckState) error

Checker calls CheckerFunc.

func (*IConsumerGroupMock) CheckerCalls

func (mock *IConsumerGroupMock) CheckerCalls() []struct {
	Ctx   context.Context
	State *health.CheckState
}

CheckerCalls gets all the calls that were made to Checker. Check the length with:

len(mockedIConsumerGroup.CheckerCalls())

func (*IConsumerGroupMock) Close

func (mock *IConsumerGroupMock) Close(ctx context.Context, optFuncs ...kafka.OptFunc) error

Close calls CloseFunc.

func (*IConsumerGroupMock) CloseCalls

func (mock *IConsumerGroupMock) CloseCalls() []struct {
	Ctx      context.Context
	OptFuncs []kafka.OptFunc
}

CloseCalls gets all the calls that were made to Close. Check the length with:

len(mockedIConsumerGroup.CloseCalls())

func (*IConsumerGroupMock) Initialise

func (mock *IConsumerGroupMock) Initialise(ctx context.Context) error

Initialise calls InitialiseFunc.

func (*IConsumerGroupMock) InitialiseCalls

func (mock *IConsumerGroupMock) InitialiseCalls() []struct {
	Ctx context.Context
}

InitialiseCalls gets all the calls that were made to Initialise. Check the length with:

len(mockedIConsumerGroup.InitialiseCalls())

func (*IConsumerGroupMock) IsInitialised

func (mock *IConsumerGroupMock) IsInitialised() bool

IsInitialised calls IsInitialisedFunc.

func (*IConsumerGroupMock) IsInitialisedCalls

func (mock *IConsumerGroupMock) IsInitialisedCalls() []struct {
}

IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:

len(mockedIConsumerGroup.IsInitialisedCalls())

func (*IConsumerGroupMock) LogErrors

func (mock *IConsumerGroupMock) LogErrors(ctx context.Context)

LogErrors calls LogErrorsFunc.

func (*IConsumerGroupMock) LogErrorsCalls

func (mock *IConsumerGroupMock) LogErrorsCalls() []struct {
	Ctx context.Context
}

LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:

len(mockedIConsumerGroup.LogErrorsCalls())

func (*IConsumerGroupMock) OnHealthUpdate

func (mock *IConsumerGroupMock) OnHealthUpdate(status string)

OnHealthUpdate calls OnHealthUpdateFunc.

func (*IConsumerGroupMock) OnHealthUpdateCalls

func (mock *IConsumerGroupMock) OnHealthUpdateCalls() []struct {
	Status string
}

OnHealthUpdateCalls gets all the calls that were made to OnHealthUpdate. Check the length with:

len(mockedIConsumerGroup.OnHealthUpdateCalls())

func (*IConsumerGroupMock) RegisterBatchHandler

func (mock *IConsumerGroupMock) RegisterBatchHandler(ctx context.Context, batchHandler kafka.BatchHandler) error

RegisterBatchHandler calls RegisterBatchHandlerFunc.

func (*IConsumerGroupMock) RegisterBatchHandlerCalls

func (mock *IConsumerGroupMock) RegisterBatchHandlerCalls() []struct {
	Ctx          context.Context
	BatchHandler kafka.BatchHandler
}

RegisterBatchHandlerCalls gets all the calls that were made to RegisterBatchHandler. Check the length with:

len(mockedIConsumerGroup.RegisterBatchHandlerCalls())

func (*IConsumerGroupMock) RegisterHandler

func (mock *IConsumerGroupMock) RegisterHandler(ctx context.Context, h kafka.Handler) error

RegisterHandler calls RegisterHandlerFunc.

func (*IConsumerGroupMock) RegisterHandlerCalls

func (mock *IConsumerGroupMock) RegisterHandlerCalls() []struct {
	Ctx context.Context
	H   kafka.Handler
}

RegisterHandlerCalls gets all the calls that were made to RegisterHandler. Check the length with:

len(mockedIConsumerGroup.RegisterHandlerCalls())

func (*IConsumerGroupMock) Start

func (mock *IConsumerGroupMock) Start() error

Start calls StartFunc.

func (*IConsumerGroupMock) StartCalls

func (mock *IConsumerGroupMock) StartCalls() []struct {
}

StartCalls gets all the calls that were made to Start. Check the length with:

len(mockedIConsumerGroup.StartCalls())

func (*IConsumerGroupMock) State

func (mock *IConsumerGroupMock) State() kafka.State

State calls StateFunc.

func (*IConsumerGroupMock) StateCalls

func (mock *IConsumerGroupMock) StateCalls() []struct {
}

StateCalls gets all the calls that were made to State. Check the length with:

len(mockedIConsumerGroup.StateCalls())

func (*IConsumerGroupMock) StateWait added in v3.1.0

func (mock *IConsumerGroupMock) StateWait(state kafka.State)

StateWait calls StateWaitFunc.

func (*IConsumerGroupMock) StateWaitCalls added in v3.1.0

func (mock *IConsumerGroupMock) StateWaitCalls() []struct {
	State kafka.State
}

StateWaitCalls gets all the calls that were made to StateWait. Check the length with:

len(mockedIConsumerGroup.StateWaitCalls())

func (*IConsumerGroupMock) Stop

func (mock *IConsumerGroupMock) Stop() error

Stop calls StopFunc.

func (*IConsumerGroupMock) StopAndWait

func (mock *IConsumerGroupMock) StopAndWait() error

StopAndWait calls StopAndWaitFunc.

func (*IConsumerGroupMock) StopAndWaitCalls

func (mock *IConsumerGroupMock) StopAndWaitCalls() []struct {
}

StopAndWaitCalls gets all the calls that were made to StopAndWait. Check the length with:

len(mockedIConsumerGroup.StopAndWaitCalls())

func (*IConsumerGroupMock) StopCalls

func (mock *IConsumerGroupMock) StopCalls() []struct {
}

StopCalls gets all the calls that were made to Stop. Check the length with:

len(mockedIConsumerGroup.StopCalls())

type IProducerMock

type IProducerMock struct {
	// AddHeaderFunc mocks the AddHeader method.
	AddHeaderFunc func(key string, value string)

	// ChannelsFunc mocks the Channels method.
	ChannelsFunc func() *kafka.ProducerChannels

	// CheckerFunc mocks the Checker method.
	CheckerFunc func(ctx context.Context, state *health.CheckState) error

	// CloseFunc mocks the Close method.
	CloseFunc func(ctx context.Context) error

	// InitialiseFunc mocks the Initialise method.
	InitialiseFunc func(ctx context.Context) error

	// IsInitialisedFunc mocks the IsInitialised method.
	IsInitialisedFunc func() bool

	// LogErrorsFunc mocks the LogErrors method.
	LogErrorsFunc func(ctx context.Context)

	// SendFunc mocks the Send method.
	SendFunc func(schema *avro.Schema, event interface{}) error
	// contains filtered or unexported fields
}

IProducerMock is a mock implementation of kafka.IProducer.

func TestSomethingThatUsesIProducer(t *testing.T) {

	// make and configure a mocked kafka.IProducer
	mockedIProducer := &IProducerMock{
		AddHeaderFunc: func(key string, value string)  {
			panic("mock out the AddHeader method")
		},
		ChannelsFunc: func() *kafka.ProducerChannels {
			panic("mock out the Channels method")
		},
		CheckerFunc: func(ctx context.Context, state *health.CheckState) error {
			panic("mock out the Checker method")
		},
		CloseFunc: func(ctx context.Context) error {
			panic("mock out the Close method")
		},
		InitialiseFunc: func(ctx context.Context) error {
			panic("mock out the Initialise method")
		},
		IsInitialisedFunc: func() bool {
			panic("mock out the IsInitialised method")
		},
		LogErrorsFunc: func(ctx context.Context)  {
			panic("mock out the LogErrors method")
		},
		SendFunc: func(schema *avro.Schema, event interface{}) error {
			panic("mock out the Send method")
		},
	}

	// use mockedIProducer in code that requires kafka.IProducer
	// and then make assertions.

}

func (*IProducerMock) AddHeader added in v3.4.0

func (mock *IProducerMock) AddHeader(key string, value string)

AddHeader calls AddHeaderFunc.

func (*IProducerMock) AddHeaderCalls added in v3.4.0

func (mock *IProducerMock) AddHeaderCalls() []struct {
	Key   string
	Value string
}

AddHeaderCalls gets all the calls that were made to AddHeader. Check the length with:

len(mockedIProducer.AddHeaderCalls())

func (*IProducerMock) Channels

func (mock *IProducerMock) Channels() *kafka.ProducerChannels

Channels calls ChannelsFunc.

func (*IProducerMock) ChannelsCalls

func (mock *IProducerMock) ChannelsCalls() []struct {
}

ChannelsCalls gets all the calls that were made to Channels. Check the length with:

len(mockedIProducer.ChannelsCalls())

func (*IProducerMock) Checker

func (mock *IProducerMock) Checker(ctx context.Context, state *health.CheckState) error

Checker calls CheckerFunc.

func (*IProducerMock) CheckerCalls

func (mock *IProducerMock) CheckerCalls() []struct {
	Ctx   context.Context
	State *health.CheckState
}

CheckerCalls gets all the calls that were made to Checker. Check the length with:

len(mockedIProducer.CheckerCalls())

func (*IProducerMock) Close

func (mock *IProducerMock) Close(ctx context.Context) error

Close calls CloseFunc.

func (*IProducerMock) CloseCalls

func (mock *IProducerMock) CloseCalls() []struct {
	Ctx context.Context
}

CloseCalls gets all the calls that were made to Close. Check the length with:

len(mockedIProducer.CloseCalls())

func (*IProducerMock) Initialise

func (mock *IProducerMock) Initialise(ctx context.Context) error

Initialise calls InitialiseFunc.

func (*IProducerMock) InitialiseCalls

func (mock *IProducerMock) InitialiseCalls() []struct {
	Ctx context.Context
}

InitialiseCalls gets all the calls that were made to Initialise. Check the length with:

len(mockedIProducer.InitialiseCalls())

func (*IProducerMock) IsInitialised

func (mock *IProducerMock) IsInitialised() bool

IsInitialised calls IsInitialisedFunc.

func (*IProducerMock) IsInitialisedCalls

func (mock *IProducerMock) IsInitialisedCalls() []struct {
}

IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:

len(mockedIProducer.IsInitialisedCalls())

func (*IProducerMock) LogErrors

func (mock *IProducerMock) LogErrors(ctx context.Context)

LogErrors calls LogErrorsFunc.

func (*IProducerMock) LogErrorsCalls

func (mock *IProducerMock) LogErrorsCalls() []struct {
	Ctx context.Context
}

LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:

len(mockedIProducer.LogErrorsCalls())

func (*IProducerMock) Send

func (mock *IProducerMock) Send(schema *avro.Schema, event interface{}) error

Send calls SendFunc.

func (*IProducerMock) SendCalls

func (mock *IProducerMock) SendCalls() []struct {
	Schema *avro.Schema
	Event  interface{}
}

SendCalls gets all the calls that were made to Send. Check the length with:

len(mockedIProducer.SendCalls())

type Message

type Message struct {
	*mock.MessageMock
	// contains filtered or unexported fields
}

Message allows a mock message to return the configured data, and capture whether commit has been called.

func NewMessage

func NewMessage(data []byte, offset int64, optionFuncs ...func(*Options) error) (*Message, error)

NewMessage returns a new mock message containing the given data.

func (Message) IsCommitted

func (internal Message) IsCommitted() bool

IsCommittedFunc returns true if the message offset was committed.

func (Message) IsMarked

func (internal Message) IsMarked() bool

IsMarked returns true if the message was marked as consumed.

type Options added in v3.5.0

type Options struct {
	Headers map[string]string
}

type Producer added in v3.9.0

type Producer struct {
	Mock *IProducerMock // Implements all moq functions so users can validate calls
	// contains filtered or unexported fields
}

Producer is an extension of the moq Producer with implementation of required functions and Sarama mocks to emulate a fully functional kafka Producer.

func NewProducer added in v3.9.0

func NewProducer(ctx context.Context, pConfig *kafka.ProducerConfig, cfg *ProducerConfig) (*Producer, error)

NewProducer creates a testing producer for testing. It behaves like a real producer, without network communication

func (*Producer) WaitForMessageSent added in v3.9.0

func (p *Producer) WaitForMessageSent(schema *avro.Schema, event interface{}, timeout time.Duration) error

WaitForMessageSent waits for a new message being sent to Kafka, with a timeout according to the provided value If a message is sent, it unmarshals it into the provided 'event', using the provided avro 'schema'.

func (*Producer) WaitNoMessageSent added in v3.9.0

func (p *Producer) WaitNoMessageSent(timeWindow time.Duration) error

WaitNoMessageSent waits until the timeWindow elapses. If during the time window the closer channel is closed, or a message is sent to the sarama message channel, then an error is returned

type ProducerConfig added in v3.9.0

type ProducerConfig struct {
	ChannelBufferSize int  // buffer size
	InitAtCreation    bool // Determines if the consumer is initialised or not when it's created, or it will be initialised later
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL