Documentation ¶
Index ¶
- Constants
- func TopicGroupOffsetKey(topic string, group string, partition int32) string
- type ConsumerGroup
- func (s *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *ConsumerGroup) ConsumeMessage(msg []byte)
- func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error
- func (s *ConsumerGroup) Stop() error
- type Kafka
- func (s *Kafka) ConsumerDelayTopic(confInfoList []*cc.ProducerConfInfo)
- func (s *Kafka) ConsumerProcessorTopic(confInfoList []*cc.ProcessorConfInfo)
- func (s *Kafka) Stop() error
- func (s *Kafka) WatchDelayTopic(c *cc.ProducerConfInfo, eventType int32) error
- func (s *Kafka) WatchProcessorTopic(c *cc.ProcessorConfInfo, eventType int32) error
- type Msg
Constants ¶
View Source
const ( // KeyTopicGroupOffset is the key of the consumer group KeyTopicGroupOffset = "evhub:processor:%v:%v:%v" // CommitTimeout commit timeout CommitTimeout = 2 )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct { // a consumer group for Kafka topic GroupID string // If is OK, representing auto commit. OffsetsAutoCommitEnable bool // flow control batch number for Kafka consumer group FlowControlBatchNum int // initial offset timeout for Kafka consumer group InitOffsetGetOffsetTimeout int64 // initial offset period for Kafka consumer group InitOffsetGetOffsetPeroid int64 // event handler Event *event.Event // redis pool RedisPool *redis.Pool // a consumer group for Kafka topic ConsumerGroup sarama.ConsumerGroup // the stop controller of flow control StopWg sync.WaitGroup // contains filtered or unexported fields }
ConsumerGroup is a consumer of Kafka
func (*ConsumerGroup) Cleanup ¶
func (s *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is the end of the processor
func (*ConsumerGroup) ConsumeClaim ¶
func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim is processing the message
func (*ConsumerGroup) ConsumeMessage ¶
func (s *ConsumerGroup) ConsumeMessage(msg []byte)
ConsumeMessage consumer messages
func (*ConsumerGroup) Setup ¶
func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error
Setup is the beginning of the processor
func (*ConsumerGroup) Stop ¶
func (s *ConsumerGroup) Stop() error
Stop stops the consumer group gracefully.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka is a type of connector
func (*Kafka) ConsumerDelayTopic ¶
func (s *Kafka) ConsumerDelayTopic(confInfoList []*cc.ProducerConfInfo)
ConsumerDelayTopic is the subject of the subscription latency processor
func (*Kafka) ConsumerProcessorTopic ¶
func (s *Kafka) ConsumerProcessorTopic(confInfoList []*cc.ProcessorConfInfo)
ConsumerProcessorTopic is the topic of the subscription processor
func (*Kafka) WatchDelayTopic ¶
func (s *Kafka) WatchDelayTopic(c *cc.ProducerConfInfo, eventType int32) error
WatchDelayTopic watch the topic of the latency processor
func (*Kafka) WatchProcessorTopic ¶
func (s *Kafka) WatchProcessorTopic(c *cc.ProcessorConfInfo, eventType int32) error
WatchProcessorTopic watch the topic of the processor
type Msg ¶
type Msg struct {
Message *sarama.ConsumerMessage
}
Msg is the message of the consumer group
Click to show internal directories.
Click to hide internal directories.