consumer

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2022 License: BSD-3-Clause Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateTopic added in v1.0.6

func CreateTopic(addr string, topicName string, topicDetail sarama.TopicDetail, opt Coptions) (err error)

CreateTopic @Description: 创建topic @param addr @param topicName @param tipicDetail @param conf @return err

func GetTopicInfoExclude added in v1.0.4

func GetTopicInfoExclude(addr string, opt Coptions, exclude []string) (topicsInfo map[string][]int32, err error)

GetTopicInfo @Description: 获得topic 信息,包含partitions信息 @param addr @param version @param exclude 过滤topic 不包含这些的topic @return topicsInfo @return err

func GetTopicInfoInclude added in v1.0.4

func GetTopicInfoInclude(addr string, opt Coptions, include []string) (topicsInfo map[string][]int32, err error)

GetTopicInfoInclude @Description: @param addr @param version @param include 过滤topic 只包含这些的topic @return topicsInfo @return err

Types

type AMQPConsumer

type AMQPConsumer struct {
	// contains filtered or unexported fields
}

func NewAMQPConsumer

func NewAMQPConsumer(addr string, opt Coptions) *AMQPConsumer

func (*AMQPConsumer) RegisterReceiver

func (amqpc *AMQPConsumer) RegisterReceiver(r Receiver)

func (*AMQPConsumer) Start

func (amqpc *AMQPConsumer) Start()

func (*AMQPConsumer) Stop

func (amqpc *AMQPConsumer) Stop()

type AMQPOpt

type AMQPOpt struct {
	Exchange            string
	ExchangeType        string
	Heartbeat           time.Duration //心跳检查间隔 s
	Vhost               string
	QueueName           string
	RouterKey           string
	Username            string //鉴权 用户名
	Password            string //鉴权 密码
	AMQPQueueDeclare    mq.AMQPQueueDeclareOPT
	AMQPExchangeDeclare mq.AMQPExchangeDeclareOPT
	AMQPQueueBind       mq.AMQPQueueBindOPT
	AMQPConsumer        mq.AMQPConsumeOPT
}

type Cmessage

type Cmessage struct {
	Key  []byte
	Body []byte
	Info map[string]interface{}
}

Cmessage 消息格式

type Coptions

type Coptions struct {
	Kafka          KafkaOpt
	Nsq            NsqOpt
	AMQP           AMQPOpt
	LookupInterval time.Duration //重连时间
	Address        string
}

Coptions consumer的option

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(addr string, opt Coptions) *KafkaConsumer

func (*KafkaConsumer) RegisterReceiver

func (kac *KafkaConsumer) RegisterReceiver(r Receiver)

func (*KafkaConsumer) Start

func (kac *KafkaConsumer) Start()

func (*KafkaConsumer) Stop

func (kac *KafkaConsumer) Stop()

type KafkaOpt

type KafkaOpt struct {
	Version    *sarama.KafkaVersion
	Topic      []string
	GroupID    string               //offset
	Offsets    []OffsetGroup        //不同group,topic的offset信息
	Partition  mapset.Set           //接受的partition
	TimeFilter map[string]time.Time //时间范围,string只有『After』[Befter](不包含,开区间)
	UserName   string               //鉴权 用户名
	Password   string               //鉴权 密码
}

type MqConsumer

type MqConsumer interface {
	// RegisterReceiver 注册接受者
	RegisterReceiver(Receiver)
	// Start 开始接受消息
	Start()
	// Stop 结束
	Stop()
}

MqConsumer 消费者接口

func NewConsumer

func NewConsumer(mqType mq.MQType, address string, opt ...Option) MqConsumer

type NsqConsumer

type NsqConsumer struct {
	MaxWorkerNum int
	// contains filtered or unexported fields
}

func NewNsqConsumer

func NewNsqConsumer(addr string, opt Coptions) *NsqConsumer

func (*NsqConsumer) RegisterReceiver

func (nsqc *NsqConsumer) RegisterReceiver(r Receiver)

func (*NsqConsumer) Start

func (nsqc *NsqConsumer) Start()

Start 开始接受消息

func (NsqConsumer) Stop

func (nsqc NsqConsumer) Stop()

Stop 停止接受消息

type NsqOpt

type NsqOpt struct {
	Topic      string
	Channel    string
	AuthSecret string //鉴权 secret
	MaxTaskNum int
}

type OffsetGroup

type OffsetGroup struct {
	Topic     string
	Partition int32
	Offset    int64
	Metadata  string
}

type Option

type Option func(opt *Coptions)

Option Consumer option

func WithAMQP

func WithAMQP(rOpt AMQPOpt) Option

WithAMQP 设置rabbitmq配置

func WithAddress

func WithAddress(addr string) Option

WithAddress 设置address

func WithKafka

func WithKafka(kOpt KafkaOpt) Option

WithKafka 设置kafka配置

func WithLookupInterval

func WithLookupInterval(t int) Option

WithLookupInterval 设置重连间隔

func WithNsq

func WithNsq(nOpt NsqOpt) Option

WithNsq 设置nsq配置

func WithNsqTopic

func WithNsqTopic(topic string) Option

WithNsqTopic 设置nsqTopic

type Receiver

type Receiver interface {
	OnError(err error)           // 处理遇到的错误,当MQ对象发生了错误,他需要告诉接收者处理错误
	OnReceive(msg Cmessage) bool // 处理收到的消息, 这里需要告知MQ对象消息是否处理成功
	GetTopic() Topic             //消息的topic,queue等信息
}

Receiver 用于接收消息到来的数据

type Topic

type Topic struct {
	Topic     string
	Topics    []string
	Queue     string
	RouterKey string
	Channel   string
	Args      map[string]interface{}
}

Topic 监听的topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL