rabbit

package module
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

README

rabbit test

个人写着玩

Documentation

Index

Constants

View Source
const (
	DefaultConnNum    = 1
	DefaultChannelNum = 1

	DefaultHost = "amqp://guest:guest@localhost:5672/"
)
View Source
const RabbitAttempt = "rabbit_attempt"

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(ch *amqp.Channel) *Channel

func (*Channel) Consume

func (c *Channel) Consume(name string) (<-chan amqp.Delivery, error)

func (*Channel) ExchangeDeclare

func (c *Channel) ExchangeDeclare(ex *Exchange) error

func (*Channel) Publish

func (c *Channel) Publish(data *PublishData) error

func (*Channel) QueueBind

func (c *Channel) QueueBind(queueName, key, exchangeName string, args amqp.Table) error

func (*Channel) QueueDeclare

func (c *Channel) QueueDeclare(queue *Queue) (amqp.Queue, error)

type ChannelPool

type ChannelPool struct {
	// contains filtered or unexported fields
}

func NewChannelPool

func NewChannelPool(pool *list.List) *ChannelPool

func (*ChannelPool) Get

func (p *ChannelPool) Get() (*Channel, error)

func (*ChannelPool) Release

func (p *ChannelPool) Release(channel *Channel)

type ChannelService

type ChannelService interface {
	Publish(data *PublishData) error
}

type ConnPoolOptions

type ConnPoolOptions struct {
	ConnNum    int
	Url        string
	ChannelNum int
}

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(config *ConnectionConfig) *Connection

func (*Connection) GetChannelPool

func (c *Connection) GetChannelPool() *ChannelPool

type ConnectionConfig

type ConnectionConfig struct {
	ChanNum int
	Url     string
}

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

func NewConnectionPool

func NewConnectionPool(opts *ConnPoolOptions) *ConnectionPool

func (*ConnectionPool) Get

func (c *ConnectionPool) Get() (*Connection, error)

func (*ConnectionPool) Release

func (c *ConnectionPool) Release(conn *Connection)

type Consumer

type Consumer interface {
	Consume(name string) error
}

type ConsumerService

type ConsumerService struct {
	Rabbit RabbitService
	// contains filtered or unexported fields
}

func NewConsumerService

func NewConsumerService(rabbit RabbitService) *ConsumerService

func (*ConsumerService) Consume

func (c *ConsumerService) Consume(name string) error

func (*ConsumerService) Listen

func (c *ConsumerService) Listen(name string)

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

name, kind string, durable, autoDelete, internal, noWait bool, args Table

type Job

type Job interface {
	RabbitService
}

func NewJobService

func NewJobService(m *Message, rabbitService RabbitService) Job

type JobService

type JobService struct {
	RabbitService
	// contains filtered or unexported fields
}

func (*JobService) Ack

func (j *JobService) Ack() error

func (*JobService) Attempt

func (j *JobService) Attempt() int

func (*JobService) Retry

func (j *JobService) Retry(ctx context.Context) error

type Message

type Message struct {
	ID          string
	Body        []byte
	ContentType string
	RoutingKey  string
	Headers     amqp.Table
	Attempt     int
	D           amqp.Delivery
}

type Options

type Options struct {
	MaxConnection int
	Host          string
	MaxChannels   int
}

type Producer

type Producer interface {
	Publish(ctx context.Context, exchange *Exchange, message *Message) error
}

func NewProducerService

func NewProducerService(rabbit RabbitService) Producer

type ProducerService

type ProducerService struct {
	Rabbit RabbitService
}

func (*ProducerService) Publish

func (p *ProducerService) Publish(ctx context.Context, exchange *Exchange, message *Message) error

type PublishData

type PublishData struct {
	Exchange  string
	Key       string
	Mandatory bool // 如果没有队列 true 返回给生产者 false 会丢掉
	Immediate bool // 如果没有消费者 true 返回给生产者,false 丢入队列
	Msg       *amqp.Publishing
}

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

name string, durable, autoDelete, exclusive, noWait bool, args Table

type Rabbit

type Rabbit struct {
	// contains filtered or unexported fields
}

func NewRabbit

func NewRabbit(opts *Options) *Rabbit

func (*Rabbit) BindQueue

func (r *Rabbit) BindQueue(ch *Channel, exchangeName, key, queueName string) error

bind exchange with queue

func (*Rabbit) ExchangeDeclare

func (r *Rabbit) ExchangeDeclare(ch *Channel, exchange *Exchange) error

declare exchange if not exist

func (*Rabbit) GetConnectionPool

func (r *Rabbit) GetConnectionPool() *ConnectionPool

func (*Rabbit) QueueDeclare

func (r *Rabbit) QueueDeclare(ch *Channel, queue *Queue) error

declare queue if not exist

type RabbitService

type RabbitService interface {
	ExchangeDeclare(ch *Channel, exchange *Exchange) error
	QueueDeclare(ch *Channel, queue *Queue) error
	BindQueue(ch *Channel, exchangeName, key, queueName string) error
	GetConnectionPool() *ConnectionPool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL