amqp

package module
v0.0.0-...-a6ce81c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2022 License: MIT Imports: 8 Imported by: 0

README

MQ管理器

初始化

  1. 连接字符串
  2. 打开通道channel

声明

  1. 队列、交换机、路由

注册消费者、生产者

  1. 注册
  2. 启动消费

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(log ILogger)

func ShowDefaultLogger

func ShowDefaultLogger(on bool)

Types

type AMQPManager

type AMQPManager struct {
	TCPSection
	// contains filtered or unexported fields
}

AMQPManager mq manager

func NewManager

func NewManager(host, user, pwd, path string, port int32) *AMQPManager
func (m *AMQPManager) AutoRelink(ctx context.Context)

AutoRelink auto relink consumer/producer in manager

func (*AMQPManager) Consumers

func (m *AMQPManager) Consumers() map[string]IConsumer

func (*AMQPManager) Declare

func (m *AMQPManager) Declare(channel *amqpMeta.Channel, declareFunc ...DeclareFunc) error

Declare declare some element, such as exchange/queue/router

func (*AMQPManager) DisConnect

func (m *AMQPManager) DisConnect() error

DisConnect close connection

func (*AMQPManager) GetConnect

func (m *AMQPManager) GetConnect() (*amqpMeta.Connection, error)

GetConnect get open connection

func (*AMQPManager) GetConsumer

func (m *AMQPManager) GetConsumer(name string) IConsumer

func (*AMQPManager) GetProducer

func (m *AMQPManager) GetProducer(name string) IProducer

func (*AMQPManager) NewChannel

func (m *AMQPManager) NewChannel() (*amqpMeta.Channel, error)

NewChannel get new channel in a living connect

func (m *AMQPManager) NoRelink()

func (*AMQPManager) Producers

func (m *AMQPManager) Producers() map[string]IProducer

func (*AMQPManager) PublishOnce

func (m *AMQPManager) PublishOnce(ctx context.Context, exchange, router string, encoder Encoder, messages ...IMessage) ([]error, error)

PublishOnce publish message in new connect

func (*AMQPManager) Register

func (m *AMQPManager) Register(c ICaller) error

Register register consumer/producer in manager

func (*AMQPManager) URL

func (m *AMQPManager) URL() string

URL build url string

Example
m := &AMQPManager{
	TCPSection: TCPSection{
		host: "localhost",
		port: 5672,
		user: "test",
		pwd:  "123456",
		path: "/test",
	},
}
println(m.URL())
Output:

type CloseHandler

type CloseHandler func(e *amqpMeta.Error)

type ConfirmHandler

type ConfirmHandler func(c amqpMeta.Confirmation)

type Consumer

type Consumer struct {
	Name             string
	Ctx              context.Context
	QueueName        string
	NoLocal          bool
	AutoAck          bool
	Exclusive        bool
	NoWait           bool
	Arguments        map[string]interface{}
	CloseHandlers    []CloseHandler
	DeliveryHandlers []DeliveryHandler
	// contains filtered or unexported fields
}

func (*Consumer) AddCloseHandler

func (e *Consumer) AddCloseHandler(h CloseHandler)

func (*Consumer) AddDeliveryHandler

func (e *Consumer) AddDeliveryHandler(h DeliveryHandler)

func (*Consumer) Identify

func (e *Consumer) Identify() string
func (e *Consumer) Link(m *AMQPManager) error

type DeclareFunc

type DeclareFunc func(channel *amqpMeta.Channel) error

func WithExchange

func WithExchange(e Exchange) DeclareFunc

func WithQueue

func WithQueue(e Queue) DeclareFunc

func WithRouter

func WithRouter(e Router) DeclareFunc

type DeliveryHandler

type DeliveryHandler func(e amqpMeta.Delivery)

type Element

type Element struct {
	Name       string
	Durable    bool
	AutoDelete bool
	NoWait     bool
	Arguments  map[string]interface{}
}

type Encoder

type Encoder interface {
	GetContentType() string
	GetEncoding() string
	Encode(v interface{}) ([]byte, error)
	Decode(data []byte, v interface{}) error
}

type Exchange

type Exchange struct {
	Element
	Kind     ExchangeType
	Internal bool
}

func (*Exchange) Declare

func (e *Exchange) Declare(channel *amqpMeta.Channel) error

type ExchangeType

type ExchangeType int32
const (
	Nil ExchangeType = iota
	Direct
	Topic
	FanOut
	Headers
)

func (ExchangeType) String

func (i ExchangeType) String() string

type ICaller

type ICaller interface {
	Identify() string
	Link(m *AMQPManager) error
}

type IConsumer

type IConsumer interface {
	ICaller
	AddCloseHandler(h CloseHandler)
	AddDeliveryHandler(h DeliveryHandler)
}

func NewConsumer

func NewConsumer(ctx context.Context, name, queue string) IConsumer

type ILogger

type ILogger interface {
	DebugCtxf(ctx context.Context, format string, args ...interface{})
	InfoCtxf(ctx context.Context, format string, args ...interface{})
	WarnCtxf(ctx context.Context, format string, args ...interface{})
	ErrorCtxf(ctx context.Context, format string, args ...interface{})
}

func GetLogger

func GetLogger() ILogger

type IMessage

type IMessage interface {
	GetMessageID() string
	GetTimeout() time.Duration
}

type IProducer

type IProducer interface {
	ICaller
	Publish(ctx context.Context, router string, encoder Encoder, messages ...IMessage) ([]error, error)
}

func NewProducer

func NewProducer(name, exchange string) IProducer

type JSONEncoder

type JSONEncoder struct{}

func (JSONEncoder) Decode

func (e JSONEncoder) Decode(data []byte, v interface{}) error

func (JSONEncoder) Encode

func (e JSONEncoder) Encode(v interface{}) ([]byte, error)

func (JSONEncoder) GetContentType

func (e JSONEncoder) GetContentType() string

func (JSONEncoder) GetEncoding

func (e JSONEncoder) GetEncoding() string

type Producer

type Producer struct {
	Name     string
	Exchange string
	// contains filtered or unexported fields
}

func (*Producer) Identify

func (p *Producer) Identify() string
func (p *Producer) Link(m *AMQPManager) error

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, router string, encoder Encoder, messages ...IMessage) ([]error, error)

Publish send message

type Queue

type Queue struct {
	Element
	Exclusive bool
}

func (*Queue) Declare

func (e *Queue) Declare(channel *amqpMeta.Channel) (amqpMeta.Queue, error)

type ReturnHandler

type ReturnHandler func(r amqpMeta.Return)

type Router

type Router struct {
	Ex        *Exchange
	Q         *Queue
	Name      string
	NoWait    bool
	Arguments map[string]interface{}
}

func (*Router) Bind

func (e *Router) Bind(channel *amqpMeta.Channel) error

type TCPSection

type TCPSection struct {
	// contains filtered or unexported fields
}

func (*TCPSection) GetHost

func (m *TCPSection) GetHost() string

func (*TCPSection) GetPath

func (m *TCPSection) GetPath() string

func (*TCPSection) GetPort

func (m *TCPSection) GetPort() int32

func (*TCPSection) GetPwd

func (m *TCPSection) GetPwd() string

func (*TCPSection) GetUser

func (m *TCPSection) GetUser() string

func (*TCPSection) SetHost

func (m *TCPSection) SetHost(v string)

func (*TCPSection) SetPath

func (m *TCPSection) SetPath(v string)

func (*TCPSection) SetPort

func (m *TCPSection) SetPort(v int32)

func (*TCPSection) SetPwd

func (m *TCPSection) SetPwd(v string)

func (*TCPSection) SetUser

func (m *TCPSection) SetUser(v string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL