kafkaclient

package module
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2021 License: MIT Imports: 8 Imported by: 0

README

kafka-client

kafka client

Install:

go get github.com/go-light/kafkaclient/v2

Consumer

config := &ConsumerConfig{
	Brokers: "127.0.0.1:9092", // "127.0.0.1:9092,127.0.0.2:9092"
	Group: "test_group",
	Topics: "test_kafka_producer", // "test_kafka_producer,test_kafka_producer1"
	OffsetReset: "latest", // "latest" , "earliest"
	Assignor: "roundrobin", // "sticky", "roundrobin" , "range"
	AutoCommit: false, 
	AutoCommitInterval: "5000ms",
	FailTries: 4, // 消息消费失败重试次数
}

consumer, err := NewConsumer(config)
if err != nil {
	t.Error(err)
	return
}

consumer.Receive(func(ctx context.Context, msg *sarama.ConsumerMessage) (err error) {
	fmt.Println(string(msg.Value))
	fmt.Println("fail_tries")
	err = fmt.Errorf("fail_tries")
	return err
})

Producer

config := &ProducerConfig{
	Brokers: "127.0.0.1:9092",
	Retries: 3,
}

syncProducer, err := NewSyncProducer(config)
if err != nil {
	t.Error(err)
	return
}

topic := "test_kafka_producer"
msg, err := json.Marshal(map[string]string{"foo":"bar"})
if err !=nil {
	t.Error(err)
	return
}


_, _, err = syncProducer.Send(context.Background(), topic, msg)
if err != nil {
	t.Error(err)
	return
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conf *ConsumerConfig) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close() (err error)

func (*Consumer) Receive

func (c *Consumer) Receive(callback MsgProcess)

type ConsumerConfig

type ConsumerConfig struct {
	Brokers            string
	Group              string
	Topics             string
	OffsetReset        string // latest, earliest, none
	Assignor           string // sticky, roundrobin, range
	AutoCommit         bool
	AutoCommitInterval string
	FailTries          int
}

type DelayFunc

type DelayFunc func(tries int) time.Duration

A DelayFunc is used to decide the amount of time to wait between retries.

type MsgProcess

type MsgProcess func(ctx context.Context, msg *sarama.ConsumerMessage) (err error)

type ProducerConfig

type ProducerConfig struct {
	Brokers      string `toml:"brokers"`
	Retries      int    `toml:"retries"`
	RetryBackoff string `toml:"retry_backoff"`
}

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

func NewSyncProducer

func NewSyncProducer(config *ProducerConfig) (*SyncProducer, error)

func (*SyncProducer) Close

func (sp *SyncProducer) Close()

func (*SyncProducer) Send

func (sp *SyncProducer) Send(ctx context.Context, topic string, msg []byte) (partition int32, offset int64, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL