rabbitmq

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

README

rabbitmq.v3

rabbitmq client for rabbitmq

Documentation

Index

Constants

View Source
const (
	KindDirect  kind = amqp.ExchangeDirect
	KindFanout  kind = amqp.ExchangeFanout
	KindHeaders kind = amqp.ExchangeHeaders
	KindTopic   kind = amqp.ExchangeTopic
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(addr string, options ...Option) (*Client, error)

func (*Client) Close

func (c *Client) Close()

func (*Client) ParseDefiniteMessage

func (c *Client) ParseDefiniteMessage(msg amqp.Delivery) (dm DefiniteMessage, ok bool)

func (*Client) Publish

func (c *Client) Publish(exchange, routeKey string, opts ...PublishOption) (err error)

Publish send message

func (*Client) PublishDefinite

func (c *Client) PublishDefinite(exchange, routeKey string, opts ...PublishOption) (err error)

PublishDefinite send definite message

func (*Client) SetConsumer

func (c *Client) SetConsumer(consumer func(c *Client, msg amqp.Delivery))

type Config

type Config struct {
	Addr     string
	Username string
	Password string
	//deprecated
	PrefetchCount int
	Qos           Qos
	Exchanges     []*exchange
	Queue         *queue
	QueueDisable  bool
	Confirm       confirm
	Consumer      func(c *Client, msg amqp.Delivery)
	ConsumerTag   string
	// if ConsumeInOrder set to true,consumer will process message in order,not use goroutine
	ConsumeInOrder bool
	Recovery       recovery

	Amqp amqp.Config
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig() *Config

type DefiniteMessage

type DefiniteMessage struct {
	Id       string `json:"id"`
	From     string `json:"from"`
	Exchange string `json:"exchange"`
}

type ExchangeOption

type ExchangeOption func(ex *exchange)

func ExchangeArgs

func ExchangeArgs(args amqp.Table) ExchangeOption

func ExchangeAutoDelete

func ExchangeAutoDelete() ExchangeOption

func ExchangeDurable

func ExchangeDurable() ExchangeOption

func ExchangeInternal

func ExchangeInternal() ExchangeOption

func ExchangeNoWait

func ExchangeNoWait() ExchangeOption

type Option

type Option func(cfg *Config)

func AmqpConfig

func AmqpConfig(config amqp.Config) Option

func Auth

func Auth(username, password string) Option

Auth set rabbitmq login username and password

func Confirm

func Confirm(chSize int, timeout time.Duration, noWait bool) Option

Confirm Set channel confirm

func ConsumeInOrder

func ConsumeInOrder() Option

ConsumeInOrder consume message in order

func Consumer

func Consumer(consumer func(c *Client, msg amqp.Delivery)) Option

Consumer set queue consumer

func ConsumerTag

func ConsumerTag(consumerTag string) Option

ConsumerTag set queue consumer tag

func Debug

func Debug() Option

Debug display debug log

func DefaultConsumer

func DefaultConsumer() Option

DefaultConsumer add default consumer to queue

func Exchange

func Exchange(name string, kind kind, opts ...ExchangeOption) Option

Exchange add exchange,support multiple

func Heartbeat

func Heartbeat(ht time.Duration) Option

func PrefetchCount

func PrefetchCount(count int) Option

PrefetchCount deprecated

func QosGlobal added in v1.0.4

func QosGlobal(global bool) Option

func QosPrefetchCount added in v1.0.4

func QosPrefetchCount(count int) Option

func QosPrefetchSize added in v1.0.4

func QosPrefetchSize(size int) Option

func Queue

func Queue(name, routeKey string, opts ...QueueOption) Option

Queue set queue properties

func QueueDisable

func QueueDisable() Option

QueueDisable do not create queue

func Recovery

func Recovery(retryInterval time.Duration) Option

func Tls

func Tls(tls *tls.Config) Option

func TlsCert

func TlsCert(caFile, certFile, keyFile, keyFilePassword string) Option

func VHost

func VHost(vh string) Option

type PublishOption

type PublishOption func(pub *amqp.Publishing)

func PublishAppId

func PublishAppId(appId string) PublishOption

PublishAppId creating application id

func PublishBody

func PublishBody(body []byte) PublishOption

PublishBody The application specific payload of the message

func PublishContentEncoding

func PublishContentEncoding(contentEncoding string) PublishOption

PublishContentEncoding MIME content encoding

func PublishContentType

func PublishContentType(contentType string) PublishOption

PublishContentType MIME content type

func PublishCorrelationId

func PublishCorrelationId(correlationId string) PublishOption

PublishCorrelationId correlation identifier

func PublishDeliveryMode

func PublishDeliveryMode(deliveryMode uint8) PublishOption

PublishDeliveryMode Transient (0 or 1) or Persistent (2)

func PublishExpiration

func PublishExpiration(expire string) PublishOption

PublishExpiration Expiration // message expiration spec

func PublishHeaders

func PublishHeaders(keyPairs ...interface{}) PublishOption

PublishHeaders Application or exchange specific fields, the headers exchange will inspect this field.

func PublishMessageId

func PublishMessageId(messageId string) PublishOption

PublishMessageId message identifier

func PublishPriority

func PublishPriority(priority uint8) PublishOption

PublishPriority 0 to 9

func PublishReplyTo

func PublishReplyTo(replyTo string) PublishOption

PublishReplyTo address to to reply to (ex: RPC)

func PublishTimestamp

func PublishTimestamp(timestamp time.Time) PublishOption

PublishTimestamp message timestamp

func PublishType

func PublishType(typ string) PublishOption

PublishType message type name

func PublishUserId

func PublishUserId(userId string) PublishOption

PublishUserId creating user id - ex: "guest"

type Qos added in v1.0.4

type Qos struct {
	PrefetchCount int
	PrefetchSize  int
	Global        bool
}

type QueueOption

type QueueOption func(queue *queue)

func QueueArgs

func QueueArgs(args amqp.Table) QueueOption

func QueueAutoDelete

func QueueAutoDelete() QueueOption

func QueueConsumeAutoAck

func QueueConsumeAutoAck() QueueOption

func QueueDurable

func QueueDurable() QueueOption

func QueueExclusive

func QueueExclusive() QueueOption

func QueueNoWait

func QueueNoWait() QueueOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL