ka

package
v0.0.0-...-35fa672 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2021 License: MIT Imports: 10 Imported by: 11

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumerNotFound = errors.New("consumer not found")
)

Functions

func GetVersion

func GetVersion() sarama.KafkaVersion

func SetVersion

func SetVersion(v sarama.KafkaVersion)

Types

type Config

type Config struct {
	// broker的集群地址
	Brokers   []string          `toml:"brokers"`
	Consumers []*ConsumerConfig `toml:"consumers"`
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig) (*Consumer, error)

func NewConsumerWithConfig

func NewConsumerWithConfig(cfg1 *ConsumerConfig, cfg2 *sarama.Config) (*Consumer, error)

func NewConsumerWithInterceptor

func NewConsumerWithInterceptor(cfg1 *ConsumerConfig, interceptor sarama.ConsumerInterceptor) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) GetTopic

func (c *Consumer) GetTopic() string

func (*Consumer) Handle

func (c *Consumer) Handle()

func (*Consumer) HandleError

func (c *Consumer) HandleError(e HandleErrorFunc)

func (*Consumer) HandleEvent

func (c *Consumer) HandleEvent(eventType string, consumerFunc HandleConsumerFunc)

func (*Consumer) HandleMsg

func (c *Consumer) HandleMsg(handler HandleConsumerMsgFunc)

func (*Consumer) Run

func (c *Consumer) Run()

func (*Consumer) Worker

func (c *Consumer) Worker()

type ConsumerConfig

type ConsumerConfig struct {
	// broker的集群地址
	Brokers []string `toml:"brokers"`
	// topic 的名称
	Topic string `toml:"topic"`
	// 消费组名称
	Group string `toml:"group"`
	SASL  struct {
		Enable   bool   `toml:"enable"`
		User     string `toml:"user"`
		Password string `toml:"password"`
	} `toml:"sasl"`
	// 多少个协程
	Workers int `toml:"workers"`
	// 是否从最老的开始消费
	Oldest bool `toml:"oldest"`
}

func (*ConsumerConfig) Check

func (c *ConsumerConfig) Check() bool

type Event

type Event struct {
	Time     string          `json:"Time,omitempty"`
	Hostname string          `json:"Hostname,omitempty"`
	From     string          `json:"From,omitempty"`
	Type     string          `json:"Type,omitempty"`
	Data     json.RawMessage `json:"Data,omitempty"`
}

type HandleConsumerFunc

type HandleConsumerFunc func(e *Event) error

如果返回error,则这一条消费不会被mark消费成功

type HandleConsumerMsgFunc

type HandleConsumerMsgFunc func(message *sarama.ConsumerMessage)

type HandleErrorFunc

type HandleErrorFunc func(error)

type HandleSucceedFunc

type HandleSucceedFunc func(*sarama.ProducerMessage)

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func New

func New(cfg *Config) (*Kafka, error)

func NewPlat

func NewPlat(consumer *ConsumerConfig) (*Kafka, error)

func (*Kafka) Close

func (k *Kafka) Close() error

func (*Kafka) Consume

func (k *Kafka) Consume(eventType string, handler HandleConsumerFunc) error

如果在消费组中只有一个consumer 可以默认不传topic

func (*Kafka) ConsumeTopic

func (k *Kafka) ConsumeTopic(topic string, handler HandleConsumerMsgFunc) error

func (*Kafka) ConsumeTopicEvent

func (k *Kafka) ConsumeTopicEvent(topic string, eventType string, handler HandleConsumerFunc) error

func (*Kafka) Consumer

func (k *Kafka) Consumer(topic string) *Consumer

func (*Kafka) Consumers

func (k *Kafka) Consumers() map[string]*Consumer

func (*Kafka) Producer

func (k *Kafka) Producer() *Producer

func (*Kafka) Push

func (k *Kafka) Push(topic, eventType string, event interface{})

func (*Kafka) Start

func (k *Kafka) Start()

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(brokers []string) (*Producer, error)

func NewProducerWithCfg

func NewProducerWithCfg(brokers []string, cfg *sarama.Config) (*Producer, error)

func NewProducerWithInterceptor

func NewProducerWithInterceptor(brokers []string, interceptor sarama.ProducerInterceptor) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) HandleError

func (p *Producer) HandleError(e HandleErrorFunc)

func (*Producer) HandleSucceed

func (p *Producer) HandleSucceed(s HandleSucceedFunc)

func (*Producer) PushEvent

func (p *Producer) PushEvent(topic, data string, e interface{})

func (*Producer) Send

func (p *Producer) Send(topic string, data []byte)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL