kafka

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfOptionFunc

type ConfOptionFunc func(c *sarama.Config)

type Config

type Config struct {
	Brokers  []string
	Version  string
	ClientID string // a user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
	Oldest   bool   // if true, fetch oldest available offset
	Verbose  bool   // if true, logs are printed to stdout
}

type Consumer

type Consumer struct {
	Processor Processor
	Topics    string // topic1,topic2,topic3
	Group     string
	Ready     chan bool
}

func NewConsumer

func NewConsumer(processor Processor, topics string, group string) *Consumer

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context, conf *Config, option ConfOptionFunc)

func (*Consumer) Setup

func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type Kafka

type Kafka struct {
	Consumers []*Consumer
	// contains filtered or unexported fields
}

func New

func New(conf *Config, confOption ConfOptionFunc) *Kafka

func (*Kafka) AddConsumer

func (k *Kafka) AddConsumer(processor Processor, topics string, group string) *Consumer

AddConsumer add a specific consumer to this receiver to handle the topics using the given group To handle multiple topics by this processor, use `,` to separate the topics, e.g. `"topic1,topic2"`

func (*Kafka) RunConsumers added in v0.0.12

func (k *Kafka) RunConsumers(ctx context.Context)

RunConsumers runs all this kafka receiver's consumers using sarama consumer group. Sarama consumer group runs in multiple goroutines based on the number of its topic's partition num. If you add 2 consumers, and each consumer's topic has 3 partitions, this will run 2*3 consumer goroutines.

type Processor

type Processor interface {
	// Handle processes a message get from a broker.
	// You must finish processing and mark offsets within
	// sarama.Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
	// re-assigned to another group member.
	Handle(session sarama.ConsumerGroupSession, saramaMsg *sarama.ConsumerMessage) error
}

Processor is the interface you need to implement to write your logic the handle the messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL