kafka

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const FakeKafkaIDPropName = "fakeKafkaID"

Variables

This section is empty.

Functions

func IngestRow

func IngestRow(topic *Topic, row *common.Row, colTypes []common.ColumnType, keyCols []int, encoder MessageEncoder, timestamp time.Time) error

IngestRow is a convenience method which encodes the row into a Kafka message first, then ingests it

func IngestRows

func IngestRows(f *FakeKafka, sourceInfo *common.SourceInfo, colTypes []common.ColumnType, rows *common.Rows, encoder MessageEncoder) error

IngestRows ingests rows given schema and source name - convenience method for use in tests

func NewStringKeyProtobufValueEncoderFactory

func NewStringKeyProtobufValueEncoderFactory(registry protolib.Resolver) func(options string) (MessageEncoder, error)

Types

type ClientFactory

type ClientFactory func(topicName string, props map[string]string, groupID string) MessageProviderFactory

type ConfluentMessageProvider

type ConfluentMessageProvider struct {
	// contains filtered or unexported fields
}

func (*ConfluentMessageProvider) Close

func (cmp *ConfluentMessageProvider) Close() error

func (*ConfluentMessageProvider) CommitOffsets

func (cmp *ConfluentMessageProvider) CommitOffsets(offsetsMap map[int32]int64) error

func (*ConfluentMessageProvider) GetMessage

func (cmp *ConfluentMessageProvider) GetMessage(pollTimeout time.Duration) (*Message, error)

func (*ConfluentMessageProvider) RebalanceOccurred

func (cmp *ConfluentMessageProvider) RebalanceOccurred(cons *kafka.Consumer, event kafka.Event) error

func (*ConfluentMessageProvider) SetRebalanceCallback

func (cmp *ConfluentMessageProvider) SetRebalanceCallback(callback RebalanceCallback)

func (*ConfluentMessageProvider) Start

func (cmp *ConfluentMessageProvider) Start() error

func (*ConfluentMessageProvider) Stop

func (cmp *ConfluentMessageProvider) Stop() error

type ConfluentMessageProviderFactory

type ConfluentMessageProviderFactory struct {
	// contains filtered or unexported fields
}

func (*ConfluentMessageProviderFactory) NewMessageProvider

func (cmpf *ConfluentMessageProviderFactory) NewMessageProvider() (MessageProvider, error)

type FakeKafka

type FakeKafka struct {
	ID int64
	// contains filtered or unexported fields
}

func GetFakeKafka

func GetFakeKafka(id int64) (*FakeKafka, bool)

func NewFakeKafka

func NewFakeKafka() *FakeKafka

func (*FakeKafka) CreateTopic

func (f *FakeKafka) CreateTopic(name string, partitions int) (*Topic, error)

func (*FakeKafka) DeleteTopic

func (f *FakeKafka) DeleteTopic(name string) error

func (*FakeKafka) GetTopic

func (f *FakeKafka) GetTopic(name string) (*Topic, bool)

func (*FakeKafka) GetTopicNames

func (f *FakeKafka) GetTopicNames() []string

func (*FakeKafka) IngestMessage

func (f *FakeKafka) IngestMessage(topicName string, message *Message) error

func (*FakeKafka) InjectFailure

func (f *FakeKafka) InjectFailure(topicName string, groupID string, failTime time.Duration) error

type FakeMessageProvider

type FakeMessageProvider struct {
	// contains filtered or unexported fields
}

func (*FakeMessageProvider) Close

func (f *FakeMessageProvider) Close() error

func (*FakeMessageProvider) CommitOffsets

func (f *FakeMessageProvider) CommitOffsets(offsets map[int32]int64) error

func (*FakeMessageProvider) GetMessage

func (f *FakeMessageProvider) GetMessage(pollTimeout time.Duration) (*Message, error)

func (*FakeMessageProvider) SetMaxRate added in v0.1.6

func (f *FakeMessageProvider) SetMaxRate(rate int)

func (*FakeMessageProvider) SetRebalanceCallback

func (f *FakeMessageProvider) SetRebalanceCallback(callback RebalanceCallback)

func (*FakeMessageProvider) Start

func (f *FakeMessageProvider) Start() error

func (*FakeMessageProvider) Stop

func (f *FakeMessageProvider) Stop() error

type FakeMessageProviderFactory

type FakeMessageProviderFactory struct {
	// contains filtered or unexported fields
}

func (*FakeMessageProviderFactory) NewMessageProvider

func (fmpf *FakeMessageProviderFactory) NewMessageProvider() (MessageProvider, error)

type Float32BEKeyTLJSONValueEncoder

type Float32BEKeyTLJSONValueEncoder struct {
}

Float32BEKeyTLJSONValueEncoder encodes as float32BE key, top level JSON value, no headers

func (*Float32BEKeyTLJSONValueEncoder) EncodeMessage

func (s *Float32BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*Float32BEKeyTLJSONValueEncoder) Name

type Float64BEKeyTLJSONValueEncoder

type Float64BEKeyTLJSONValueEncoder struct {
}

Float64BEKeyTLJSONValueEncoder encodes as float64BE key, top level JSON value, no headers

func (*Float64BEKeyTLJSONValueEncoder) EncodeMessage

func (s *Float64BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*Float64BEKeyTLJSONValueEncoder) Name

type Group

type Group struct {
	// contains filtered or unexported fields
}

type Int16BEKeyTLJSONValueEncoder

type Int16BEKeyTLJSONValueEncoder struct {
}

Int16BEKeyTLJSONValueEncoder encodes as int16BE key, top level JSON value, no headers

func (*Int16BEKeyTLJSONValueEncoder) EncodeMessage

func (s *Int16BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*Int16BEKeyTLJSONValueEncoder) Name

type Int32BEKeyTLJSONValueEncoder

type Int32BEKeyTLJSONValueEncoder struct {
}

Int32BEKeyTLJSONValueEncoder encodes as int32BE key, top level JSON value, no headers

func (*Int32BEKeyTLJSONValueEncoder) EncodeMessage

func (s *Int32BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*Int32BEKeyTLJSONValueEncoder) Name

type Int64BEKeyTLJSONValueEncoder

type Int64BEKeyTLJSONValueEncoder struct {
}

Int64BEKeyTLJSONValueEncoder encodes as int64BE key, top level JSON value, no headers

func (*Int64BEKeyTLJSONValueEncoder) EncodeMessage

func (s *Int64BEKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*Int64BEKeyTLJSONValueEncoder) Name

type JSONHeadersEncoder

type JSONHeadersEncoder struct {
}

JSONHeadersEncoder puts the message key encoded as JSON in one header and the message value encoded as JSON in another, the actual message key and value are empty JSON objects

func (*JSONHeadersEncoder) EncodeMessage

func (s *JSONHeadersEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*JSONHeadersEncoder) Name

func (s *JSONHeadersEncoder) Name() string

type JSONKeyJSONValueEncoder

type JSONKeyJSONValueEncoder struct {
}

JSONKeyJSONValueEncoder encodes as top level JSON key, top level JSON value, no headers

func (*JSONKeyJSONValueEncoder) EncodeMessage

func (s *JSONKeyJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*JSONKeyJSONValueEncoder) Name

func (s *JSONKeyJSONValueEncoder) Name() string

type Message

type Message struct {
	PartInfo  PartInfo
	TimeStamp time.Time
	Key       []byte
	Value     []byte
	Headers   []MessageHeader
}

type MessageEncoder

type MessageEncoder interface {
	Name() string
	EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)
}

func NewStringKeyProtobufValueEncoder

func NewStringKeyProtobufValueEncoder(protoRegistry protolib.Resolver, options string) (MessageEncoder, error)

type MessageHeader

type MessageHeader struct {
	Key   string
	Value []byte
}

type MessageProvider

type MessageProvider interface {
	GetMessage(pollTimeout time.Duration) (*Message, error)
	CommitOffsets(offsets map[int32]int64) error
	Stop() error
	Start() error
	Close() error
	SetRebalanceCallback(callback RebalanceCallback)
}

type MessageProviderFactory

type MessageProviderFactory interface {
	NewMessageProvider() (MessageProvider, error)
}

func NewFakeMessageProviderFactory

func NewFakeMessageProviderFactory(topicName string, props map[string]string, groupName string) (MessageProviderFactory, error)

func NewMessageProviderFactory

func NewMessageProviderFactory(topicName string, props map[string]string, groupID string) MessageProviderFactory

type MessageQueue

type MessageQueue chan *Message

type NestedJSONKeyNestedJSONValueEncoder

type NestedJSONKeyNestedJSONValueEncoder struct {
}

NestedJSONKeyNestedJSONValueEncoder encodes as nested JSON key, nested JSON value, no headers

func (*NestedJSONKeyNestedJSONValueEncoder) EncodeMessage

func (s *NestedJSONKeyNestedJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*NestedJSONKeyNestedJSONValueEncoder) Name

type PartInfo

type PartInfo struct {
	PartitionID int32
	Offset      int64
}

type Partition

type Partition struct {
	// contains filtered or unexported fields
}

type RebalanceCallback

type RebalanceCallback func() error

type StringKeyProtobufValueEncoder

type StringKeyProtobufValueEncoder struct {
	// contains filtered or unexported fields
}

StringKeyProtobufValueEncoder is an encoder that translates each row to a protobuf message. Columns 0 to N correspond to protobuf field numbers 1 to N+1. This means that the key also ends up as a field in the value protobuf.

func (*StringKeyProtobufValueEncoder) EncodeMessage

func (e *StringKeyProtobufValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*StringKeyProtobufValueEncoder) Name

type StringKeyTLJSONValueEncoder

type StringKeyTLJSONValueEncoder struct {
}

StringKeyTLJSONValueEncoder encodes as string key, top level JSON value, no headers

func (*StringKeyTLJSONValueEncoder) EncodeMessage

func (s *StringKeyTLJSONValueEncoder) EncodeMessage(row *common.Row, colTypes []common.ColumnType, keyCols []int, timestamp time.Time) (*Message, error)

func (*StringKeyTLJSONValueEncoder) Name

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func (*Subscriber) GetMessage

func (c *Subscriber) GetMessage(pollTimeout time.Duration) (*Message, error)

func (*Subscriber) Unsubscribe

func (c *Subscriber) Unsubscribe() error

type Topic

type Topic struct {
	Name string
	// contains filtered or unexported fields
}

func (*Topic) CreateSubscriber

func (t *Topic) CreateSubscriber(groupID string, rebalanceCB RebalanceCallback) (*Subscriber, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL