amqp

package module
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2018 License: MIT Imports: 14 Imported by: 0

README

AMQP

Golang AMQP wrapper is a library that wraps amqp.

This lib is rethinking of observer lib.

Features
  • Auto-reconnect to brocker and auto redeclare exchanges and queues.
  • Control channels lifecycle: open new on high load and close unused.
  • Declarative style.
    client, err := amqp.NewClient(
    	conn.DefaultConnector("amqp://localhost:5672",
    		conn.WithLogger(lg), // We want to know connection status and errors.
    	),
    	amqp.TemporaryExchange("example-exchange"), // Declare exchanges and queues.
    	amqp.PersistentExchanges(
    		"exchange-one",
    		"exchange-two",
    		"exchange-three",
    	),
    	amqp.PersistentQueues(
    		"queue for one",
    		"queue for two",
    		"second queue for two",
    	),
    	amqp.Exchange{
    		Name: "declare directly",
    	},
    	amqp.Queue{
    		Name: "", // left empty, broker generates name for you.
    	},
    	amqp.Binding{ // do not forget to bind queue to exchange.
    		Exchange: "exchange-one",
    		Queue:    "queue for one",
    	},
    	amqp.WithLogger{Logger: lg}, // We want to know AMQP protocol errors.
    )
    
  • Encoding and decoding hiden inside.
    • Use Codec interface for your format.
    • XML, JSON and Protocol Buffers (protobuf) registered yet.
  • Tons of options.
    • Min and max opened channels per publisher/subscriber.
    • Limit receiving messages.
    • Any amount of data formats.
    • Fill all message fields as you wish.
    • And more others...
  • Everything from AMQP may be used directly.

Contributing

We are waiting for your issue or pull request.

Example

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"

	"github.com/devimteam/amqp"
	"github.com/devimteam/amqp/conn"
	"github.com/devimteam/amqp/logger"
)

// Data, that we want to deal with.
type Comment struct {
	Id      string
	Message string
}

func main() {
	ch := make(chan []interface{})
	// Listens errors and writes them to stdout.
	go func() {
		for l := range ch {
			fmt.Println(l...)
		}
	}()
	lg := logger.NewChanLogger(ch) // Logger interface identical to go-kit Logger.
	client, err := amqp.NewClient(
		conn.DefaultConnector("amqp://localhost:5672",
			conn.WithLogger(lg), // We want to know connection status and errors.
		),
		amqp.TemporaryExchange("example-exchange"), // Declare exchanges and queues.
		amqp.WithLogger{Logger:lg}, // We want to know AMQP protocol errors.
	)
	if err != nil {
		panic(err)
	}
	subscr := client.Subscriber()
	// context used here as closing mechanism.
	eventChan := subscr.SubscribeToExchange(context.Background(),"example-exchange", Comment{}, amqp.Consumer{})
	go func() {
		for event := range eventChan {
			fmt.Println(event.Data) // do something with events
		}
	}()
	pubsr:=client.Publisher()
	for i := 0; i < 10; i++ {
		// Prepare your data before publishing
		comment := Comment{
			Id:      strconv.Itoa(i),
			Message: "message " + strconv.Itoa(i),
		}
		// Context used here for passing data to `before` functions.
		err := pubsr.Publish(context.Background(), "example-exchange", comment, amqp.Publish{})
		if err != nil {
			panic(err)
		}
		time.Sleep(time.Millisecond * 500)
	}
	time.Sleep(time.Second * 5) // wait for delivering all messages.
}

Documentation

Overview

Easy to use library for AMQP.

It's built on top of https://github.com/streadway/amqp

Example (Common)

This example shows common use-case of library.

type Comment struct {
	Id      string
	Message string
}

ch := make(chan []interface{})
// Listens errors and writes them to stdout.
go func() {
	for l := range ch {
		fmt.Println(l...)
	}
}()
lg := logger.NewChanLogger(ch)

client, err := NewClient(conn.DefaultConnector(
	"amqp://localhost:5672",
	conn.WithLogger(lg), // We want to know connection status and errors.
),
	TemporaryExchange("example-exchange"),
)
if err != nil {
	panic(err)
}
subscriber := client.Subscriber()
eventChan := subscriber.SubscribeToExchange(context.Background(), "example-exchange", Comment{}, Consumer{})
go func() {
	for event := range eventChan {
		fmt.Println(event.Data) // do something with events
	}
}()
publisher := client.Publisher()
for i := 0; i < 10; i++ {
	// Prepare your data before publishing
	comment := Comment{
		Id:      strconv.Itoa(i),
		Message: "message " + strconv.Itoa(i),
	}
	err := publisher.Publish(context.Background(), "example-exchange", comment, Publish{})
	if err != nil {
		panic(err)
	}
	time.Sleep(time.Millisecond * 500)
}
time.Sleep(time.Second * 5) // wait for delivering all events
Output:

Index

Examples

Constants

View Source
const (
	MaxMessagePriority = 9
	MinMessagePriority = 0
)

Variables

View Source
var (
	// This error occurs when message was delivered, but it has too low or too high priority.
	NotAllowedPriority = errors.New("not allowed priority")
	// DeliveryChannelWasClosedError is an information error, that logs to info logger when delivery channel was closed.
	DeliveryChannelWasClosedError = errors.New("delivery channel was closed")
	// Durable or non-auto-delete queues with empty names will survive when all consumers have finished using it, but no one can connect to it back.
	QueueDeclareWarning = errors.New("declaring durable or non-auto-delete queue with empty name")
)
View Source
var CodecNotFound = errors.New("codec not found")

Functions

func CommonMessageIdBuilder

func CommonMessageIdBuilder() string

CommonMessageIdBuilder builds new UUID as message Id.

func CommonTyper

func CommonTyper(v interface{}) string

CommonTyper prints go-style type of value.

func WrapError

func WrapError(errs ...interface{}) error

Types

type Binding

type Binding struct {
	Exchange string
	Queue    string
	Key      string
	NoWait   bool
	Args     amqp.Table
}

Binding is used for bind exchange and queue.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is a wrapper of *amqp.Channel

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(connector conn.Connector, decls ...Declaration) (cl Client, err error)

func (*Client) Publisher

func (c *Client) Publisher(opts ...PublisherOption) *Publisher

func (*Client) Stop

func (c *Client) Stop()

func (*Client) Subscriber

func (c *Client) Subscriber(opts ...SubscriberOption) *Subscriber

type Consumer

type Consumer struct {
	Consumer   string
	AutoAck    bool
	Exclusive  bool
	NoLocal    bool
	NoWait     bool
	Args       amqp.Table
	LimitCount int
	LimitSize  int
}

type ContentTyper

type ContentTyper interface {
	ContentType() string
}

ContentTyper is an interface, that is used for choose codec.

type Declaration

type Declaration interface {
	// contains filtered or unexported methods
}

type DeliveryBefore

type DeliveryBefore func(context.Context, *amqp.Delivery) context.Context

Function, that changes message before delivering.

type ErrorBefore

type ErrorBefore func(amqp.Delivery, error) error

Function, that changes error, which caused on incorrect handling. Common use-case: debugging.

type Event

type Event struct {
	// Converted and ready to use pointer to entity of reply type.
	Data interface{}
	// Event's context.
	// Contains context.Background by default and setups with DeliveryBefore option.
	Context context.Context
	amqp.Delivery
}

Event represents amqp.Delivery with attached context and data

func (Event) Done

func (e Event) Done()

Done is a shortcut for Ack(false)

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

func LongExchange

func LongExchange(name string) Exchange

LongExchange is a common way to declare exchange with given name.

func TemporaryExchange

func TemporaryExchange(name string) Exchange

TemporaryExchange is a common way to create temporary exchange with given name.

type Exchanges added in v1.1.0

type Exchanges []Exchange

func PersistentExchanges added in v1.1.0

func PersistentExchanges(names ...string) (e Exchanges)

PersistentExchanges allow you to declare a bunch of exchanges with given names.

type ObserverOption

type ObserverOption func(opts *observerOpts)

func Lifetime

func Lifetime(dur time.Duration) ObserverOption

Lifetime sets duration between observer checks idle channels. Somewhere between dur and 2*dur observer will close channels, which do not used at least `dur` time units. Default value is 15 seconds.

func LimitCount

func LimitCount(count int) ObserverOption

LimitCount limits messages for channel, by calling Qos after each reconnection. Pass zero for unlimited messages.

func LimitSize

func LimitSize(size int) ObserverOption

LimitSize limits messages size in bytes for channel, by calling Qos after each reconnection. Pass zero for unlimited messages.

func Max

func Max(max int) ObserverOption

Max sets maximum amount of channels, that can be opened at the same time.

func Min

func Min(min int) ObserverOption

Min sets minimum amount of channels, that should be opened at the same time. Min does not open new channels, but forces observer not to close existing ones.

type Publish

type Publish struct {
	Key       string
	Mandatory bool
	Immediate bool
	Priority  uint8
}

Publish is used for AMQP Publish parameters.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func (Publisher) Publish

func (p Publisher) Publish(ctx context.Context, exchangeName string, obj interface{}, pub Publish) error

func (Publisher) PublishChannel

func (p Publisher) PublishChannel(ctx context.Context, exchangeName string, pub Publish) chan<- interface{}

type PublisherOption

type PublisherOption func(*Publisher)

func PublisherBefore

func PublisherBefore(before ...PublishingBefore) PublisherOption

PublishBefore adds functions, that should be called before publishing message to broker.

func PublisherContentType added in v1.1.2

func PublisherContentType(t string) PublisherOption

func PublisherHandlersAmount

func PublisherHandlersAmount(n int) PublisherOption

HandlersAmount sets the amount of handle processes, which receive deliveries from one channel. For n > 1 client does not guarantee the order of events.

func PublisherLogger

func PublisherLogger(lg logger.Logger) PublisherOption

WarnLogger option sets logger, which logs warning messages.

func PublisherWaitConnection

func PublisherWaitConnection(should bool, timeout time.Duration) PublisherOption

WaitConnection tells client to wait connection before Subscription or Pub executing.

func PublisherWithObserverOptions

func PublisherWithObserverOptions(opts ...ObserverOption) PublisherOption

type PublishingBefore

type PublishingBefore func(context.Context, *amqp.Publishing)

Function, that changes message before publishing.

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

func LongQueue

func LongQueue(name string) Queue

LongQueue is a common way to declare queue with given name.

type Queues added in v1.1.0

type Queues []Queue

func PersistentQueues added in v1.1.0

func PersistentQueues(names ...string) (e Queues)

PersistentQueues allow you to declare a bunch of queues with given names.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func (Subscriber) Subscribe

func (s Subscriber) Subscribe(ctx context.Context, exchangeName, queueName string, dataType interface{}, cfg Consumer) <-chan Event

func (Subscriber) SubscribeToExchange

func (s Subscriber) SubscribeToExchange(ctx context.Context, exchangeName string, dataType interface{}, cfg Consumer) <-chan Event

func (Subscriber) SubscribeToQueue

func (s Subscriber) SubscribeToQueue(ctx context.Context, queueName string, dataType interface{}, cfg Consumer) <-chan Event

type SubscriberOption

type SubscriberOption func(*Subscriber)

func SubHandlersAmount

func SubHandlersAmount(n int) SubscriberOption

HandlersAmount sets the amount of handle processes, which receive deliveries from one channel. For n > 1 client does not guarantee the order of events.

func SubProcessAllDeliveries

func SubProcessAllDeliveries(v bool) SubscriberOption

Add this option with true value that allows you to handle all deliveries from current channel, even if the Done was sent.

func SubSetDefaultContentType

func SubSetDefaultContentType(t string) SubscriberOption

SetDefaultContentType sets content type which codec should be used if ContentType field of message is empty. JSON is used by default.

func SubWithObserverOptions

func SubWithObserverOptions(opts ...ObserverOption) SubscriberOption

func SubscriberAllowedPriority

func SubscriberAllowedPriority(from, to uint8) SubscriberOption

AllowedPriority rejects messages, which not in range.

func SubscriberBufferSize

func SubscriberBufferSize(a int) SubscriberOption

EventChanBuffer sets the buffer of event channel for Subscription method.

func SubscriberDeliverBefore

func SubscriberDeliverBefore(before ...DeliveryBefore) SubscriberOption

DeliverBefore adds functions, that should be called before sending Event to channel.

func SubscriberLogger

func SubscriberLogger(lg logger.Logger) SubscriberOption

SubscriberLogger option sets logger, which logs error messages.

func SubscriberWaitConnection

func SubscriberWaitConnection(should bool, timeout time.Duration) SubscriberOption

WaitConnection tells client to wait connection before Subscription or Pub executing.

type SyncedStringSlice

type SyncedStringSlice struct {
	// contains filtered or unexported fields
}

func (*SyncedStringSlice) Append

func (s *SyncedStringSlice) Append(strs ...string)

func (*SyncedStringSlice) Drop

func (s *SyncedStringSlice) Drop()

func (*SyncedStringSlice) Find

func (s *SyncedStringSlice) Find(str string) int

func (*SyncedStringSlice) Get

func (s *SyncedStringSlice) Get() []string

type WithLogger added in v1.1.1

type WithLogger struct {
	logger.Logger
}

WithLogger set logger for client, which will report declaration problems and so on.

Directories

Path Synopsis
Package conn adds to https://github.com/streadway/amqp Connection ability to reconnect and some optional parameters.
Package conn adds to https://github.com/streadway/amqp Connection ability to reconnect and some optional parameters.
Represents logger interface and common loggers.
Represents logger interface and common loggers.
tracing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL