anyq

package module
v0.0.0-...-48a82cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2016 License: MIT Imports: 12 Imported by: 9

README

AnyQ

GoDoc

This is queue library wrapper for widely popular queues. AnyQ provide one way to handle various queues.

Supporting Queues

Basic usage

Go get:

$ go get -u github.com/jaehue/anyq

Import the package:

import (
	"github.com/jaehue/anyq"
)

Create new queue:

q, _ := anyq.New("nsq", "127.0.0.1:4150")

Create consumer:

c, _ := q.NewConsumer(anyq.NsqConsumerArgs{Topic: "test", Channel: "anyq"})

Create producer:

p, _ := q.NewProducer(anyq.NsqProducerArgs{Topic: "test"})

Consume:

recvCh := make(chan *anyq.Message, 256)
c.BindRecvChan(recvCh)
for m := range recvCh {
	fmt.Println("[receive]", string(m.Body))
}

Produce:

sendCh := make(chan []byte, 1)
p.BindSendChan(sendCh)
sendCh <- []byte("test message")

Close:

q.Close()

Advanced usage

Optional setup function

set QoS and Exchange of RabbitMQ

setQos := func(q *anyq.Rabbitmq) {
	if err := q.Qos(100, 0, false); err != nil {
		log.Fatal(err)
	}
}

setExchange := func(q *anyq.Rabbitmq) {
	if err := q.ExchangeDeclare("test-ex", "direct", false, false, false, false, nil); err != nil {
		log.Fatal(err)
	}
	log.Println("declared Exchange")
}

q, err := anyq.New("rabbitmq", "amqp://guest:guest@127.0.0.1:5672/", setQos, setExchange)

set zookeeper urls of Kafka

q, err := anyq.New("kafka", "localhost:9092", func(q *anyq.Kafka) {
	q.Zookeepers = []string{"localhost:2181", "localhost:2182"}
})
Retrieve original conn object
q, err := anyq.New("nats", "nats://127.0.0.1:4222")
if err != nil {
	panic(err)
}

conn, err := q.Conn()
if err != nil {
	b.Error(err)
}
natsConn, ok := conn.(*nats.Conn)
if !ok {
	log.Fatalf("invalid conn type(%T)\n", conn)
}

natsConn.Subscribe("test", func(m *nats.Msg) {
	natsConn.Publish(m.Reply, m.Data)
	log.Println("[receive and reply]", string(m.Data))
})

Test

Prerequisite

You should install and run each queues

Unit test
$ go test github.com/jaehue/anyq
ok  	github.com/jaehue/anyq	1.136s
Benchmark test
$ go test github.com/jaehue/anyq/benchmark -test.bench=. -test.benchmem
testing: warning: no tests to run
PASS
BenchmarkKafkaAsyncProduce	  300000	      4111 ns/op	     700 B/op	      10 allocs/op
BenchmarkKafkaSyncProduce	   20000	    100699 ns/op	    3080 B/op	      58 allocs/op
BenchmarkKafkaConsume	   30000	    151092 ns/op	   27805 B/op	     405 allocs/op
BenchmarkNatsProduce	  500000	      3468 ns/op	     280 B/op	       4 allocs/op
BenchmarkNatsConsume	  200000	     10199 ns/op	    1429 B/op	      21 allocs/op
BenchmarkNatsReply	    5000	    256335 ns/op	    2128 B/op	      88 allocs/op
BenchmarkNsqProduce	  100000	     14261 ns/op	     852 B/op	      17 allocs/op
BenchmarkNsqConsume	     100	  13415530 ns/op	  824936 B/op	   17322 allocs/op
BenchmarkRabbitmqProduce	   30000	     38698 ns/op	    1739 B/op	      53 allocs/op
BenchmarkRabbitmqConsume	       1	2421170045 ns/op	97861152 B/op	 1966673 allocs/op
ok  	github.com/jaehue/anyq/benchmark	23.944s

Examples

Contributing

  1. Fork it ( https://github.com/jaehue/anyq/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

License

MIT (see LICENSE file)

Documentation

Overview

This is queue library wrapper for widely popular queues. AnyQ provide one way to handle various queues.

Supporting Queues

Visit https://github.com/jaehue/anyq

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Register

func Register(name string, queue Queuer)

Register makes a queue available by the provided name. If Register is called twice with the same name or if queue is nil, it panics.

Types

type Consumer

type Consumer interface {
	// Consumer returns original consumer object
	Consumer() (interface{}, error)

	// BindRecvChan bind a channel for receive operations from queue.
	BindRecvChan(messages chan<- *Message) error
	// contains filtered or unexported methods
}

Consumer process messages from Queue.

type Kafka

type Kafka struct {
	Brokers    []string // The comma separated list of brokers in the Kafka cluster
	Zookeepers []string
	// contains filtered or unexported fields
}

func (*Kafka) Close

func (q *Kafka) Close() error

func (*Kafka) Conn

func (q *Kafka) Conn() (interface{}, error)

func (*Kafka) NewConsumer

func (q *Kafka) NewConsumer(v interface{}) (Consumer, error)

func (*Kafka) NewProducer

func (q *Kafka) NewProducer(v interface{}) (Producer, error)

func (*Kafka) SetLogger

func (q *Kafka) SetLogger(l logger, level LogLevel)

func (*Kafka) Setup

func (q *Kafka) Setup(url string) error

type KafkaConsumerArgs

type KafkaConsumerArgs struct {
	Topic      string `validate:"nonzero"`
	Group      string
	Partitions string
	Offset     string `validate:"regexp=^(oldest|newest)$"`
}

type KafkaProducerArgs

type KafkaProducerArgs struct {
	Topic       string `validate:"nonzero"`
	Key         string
	Partitioner string `validate:"regexp=^(hash|manual|random)$"` // The partitioning scheme to usep
	Partition   int32  // The partition to produce to
	Sync        bool
}

type LogLevel

type LogLevel int

LogLevel specifies the severity of a given log message

const (
	LogLevelDebug LogLevel = iota
	LogLevelInfo
	LogLevelWarning
	LogLevelError
)

Log levels

type Message

type Message struct {
	Body   []byte
	Origin interface{}
}

type Nats

type Nats struct {
	// contains filtered or unexported fields
}

func (*Nats) Close

func (q *Nats) Close() error

func (*Nats) Conn

func (q *Nats) Conn() (interface{}, error)

func (*Nats) NewConsumer

func (q *Nats) NewConsumer(v interface{}) (Consumer, error)

func (*Nats) NewProducer

func (q *Nats) NewProducer(v interface{}) (Producer, error)

func (*Nats) SetLogger

func (q *Nats) SetLogger(l logger, level LogLevel)

func (*Nats) Setup

func (q *Nats) Setup(url string) error

type NatsConsumerArgs

type NatsConsumerArgs struct {
	Subject string
}

type NatsProducerArgs

type NatsProducerArgs struct {
	Subject string
}

type Nsq

type Nsq struct {
	// contains filtered or unexported fields
}

func (*Nsq) Close

func (q *Nsq) Close() error

func (*Nsq) Conn

func (q *Nsq) Conn() (interface{}, error)

func (*Nsq) NewConsumer

func (q *Nsq) NewConsumer(v interface{}) (Consumer, error)

func (*Nsq) NewProducer

func (q *Nsq) NewProducer(v interface{}) (Producer, error)

func (*Nsq) SetLogger

func (q *Nsq) SetLogger(l logger, level LogLevel)

func (*Nsq) Setup

func (q *Nsq) Setup(url string) error

type NsqConsumerArgs

type NsqConsumerArgs struct {
	Topic, Channel string
}

type NsqProducerArgs

type NsqProducerArgs struct {
	Topic string
}

type Producer

type Producer interface {
	// Producer returns original producer object
	Producer() (interface{}, error)

	// Producer bind a channel for send operations to queue.
	BindSendChan(messages <-chan []byte) error
	// contains filtered or unexported methods
}

Producer publish messages to Queue.

type Queuer

type Queuer interface {
	// Conn returns original connection object.
	Conn() (interface{}, error)

	// NewConsumer create new consumer.
	// You MUST pass valid argument such as RabbitmqConsumerArgs, KafkaConsumerArgs, NsqConsumerArgs, and NatsConsumerArgs
	NewConsumer(args interface{}) (Consumer, error)

	// NewProducer create new producer.
	// You MUST pass valid argument such as RabbitmqProducerArgs, KafkaProducerArgs, NsqProducerArgs, and NatsProducerArgs
	NewProducer(args interface{}) (Producer, error)

	// SetLogger assigns the logger to use as well as a level.
	// The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
	//
	//    Output(calldepth int, s string)
	//
	SetLogger(logger, LogLevel)

	Setup(string) error
	// contains filtered or unexported methods
}

Queuer provide generic method to handle queue

func New

func New(qname, url string, setupFn ...interface{}) (Queuer, error)

New creates a queue specified by its queue name and a queue url,

type Rabbitmq

type Rabbitmq struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

func (*Rabbitmq) Close

func (q *Rabbitmq) Close() error

func (*Rabbitmq) Conn

func (q *Rabbitmq) Conn() (interface{}, error)

func (*Rabbitmq) NewConsumer

func (q *Rabbitmq) NewConsumer(v interface{}) (Consumer, error)

func (*Rabbitmq) NewProducer

func (q *Rabbitmq) NewProducer(v interface{}) (Producer, error)

func (*Rabbitmq) SetLogger

func (q *Rabbitmq) SetLogger(l logger, level LogLevel)

func (*Rabbitmq) Setup

func (q *Rabbitmq) Setup(url string) error

type RabbitmqConsumerArgs

type RabbitmqConsumerArgs struct {
	Queue                               string `validate:"nonzero"`
	RoutingKey                          string
	Exchange                            string
	ConsumerTag                         string
	AutoAck, Exclusive, NoLocal, NoWait bool
	Args                                map[string]interface{}
}

type RabbitmqProducerArgs

type RabbitmqProducerArgs struct {
	Exchange             string `validate:"nonzero"`
	RoutingKey           string `validate:"nonzero"`
	Mandatory, Immediate bool
	DeliveryMode         uint8
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL