worker

package
v2.5.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2016 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyResolved = errors.New("Attempted to resolve an already-resolved task.")

ErrAlreadyResolved is returned from Task.Fail or Task.Succeed if the Task was already marked as having failed or resolved.

View Source
var ErrNotFound = errors.New("Attempted to complete a task that we aren't working on.")

ErrNotFound is returned if you attempt to mark a task as complete or abandoned that wasn't registered in the lifecycle.

Functions

This section is empty.

Types

type DefaultLifecycle

type DefaultLifecycle struct {
	// contains filtered or unexported fields
}

DefaultLifecycle provides a default implementation of the Lifecycle interface. It moves tasks from a source, which provides available tasks, into a specific worker queue, which is the list of items that this worker is currently working on.

Completed tasks leave the individualized worker queue, while abandoned tasks move back to the task source.

func NewLifecycle

func NewLifecycle(pool *redis.Pool) *DefaultLifecycle

NewLifecycle allocates and returns a pointer to a new instance of DefaultLifecycle. It uses the specified pool to make connections into Redis and a queue of available tasks along with a second working tasks queue, which stores the items the lifecycle is currently processing.

func (*DefaultLifecycle) Abandon

func (l *DefaultLifecycle) Abandon(task *Task) error

Abandon marks a task as having failed, pushing it back onto the primary task queue and removing it from our worker queue.

func (*DefaultLifecycle) AbandonAll

func (l *DefaultLifecycle) AbandonAll() error

Marks *all* tasks in the queue as having been abandoned. Called by the worker in the Halt() method.

func (*DefaultLifecycle) Await

func (l *DefaultLifecycle) Await()

Await implements Lifecycle's Await func, blocking until all tasks are either completed or abandoned. If Await in conjunction with a AbandonAll, it will wait until all tasks have been successfully abandoned before returning.

func (*DefaultLifecycle) Complete

func (l *DefaultLifecycle) Complete(task *Task) error

Complete marks a task as having been completed, removing it from the worker's queue.

func (*DefaultLifecycle) Listen

func (l *DefaultLifecycle) Listen() (<-chan *Task, <-chan error)

Listen returns a channel of tasks and error that are pulled from the main processing queue. Once a *Task is able to be read from the <-chan *Task, that Task is ready to be worked on and is in the appropriate locations in Redis. StopListening() can be called to terminate.

func (*DefaultLifecycle) SetQueues

func (l *DefaultLifecycle) SetQueues(availableTasks queue.Queue,
	workingTasks *queue.DurableQueue)

func (*DefaultLifecycle) StopListening

func (l *DefaultLifecycle) StopListening()

StopListening closes the queue "pull" task at the next opportunity. Tasks can still be marked as completed or abandoned, but no new tasks will be generated.

type DefaultWorker

type DefaultWorker struct {
	// contains filtered or unexported fields
}

A DefaultWorker is the bridge between Redutil Queues and the worker pattern. Items can be moved around between different queues using a lifecycle (see Lifecycle, DefaultLifecycle), and worked on by clients. "Dead" workers' items are recovered by other, living ones, providing an in-order, reliable, distributed implementation of the worker pattern.

func New

func New(pool *redis.Pool, src, id string) *DefaultWorker

New creates and returns a pointer to a new instance of a DefaultWorker. It uses the given redis.Pool, the main queue to pull from (`src`), and is given a unique ID through the `id` paramter.

func (*DefaultWorker) Close

func (w *DefaultWorker) Close()

Close stops polling the queue immediately and waits for all tasks to complete before stopping the heartbeat.

func (*DefaultWorker) Halt

func (w *DefaultWorker) Halt()

Halt stops the heartbeat and queue polling goroutines immediately and cancels all tasks, marking them as FAILED before returning.

func (*DefaultWorker) SetJanitor

func (w *DefaultWorker) SetJanitor(janitor Janitor)

Sets the Janitor interface used to dispose of old workers. This is optional; if you do not need to hook in extra functionality, you don't need to provide a janitor.

func (*DefaultWorker) SetLifecycle

func (w *DefaultWorker) SetLifecycle(lf Lifecycle)

Sets the Lifecycle used for managing job states. Note: this is only safe to call BEFORE calling Start()

func (*DefaultWorker) Start

func (w *DefaultWorker) Start() (<-chan *Task, <-chan error)

Start signals the worker to begin receiving tasks from the main queue.

type Janitor

type Janitor interface {
	// Called when a worker dies after we have acquired a lock and before
	// we start moving the worker's queue back to the main processing
	// queue. Note that if we does before the worker's queue is moved
	// over, this function *can be called multiple times on the
	// same worker*
	//
	// If an error is returned from the function, the queue concatenation
	// will be aborted and we'll release the lock.
	OnPreConcat(cnx redis.Conn, worker string) error

	// Called when a worker dies after we have acquired a lock and finished
	// moving the worker's queue back to the main processing queue. Note
	// that, in the result of a panic or power failure, this function
	// may never be called, and errors resulting from this function
	// will not roll-back the concatenation.
	OnPostConcat(cnx redis.Conn, worker string) error
}

The Janitor is used to assist in the tear down of dead workers. It can be provided to the worker to hook additional functionality that will occur when the worker dies.

type Lifecycle

type Lifecycle interface {
	// Sets the queues for the lifecycle's tasks. The lifecycle should
	// pull jobs from the `available` queue, then add them to the
	// `working` queue until they're complete. When they're complete,
	// the jobs can be deleted from that queue.
	SetQueues(available queue.Queue, working *queue.DurableQueue)

	// Marks a task as being completed. This is called by the Task.Complete
	// method; you should not use this directly.
	Complete(task *Task) (err error)

	// Abandon marks a task as having failed, pushing it back onto the
	// primary task queue and removing it from our worker queue. This is
	// called by the Task.Abandon method; you should not use this directly.
	Abandon(task *Task) (err error)

	// Marks *all* tasks in the queue as having been abandoned. Called by
	// the worker in the Halt() method.
	AbandonAll() (err error)

	// Starts pulling from the processing queue, returning a channel of
	// tasks and errors. Can be halted with StopListening()
	Listen() (<-chan *Task, <-chan error)

	// Stops an ongoing listening loop.
	StopListening()

	// Await blocks until all tasks currently being worked on by the Worker
	// are completed. If there are no tasks being worked on, this method
	// will return instantly.
	Await()
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

A Task encapsulates a item coming off the main queue that needs to be "worked" on. Upon receiving a Task, callers may either Complete() or Fail() the task, which will be delegated into the lifecycle appropriately.

Should execution be halted, the Closer() channel will get an empty "message" that can be read off, which means that execution has indeed been halted.

func NewTask

func NewTask(lifecycle Lifecycle, payload []byte) *Task

NewTask initializes and returns a pointer to a new Task instance. It is constructed with the given payload, progress, and closer channels respectively, all of which should be open when either Succeed(), Fail() or Closer() is called.

func (*Task) Bytes

func (t *Task) Bytes() []byte

Bytes returns the bytes that this Task is holding, and is the "data" to be worked on.

func (*Task) Fail

func (t *Task) Fail() error

Fail signals the lifecycle that work on this task has failed, causing it to return the task to the main processing queue to be retried.

func (*Task) HexDump

func (t *Task) HexDump() string

HexDump returns a byte dump of the task, in the same format as `hexdump -C`. This is useful for debugging/logging purposes.

func (*Task) IsResolved

func (t *Task) IsResolved() bool

IsResolved returns true if the task has already been marked as having succeeded or failed.

func (*Task) String

func (t *Task) String() string

String returns the strinigified contents of the task payload.

func (*Task) Succeed

func (t *Task) Succeed() error

Succeed signals the lifecycle that work on this task has been completed, and removes the task from the worker queue.

type Worker

type Worker interface {
	// Start begins the process of pulling []byte from the Queue, returning
	// them as `*Task`s on a channel of Tasks (`<-chan *Task`).
	//
	// If any errors are encountered along the way, they are sent across the
	// (unbuffered) `<-chan error`.
	Start() (<-chan *Task, <-chan error)

	// Close closes the pulling goroutine and waits for all tasks to finish
	// before returning.
	Close()

	// Halt closes the pulling goroutine, but does not wait for all tasks to
	// finish before returning.
	Halt()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL