radish

package module
v0.0.0-...-4f80c14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2016 License: MIT Imports: 8 Imported by: 0

README

radish

Broker agnostic work library, inspired by github.com/armon/relay but with far more than AMQP support

Highly WIP, pm me (@fortytw2 on gophers.slack.com, or fortytw2 (at) gmail) for info on what's happening/if you want to use it, should be mostly cleaned up and ready for use in the next week or so

LICENSE

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultTimeout = time.Second

DefaultTimeout is the length a new worker will wait to get a task

Functions

func ConsumeBench

func ConsumeBench(br Broker, n int, b *testing.B)

ConsumeBench is a benchmark helper for consumers

func ParallelPublishBench

func ParallelPublishBench(br Broker, b *testing.B)

ParallelPublishBench benchmarks publisher.Publish in parallel

func PublishBench

func PublishBench(br Broker, b *testing.B)

PublishBench benchmarks publisher.Publish

func RandomQueue

func RandomQueue() string

RandomQueue generates a random queue name - most useful for testing

Types

type Broker

type Broker interface {
	// Close shuts down the broker
	Close() error
	// Consumer returns _a_ Consumer
	Consumer(queue string) (Consumer, error)
	// Publisher returns _a_ Publisher
	Publisher(queue string) (Publisher, error)
	// Len returns the length of a given queue
	Len(queue string) int
}

Broker implements a high level interface that can provide Consumer or Publisher

func NewMemBroker

func NewMemBroker() Broker

NewMemBroker returns a pure in-mem broker

type Consumer

type Consumer interface {
	// Consume will consume the next available message or times out waiting. The
	// message must be acknowledged with Ack() or Nack() before
	// the next call to Consume unless EnableMultiAck is true.
	Consume(out interface{}) error

	// ConsumeAck will consume the next message and acknowledge
	// that the message has been received. This prevents the message
	// from being redelivered, and no call to Ack() or Nack() is needed.
	ConsumeAck(out interface{}) error

	// ConsumeTimeout will consume the next available message. The
	// message must be acknowledged with Ack() or Nack() before
	// the next call to Consume unless EnableMultiAck is true.
	ConsumeTimeout(out interface{}, timeout time.Duration) error

	// Ack will send an acknowledgement to the server that the
	// last message returned by Consume was processed.
	Ack() error

	// Nack will send a negative acknowledgement to the server that the
	// last message returned by Consume was not processed and should be
	// redelivered. If EnableMultiAck is true, then all messages up to
	// the last consumed one will be negatively acknowledged
	Nack() error

	// Close will shutdown the Consumer. Any messages that are still
	// in flight will be Nack'ed.
	Close() error
}

Consumer is used to consume messages from a queue

type Pool

type Pool struct {
	*sync.RWMutex
	// contains filtered or unexported fields
}

A Pool is a set of workers that all function on the same queue

func NewPool

func NewPool(b Broker, queue string, fn WorkFunc, l log.Logger) *Pool

NewPool returns a configurable pool

func (*Pool) AddWorkers

func (p *Pool) AddWorkers(n int) error

AddWorkers changes the current number of workers in a pool

func (*Pool) Len

func (p *Pool) Len() int

Len returns the total number of workers in this group

func (*Pool) Stop

func (p *Pool) Stop() error

Stop turns off all workers in the pool

func (*Pool) TotalTimeSinceWork

func (p *Pool) TotalTimeSinceWork() time.Duration

TotalTimeSinceWork returns the time since any worker has seen work

type Publisher

type Publisher interface {
	// Close will shutdown the publisher
	Close() error

	// Publish will send the message to the server to be consumed
	Publish(in interface{}) error
}

Publisher is used to push messages into a queue

type WorkFunc

type WorkFunc func(i interface{}) error

A WorkFunc processes the data passed to a Worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL