Documentation ¶
Index ¶
- func Consume(ctx context.Context, consumer sarama.Consumer, topic string, ...) error
- func ConsumeGroup(ctx context.Context, cg sarama.ConsumerGroup, topics []string, ...) error
- func NewConsumer(addrs []string, config *sarama.Config) (sarama.Consumer, error)
- func NewConsumerClient(addrs []string, config *sarama.Config) (sarama.Client, sarama.Consumer, error)
- func NewConsumerGroup(addrs []string, groupID string, config *sarama.Config) (sarama.ConsumerGroup, error)
- func NewSyncProducer(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
- func NewSyncProducerClient(addrs []string, config *sarama.Config) (sarama.Client, sarama.SyncProducer, error)
- type ConsumerHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶
func Consume(ctx context.Context, consumer sarama.Consumer, topic string, handle ConsumerHandler) error
Consume start consume, will block until exit, call in `goroutine` note: `handle` called in `goroutine`
func ConsumeGroup ¶
func ConsumeGroup(ctx context.Context, cg sarama.ConsumerGroup, topics []string, handler sarama.ConsumerGroupHandler) error
func NewConsumer ¶
NewConsumer A nil sarama.config use the Default config, refer to: https://github.com/Shopify/sarama/blob/v1.32.0/config.go#L483-L499 Example:
consumer, err := kafka.NewConsumer(kafkaAddr, nil) if err != nil { panic(err) } partitions, err := consumer.Partitions(topic) if err != nil { panic(err) } go func() { // will block if err = kafka.Consume(ctx, consumer, topic, msg); err != nil { t.Log(err) } else { t.Log("consume exit") } }()
Note:
- 使用 assign 模式,如非必要,请尽量使用ConsumerGroup方式消费。
- 如下场景建议使用:要实现类广播效果,但是是同一个应用(服务)。比如多个gateway都要能同时消费到msg-dp的消息以解决跨网关消息转发的问题, 如果动态创建consumeGroup实现广播目的,成本比较大,具体参考阿里云的限制:https://help.aliyun.com/document_detail/120676.html
func NewConsumerClient ¶
func NewConsumerClient(addrs []string, config *sarama.Config) (sarama.Client, sarama.Consumer, error)
NewConsumerClient create consumer use default config and return client
func NewConsumerGroup ¶
func NewConsumerGroup(addrs []string, groupID string, config *sarama.Config) (sarama.ConsumerGroup, error)
NewConsumerGroup A nil sarama.config use the Default config, refer to: https://github.com/Shopify/sarama/blob/v1.32.0/config.go#L483-L499
func NewSyncProducer ¶
NewSyncProducer A nil sarama.config use the default config
func NewSyncProducerClient ¶
func NewSyncProducerClient(addrs []string, config *sarama.Config) (sarama.Client, sarama.SyncProducer, error)
NewSyncProducerClient A nil sarama.config use the default config
Types ¶
type ConsumerHandler ¶
type ConsumerHandler func(partition int32, partitionConsumer sarama.PartitionConsumer, message *sarama.ConsumerMessage)
Click to show internal directories.
Click to hide internal directories.