producer

package
v0.2.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultSucFeedbackFn = func(suc *kafka.Message, body string) {
		seelog.Infof("[KAFKA PRODUCER] publish success: %s, message: %s", suc.TopicPartition.String(), body)
	}
	DefaultFailFeedbackFn = func(fail *kafka.Message, body string) {
		seelog.Errorf("[KAFKA PRODUCER] publish fail: %s, message: %s", fail.TopicPartition.String(), body)
	}
)

Functions

func CloseDefaultProducer

func CloseDefaultProducer()

func CloseProducer

func CloseProducer(name string) (err error)

func LoadCfgs added in v0.2.15

func LoadCfgs() (r map[string]*Cfg, err error)

func Publish

func Publish(name string, body string, key string, partition int32) (err error)

func SetFailFeedback

func SetFailFeedback(name string, fn func(*kafka.Message, string)) (err error)

func SetSucFeedback

func SetSucFeedback(name string, fn func(*kafka.Message, string)) (err error)

Types

type Cfg

type Cfg struct {
	Addrs             []string `toml:"addrs"`
	Acks              int      `toml:"acks"`                // 等待服务器完成到如何进度在响应
	Topic             string   `toml:"topic"`               // 默认topic.当不指定topic时候使用该值
	Partitioner       string   `toml:"partitioner"`         // 指定分区选择器
	ReturnSuccesses   bool     `toml:"return_successes"`    // 是否等待成功的响应,仅RequireAcks设置不是NoReponse才有效
	ReturnErrors      bool     `toml:"return_errors"`       // 是否等待失败的响应,仅RequireAcks设置不是NoReponse才有效
	ReturnFeedbackNum int      `toml:"return_feedback_num"` // 等待响应的并发数
}

func LoadCfg added in v0.2.15

func LoadCfg(name string) (r *Cfg, err error)

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func DefaultProducer added in v0.2.8

func DefaultProducer() (r *KafkaProducer)

func NewKafkaProducer

func NewKafkaProducer(cfg *Cfg) (r *KafkaProducer, err error)

func Producer

func Producer(name string) (r *KafkaProducer)

func SafeProducer

func SafeProducer(name string) (r *KafkaProducer, err error)

func (*KafkaProducer) Close

func (this *KafkaProducer) Close()

func (*KafkaProducer) Publish

func (this *KafkaProducer) Publish(body string, key string, partition int32)

func (*KafkaProducer) SetFailFeedback

func (this *KafkaProducer) SetFailFeedback(fn func(*kafka.Message, string))

func (*KafkaProducer) SetSucFeedback

func (this *KafkaProducer) SetSucFeedback(fn func(*kafka.Message, string))

type Partitioner added in v0.2.0

type Partitioner interface {
	// Partition takes a message and partition count and chooses a partition
	Partition(message *kafka.Message, numPartitions int32) (r int32, err error)
}

func NewHashPartitioner added in v0.2.0

func NewHashPartitioner() Partitioner

func NewManualPartitioner added in v0.2.0

func NewManualPartitioner() Partitioner

func NewRandomPartitioner added in v0.2.0

func NewRandomPartitioner() Partitioner

func NewRoundRobinPartitioner added in v0.2.0

func NewRoundRobinPartitioner() Partitioner

type PartitionerConstructor added in v0.2.0

type PartitionerConstructor func() Partitioner

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL