queue

package
v2.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2016 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LPOPRPUSH

func LPOPRPUSH(cnx redis.Conn) *redis.Script

Types

type BaseQueue

type BaseQueue struct {
	// contains filtered or unexported fields
}

BaseQueue provides a basic implementation of the Queue interface. Its basic methodology is to preform updates using a Processor interface which in and of itself defines how updates can be handled.

func NewBaseQueue

func NewBaseQueue(pool *redis.Pool, source string) *BaseQueue

func (*BaseQueue) Concat

func (q *BaseQueue) Concat(src string) (moved int, err error)

Takes all elements from the source queue and adds them to this one. This can be a long-running operation. If a persistent error is returned while moving things, then it will be returned and the concat will stop, though the concat operation can be safely resumed at any time.

func (*BaseQueue) Processor

func (q *BaseQueue) Processor() Processor

Source implements the Source method on the Queue interface. It functions by requesting a read-level lock from the guarding mutex and returning that value once obtained. If no processor is set, the the default FIFO implementation is returned.

func (*BaseQueue) Pull

func (q *BaseQueue) Pull(timeout time.Duration) (payload []byte, err error)

Source implements the Source method on the Queue interface.

func (*BaseQueue) Push

func (q *BaseQueue) Push(payload []byte) (err error)

Push pushes the given payload (a byte slice) into the specified keyspace by delegating into the `Processor`'s `func Push`. It obtains a connection to Redis using the pool, which is passed into the Processor, and recycles that connection after the function has returned.

If an error occurs during Pushing, it will be returned, and it can be assumed that the payload is not in Redis.

func (*BaseQueue) SetProcessor

func (q *BaseQueue) SetProcessor(processor Processor)

SetProcessor implements the SetProcessor method on the Queue interface. It functions by requesting write-level access from the guarding mutex and preforms the update atomically.

func (*BaseQueue) Source

func (q *BaseQueue) Source() string

Source implements the Source method on the Queue interface.

type ByteQueue

type ByteQueue struct {
	BaseQueue
}

ByteQueue represents either a FILO or FIFO queue contained in a particular Redis keyspace. It allows callers to push `[]byte` payloads, and receive them back over the `In() <-chan []byte`. It is typically used in a distributed setting, where the pusher may not always get the item back.

func NewByteQueue

func NewByteQueue(pool *redis.Pool, name string) *ByteQueue

NewByteQueue allocates and returns a pointer to a new instance of a ByteQueue. It initializes itself using the given *redis.Pool, and the name, which refers to the keyspace wherein these values will be stored.

Internal channels are also initialized here.

type DurableQueue

type DurableQueue struct {

	// DurableQueue extends a BaseQueue
	BaseQueue
	// contains filtered or unexported fields
}

DurableQueue is an implementation of the Queue interface which takes items from a source queue and pushes them into the destination queue when Pull() is called.

func NewDurableQueue

func NewDurableQueue(pool *redis.Pool, source, dest string) *DurableQueue

NewDurableQueue initializes and returns a new pointer to an instance of a DurableQueue. It is initialized with the given Redis pool, and the source and destination queues. By default the FIFO tactic is used, but a call to SetProcessor can change this in a safe fashion.

DurableQueues own no goroutines, so this method does not spwawn any goroutines or channels.

func (*DurableQueue) Dest

func (q *DurableQueue) Dest() string

Dest returns the destination keyspace in Redis where pulled items end up. It first obtains a read-level lock on the member `dest` variable before returning.

func (*DurableQueue) Pull

func (q *DurableQueue) Pull(timeout time.Duration) (payload []byte, err error)

Pull implements the Pull function on the Queue interface. Unlike common implementations of the Queue type, it mutates the Redis keyspace twice, by removing an item from one LIST and popping it onto another. It does so by delegating into the processor, thus blocking until the processor returns.

func (*DurableQueue) SetDest

func (q *DurableQueue) SetDest(dest string) string

SetDest updates the destination where items are "pulled" to in a safe, blocking manner. It does this by first obtaining a write-level lock on the internal member variable wherein the destination is stored, updating, and then relinquishing the lock.

It returns the new destination that was just set.

type Processor

type Processor interface {
	// Push pushes a given `payload` into the keyspace at `key` over the
	// given `redis.Conn`. This function should block until the item can
	// succesfully be confirmed to have been pushed.
	Push(conn redis.Conn, src string, payload []byte) (err error)

	// Pull pulls a given `payload` from the keyspace at `key` over the
	// given `redis.Conn`. This function should block until the given
	// timeout has elapsed, or an item is available. If the timeout has
	// passed, a redis.ErrNil will be returned.
	Pull(conn redis.Conn, src string, timeout time.Duration) (payload []byte, err error)

	// PullTo transfers a given payload from the source (src) keyspace to
	// the destination (dest) keyspace and returns the moved item in the
	// payload space. If an error was encountered, then it will be returned
	// immediately. Timeout semantisc are idential to those on Pull, unless
	// noted otherwise in implementation.
	PullTo(conn redis.Conn, src, dest string, timeout time.Duration) (payload []byte, err error)

	// Moves all elements from the src queue to the end of the destination
	// It should return a redis.ErrNil when the source queue is empty.
	Concat(conn redis.Conn, src, dest string) (err error)
}

Processor is an interface to a type encapsulating the interaction between a queue.ByteQueue and a datastructure in Redis.

var FIFO Processor = &fifoProcessor{}

FIFO is a first in, first out implementation of the Processor interface.

var LIFO Processor = &lifoProcessor{}

FIFO is a last in, first out implementation of the Processor interface.

type Queue

type Queue interface {
	// Source returns the keyspace in Redis from which this queue is
	// populated.
	Source() string

	// Push pushes the given payload (a byte slice) into the specified
	// keyspace by delegating into the `Processor`'s `func Push`. It obtains
	// a connection to Redis using the pool, which is passed into the
	// Processor, and recycles that connection after the function has
	// returned.
	//
	// If an error occurs during Pushing, it will be returned, and it can be
	// assumed that the payload is not in Redis.
	Push(payload []byte) (err error)

	// Pull returns the next available payload, blocking until data can be
	// returned.
	Pull(timeout time.Duration) (payload []byte, err error)

	// Takes all elements from the source queue and adds them to this one. This
	// can be a long-running operation. If a persistent error is returned while
	// moving things, then concat will stop, though the concat operation can
	// be safely resumed at any time.
	//
	// Returns the number of items successfully moved and any error that
	// occurred.
	Concat(src string) (moved int, err error)

	// Processor returns the processor that is being used to push and pull.
	// If no processor is specified, a first-in-first-out will be returned
	// by default.
	Processor() Processor

	// SetProcessor sets the current processor to the specified processor by
	// aquiring a write lock into the mutex guarding that field. The
	// processor will be switched over during the next iteration of a
	// Pull-cycle, or a call to Push.
	SetProcessor(processor Processor)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL