goamqp

package module
v0.0.0-...-de203b7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2021 License: GPL-3.0 Imports: 13 Imported by: 0

README

goamqp

介绍

goamqp是一个基于 github.com/streadway/amqp 的连接池包,其有以下特性

  • 支持对多个 amqp broker 进行连接
  • 支持并发
  • 支持阻塞模式
  • 支持断开重连
  • 伸缩性,对长期不使用的连接和通道进行回收,降低资源占用

todo

  • 基本连接池功能
  • 增加对Block的支持 *
  • 支持连接重连
  • 增加消费的快捷使用
  • 增加对队列、交换机、死信队列、延时队列等快捷定义方法
  • 支持空闲自动回收

packages

consumer

提供了快捷的消费者方法,其实现了 Looper 接口,可以在上下文未结束时持续进行消费。

declare

提供了针对队列定义、交换机定义以及绑定关系的快捷方法。

publisher

提供了发布的快捷方法

retry_policy

定义了重试策略,此策略是是针对 connection 断开时,如何进行重试操作的定义。

NewDefaultPolicy 将创建一个基于 attemptsMaximum (最大重试次数) 以及 interval (重试间隔) 的重试操作,一旦超过最大重试次数,则会调用 final 方法,并传入重试连接错误

Documentation

Index

Constants

View Source
const (
	DefaultConnectionTimeout       = time.Second * 5  // 默认连接超时时间
	DefaultConnectionAliveDuration = time.Minute * 5  // 默认连接存活时间
	DefaultChannelAliveDuration    = time.Minute * 5  // 默认通道存活时间
	DefaultRetryMaximumAttempts    = 3                // 断开连接后重试次数
	DefaultRetryInterval           = time.Second * 10 // 断开连接后重试间隔
	DefaultScanIdleDuration        = time.Minute      // 默认扫描回收间隔
)
View Source
const (
	Idle   int32 = 0 // 通道空闲
	Used   int32 = 1 // 通道使用
	Closed int32 = 2 // 关闭
)

定义通道的状态

Variables

View Source
var (
	ErrChannelMaximum = errors.New("channel is up to maximum") // 通道到达配置的最大值
	ErrIllegalOptions = errors.New("illegal options")          // 非法的配置
	ErrPoolClosed     = errors.New("pool is closed")           // 连接池已关闭
)
View Source
var AmqpConnectionPrefix = "GoAMQP#"

AmqpConnectionPrefix amqp连接名的前缀

Functions

func NewIllegalOptionsError

func NewIllegalOptionsError(msg string) error

NewIllegalOptionsError 新建一个非法配置的错误

Types

type Channel

type Channel struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

Channel 定义通道

func (*Channel) Close

func (c *Channel) Close() error

Close 关闭通道

func (*Channel) IsClosed

func (c *Channel) IsClosed() bool

IsClosed 是否关闭了

type Option

type Option func(options *Options)

Option 选项,用于使用函数方式来修改配置

func WithAMQPConfig

func WithAMQPConfig(cfg amqp.Config) Option

WithAMQPConfig 用于配置AMQP的配置项

func WithBlocking

func WithBlocking(blocking bool) Option

WithBlocking 是否使用阻塞模式

func WithChannelAliveDuration

func WithChannelAliveDuration(duration time.Duration) Option

WithChannelAliveDuration 用于配置通道的存活时间,当超过这个时间且空闲的通道数超过配置的空闲通道数,则通道会被回收

func WithConnectionAliveDuration

func WithConnectionAliveDuration(duration time.Duration) Option

WithConnectionAliveDuration 用于配置连接的存活时间,当超过这个时间且空闲连接数超过配置的空闲连接数,则连接会被回收关闭

func WithConnectionTimeout

func WithConnectionTimeout(timeout time.Duration) Option

WithConnectionTimeout 用于配置连接超时时间

func WithEndpoints

func WithEndpoints(endpoints ...string) Option

WithEndpoints 用于配置节点地址,此函数会覆盖现有的节点地址

func WithIdleChannelCountPerConnection

func WithIdleChannelCountPerConnection(count int) Option

WithIdleChannelCountPerConnection 用于配置每个连接闲置的通道数量

func WithIdleConnectionCount

func WithIdleConnectionCount(count int) Option

WithIdleConnectionCount 用于配置闲置连接数量

func WithLogger

func WithLogger(logger logrus.FieldLogger) Option

WithLogger 用于配置日志

func WithMaximumChannelCountPerConnection

func WithMaximumChannelCountPerConnection(count int) Option

WithMaximumChannelCountPerConnection 用于配置每个连接最大的通道数

func WithMaximumConnectionCount

func WithMaximumConnectionCount(count int) Option

WithMaximumConnectionCount 用于配置最大连接数

func WithRetryPolicy

func WithRetryPolicy(policy retry_policy.RetryPolicy) Option

WithRetryPolicy 配置重试策略

func WithScanConnectionIdleDuration

func WithScanConnectionIdleDuration(duration time.Duration) Option

WithScanConnectionIdleDuration 扫描连接空闲间隔

type Options

type Options struct {
	Endpoints                        []string                 // amqp节点地址,可传入多个
	ConnectTimeout                   time.Duration            // 连接超时时间
	MaximumConnectionCount           int                      // 最大连接数,0为不限制
	MaximumChannelCountPerConnection int                      // 每个连接最大通道数,0为不限制
	ConnectionAliveDuration          time.Duration            // 连接存活时间,超过这个时间的空闲连接在保证空闲连接数情况下会被回收关闭,若配置为0则不回收
	ChannelAliveDuration             time.Duration            // 通道存活时间,超过这个时间的空闲通道在保证空闲连接数情况下会被回收关闭,若配置为0则不回收
	IdleConnectionCount              int                      // 空闲连接数
	IdleChannelCountPerConnection    int                      // 每个连接空闲通道数
	AMQPConfig                       amqp.Config              // 可传入amqp的配置,其中最大通道限制、连接地址、连接超时时间将被上面的节点地址、连接超时时间、每个连接最大通道数所覆盖
	Logger                           logrus.FieldLogger       // 可传入实现此接口的日志
	Blocking                         bool                     // 是否阻塞模式,非阻塞模式将在获取通道时等候
	RetryPolicy                      retry_policy.RetryPolicy // 重试策略
	ScanConnectionIdleDuration       time.Duration            // 扫描连接空闲间隔
}

Options 配置

func (Options) Validate

func (o Options) Validate() error

Validate 用于校验参数是否有效 无效将返回一个 ErrIllegalOptions 错误,可以用 errors.Is 来判断

type Pool

type Pool interface {

	// GetChannel 从连接池中获取一个通道
	// 使用后需要 PutChannel 放回
	// 当连接池中没有空闲的通道时,将会建立一个新的连接,并生成通道返回
	// 但当连接已经达到最大的连接数限制,则会返回一个 ErrChannelMaximum 的错误
	GetChannel() (*Channel, error)

	// PutChannel 将通道放回连接池中
	PutChannel(channel *Channel) bool

	// Execute 通过传入函数,使用一个通道,此方式将自动的获取通道并使用后自动放回
	// 其调用过程等于 GetChannel() -> fn() -> PutChannel
	Execute(fn func(channel *Channel) error) error

	// Close 实现io.Closer接口,完成对连接池的关闭
	Close() error

	// Cap 连接池容量
	Cap() int

	// Size 连接池当前连接数
	Size() int
}

Pool 定义了连接池接口

func NewPool

func NewPool(opts ...Option) (Pool, error)

NewPool 新建一个连接池

func NewPoolByOptions

func NewPoolByOptions(opt Options) (Pool, error)

NewPoolByOptions 通过选项建立连接池

Directories

Path Synopsis
Package consumer 消费者包,提供快捷的消费方法
Package consumer 消费者包,提供快捷的消费方法
Package declare 定义包,用于快速完成定义队列、交换机、绑定关系的操作
Package declare 定义包,用于快速完成定义队列、交换机、绑定关系的操作
Package publisher 发布包,提供快捷的广播发布功能
Package publisher 发布包,提供快捷的广播发布功能
Package retry_policy 重试策略包,提供重试策略接口,主要用于对AMQP连接断开时自动重连时的策略定义
Package retry_policy 重试策略包,提供重试策略接口,主要用于对AMQP连接断开时自动重连时的策略定义

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL