kafka

package
v0.0.0-...-d5b0b98 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2023 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 1 more Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// KeyTopicGroupOffset is the key of the consumer group
	KeyTopicGroupOffset = "evhub:processor:%v:%v:%v"
	// CommitTimeout commit timeout
	CommitTimeout = 2
)

Variables

This section is empty.

Functions

func TopicGroupOffsetKey

func TopicGroupOffsetKey(topic string, group string, partition int32) string

TopicGroupOffsetKey returns the key of consumer group

Types

type ConsumerGroup

type ConsumerGroup struct {
	// a consumer group for Kafka topic
	GroupID string
	//  If is OK, representing auto commit.
	OffsetsAutoCommitEnable bool
	// flow control batch number for Kafka consumer group
	FlowControlBatchNum int
	// initial offset timeout for Kafka consumer group
	InitOffsetGetOffsetTimeout int64
	// initial offset period for Kafka consumer group
	InitOffsetGetOffsetPeroid int64

	// event handler
	Event *event.Event
	// redis pool
	RedisPool *redis.Pool
	//  a consumer group for Kafka topic
	ConsumerGroup sarama.ConsumerGroup

	// the stop controller of flow control
	StopWg sync.WaitGroup
	// contains filtered or unexported fields
}

ConsumerGroup is a consumer of Kafka

func (*ConsumerGroup) Cleanup

Cleanup is the end of the processor

func (*ConsumerGroup) ConsumeClaim

func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim is processing the message

func (*ConsumerGroup) ConsumeMessage

func (s *ConsumerGroup) ConsumeMessage(msg []byte)

ConsumeMessage consumer messages

func (*ConsumerGroup) Setup

func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error

Setup is the beginning of the processor

func (*ConsumerGroup) Stop

func (s *ConsumerGroup) Stop() error

Stop stops the consumer group gracefully.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is a type of connector

func New

func New(opts *options.Options, c *options.KafkaConfig, handler *handler.Handler) (*Kafka, error)

New creates a Kafka connector

func (*Kafka) ConsumerDelayTopic

func (s *Kafka) ConsumerDelayTopic(confInfoList []*cc.ProducerConfInfo)

ConsumerDelayTopic is the subject of the subscription latency processor

func (*Kafka) ConsumerProcessorTopic

func (s *Kafka) ConsumerProcessorTopic(confInfoList []*cc.ProcessorConfInfo)

ConsumerProcessorTopic is the topic of the subscription processor

func (*Kafka) Stop

func (s *Kafka) Stop() error

Stop stops the Kafka connector gracefully.

func (*Kafka) WatchDelayTopic

func (s *Kafka) WatchDelayTopic(c *cc.ProducerConfInfo, eventType int32) error

WatchDelayTopic watch the topic of the latency processor

func (*Kafka) WatchProcessorTopic

func (s *Kafka) WatchProcessorTopic(c *cc.ProcessorConfInfo, eventType int32) error

WatchProcessorTopic watch the topic of the processor

type Msg

type Msg struct {
	Message *sarama.ConsumerMessage
}

Msg is the message of the consumer group

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL