Documentation ¶
Index ¶
- type ConfOptionFunc
- type Config
- type Consumer
- func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error
- func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *Consumer) Run(ctx context.Context, conf *Config, option ConfOptionFunc)
- func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error
- type Kafka
- type Processor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConfOptionFunc ¶
type Consumer ¶
type Consumer struct { Processor Processor Topics string // topic1,topic2,topic3 Group string Ready chan bool }
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
type Kafka ¶
type Kafka struct { Consumers []*Consumer // contains filtered or unexported fields }
func New ¶
func New(conf *Config, confOption ConfOptionFunc) *Kafka
func (*Kafka) AddConsumer ¶
AddConsumer add a specific consumer to this receiver to handle the topics using the given group To handle multiple topics by this processor, use `,` to separate the topics, e.g. `"topic1,topic2"`
func (*Kafka) RunConsumers ¶ added in v0.0.12
RunConsumers runs all this kafka receiver's consumers using sarama consumer group. Sarama consumer group runs in multiple goroutines based on the number of its topic's partition num. If you add 2 consumers, and each consumer's topic has 3 partitions, this will run 2*3 consumer goroutines.
type Processor ¶
type Processor interface { // Handle processes a message get from a broker. // You must finish processing and mark offsets within // sarama.Config.Consumer.Group.Session.Timeout before the topic/partition is eventually // re-assigned to another group member. Handle(session sarama.ConsumerGroupSession, saramaMsg *sarama.ConsumerMessage) error }
Processor is the interface you need to implement to write your logic the handle the messages.