ergoq

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2015 License: MIT Imports: 9 Imported by: 0

README

README

Build Status

Warning !!

This library is heavily developed and the api can be changed !! I hope that in couple of days the api will stabilize and I will continue to develop other drivers.

What is this repository for?

Ergoq package is small and lightweight message queue abstraction. Currently redis implementation and amqp is done. In the future more implementations will be done

Usage

All snippets of code assume import of library

	import (
		"github.com/phonkee/ergoq"
	)

Ergoq supports drivers system as seen in sql package. Every driver uses it's own connection(for redis it's redis.Pool). To open ergoq message queue you can use Open function and provide DSN. Every driver can have slightly different implementation but usually you will see

<driverName>://<host>:<port>/<database>?params

Example:

dsn := "redis://localhost:6379/0?max_idle=100&max_active=100&idle_timeout=200"
dsnAmqp := "amqp://guest:guest@localhost:5672/test?auto_ack=true&prefix=queues"

Each driver can support it's params.

Drivers

RedisMessageQueueDriver

connection: &redis.Pool

DSN params:

  • max_idle - default is 10
  • max_active - default is 10
  • idle_timeout - default is 300

Open message queue

You can open message two ways.

a. You provide DSN string to ergoq.Open and let ergoq make connections for you

mq, err := ergoq.Open("redis://localhost:6379/0")
if err != nil {
	panic(err)
}

b. You provide connection to OpenConnection

pool := redis.Pool{
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}
mq, err := ergoq.OpenConnection("redis", &pool, "auto_ack=true")
if err != nil {
	panic(err)
}

API

MessageQueuer interface says it all.

type MessageQueuer interface {
	// Pushes message to queue
	Push(queue string, messages ...[]byte) error

	// Pops message from queue
	Pop(queue string) (QueueMessage, error)

	// Publishes message to queue (all subscribers)
	// Fanout
	Publish(queue string, message []byte) error

	// Subscribes to queue(s)
	Subscribe(quit <-chan struct{}, topics ...string) (chan SubscriberMessage, chan error)
}

Examples:

// Error checking is omitted, but please you make your checks!
mq, _ := ergoq.Open("redis://localhost:6379/0")

// If we want to push to queue (direct) only first who pops this value will
// process it
_ := mq.Push("queue", []byte("message"))

// pop data from queue
// second argument is blocking
// third optional parameter is timeout for blocking
data, _ := mq.Pop("queue")

// If we want to publish message to all subscribers of given queue
// we need to call Publish method

errPub := mq.Publish("user:1", "logged_in")
if errPub != nil {
	panic(errPub)
}

// subscribe to channels can be donw following way.
// You need to provide "quit" channel when subscription will be stopped.
// Subscribe returns 2 channels, result and errors.
quit := make(chan struct{})
results, error := mq.Subscribe(quit, "user:1", "admins")

go func() {
	for {
		select {
			r <- results:
				fmt.Println("result %+v", r)
			e <- errors:
				panic(e)
		}
	}
}()
Contribute

Welcome!

Documentation

Overview

AMQP driver implementation

Index

Constants

View Source
const (
	RELIABLE_URL_PARAM = "reliable"
	AUTO_ACK_URL_PARAM = "auto_ack"
	EXCHANGE_URL_PARAM = "exchange"

	DEF_AMQP_RELIABLE = true
	DEF_AMQP_AUTO_ACK = false
	DEF_AMQP_PREFIX   = "errgoq"
	DEF_EXCHANGE      = "errgo"
)
View Source
const (
	URL_PARAM_NAME_AUTO_ACK = "auto_ack"
	URL_PARAM_PREFIX        = "prefix"

	// default values
	DEFAULT_AUTO_ACK = false
	DEFAULT_PREFIX   = "errgoq"
)
View Source
const (
	// default values for redisMessageQueue
	MAX_IDLE                = 10
	MAX_ACTIVE              = 10
	IDLE_TIMEOUT            = 500
	RETRY_NON_ACKED_TIMEOUT = 600
	AUTO_ACK                = false
	REQUEUE_NON_ACKED_NUM   = 10
)
View Source
const (
	RETRY_QUEUE     = "ergoq-retry:"
	MESSAGE_COUNTER = "ergoq-counter:"
)

Variables

This section is empty.

Functions

func Drivers

func Drivers() []string

returns list of available driver names

func GetBool

func GetBool(values url.Values, key string, def bool) bool

returns int from url.values if not found return default value

func GetInt

func GetInt(values url.Values, key string, def int) int

returns int from url.values if not found return default value

func GetString

func GetString(values url.Values, key string, def string) string

returns int from url.values if not found return default value

func NewAMQPURLSettings

func NewAMQPURLSettings(values url.Values) *amqpURLSettings

func Register

func Register(name string, driver MessageQueueDriverer)

Register makes a message queue driver available by the provided name. If Register is called twice with the same name or if driver is nil, it panics.

Types

type AMQPDSN

type AMQPDSN struct {
	// contains filtered or unexported fields
}

dsn information

func ParseAMQPDSN

func ParseAMQPDSN(dsn string) (*AMQPDSN, error)

Parses dsn

type AMQPDriver

type AMQPDriver struct{}

func (*AMQPDriver) Open

func (a *AMQPDriver) Open(dsn string) (MessageQueuer, error)

Open by DSN

func (*AMQPDriver) OpenConnection

func (a *AMQPDriver) OpenConnection(connection interface{}, settings string) (MessageQueuer, error)

open by already instantiated connection

type MessageQueueDriverer

type MessageQueueDriverer interface {

	// "opens" message queuer
	Open(dsn string) (MessageQueuer, error)

	// Opens queuer by connection
	// 	settings is url encoded params e.g. "auto_ack=true&exchange=exchange"
	OpenConnection(connection interface{}, settings string) (MessageQueuer, error)
}

type MessageQueuer

type MessageQueuer interface {

	// Pushes message to queue
	Push(queue string, message []byte) error

	// Pops message from queue
	Pop(queue string) (QueueMessage, error)

	// Publishes message to queue(fanout for all subscribers)
	Publish(queue string, message []byte) error

	// Subscribes to queue(s)
	Subscribe(quit <-chan struct{}, queues ...string) (chan SubscribeMessage, chan error)
}

func Open

func Open(dsn string) (MessageQueuer, error)

opens message queue by dsn

func OpenConnection

func OpenConnection(driverName string, connection interface{}, settings ...string) (MessageQueuer, error)

opens message queue by name and connection

type QueueMessage

type QueueMessage interface {
	// queue
	Queue() string

	// returns contents of message
	Message() []byte

	// Acknowledges message
	Ack() error

	// returns id of message
	Id() string
}

QueueMessage interface

type SubscribeMessage

type SubscribeMessage interface {
	// returns queue where was message published
	Queue() string

	// returns content of message
	Message() []byte
}

SubscribeMessage interface

func NewSubscriberMessage

func NewSubscriberMessage(queue string, message []byte) SubscribeMessage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL