relyq

package
v0.0.0-...-3ca95c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2013 License: MIT Imports: 10 Imported by: 2

Documentation

Overview

Package relyq provides a reliable queue backed by redis

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ArbitraryTask

type ArbitraryTask map[string]interface{}

An arbitrary task object that can be directly used by applications

func (ArbitraryTask) Id

func (t ArbitraryTask) Id() []byte

type Config

type Config struct {
	// Required: Prefix on simpleq.Queue key names for redis
	Prefix string
	// Field in any object which contains a unique identifier
	// Defaults to "id"
	IdField string
	// Redis delimeter. Defaults to ":"
	Delimiter string
	// Clean finish (i.e. no Done queue)
	// Defaults to false
	UseDoneQueue bool
	// Should we keep the task stored after they are done?
	// Defaults to false
	KeepDoneTasks bool
}

Configuration for Relyq

func (*Config) Defaults

func (cfg *Config) Defaults()

type Ider

type Ider interface {
	// Ensure an id exists by creating it if necessary. Always return it.
	Id() []byte
}

A useful alias for a task

type Listener

type Listener struct {
	Errors              chan error
	Tasks, Fail, Finish chan Ider
	// contains filtered or unexported fields
}

func NewListener

func NewListener(rq *Queue, sql *simpleq.Listener, example Ider) *Listener

func (*Listener) Close

func (l *Listener) Close() error

type Queue

type Queue struct {
	// The underlying simpleqs
	Todo, Doing, Done, Failed *simpleq.Queue
	Storage                   Storage
	Cfg                       *Config
	// contains filtered or unexported fields
}

A reliable redis-backed queue

func New

func New(pool *redis.Pool, storage Storage, cfg *Config) *Queue

Create a reliable queue

func NewRedisJson

func NewRedisJson(pool *redis.Pool, cfg *Config) *Queue

func (*Queue) BProcess

func (q *Queue) BProcess(timeout_secs int, task Ider) error

Block and process the next task.

func (*Queue) Close

func (q *Queue) Close() error

End the queue

func (*Queue) Fail

func (q *Queue) Fail(task Ider) error

Move a task to the Failed queue

func (*Queue) Finish

func (q *Queue) Finish(task Ider) error

Move a task to the Done queue if in use If a task is not in use, delete if CleanFinishKeepStorage is false Sometimes a task is in the Failed queue already (maybe timeout) so we check there if not in Finish

func (*Queue) Listen

func (q *Queue) Listen(example Ider) *Listener

Start a listener

func (*Queue) Process

func (q *Queue) Process(task Ider) (ok bool, err error)

Move the next task to the Doing queue. Will decode into task. Returns ok as false if nothing happened

func (*Queue) Push

func (q *Queue) Push(task Ider) error

Push a task onto the queue

func (*Queue) Remove

func (q *Queue) Remove(subq *simpleq.Queue, task Ider, keepInStorage ...bool) error

Remove a task from a queue If dontDelete (single extra arg) is true, then no delete call will be done for the task

type Storage

type Storage interface {
	// Get a task object
	Get(taskid []byte, task interface{}) error
	// Save a task object
	Set(task interface{}, taskid []byte) error
	// Delete the task object in the storage
	Del(taskid []byte) error
	// End the Storage connection
	io.Closer
}

Storage interface

type StructuredTask

type StructuredTask struct {
	RqId string `json:"id"`
}

A struct that implements Ider to be used in task objects for applications. Use like so:

type MyTask struct {
  StructuredTask
  OtherFields string
}

func (*StructuredTask) Id

func (t *StructuredTask) Id() []byte

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL