amqp

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

README

fog-amqp

standard-readme compliant standard-readme compliant

中文 | English

FogCloud AMQP SDK。 基于开源库amqp091-go开发,增加了重连逻辑和更易用的API。

安装

go get github.com/fogcloud-io/fog-amqp

快速上手

消费者
var (
	AMQPHost        = "localhost"
	AMQPPort        = "5672"
	AMQPTLS         = false
	FogAccessKey    = "xgHc40bf04fb020c"
	FogAccessSecret = "c3bad348bb34390558f7f1aacce17877"
	clientID        = "fog-consumer"
)

func main() {
	cli, err := amqp.NewFogAMQPClient(AMQPHost, AMQPPort, FogAccessKey, FogAccessSecret, clientID, AMQPTLS)
	if err != nil {
		log.Fatal(err)
	}

	err = cli.ConsumeWithHandler(
		100,
		FogAccessKey,
		func(b amqp.Delivery) { log.Printf("amqp receive: %s", b.Body) },
		amqp.WithConsumerOptionsConsumerTag("fog-consumer-1"),
		amqp.WithConsumerOptionsAutoAck(true),
		amqp.WithConsumerOptionsNoWait(true),
	)
	if err != nil {
		log.Print(err)
	}
}
生产者
var (
	AMQPHost        = "localhost"
	AMQPPort        = "5672"
	AMQPTLS         = false
	FogAccessKey    = "xgHc40bf04fb020c"
	FogAccessSecret = "c3bad348bb34390558f7f1aacce17877"
	clientID        = "fog-producer"
)

func main() {
	cli, err := amqp.NewFogAMQPClient(AMQPHost, AMQPPort, FogAccessKey, FogAccessSecret, clientID, AMQPTLS)
	if err != nil {
		log.Fatal(err)
	}

	err = cli.PublishUnsafeWithContext(
		context.Background(),
		"test",
		"test",
		[]byte("hello,world"),
		amqp.WithPublishOptionsContentType("text/plaintext"),
		amqp.WithPublishOptionsDeliveryMode(2),
	)
	if err != nil {
		log.Print(err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAmqpShutdown           = errors.New("amqp shutdown")
	ErrAmqpChannelInitTimeout = errors.New("amqp channel init timeout")
	ErrAmqpConnTimeout        = errors.New("amqp connect timeout")
	ErrAmqpConnNil            = errors.New("amqp conn nil")
	ErrAmqpReconn             = errors.New("amqp reconnecting")
)

Functions

This section is empty.

Types

type AMQPMsgHandler

type AMQPMsgHandler = func(amqp.Delivery)

type ClientOption

type ClientOption func(*RabbitmqClient)

func WithClientOptionsChInitTimeout

func WithClientOptionsChInitTimeout(t time.Duration) ClientOption

func WithClientOptionsConnTimeout

func WithClientOptionsConnTimeout(t time.Duration) ClientOption

func WithClientOptionsLogger

func WithClientOptionsLogger(logger Logger) ClientOption

type ConsumerOption

type ConsumerOption func(*consumerParams)

func WithConsumerOptionsArgs

func WithConsumerOptionsArgs(args Table) ConsumerOption

func WithConsumerOptionsAutoAck

func WithConsumerOptionsAutoAck(autoAck bool) ConsumerOption

When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details.

func WithConsumerOptionsConsumerTag

func WithConsumerOptionsConsumerTag(consumerTag string) ConsumerOption

func WithConsumerOptionsExclusive

func WithConsumerOptionsExclusive(exclusive bool) ConsumerOption

When exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.

func WithConsumerOptionsNoWait

func WithConsumerOptionsNoWait(noWait bool) ConsumerOption

When noWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.

type Delivery

type Delivery = amqp.Delivery

type ExchangeDeclareOption

type ExchangeDeclareOption func(*exchangeParams)

func WithExchangeDeclareOptionsArgs

func WithExchangeDeclareOptionsArgs(args Table) ExchangeDeclareOption

func WithExchangeDeclareOptionsAutoDelete

func WithExchangeDeclareOptionsAutoDelete(autoDelete bool) ExchangeDeclareOption

func WithExchangeDeclareOptionsDurable

func WithExchangeDeclareOptionsDurable(durable bool) ExchangeDeclareOption

func WithExchangeDeclareOptionsInternal

func WithExchangeDeclareOptionsInternal(internal bool) ExchangeDeclareOption

func WithExchangeDeclareOptionsNoWait

func WithExchangeDeclareOptionsNoWait(noWait bool) ExchangeDeclareOption

type ExchangeDeleteOption

type ExchangeDeleteOption func(*exchangeDeleteParams)

func WithExchangeDeleteOptionsIfUnused

func WithExchangeDeleteOptionsIfUnused(ifUnused bool) ExchangeDeleteOption

func WithExchangeDeleteOptionsNoWait

func WithExchangeDeleteOptionsNoWait(noWait bool) ExchangeDeleteOption

type Logger

type Logger interface {
	Debug(args ...interface{})
	Debugf(template string, args ...interface{})
	Info(args ...interface{})
	Infof(template string, args ...interface{})
	Warn(args ...interface{})
	Warnf(template string, args ...interface{})
	Fatal(args ...interface{})
	Fatalf(template string, args ...interface{})
}

type PublishOption

type PublishOption func(*publishParams)

func WithPublishOptionsAppId

func WithPublishOptionsAppId(appId string) PublishOption

func WithPublishOptionsContentType

func WithPublishOptionsContentType(contentType string) PublishOption

func WithPublishOptionsDeliveryMode

func WithPublishOptionsDeliveryMode(deliveryMode uint8) PublishOption

func WithPublishOptionsHeader

func WithPublishOptionsHeader(headers Table) PublishOption

func WithPublishOptionsImmediate

func WithPublishOptionsImmediate(immediate bool) PublishOption

func WithPublishOptionsMandatory

func WithPublishOptionsMandatory(mandatory bool) PublishOption

func WithPublishOptionsMsgExpiration

func WithPublishOptionsMsgExpiration(expiration string) PublishOption

func WithPublishOptionsMsgId

func WithPublishOptionsMsgId(msgId string) PublishOption

func WithPublishOptionsMsgType

func WithPublishOptionsMsgType(msgType string) PublishOption

func WithPublishOptionsPriority

func WithPublishOptionsPriority(priority uint8) PublishOption

func WithPublishOptionsUserId

func WithPublishOptionsUserId(userId string) PublishOption

type Publishing

type Publishing = amqp.Publishing

type QueueBindOption

type QueueBindOption func(*queueBindParams)

func WithQueueBindOptionsArgs

func WithQueueBindOptionsArgs(args Table) QueueBindOption

func WithQueueBindOptionsNoWait

func WithQueueBindOptionsNoWait(noWait bool) QueueBindOption

type QueueDeclareOption

type QueueDeclareOption func(*queueParams)

func WithQueueDeclareOptionsArgs

func WithQueueDeclareOptionsArgs(args Table) QueueDeclareOption

func WithQueueDeclareOptionsAutoDelete

func WithQueueDeclareOptionsAutoDelete(autoDelete bool) QueueDeclareOption

func WithQueueDeclareOptionsDurable

func WithQueueDeclareOptionsDurable(durable bool) QueueDeclareOption

func WithQueueDeclareOptionsInternal

func WithQueueDeclareOptionsInternal(exclusive bool) QueueDeclareOption

func WithQueueDeclareOptionsNoWait

func WithQueueDeclareOptionsNoWait(noWait bool) QueueDeclareOption

type QueueDeleteOption

type QueueDeleteOption func(*queueDeleteParams)

func WithQueueDeleteOptionsIfUnused

func WithQueueDeleteOptionsIfUnused(ifUnused bool) QueueDeleteOption

func WithQueueDeleteOptionsNoWait

func WithQueueDeleteOptionsNoWait(noWait bool) QueueDeleteOption

type RabbitmqClient

type RabbitmqClient struct {
	// contains filtered or unexported fields
}

func NewAMQPClient

func NewAMQPClient(endpoint string, opts ...ClientOption) (*RabbitmqClient, error)

func NewFogAMQPClient

func NewFogAMQPClient(host, port, accessKey, accessSecret, clientID string, tls bool, opts ...ClientOption) (*RabbitmqClient, error)

func (*RabbitmqClient) BindQueue

func (rc *RabbitmqClient) BindQueue(queueName, key, exchangeName string, opts ...QueueBindOption) error

func (*RabbitmqClient) Close

func (rc *RabbitmqClient) Close() error

func (*RabbitmqClient) ConsumeWithHandler

func (rc *RabbitmqClient) ConsumeWithHandler(prefetchCnt int, queue string, handler AMQPMsgHandler, opts ...ConsumerOption) error

func (*RabbitmqClient) DeclareExchange

func (rc *RabbitmqClient) DeclareExchange(exchangeName, kind string, opts ...ExchangeDeclareOption) error

func (*RabbitmqClient) DeclareQueue

func (rc *RabbitmqClient) DeclareQueue(queueName string, opts ...QueueDeclareOption) error

func (*RabbitmqClient) DeleteExchange

func (rc *RabbitmqClient) DeleteExchange(exchangeName string, opts ...ExchangeDeleteOption) error

func (*RabbitmqClient) DeleteQueue

func (rc *RabbitmqClient) DeleteQueue(queueName string, opts ...QueueDeleteOption) error

func (*RabbitmqClient) PublishUnsafeWithContext

func (rc *RabbitmqClient) PublishUnsafeWithContext(ctx context.Context, exchangeName, key string, body []byte, opts ...PublishOption) error

type Table

type Table = amqp.Table

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL