mocks

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MockKafkaAdmin

type MockKafkaAdmin struct {
	Topics *Topics
}

func NewMockAdminWithTopics

func NewMockAdminWithTopics(tps []*kafka.Topic) *MockKafkaAdmin

func (*MockKafkaAdmin) Close

func (m *MockKafkaAdmin) Close()

func (*MockKafkaAdmin) CreateTopics

func (m *MockKafkaAdmin) CreateTopics(topics []*kafka.Topic) error

func (*MockKafkaAdmin) DeleteTopics

func (m *MockKafkaAdmin) DeleteTopics(topics []string) (map[string]error, error)

func (*MockKafkaAdmin) FetchInfo

func (m *MockKafkaAdmin) FetchInfo(topics []string) ([]*kafka.Topic, error)

type MockPartition

type MockPartition struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

func (*MockPartition) Append

func (p *MockPartition) Append(r kafka.Record) error

func (*MockPartition) Fetch

func (p *MockPartition) Fetch(start int64, limit int) (records []kafka.Record, err error)

func (*MockPartition) FetchAll

func (p *MockPartition) FetchAll() (records []kafka.Record)

func (*MockPartition) Latest

func (p *MockPartition) Latest() int64

type MockPartitionConsumer

type MockPartitionConsumer struct {
	// contains filtered or unexported fields
}

func (*MockPartitionConsumer) Close

func (m *MockPartitionConsumer) Close() error

func (*MockPartitionConsumer) ConsumePartition

func (m *MockPartitionConsumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset kafka.Offset) (kafka.Partition, error)

func (*MockPartitionConsumer) ConsumeTopic

func (m *MockPartitionConsumer) ConsumeTopic(ctx context.Context, topic string, offset kafka.Offset) (map[int32]kafka.Partition, error)

func (*MockPartitionConsumer) GetOffsetLatest

func (m *MockPartitionConsumer) GetOffsetLatest(topic string, partition int32) (offset int64, err error)

func (*MockPartitionConsumer) GetOffsetOldest

func (m *MockPartitionConsumer) GetOffsetOldest(topic string, partition int32) (offset int64, err error)

func (*MockPartitionConsumer) OffsetValid

func (m *MockPartitionConsumer) OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error)

func (*MockPartitionConsumer) Partitions

func (m *MockPartitionConsumer) Partitions(ctx context.Context, topic string) ([]int32, error)

type MockStreamProducer

type MockStreamProducer struct {
	// contains filtered or unexported fields
}

func NewMockPartitionConsumer

func NewMockPartitionConsumer(topics *Topics) *MockStreamProducer

func NewMockProducer

func NewMockProducer(topics *Topics) *MockStreamProducer

func (*MockStreamProducer) Close

func (msp *MockStreamProducer) Close() error

func (*MockStreamProducer) ProduceBatch

func (msp *MockStreamProducer) ProduceBatch(ctx context.Context, messages []kafka.Record) error

func (*MockStreamProducer) ProduceSync

func (msp *MockStreamProducer) ProduceSync(ctx context.Context, message kafka.Record) (partition int32, offset int64, err error)

type MockTopic

type MockTopic struct {
	Name string

	Meta *kafka.Topic
	// contains filtered or unexported fields
}

func (*MockTopic) AddPartition

func (tp *MockTopic) AddPartition(id int) error

func (*MockTopic) FetchAll

func (tp *MockTopic) FetchAll() (records []kafka.Record)

func (*MockTopic) Partition

func (tp *MockTopic) Partition(id int) (*MockPartition, error)

func (*MockTopic) Partitions

func (tp *MockTopic) Partitions() []*MockPartition

type Record

type Record struct {
	MCtx       context.Context
	MTopic     string
	MPartition int32
	MOffset    int64
	MValue     []byte
	MKey       []byte
	MTimestamp time.Time
	MHeaders   []kafka.RecordHeader
}

func (*Record) Ctx

func (r *Record) Ctx() context.Context

func (*Record) Headers

func (r *Record) Headers() kafka.RecordHeaders

func (*Record) Key

func (r *Record) Key() []byte

func (*Record) Offset

func (r *Record) Offset() int64

func (*Record) Partition

func (r *Record) Partition() int32

func (*Record) String

func (r *Record) String() string

func (*Record) Timestamp

func (r *Record) Timestamp() time.Time

func (*Record) Topic

func (r *Record) Topic() string

func (*Record) Value

func (r *Record) Value() []byte

type Topics

type Topics struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

func NewMockTopics

func NewMockTopics() *Topics

func (*Topics) AddTopic

func (td *Topics) AddTopic(topic *MockTopic) error

func (*Topics) RemoveTopic

func (td *Topics) RemoveTopic(name string) error

func (*Topics) Topic

func (td *Topics) Topic(name string) (*MockTopic, error)

func (*Topics) Topics

func (td *Topics) Topics() map[string]*MockTopic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL