Documentation ¶
Index ¶
- Constants
- Variables
- func GetQueueConsumerMode(mode QueueConsumerMode) string
- func GetQueueConsumerModeContains(modes []QueueConsumerMode, compareMode QueueConsumerMode) bool
- func QueueConsumerModeList() map[QueueConsumerMode]string
- func RandomAround(min, max int64) (int64, error)
- func RandomNum(length int) string
- type ConsumeReceive
- type QueueConsumerMode
- type RabbitAttribute
- type RabbitLoadBalance
- type RabbitMqData
- type RabbitMqError
- type RabbitPool
- func (r *RabbitPool) BindQueue(rconn *rConn, channel *rChannel, queueName, routingKey, exchange string, ...) error
- func (r *RabbitPool) CloseConn(conn *rConn) error
- func (r *RabbitPool) Connect(host string, port int, user string, password string) error
- func (r *RabbitPool) ConnectUrl(url string) error
- func (r *RabbitPool) ConnectVirtualHost(host string, port int, user string, password string, virtualHost string) error
- func (r *RabbitPool) CreateChannel(conn *rConn) (*amqp.Channel, error)
- func (r *RabbitPool) CreateRChannel(channel *amqp.Channel, num int32) *rChannel
- func (r *RabbitPool) DeclareExchange(rconn *rConn, channel *rChannel, exChangeName, exChangeType string, ...) error
- func (r *RabbitPool) DeclareQueue(rconn *rConn, channel *rChannel, queueName string, args amqp.Table) (string, error)
- func (r *RabbitPool) GetErrorChannel() chan *amqp.Error
- func (r *RabbitPool) GetHost() string
- func (r *RabbitPool) GetMaxConsumerChannel() int32
- func (r *RabbitPool) GetPort() int
- func (r *RabbitPool) GetRChannelCh(rChannel *rChannel) *amqp.Channel
- func (r *RabbitPool) GetRConnection() *rConn
- func (r *RabbitPool) Push(data *RabbitMqData) *RabbitMqError
- func (r *RabbitPool) RegisterConsumeReceive(consumeReceive *ConsumeReceive)
- func (r *RabbitPool) RunConsume() error
- func (r *RabbitPool) SetConnectionBalance(balance int)
- func (r *RabbitPool) SetConnectionError(code int, msg string)
- func (r *RabbitPool) SetMaxConnection(maxConnection int32)
- func (r *RabbitPool) SetMaxConsumeChannel(maxConsume int32)
- func (r *RabbitPool) SetRandomRetryTime(min, max int64)
- type RetryClientInterface
- type RetryTool
- type RetryToolInterface
Constants ¶
const ( DEFAULT_MAX_CONNECTION = 5 //rabbitmq tcp 最大连接数 DEFAULT_MAX_CONSUME_CHANNEL = 25 //最大消费channel数(一般指消费者) DEFAULT_MAX_CONSUME_RETRY = 5 //消费者断线重连最大次数 DEFAULT_PUSH_MAX_TIME = 5 //最大重发次数 DEFAULT_MAX_PRODUCT_RETRY = 5 //生产者断线重连最大次数 //轮循-连接池负载算法 LOAD_BALANCE_ROUND = 1 )
const ( RABBITMQ_TYPE_PUBLISH = 1 //生产者 RABBITMQ_TYPE_CONSUME = 2 //消费者 DEFAULT_RETRY_MIN_RANDOM_TIME = 5000 //最小重试时间机数 DEFAULT_RETRY_MAX_RADNOM_TIME = 15000 //最大重试时间机数 )
const ( EXCHANGE_TYPE_FANOUT = "fanout" // Fanout:广播,将消息交给所有绑定到交换机的队列 EXCHANGE_TYPE_DIRECT = "direct" //Direct:定向,把消息交给符合指定routing key 的队列 EXCHANGE_TYPE_TOPIC = "topic" //Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列 )
const ( RCODE_PUSH_MAX_ERROR = 501 //发送超过最大重试次数 RCODE_GET_CHANNEL_ERROR = 502 //获取信道失败 RCODE_CHANNEL_QUEUE_EXCHANGE_BIND_ERROR = 503 //交换机/队列/绑定失败 RCODE_CONNECTION_ERROR = 504 //连接失败 RCODE_PUSH_ERROR = 505 //消息推送失败 RCODE_CHANNEL_CREATE_ERROR = 506 //信道创建失败 RCODE_RETRY_MAX_ERROR = 507 //超过最大重试次数 )
* 错误码
Variables ¶
var (
ACK_DATA_NIL = errors.New("ack data nil")
)
var UTFALL_SECOND = "2006-01-02 15:04:05"
Functions ¶
func GetQueueConsumerMode ¶
func GetQueueConsumerMode(mode QueueConsumerMode) string
func GetQueueConsumerModeContains ¶
func GetQueueConsumerModeContains(modes []QueueConsumerMode, compareMode QueueConsumerMode) bool
func QueueConsumerModeList ¶
func QueueConsumerModeList() map[QueueConsumerMode]string
func RandomAround ¶
Types ¶
type ConsumeReceive ¶
type ConsumeReceive struct { TagName string //标识处理方法 ExchangeName string //交换机 ExchangeType string //交换机类型 Route string //路由 QueueName string //队列名称 EventSuccess func(data *amqp.Delivery, retryClient RetryClientInterface) bool //成功事件回调 EventFail func(int, error, []byte) //失败回调 IsTry bool //是否重试 MaxReTry int32 //最大重式次数 IsAutoAck bool //是否自动确认 }
消费者注册接收数据
type QueueConsumerMode ¶
type QueueConsumerMode string
const ( Normal QueueConsumerMode = "Normal" Retry QueueConsumerMode = "Retry" Fail QueueConsumerMode = "Fail" )
QueueConsumerMode
func (QueueConsumerMode) ToString ¶
func (p QueueConsumerMode) ToString() string
type RabbitAttribute ¶
type RabbitAttribute struct { //交换机名称 ExchangeName string `json:"exchangeName"` // 交换机模式[fanout 发布订阅模式,direct 路由模式,topic 主题模式,x-delayed-message 延时队列模式 WorkModel string `json:"workModel"` //路由键《路由键和队列名称配合使用》 RoutingKey string `json:"routingKey"` //队列名称《队列名称和路由键配合使用》 Queue string `json:"queue"` //队列订阅消费的模式 Modes []QueueConsumerMode `json:"modes"` }
事件event配置的基本应用交换机队列数据
type RabbitLoadBalance ¶
type RabbitLoadBalance struct { }
* 连接负载处理
func NewRabbitLoadBalance ¶
func NewRabbitLoadBalance() *RabbitLoadBalance
func (*RabbitLoadBalance) RoundRobin ¶
func (r *RabbitLoadBalance) RoundRobin(cIndex, max int32) int32
* 负载均衡 轮循
type RabbitMqData ¶
type RabbitMqData struct { ExchangeName string //交换机名称 ExchangeType string //交换机类型 见RabbitmqPool.go 常量 QueueName string //队列名称 Route string //路由 Data string //发送数据 }
* 发送数据 消息发送
func GetRabbitMqDataFormat ¶
func GetRabbitMqDataFormat(exChangeName string, exChangeType string, queueName string, route string, data string) *RabbitMqData
* 获取发送数据模板 @param exChangeName 交换机名称 @param exChangeType 交换机类型 @param queueName string 队列名称 @param route string 路由 @param data string 发送的数据
func GetRabbitMqDataFormatExpire ¶
func GetRabbitMqDataFormatExpire(exChangeName string, exChangeType string, queueName string, route string, data string) *RabbitMqData
* 获取发送数据模板 过期设置(死信队列) @param exChangeName 交换机名称 @param exChangeType 交换机类型 @param queueName string 队列名称 @param route string 路由 @param data string 发送的数据
type RabbitMqError ¶
* 错误返回
func NewRabbitMqError ¶
func NewRabbitMqError(code int, message string, detail string) *RabbitMqError
func (RabbitMqError) Error ¶
func (e RabbitMqError) Error() string
type RabbitPool ¶
type RabbitPool struct {
// contains filtered or unexported fields
}
func (*RabbitPool) BindQueue ¶
func (r *RabbitPool) BindQueue(rconn *rConn, channel *rChannel, queueName, routingKey, exchange string, args amqp.Table) error
绑定队列
func (*RabbitPool) CloseConn ¶
func (r *RabbitPool) CloseConn(conn *rConn) error
func (*RabbitPool) Connect ¶
* 连接rabbitmq @param host string 服务器地址 @param port int 服务端口 @param user string 用户名 @param password 密码
func (*RabbitPool) ConnectVirtualHost ¶
func (r *RabbitPool) ConnectVirtualHost(host string, port int, user string, password string, virtualHost string) error
* 自定义虚拟机连接 @param host string 服务器地址 @param port int 服务端口 @param user string 用户名 @param password 密码 @param virtualHost虚拟机路径
func (*RabbitPool) CreateChannel ¶
func (r *RabbitPool) CreateChannel(conn *rConn) (*amqp.Channel, error)
对外创建channel
func (*RabbitPool) CreateRChannel ¶
func (r *RabbitPool) CreateRChannel(channel *amqp.Channel, num int32) *rChannel
对外创建rChannel
func (*RabbitPool) DeclareExchange ¶
func (r *RabbitPool) DeclareExchange(rconn *rConn, channel *rChannel, exChangeName, exChangeType string, args amqp.Table) error
尝试创建交换机,不存在创建
func (*RabbitPool) DeclareQueue ¶
func (r *RabbitPool) DeclareQueue(rconn *rConn, channel *rChannel, queueName string, args amqp.Table) (string, error)
尝试创建队列,不存在创建
func (*RabbitPool) GetErrorChannel ¶
func (r *RabbitPool) GetErrorChannel() chan *amqp.Error
func (*RabbitPool) GetHost ¶
func (r *RabbitPool) GetHost() string
func (*RabbitPool) GetMaxConsumerChannel ¶
func (r *RabbitPool) GetMaxConsumerChannel() int32
获取消费者最大信道数据
func (*RabbitPool) GetPort ¶
func (r *RabbitPool) GetPort() int
func (*RabbitPool) GetRChannelCh ¶
func (r *RabbitPool) GetRChannelCh(rChannel *rChannel) *amqp.Channel
对外获取channel
func (*RabbitPool) RegisterConsumeReceive ¶
func (r *RabbitPool) RegisterConsumeReceive(consumeReceive *ConsumeReceive)
* 注册消费接收
func (*RabbitPool) SetConnectionBalance ¶
func (r *RabbitPool) SetConnectionBalance(balance int)
* 设置连接池负载算法 默认轮循
func (*RabbitPool) SetConnectionError ¶
func (r *RabbitPool) SetConnectionError(code int, msg string)
对外设置连接错误消息发送channel
func (*RabbitPool) SetMaxConnection ¶
func (r *RabbitPool) SetMaxConnection(maxConnection int32)
* 设置最大连接数
func (*RabbitPool) SetMaxConsumeChannel ¶
func (r *RabbitPool) SetMaxConsumeChannel(maxConsume int32)
* 设置消费者最大信道数
func (*RabbitPool) SetRandomRetryTime ¶
func (r *RabbitPool) SetRandomRetryTime(min, max int64)
* 设置随时重试时间 避免同一时刻一次重试过多
type RetryClientInterface ¶
type RetryClientInterface interface { Push(pushData []byte) *RabbitMqError Ack() error }
type RetryToolInterface ¶
type RetryToolInterface interface {
// contains filtered or unexported methods
}