kafka

package
v0.0.0-...-4c38643 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2022 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume(ctx context.Context, consumer sarama.Consumer, topic string, handle ConsumerHandler) error

Consume start consume, will block until exit, call in `goroutine` note: `handle` called in `goroutine`

func ConsumeGroup

func ConsumeGroup(ctx context.Context, cg sarama.ConsumerGroup, topics []string, handler sarama.ConsumerGroupHandler) error

func NewConsumer

func NewConsumer(addrs []string, config *sarama.Config) (sarama.Consumer, error)

NewConsumer A nil sarama.config use the Default config, refer to: https://github.com/Shopify/sarama/blob/v1.32.0/config.go#L483-L499 Example:

	consumer, err := kafka.NewConsumer(kafkaAddr, nil)
	if err != nil {
		panic(err)
	}
	partitions, err := consumer.Partitions(topic)
	if err != nil {
		panic(err)
	}
	go func() {
     // will block
		if err = kafka.Consume(ctx, consumer, topic, msg); err != nil {
			t.Log(err)
		} else {
			t.Log("consume exit")
		}
	}()

Note:

  • 使用 assign 模式,如非必要,请尽量使用ConsumerGroup方式消费。
  • 如下场景建议使用:要实现类广播效果,但是是同一个应用(服务)。比如多个gateway都要能同时消费到msg-dp的消息以解决跨网关消息转发的问题, 如果动态创建consumeGroup实现广播目的,成本比较大,具体参考阿里云的限制:https://help.aliyun.com/document_detail/120676.html

func NewConsumerClient

func NewConsumerClient(addrs []string, config *sarama.Config) (sarama.Client, sarama.Consumer, error)

NewConsumerClient create consumer use default config and return client

func NewConsumerGroup

func NewConsumerGroup(addrs []string, groupID string, config *sarama.Config) (sarama.ConsumerGroup, error)

NewConsumerGroup A nil sarama.config use the Default config, refer to: https://github.com/Shopify/sarama/blob/v1.32.0/config.go#L483-L499

func NewSyncProducer

func NewSyncProducer(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)

NewSyncProducer A nil sarama.config use the default config

func NewSyncProducerClient

func NewSyncProducerClient(addrs []string, config *sarama.Config) (sarama.Client, sarama.SyncProducer, error)

NewSyncProducerClient A nil sarama.config use the default config

Types

type ConsumerHandler

type ConsumerHandler func(partition int32, partitionConsumer sarama.PartitionConsumer, message *sarama.ConsumerMessage)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL