consumer

package
v0.2.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2021 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultReceiveFn = func(consumeWorkerIdx int, receiveWorkerIdx int, topic string, body []byte, msg *kafka.Message) (err error) {
	seelog.Infof("[KAFKA CONSUMER] receive message: %s#%d|%d, tm:%s, key:%s, body:%s", msg.TopicPartition.String(), consumeWorkerIdx, receiveWorkerIdx, msg.Timestamp.Format("2006-01-02 15:04:05"), string(msg.Key), string(body))
	return
}
View Source
var DefaultReceiveSelector = func(topic string, key string, receiveWorkerNum int, msg *kafka.Message) (r int) {
	rand.Seed(time.Now().UnixNano())
	return rand.Intn(receiveWorkerNum)
}

Functions

func LoadCfgs added in v0.1.14

func LoadCfgs() (r map[string]*Cfg, err error)

func StartDefaultConsumer added in v0.2.8

func StartDefaultConsumer(
	fn func(consumeWorkerIdx int, receiveWorkerIdx int, topic string, body []byte, msg *kafka.Message) error,
	selector func(topic string, key string, receiveWorkerNum int, msg *kafka.Message) (r int)) (err error)

func StartReceive added in v0.2.8

func StartReceive(
	name string,
	fn func(consumeWorkerIdx int, receiveWorkerIdx int, topic string, body []byte, msg *kafka.Message) error,
	selector func(topic string, key string, receiveWorkerNum int, msg *kafka.Message) (r int)) (err error)

func StopDefaultConsumer added in v0.2.8

func StopDefaultConsumer() (err error)

func StopReceive added in v0.2.8

func StopReceive(name string) (err error)

Types

type Cfg

type Cfg struct {
	Addrs            []string `toml:"addrs"`
	Topics           []string `toml:"topics"`
	GroupId          string   `toml:"group_id"`
	ConsumeWorkerNum int      `toml:"consume_worker_num"` // 消费者并发数,参与分区分配,默认1
	ReceiveWorkerNum int      `toml:"receive_worker_num"` // 业务实际并发数,默认10
}

func LoadCfg added in v0.1.14

func LoadCfg(name string) (r *Cfg, err error)

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func Consumer

func Consumer(name string) (r *KafkaConsumer)

func DefaultConsumer added in v0.2.8

func DefaultConsumer() (r *KafkaConsumer)

func NewKafkaConsumer

func NewKafkaConsumer(cfg *Cfg) (r *KafkaConsumer, err error)

func SafeConsumer

func SafeConsumer(name string) (r *KafkaConsumer, err error)

func (*KafkaConsumer) Close

func (this *KafkaConsumer) Close()

func (*KafkaConsumer) Receive

func (this *KafkaConsumer) Receive(rcvr func(consumeWorkerIdx int, receiveWorkerIdx int, topic string, body []byte, msg *kafka.Message) error) (err error)

func (*KafkaConsumer) SetReceiveSelector added in v0.1.8

func (this *KafkaConsumer) SetReceiveSelector(fn func(topic string, key string, receiveWorkerNum int, msg *kafka.Message) int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL