Documentation ¶
Overview ¶
easy_amqp define a lightweight framework
Index ¶
- Variables
- func DeclareExchange(channel *amqp.Channel, exchanges ...Exchange) error
- func DeclareQueue(channel *amqp.Channel, queues ...Queue) error
- func NewChannelPool(capacity uint, factory channelFactory) *channelPool
- func NewMessage(raw interface{}, contentType ContentType, opts ...MessageOption) *message
- func NewPublisher(conn *amqp.Connection, ctx context.Context) *publisher
- func NewSubscriber(conn *amqp.Connection, ctx context.Context) *subscriber
- type Bind
- type BindOption
- type ContentType
- type Exchange
- type ExchangeKind
- type ExchangeOption
- func WithExchangeAlias(alias string) ExchangeOption
- func WithExchangeArgs(args amqp.Table) ExchangeOption
- func WithExchangeDelete(autoDelete bool) ExchangeOption
- func WithExchangeDurable(durable bool) ExchangeOption
- func WithExchangeInternal(internal bool) ExchangeOption
- func WithExchangeNoWait(noWait bool) ExchangeOption
- type Handle
- type MessageOption
- type Queue
- type QueueOption
- type SubscribeOption
Constants ¶
This section is empty.
Variables ¶
View Source
var ( DefaultQueueDurable = true DefaultQueueAutoDelete = false DefaultQueueNoWait = false DefaultQueueExclusive = false DefaultExchangeDurable = true DefaultExchangeAutoDelete = false DefaultExchangeNoWait = false DefaultExchangeInternal = false DefaultRouteKey = "" DefaultBindNoWait = false )
View Source
var ( QueueDeclared = make(map[string]Queue) ExchangeDeclared = make(map[string]Exchange) )
View Source
var ( ErrorClosed = errors.New("publisher is closed") ErrorAmqpConnection = errors.New("error amqp connection") ErrorNoChannel = errors.New("not channel gets") ErrorBuildChannel = errors.New("can't build channel") )
View Source
var ( ErrorConnection = errors.New("error connection of amqp") ErrorChanRecvFail = errors.New("channel recv fail") ErrorEmptySubscribeQueue = errors.New("subscribe queues empty") )
View Source
var (
ErrorInvalidContentType = errors.New("invalid content type")
)
Functions ¶
func DeclareExchange ¶
DeclareExchange is recursive declare exchanges and bind them
func DeclareQueue ¶
DeclareQueue is declare queues and bind exchange
func NewChannelPool ¶
func NewChannelPool(capacity uint, factory channelFactory) *channelPool
func NewMessage ¶
func NewMessage(raw interface{}, contentType ContentType, opts ...MessageOption) *message
func NewPublisher ¶
func NewPublisher(conn *amqp.Connection, ctx context.Context) *publisher
func NewSubscriber ¶
func NewSubscriber(conn *amqp.Connection, ctx context.Context) *subscriber
Types ¶
type BindOption ¶
type BindOption func(*Bind)
func WithBindAllRouter ¶
func WithBindAllRouter() BindOption
func WithBindArgs ¶
func WithBindArgs(args amqp.Table) BindOption
func WithBindNoWait ¶
func WithBindNoWait(noWait bool) BindOption
func WithBindRouteKey ¶
func WithBindRouteKey(key string) BindOption
type ContentType ¶
type ContentType uint8
const ( Json ContentType = iota Text )
func (ContentType) String ¶
func (c ContentType) String() string
type Exchange ¶
type Exchange struct { Name string Alias string Kind ExchangeKind Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table sync.Mutex Binds []*Bind }
Exchange is define amqp exchange
func NewExchange ¶
func NewExchange(name string, kind ExchangeKind, opts ...ExchangeOption) Exchange
func (*Exchange) Bind ¶
func (e *Exchange) Bind(ex Exchange, opts ...BindOption)
type ExchangeKind ¶
type ExchangeKind uint8
ExchangeKind
const ( Direct ExchangeKind = iota Fanout Topic Headers )
func (ExchangeKind) String ¶
func (e ExchangeKind) String() string
type ExchangeOption ¶
type ExchangeOption func(*Exchange)
func WithExchangeAlias ¶
func WithExchangeAlias(alias string) ExchangeOption
func WithExchangeArgs ¶
func WithExchangeArgs(args amqp.Table) ExchangeOption
func WithExchangeDelete ¶
func WithExchangeDelete(autoDelete bool) ExchangeOption
func WithExchangeDurable ¶
func WithExchangeDurable(durable bool) ExchangeOption
func WithExchangeInternal ¶
func WithExchangeInternal(internal bool) ExchangeOption
func WithExchangeNoWait ¶
func WithExchangeNoWait(noWait bool) ExchangeOption
type MessageOption ¶
type MessageOption func(*message)
func WithAppId ¶
func WithAppId(appId string) MessageOption
func WithContentType ¶
func WithContentType(contentType ContentType) MessageOption
func WithExchange ¶
func WithExchange(exchange string) MessageOption
func WithMode ¶
func WithMode(mode uint8) MessageOption
func WithRouterKey ¶
func WithRouterKey(routerkey string) MessageOption
func WithType ¶
func WithType(tp string) MessageOption
type Queue ¶
type Queue struct { Name string Alias string Durable bool AutoDelete bool Exclusive bool NoWait bool Args amqp.Table Queue *amqp.Queue sync.Mutex Binds []Bind }
Queue is define amqp queue
func NewQueue ¶
func NewQueue(name string, opts ...QueueOption) Queue
func (*Queue) Bind ¶
func (q *Queue) Bind(ex Exchange, opts ...BindOption)
type QueueOption ¶
type QueueOption func(*Queue)
func WithQueueAlias ¶
func WithQueueAlias(alias string) QueueOption
func WithQueueArgs ¶
func WithQueueArgs(args amqp.Table) QueueOption
func WithQueueAutoDelete ¶
func WithQueueAutoDelete(autoDelete bool) QueueOption
func WithQueueDurable ¶
func WithQueueDurable(durable bool) QueueOption
func WithQueueExclusive ¶
func WithQueueExclusive(exclusive bool) QueueOption
func WithQueueNoWait ¶
func WithQueueNoWait(noWait bool) QueueOption
type SubscribeOption ¶
type SubscribeOption struct { AutoAck bool Exclusive bool NoLocal bool NoWait bool Args amqp.Table Hande Handle }
func NewSUbscribeDefaultOption ¶
func NewSUbscribeDefaultOption(handle Handle) *SubscribeOption
Source Files ¶
Click to show internal directories.
Click to hide internal directories.