inmq

package module
v0.0.0-...-c790eee Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2022 License: GPL-3.0 Imports: 9 Imported by: 0

README

A Simple But Practical RabbitMq Client

A simple AMQP 0.9.1 client in Golang

Demo

import mq "github.com/qingxudarcy/in-mq"
Connect Rabbitmq
dsn := mq.NewDsn(host, port, vhost, user, password)
conn := mq.NewConnetion(dsn, mq.NewExchange(exchangeName, exchangeType))
Open Up Consumers

func consumerHandler(s []byte) {
	fmt.Println(string(s))
}

queue := mq.NewQueue(exchangeName, queueName, []string{routingKey})
consumer := mq.NewConsumer(conn, queue, workers)
consumer.Start(consumerHandler)
Push Message
producer := mq.NewProducer(routingKey, []byte("in-mq"), conn)
producer.Push()   // Producers and consumers are advised not to share the same connection object.

Installation

go install github.com/qingxudarcy/in-mq

Features

V2 already supports:

  • queue support the configuration of multiple routingkey
  • prefetch count

v3……

Support

For support, email darcychang@88.com or join our Slack channel.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewDsn

func NewDsn(host, port, vhost, user, pass string) string

func NewExchange

func NewExchange(name, etype string, opts ...*ecOption) *exchange

func NewQueue

func NewQueue(exchange, name string, routingKey []string, opts ...queOption) *queue

func WithCsAutoAck

func WithCsAutoAck(autoAck bool) *csOption

func WithCsExclusive

func WithCsExclusive(exclusive bool) *csOption

func WithCsNoLocal

func WithCsNoLocal(noLocal bool) *csOption

func WithCsNoWait

func WithCsNoWait(noWait bool) *csOption

func WithCsQueueType

func WithCsQueueType(queueType string) *csOption

func WithEcAutoDelete

func WithEcAutoDelete(autoDelete bool) *ecOption

func WithEcDurable

func WithEcDurable(durable bool) *ecOption

func WithEcInternal

func WithEcInternal(internal bool) *ecOption

func WithEcNoWait

func WithEcNoWait(noWait bool) *ecOption

func WithQueAutoDeleteOption

func WithQueAutoDeleteOption(autoDelete bool) *queOption

func WithQueDurable

func WithQueDurable(durable bool) *queOption

func WithQueExclusive

func WithQueExclusive(exclusive bool) *queOption

func WithQueNowait

func WithQueNowait(nowait bool) *queOption

func WithRcHeartbeat

func WithRcHeartbeat(heartbeat time.Duration) *connOption

func WithRcLogger

func WithRcLogger(logger Logx) *connOption

func WithRcQos

func WithRcQos(prefetchCount int) *connOption

func WithRcTimeOut

func WithRcTimeOut(timeout time.Duration) *connOption

Types

type ConnConfig

type ConnConfig struct {
	PrefetchCount int
	Heartbeat     time.Duration // default: 20s
	TimeOut       time.Duration // default: 30s
	Logger        Logx
}

func ConnCfgParse

func ConnCfgParse(rcOpts ...*connOption) *ConnConfig

type Connection

type Connection struct {
	DSN string
	// contains filtered or unexported fields
}

func NewConnetion

func NewConnetion(dsn string, exchange *exchange, connOpts ...*connOption) *Connection

func (*Connection) Close

func (connection *Connection) Close() error

Close will cleanly shutdown the channel and connection.

type Consumer

type Consumer struct {
	Workers int
	Queue   *queue
	Option  *consumerOption
	Conn    *Connection
}

func NewConsumer

func NewConsumer(conn *Connection, que *queue, workers int, opts ...*csOption) *Consumer

func (*Consumer) Start

func (consumer *Consumer) Start(handler ConsumerHandler)

StartConsumer will open specified consumers When queue is empty, the queue uses default configuration

type ConsumerHandler

type ConsumerHandler func(d []byte)

type Logx

type Logx interface {
	Printf(format string, v ...interface{})
	Println(v ...interface{})
	Panicf(format string, v ...interface{})
	Panic(v ...interface{})
}

type Producer

type Producer struct {
	RoutingKey string
	Body       []byte
	Conn       *Connection
}

func NewProducer

func NewProducer(routingKey string, body []byte, conn *Connection) *Producer

func (*Producer) Push

func (producer *Producer) Push() error

Push will push data onto the queue, and wait for a confirm. If no confirms are received until within the resendTimeout, it continuously re-sends messages until a confirm is received. This will block until the server sends a confirm. Errors are only returned if the push action itself fails, see UnsafePush.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL