Documentation ¶
Index ¶
- func CreateTopic(addr string, topicName string, topicDetail sarama.TopicDetail, opt Coptions) (err error)
- func GetTopicInfoExclude(addr string, opt Coptions, exclude []string) (topicsInfo map[string][]int32, err error)
- func GetTopicInfoInclude(addr string, opt Coptions, include []string) (topicsInfo map[string][]int32, err error)
- type AMQPConsumer
- type AMQPOpt
- type Cmessage
- type Coptions
- type KafkaConsumer
- type KafkaOpt
- type MqConsumer
- type NsqConsumer
- type NsqOpt
- type OffsetGroup
- type Option
- type Receiver
- type Topic
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateTopic ¶ added in v1.0.6
func CreateTopic(addr string, topicName string, topicDetail sarama.TopicDetail, opt Coptions) (err error)
CreateTopic @Description: 创建topic @param addr @param topicName @param tipicDetail @param conf @return err
func GetTopicInfoExclude ¶ added in v1.0.4
func GetTopicInfoExclude(addr string, opt Coptions, exclude []string) (topicsInfo map[string][]int32, err error)
GetTopicInfo @Description: 获得topic 信息,包含partitions信息 @param addr @param version @param exclude 过滤topic 不包含这些的topic @return topicsInfo @return err
Types ¶
type AMQPConsumer ¶
type AMQPConsumer struct {
// contains filtered or unexported fields
}
func NewAMQPConsumer ¶
func NewAMQPConsumer(addr string, opt Coptions) *AMQPConsumer
func (*AMQPConsumer) RegisterReceiver ¶
func (amqpc *AMQPConsumer) RegisterReceiver(r Receiver)
func (*AMQPConsumer) Start ¶
func (amqpc *AMQPConsumer) Start()
func (*AMQPConsumer) Stop ¶
func (amqpc *AMQPConsumer) Stop()
type AMQPOpt ¶
type AMQPOpt struct { Exchange string ExchangeType string Heartbeat time.Duration //心跳检查间隔 s Vhost string QueueName string RouterKey string Username string //鉴权 用户名 Password string //鉴权 密码 AMQPQueueDeclare mq.AMQPQueueDeclareOPT AMQPExchangeDeclare mq.AMQPExchangeDeclareOPT AMQPQueueBind mq.AMQPQueueBindOPT AMQPConsumer mq.AMQPConsumeOPT }
type Coptions ¶
type Coptions struct { Kafka KafkaOpt Nsq NsqOpt AMQP AMQPOpt LookupInterval time.Duration //重连时间 Address string }
Coptions consumer的option
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func NewKafkaConsumer ¶
func NewKafkaConsumer(addr string, opt Coptions) *KafkaConsumer
func (*KafkaConsumer) RegisterReceiver ¶
func (kac *KafkaConsumer) RegisterReceiver(r Receiver)
func (*KafkaConsumer) Start ¶
func (kac *KafkaConsumer) Start()
func (*KafkaConsumer) Stop ¶
func (kac *KafkaConsumer) Stop()
type MqConsumer ¶
type MqConsumer interface { // RegisterReceiver 注册接受者 RegisterReceiver(Receiver) // Start 开始接受消息 Start() // Stop 结束 Stop() }
MqConsumer 消费者接口
func NewConsumer ¶
func NewConsumer(mqType mq.MQType, address string, opt ...Option) MqConsumer
type NsqConsumer ¶
type NsqConsumer struct { MaxWorkerNum int // contains filtered or unexported fields }
func NewNsqConsumer ¶
func NewNsqConsumer(addr string, opt Coptions) *NsqConsumer
func (*NsqConsumer) RegisterReceiver ¶
func (nsqc *NsqConsumer) RegisterReceiver(r Receiver)
type OffsetGroup ¶
Click to show internal directories.
Click to hide internal directories.