kafka

package
v0.0.0-...-2c5f5e2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2023 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Pool map[string]*Entity
)

Functions

func Init

func Init(name string, c *Config) error

Types

type Config

type Config struct {
	BrokerList   []string
	ConsumerConf *ConsumerConf
	ProducerConf *ProducerConf
}

type ConsumerConf

type ConsumerConf struct {
	Topics        []string
	ConsumerGroup string
	Partition     int32
	Offset        int64
}

type Entity

type Entity struct {
	ConnName     string
	SaramaClient sarama.Client
	SaramaConfig *sarama.Config
	ErrChan      chan error
	MsgChan      chan string
}

func GetClient

func GetClient(name string) (*Entity, error)

func (*Entity) ConsumeWithGroup

func (c *Entity) ConsumeWithGroup(ctx context.Context) error

* 使用消费组进行消费, 可以消费 多个topic 和 多个partition

func (*Entity) ConsumeWithHander

func (c *Entity) ConsumeWithHander(ctx context.Context, msgHandler func(message *sarama.ConsumerMessage) (metadata string), errHandler func(err error)) error

func (*Entity) Errors

func (c *Entity) Errors() chan error

func (*Entity) Messages

func (c *Entity) Messages() chan string

func (*Entity) SyncProduce

func (c *Entity) SyncProduce(message *sarama.ProducerMessage) error

type ProducerConf

type ProducerConf struct {
	Topic     string
	Partition int32
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL