Documentation ¶
Index ¶
- Constants
- Variables
- func NewIllegalOptionsError(msg string) error
- type Channel
- type Option
- func WithAMQPConfig(cfg amqp.Config) Option
- func WithBlocking(blocking bool) Option
- func WithChannelAliveDuration(duration time.Duration) Option
- func WithConnectionAliveDuration(duration time.Duration) Option
- func WithConnectionTimeout(timeout time.Duration) Option
- func WithEndpoints(endpoints ...string) Option
- func WithIdleChannelCountPerConnection(count int) Option
- func WithIdleConnectionCount(count int) Option
- func WithLogger(logger logrus.FieldLogger) Option
- func WithMaximumChannelCountPerConnection(count int) Option
- func WithMaximumConnectionCount(count int) Option
- func WithRetryPolicy(policy retry_policy.RetryPolicy) Option
- func WithScanConnectionIdleDuration(duration time.Duration) Option
- type Options
- type Pool
Constants ¶
View Source
const ( DefaultConnectionTimeout = time.Second * 5 // 默认连接超时时间 DefaultConnectionAliveDuration = time.Minute * 5 // 默认连接存活时间 DefaultChannelAliveDuration = time.Minute * 5 // 默认通道存活时间 DefaultRetryMaximumAttempts = 3 // 断开连接后重试次数 DefaultRetryInterval = time.Second * 10 // 断开连接后重试间隔 DefaultScanIdleDuration = time.Minute // 默认扫描回收间隔 )
View Source
const ( Idle int32 = 0 // 通道空闲 Used int32 = 1 // 通道使用 Closed int32 = 2 // 关闭 )
定义通道的状态
Variables ¶
View Source
var ( ErrChannelMaximum = errors.New("channel is up to maximum") // 通道到达配置的最大值 ErrIllegalOptions = errors.New("illegal options") // 非法的配置 ErrPoolClosed = errors.New("pool is closed") // 连接池已关闭 )
View Source
var AmqpConnectionPrefix = "GoAMQP#"
AmqpConnectionPrefix amqp连接名的前缀
Functions ¶
func NewIllegalOptionsError ¶
NewIllegalOptionsError 新建一个非法配置的错误
Types ¶
type Option ¶
type Option func(options *Options)
Option 选项,用于使用函数方式来修改配置
func WithChannelAliveDuration ¶
WithChannelAliveDuration 用于配置通道的存活时间,当超过这个时间且空闲的通道数超过配置的空闲通道数,则通道会被回收
func WithConnectionAliveDuration ¶
WithConnectionAliveDuration 用于配置连接的存活时间,当超过这个时间且空闲连接数超过配置的空闲连接数,则连接会被回收关闭
func WithConnectionTimeout ¶
WithConnectionTimeout 用于配置连接超时时间
func WithEndpoints ¶
WithEndpoints 用于配置节点地址,此函数会覆盖现有的节点地址
func WithIdleChannelCountPerConnection ¶
WithIdleChannelCountPerConnection 用于配置每个连接闲置的通道数量
func WithIdleConnectionCount ¶
WithIdleConnectionCount 用于配置闲置连接数量
func WithMaximumChannelCountPerConnection ¶
WithMaximumChannelCountPerConnection 用于配置每个连接最大的通道数
func WithMaximumConnectionCount ¶
WithMaximumConnectionCount 用于配置最大连接数
func WithRetryPolicy ¶
func WithRetryPolicy(policy retry_policy.RetryPolicy) Option
WithRetryPolicy 配置重试策略
func WithScanConnectionIdleDuration ¶
WithScanConnectionIdleDuration 扫描连接空闲间隔
type Options ¶
type Options struct { Endpoints []string // amqp节点地址,可传入多个 ConnectTimeout time.Duration // 连接超时时间 MaximumConnectionCount int // 最大连接数,0为不限制 MaximumChannelCountPerConnection int // 每个连接最大通道数,0为不限制 ConnectionAliveDuration time.Duration // 连接存活时间,超过这个时间的空闲连接在保证空闲连接数情况下会被回收关闭,若配置为0则不回收 ChannelAliveDuration time.Duration // 通道存活时间,超过这个时间的空闲通道在保证空闲连接数情况下会被回收关闭,若配置为0则不回收 IdleConnectionCount int // 空闲连接数 IdleChannelCountPerConnection int // 每个连接空闲通道数 AMQPConfig amqp.Config // 可传入amqp的配置,其中最大通道限制、连接地址、连接超时时间将被上面的节点地址、连接超时时间、每个连接最大通道数所覆盖 Logger logrus.FieldLogger // 可传入实现此接口的日志 Blocking bool // 是否阻塞模式,非阻塞模式将在获取通道时等候 RetryPolicy retry_policy.RetryPolicy // 重试策略 ScanConnectionIdleDuration time.Duration // 扫描连接空闲间隔 }
Options 配置
type Pool ¶
type Pool interface { // GetChannel 从连接池中获取一个通道 // 使用后需要 PutChannel 放回 // 当连接池中没有空闲的通道时,将会建立一个新的连接,并生成通道返回 // 但当连接已经达到最大的连接数限制,则会返回一个 ErrChannelMaximum 的错误 GetChannel() (*Channel, error) // PutChannel 将通道放回连接池中 PutChannel(channel *Channel) bool // Execute 通过传入函数,使用一个通道,此方式将自动的获取通道并使用后自动放回 // 其调用过程等于 GetChannel() -> fn() -> PutChannel Execute(fn func(channel *Channel) error) error // Close 实现io.Closer接口,完成对连接池的关闭 Close() error // Cap 连接池容量 Cap() int // Size 连接池当前连接数 Size() int }
Pool 定义了连接池接口
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package consumer 消费者包,提供快捷的消费方法
|
Package consumer 消费者包,提供快捷的消费方法 |
Package declare 定义包,用于快速完成定义队列、交换机、绑定关系的操作
|
Package declare 定义包,用于快速完成定义队列、交换机、绑定关系的操作 |
Package publisher 发布包,提供快捷的广播发布功能
|
Package publisher 发布包,提供快捷的广播发布功能 |
Package retry_policy 重试策略包,提供重试策略接口,主要用于对AMQP连接断开时自动重连时的策略定义
|
Package retry_policy 重试策略包,提供重试策略接口,主要用于对AMQP连接断开时自动重连时的策略定义 |
Click to show internal directories.
Click to hide internal directories.