kafka

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2023 License: MIT Imports: 10 Imported by: 0

README

concurrency kafka consumer

Go versions

kafka-go requires Go version 1.15 or later.

Producer

p := kafka.NewProducer(kafka.ProducerConfig{
    Version: "",
    Brokers: []string{broker},
})

_, _, err := p.Publish(context.Background(), topic, "key", []byte(string("val")))

Consumer

// config
singleConf := kafka.ConsumerConfig{
    Version:        "",
    Brokers:        []string{"127.0.0.1:9092"},
    Group:          "test-group",
    Topic:          "test",
    CacheCapacity:  100,
    ConnectTimeout: time.Millisecond * time.Duration(5000),
}
batchConf := kafka.BatchConsumerConf{
    CacheCapacity: 100,
    Consumers:     4,
    Processors:    4,
}

consumer := kafka.NewConsumer(singleConf, nil)
bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
    log.Info("receive msg:", "value", data)
    time.Sleep(time.Millisecond * 500)
    return nil
}))

bc.Start()

trace interceptor

imports(
	"github.com/oofpgDLD/kafka-go"
    "github.com/oofpgDLD/kafka-go/trace"
)

// new consumer with trace interceptor
func newConsumer(singleConf kafka.ConsumerConfig, batchConf kafka.BatchConsumerConf) {
    consumer := kafka.NewConsumer(singleConf, nil)
    bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
        log.Info("receive msg:", "value", data)
        time.Sleep(time.Millisecond * 500)
        return nil
    }), xkafka.WithBatchConsumerInterceptors(trace.ConsumeInterceptor))
}

// new producer with trace interceptor
func newProducer() {
    p := kafka.NewProducer(kafka.ProducerConfig{
        Version: "",
        Brokers: []string{broker},
    }, xkafka.WithProducerInterceptors(trace.ProducerInterceptor))
}

Test

[kafka producer and consumer test | kafka 消费者和生产者测试]

License

kafka-go is under the MIT license. See the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchConsumer

type BatchConsumer struct {
	// contains filtered or unexported fields
}

func NewBatchConsumer

func NewBatchConsumer(cfg BatchConsumerConf, consumer IConsumer, opt ...BatchConsumerOption) *BatchConsumer

func (*BatchConsumer) GracefulStop

func (bc *BatchConsumer) GracefulStop(ctx context.Context)

func (*BatchConsumer) Start

func (bc *BatchConsumer) Start()

func (*BatchConsumer) Stop

func (bc *BatchConsumer) Stop()

type BatchConsumerConf

type BatchConsumerConf struct {
	CacheCapacity int `json:",optional"`
	Consumers     int `json:",optional"`
	Processors    int `json:",optional"`
}

type BatchConsumerOption

type BatchConsumerOption interface {
	// contains filtered or unexported methods
}

A BatchConsumerOption sets options such as interceptor etc.

func WithBatchConsumerInterceptors added in v1.1.0

func WithBatchConsumerInterceptors(interceptors ...ConsumeInterceptor) BatchConsumerOption

WithBatchConsumerInterceptors returns a ServerOption that sets the Interceptor for the producer.

func WithHandle

func WithHandle(handle ConsumeHandle) BatchConsumerOption

func WithLogger

func WithLogger(logger log15.Logger) BatchConsumerOption

type ConsumeHandle

type ConsumeHandle func(ctx context.Context, key string, data []byte) error

type ConsumeHandler

type ConsumeHandler func(ctx context.Context, msg *sarama.ConsumerMessage) error

type ConsumeInterceptor added in v1.1.0

type ConsumeInterceptor func(ctx context.Context, msg *sarama.ConsumerMessage, handler ConsumeHandler) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg ConsumerConfig, logger log15.Logger) *Consumer

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Consumer) FetchMessage

func (c *Consumer) FetchMessage(ctx context.Context) (message *sarama.ConsumerMessage, err error)

FetchMessage read and return message

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim.

type ConsumerConfig

type ConsumerConfig struct {
	Version        string        `json:",optional"`
	Brokers        []string      `json:",optional"`
	Group          string        `json:",optional"`
	Topic          string        `json:",optional"`
	CacheCapacity  int           `json:",optional"`
	ConnectTimeout time.Duration `json:",optional"`
	// contains filtered or unexported fields
}

type IConsumer

type IConsumer interface {
	FetchMessage(ctx context.Context) (message *sarama.ConsumerMessage, err error)
	Close() error
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cfg ProducerConfig, opt ...ProducerOption) *Producer

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, topic, k string, v []byte) (int32, int64, error)

type ProducerConfig

type ProducerConfig struct {
	Version string   `json:",optional"`
	Brokers []string `json:",optional"`
	// contains filtered or unexported fields
}

type ProducerHandler added in v1.1.0

type ProducerHandler func(ctx context.Context, msg *sarama.ProducerMessage) (int32, int64, error)

type ProducerInterceptor added in v1.1.0

type ProducerInterceptor func(ctx context.Context, msg *sarama.ProducerMessage, handler ProducerHandler) (int32, int64, error)

type ProducerOption added in v1.1.0

type ProducerOption interface {
	// contains filtered or unexported methods
}

A ProducerOption sets options such as interceptor etc.

func WithProducerInterceptors added in v1.1.0

func WithProducerInterceptors(interceptors ...ProducerInterceptor) ProducerOption

WithProducerInterceptors returns a ServerOption that sets the Interceptor for the producer.

Directories

Path Synopsis
internal
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL