kafka

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchConsumer

type BatchConsumer struct {
	// contains filtered or unexported fields
}

func NewBatchConsumer

func NewBatchConsumer(cfg BatchConsumerConf, handler ConsumeHandler, consumer IConsumer, opts ...BatchConsumerOption) *BatchConsumer

func (*BatchConsumer) GracefulStop

func (bc *BatchConsumer) GracefulStop(ctx context.Context)

func (*BatchConsumer) Start

func (bc *BatchConsumer) Start()

func (*BatchConsumer) Stop

func (bc *BatchConsumer) Stop()

type BatchConsumerConf

type BatchConsumerConf struct {
	CacheCapacity int `json:",optional"`
	Consumers     int `json:",optional"`
	Processors    int `json:",optional"`
}

type BatchConsumerOption

type BatchConsumerOption func(*batchConsumerOptions)

func WithLogger

func WithLogger(logger log15.Logger) BatchConsumerOption

type ConsumeHandle

type ConsumeHandle func(key string, data []byte) error

type ConsumeHandler

type ConsumeHandler interface {
	Consume(key string, data []byte) error
}

func WithHandle

func WithHandle(handle ConsumeHandle) ConsumeHandler

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg ConsumerConfig, logger log15.Logger) *Consumer

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Consumer) FetchMessage

func (c *Consumer) FetchMessage(ctx context.Context) (message *sarama.ConsumerMessage, err error)

FetchMessage 读取并返回message

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim.

type ConsumerConfig

type ConsumerConfig struct {
	Version        string        `json:",optional"`
	Brokers        []string      `json:",optional"`
	Group          string        `json:",optional"`
	Topic          string        `json:",optional"`
	CacheCapacity  int           `json:",optional"`
	ConnectTimeout time.Duration `json:",optional"`
	// contains filtered or unexported fields
}

type IConsumer

type IConsumer interface {
	FetchMessage(ctx context.Context) (message *sarama.ConsumerMessage, err error)
	Close() error
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cfg ProducerConfig) *Producer

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Publish

func (p *Producer) Publish(topic, k string, v []byte) (int32, int64, error)

type ProducerConfig

type ProducerConfig struct {
	Version string   `json:",optional"`
	Brokers []string `json:",optional"`
	// contains filtered or unexported fields
}

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL