easy_amqp

package module
v0.0.0-...-8458fb1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2019 License: GPL-3.0 Imports: 13 Imported by: 0

README

easy-amqp

a lightweight amqp framework

Documentation

Overview

easy_amqp define a lightweight framework

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultQueueDurable       = true
	DefaultQueueAutoDelete    = false
	DefaultQueueNoWait        = false
	DefaultQueueExclusive     = false
	DefaultExchangeDurable    = true
	DefaultExchangeAutoDelete = false
	DefaultExchangeNoWait     = false
	DefaultExchangeInternal   = false
	DefaultRouteKey           = ""
	DefaultBindNoWait         = false
)
View Source
var (
	QueueDeclared    = make(map[string]Queue)
	ExchangeDeclared = make(map[string]Exchange)
)
View Source
var (
	ErrorClosed         = errors.New("publisher is closed")
	ErrorAmqpConnection = errors.New("error amqp connection")
	ErrorNoChannel      = errors.New("not channel gets")
	ErrorBuildChannel   = errors.New("can't build channel")
)
View Source
var (
	ErrorConnection          = errors.New("error connection of amqp")
	ErrorChanRecvFail        = errors.New("channel recv fail")
	ErrorEmptySubscribeQueue = errors.New("subscribe queues empty")
)
View Source
var (
	ErrorInvalidContentType = errors.New("invalid content type")
)

Functions

func DeclareExchange

func DeclareExchange(channel *amqp.Channel, exchanges ...Exchange) error

DeclareExchange is recursive declare exchanges and bind them

func DeclareQueue

func DeclareQueue(channel *amqp.Channel, queues ...Queue) error

DeclareQueue is declare queues and bind exchange

func NewChannelPool

func NewChannelPool(capacity uint, factory channelFactory) *channelPool

func NewMessage

func NewMessage(raw interface{}, contentType ContentType, opts ...MessageOption) *message

func NewPublisher

func NewPublisher(conn *amqp.Connection, ctx context.Context) *publisher

func NewSubscriber

func NewSubscriber(conn *amqp.Connection, ctx context.Context) *subscriber

Types

type Bind

type Bind struct {
	Exg       Exchange
	RouterKey string
	NoWait    bool
	Args      amqp.Table
}

Bind is define exchange relation to queue or exchange relation to exchange

type BindOption

type BindOption func(*Bind)

func WithBindAllRouter

func WithBindAllRouter() BindOption

func WithBindArgs

func WithBindArgs(args amqp.Table) BindOption

func WithBindNoWait

func WithBindNoWait(noWait bool) BindOption

func WithBindRouteKey

func WithBindRouteKey(key string) BindOption

type ContentType

type ContentType uint8
const (
	Json ContentType = iota
	Text
)

func (ContentType) String

func (c ContentType) String() string

type Exchange

type Exchange struct {
	Name       string
	Alias      string
	Kind       ExchangeKind
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
	sync.Mutex
	Binds []*Bind
}

Exchange is define amqp exchange

func NewExchange

func NewExchange(name string, kind ExchangeKind, opts ...ExchangeOption) Exchange

func (*Exchange) Bind

func (e *Exchange) Bind(ex Exchange, opts ...BindOption)

type ExchangeKind

type ExchangeKind uint8

ExchangeKind

const (
	Direct ExchangeKind = iota
	Fanout
	Topic
	Headers
)

func (ExchangeKind) String

func (e ExchangeKind) String() string

type ExchangeOption

type ExchangeOption func(*Exchange)

func WithExchangeAlias

func WithExchangeAlias(alias string) ExchangeOption

func WithExchangeArgs

func WithExchangeArgs(args amqp.Table) ExchangeOption

func WithExchangeDelete

func WithExchangeDelete(autoDelete bool) ExchangeOption

func WithExchangeDurable

func WithExchangeDurable(durable bool) ExchangeOption

func WithExchangeInternal

func WithExchangeInternal(internal bool) ExchangeOption

func WithExchangeNoWait

func WithExchangeNoWait(noWait bool) ExchangeOption

type Handle

type Handle func(delivery *amqp.Delivery)

type MessageOption

type MessageOption func(*message)

func WithAppId

func WithAppId(appId string) MessageOption

func WithContentType

func WithContentType(contentType ContentType) MessageOption

func WithExchange

func WithExchange(exchange string) MessageOption

func WithMode

func WithMode(mode uint8) MessageOption

func WithRouterKey

func WithRouterKey(routerkey string) MessageOption

func WithType

func WithType(tp string) MessageOption

type Queue

type Queue struct {
	Name       string
	Alias      string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
	Queue      *amqp.Queue
	sync.Mutex
	Binds []Bind
}

Queue is define amqp queue

func NewQueue

func NewQueue(name string, opts ...QueueOption) Queue

func (*Queue) Bind

func (q *Queue) Bind(ex Exchange, opts ...BindOption)

type QueueOption

type QueueOption func(*Queue)

func WithQueueAlias

func WithQueueAlias(alias string) QueueOption

func WithQueueArgs

func WithQueueArgs(args amqp.Table) QueueOption

func WithQueueAutoDelete

func WithQueueAutoDelete(autoDelete bool) QueueOption

func WithQueueDurable

func WithQueueDurable(durable bool) QueueOption

func WithQueueExclusive

func WithQueueExclusive(exclusive bool) QueueOption

func WithQueueNoWait

func WithQueueNoWait(noWait bool) QueueOption

type SubscribeOption

type SubscribeOption struct {
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
	Hande     Handle
}

func NewSUbscribeDefaultOption

func NewSUbscribeDefaultOption(handle Handle) *SubscribeOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL