delayedq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2021 License: MIT Imports: 4 Imported by: 0

README

delayed-queue-go-sdk

A delayed queue base rabbitmq, use the rabbitmq_delayed_message_exchange's plugin.

We also handle a disconnection and reconnection of rabbitmq trying every 5 sec if a new connection is available.

Installation

Start up rabbitmq in local.

docker compose up -d

Usage

Publish Message

You can publish the message by calling publish()

	publisher, err := queue.NewPublisher(
		"amqp://admin:password@localhost:5672/",
		queue.WithPublisherOptionsExchange("test"),
		queue.WithPublisherOptionsQueue("test"),
		queue.WithPublisherOptionsKey("test"),
	)
	if err != nil {
		fmt.Printf("%+v", err)
	}

	defer publisher.Disconnect()

	err = publisher.Publish([]byte("Hello"), 3*time.Second)
	if err != nil {
		fmt.Printf("%+v", err)
	}

Subscribe Message

comsumer, err := queue.NewComsumer(
		"amqp://admin:password@localhost:5672/",
		queue.WithComsumerOptionsExchange("test"),
		queue.WithComsumerOptionsQueue("test"),
		queue.WithComsumerOptionsKey("test"),
	)
	if err != nil {
		fmt.Printf("%+v", err)
	}

	defer comsumer.Disconnect()

	deliveries := comsumer.Consume()

	for d := range deliveries {
		log.Printf("Received a message: %s", d.Body)
	}

Questions & Suggestions

Please open an issue here.

License

MIT

Documentation

Index

Constants

View Source
const (
	DEFAULT_PREFETCH_COUNT = 1
	DELAYED_EXCHANGE_TYPE  = "x-delayed-message"
	DELAYED_TYPE           = "direct"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Comsumer

type Comsumer struct {
	DelayedQueue
}

func NewComsumer

func NewComsumer(url string, options ...ComsumerOptions) (*Comsumer, error)

New Publisher returns a new publisher with an open channel.

func (*Comsumer) Consume

func (c *Comsumer) Consume() <-chan amqp.Delivery

consume from the queue.

type ComsumerOptions

type ComsumerOptions func(*Comsumer)

func WithComsumerOptionsExchange

func WithComsumerOptionsExchange(exchange string) ComsumerOptions

WithComsumerOptionsPriority returns a function that sets the exchange.

func WithComsumerOptionsKey

func WithComsumerOptionsKey(key string) ComsumerOptions

WithComsumerOptionsPriority returns a function that sets the binding key.

func WithComsumerOptionsPrefetchCount

func WithComsumerOptionsPrefetchCount(prefetchCount int) ComsumerOptions

func WithComsumerOptionsQueue

func WithComsumerOptionsQueue(queue string) ComsumerOptions

WithComsumerOptionsPriority returns a function that sets the queue.

type DelayedQueue

type DelayedQueue struct {
	// contains filtered or unexported fields
}

func (*DelayedQueue) Disconnect

func (dq *DelayedQueue) Disconnect()

Disconnect the channel and connection

type Publisher

type Publisher struct {
	DelayedQueue
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(url string, options ...PublisherOptions) (*Publisher, error)

New Publisher returns a new publisher with an open channel.

func (*Publisher) Publish

func (p *Publisher) Publish(body []byte, delay time.Duration) (err error)

publish the provided data over the connection

type PublisherOptions

type PublisherOptions func(*Publisher)

func WithPublisherOptionsExchange

func WithPublisherOptionsExchange(exchange string) PublisherOptions

WithPublisherOptionsPriority returns a function that sets the exchange.

func WithPublisherOptionsKey

func WithPublisherOptionsKey(key string) PublisherOptions

WithPublisherOptionsPriority returns a function that sets the binding key.

func WithPublisherOptionsPriority

func WithPublisherOptionsPriority(priority int) PublisherOptions

WithPublisherOptionsPriority returns a function that sets the priority.

func WithPublisherOptionsQueue

func WithPublisherOptionsQueue(queue string) PublisherOptions

WithPublisherOptionsPriority returns a function that sets the queue.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL