kafka

package
v0.0.0-...-f839e32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CopyDataChunkFile

func CopyDataChunkFile(consumer *Consumer, dst io.Writer)

CopyDataChunkFile

func MsgAddressPrintable

func MsgAddressPrintable(message interface{}) string

MsgAddressPrintable

func TasksProcessor

func TasksProcessor(consumer *Consumer, processor func(*common.Task) error)

Types

type ConsumeMessageFunction

type ConsumeMessageFunction func(context.Context, *sarama.ConsumerMessage) bool

ConsumeMessageFunction specifies message consumeMessageFunction function

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer

func NewConsumer

func NewConsumer(endpoint *common.KafkaEndpoint, address *common.KafkaAddress) *Consumer

NewConsumer

func NewConsumerConfig

func NewConsumerConfig(cfg sections.KafkaConfigurator, topic string) *Consumer

NewConsumerConfig

func (*Consumer) Close

func (c *Consumer) Close()

Close will close partition consumer and drain partition consumer's Messages() chan, so blocking Messages() will exit

func (*Consumer) Recv

func (c *Consumer) Recv() *sarama.ConsumerMessage

Recv is a blocking call

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup

func NewConsumerGroup

func NewConsumerGroup(endpoint *common.KafkaEndpoint, address *common.KafkaAddress, groupID string) *ConsumerGroup

NewConsumerGroup creates new consumer group

func NewConsumerGroupFromEndpoint

func NewConsumerGroupFromEndpoint(cfg sections.KafkaConfigurator, groupID string) *ConsumerGroup

NewConsumerGroupFromEndpoint. IMPORTANT - you have to specify topic to read from either with

  1. SetAddress
  2. SetTopic

func (*ConsumerGroup) ConsumeLoop

func (c *ConsumerGroup) ConsumeLoop(consumeNewest bool, ack bool)

ConsumeLoop runs an endless loop of kafka consumer

func (*ConsumerGroup) SetAddress

func (c *ConsumerGroup) SetAddress(address *common.KafkaAddress) *ConsumerGroup

SetAddress - sets the full address - Topic and Partition

func (*ConsumerGroup) SetConsumeMessageFunction

func (c *ConsumerGroup) SetConsumeMessageFunction(consumeMessageFunction ConsumeMessageFunction) *ConsumerGroup

SetConsumeMessageFunction sets function which will be called for each message received from Kafka

func (*ConsumerGroup) SetConsumerGroupHandler

func (c *ConsumerGroup) SetConsumerGroupHandler(handler sarama.ConsumerGroupHandler) *ConsumerGroup

SetConsumerGroupHandler sets handler which performs setup, cleanup and message processing activities

func (*ConsumerGroup) SetContext

func (c *ConsumerGroup) SetContext(ctx context.Context) *ConsumerGroup

SetContext - sets context to be used by MessageProcessor

func (*ConsumerGroup) SetTopic

func (c *ConsumerGroup) SetTopic(topic string) *ConsumerGroup

SetTopic - sets address in simplified form - specified Topic and Partition 0

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

ConsumerGroupHandler instances are used to handle individual topic/partition claims. It also provides hooks for your consumer group session life-cycle and allow you to trigger logic before or after the consume loop(s).

PLEASE NOTE that handlers are likely be called from several goroutines concurrently, ensure that all state is safely protected against race conditions.

Implements sarama.ConsumerGroupHandler interface

func (*ConsumerGroupHandler) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time. Part of sarama.ConsumerGroupHandler interface

func (*ConsumerGroupHandler) ConsumeClaim

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit. Part of sarama.ConsumerGroupHandler interface

func (*ConsumerGroupHandler) Setup

Setup is run at the beginning of a new session, before ConsumeClaim. Part of sarama.ConsumerGroupHandler interface

type DataChunkTransport

type DataChunkTransport struct {
	Transport
}

func NewDataChunkTransport

func NewDataChunkTransport(producer *Producer, consumer *Consumer, close bool) *DataChunkTransport

NewDataChunkTransport

func (*DataChunkTransport) Recv

func (t *DataChunkTransport) Recv() (*common.DataPacket, error)

Recv

func (*DataChunkTransport) Send

func (t *DataChunkTransport) Send(dataChunk *common.DataPacket) error

Send

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer

func NewProducer

func NewProducer(endpoint *common.KafkaEndpoint, address *common.KafkaAddress) *Producer

NewProducer

func NewProducerConfig

func NewProducerConfig(cfg sections.KafkaConfigurator) *Producer

NewProducerConfig

func (*Producer) Close

func (p *Producer) Close()

Close

func (*Producer) CreateTopic

func (p *Producer) CreateTopic() (error, error)

CreateTopic

func (*Producer) GetTopic

func (p *Producer) GetTopic() string

GetTopic

func (*Producer) ListTopics

func (p *Producer) ListTopics() (map[string]sarama.TopicDetail, error)

ListTopics

func (*Producer) Send

func (p *Producer) Send(data []byte) error

Send

func (*Producer) SetAddress

func (p *Producer) SetAddress(address *common.KafkaAddress) *Producer

SetAddress

func (*Producer) SetTopic

func (p *Producer) SetTopic(topic string) *Producer

SetTopic

type TaskTransport

type TaskTransport struct {
	Transport
}

func NewTaskTransport

func NewTaskTransport(producer *Producer, consumer *Consumer, close bool) *TaskTransport

NewTaskTransport

func (*TaskTransport) Recv

func (t *TaskTransport) Recv() (*common.Task, error)

Recv

func (*TaskTransport) Send

func (t *TaskTransport) Send(task *common.Task) error

Send

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport

func NewTransport

func NewTransport(producer *Producer, consumer *Consumer, close bool) *Transport

NewTransport

func (*Transport) Close

func (t *Transport) Close()

Close

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL