go-pqueue: github.com/ljosa/go-pqueue/pqueue Index | Files

package pqueue

import "github.com/ljosa/go-pqueue/pqueue"

Package pqueue provides privitives for processing a simple persistent queue backed by the local filesystem. In many ways, the approach is similar to Maildir. Submitting jobs can be done easily from any language by creating a directory and moving it atomically into a directory.

A pqueue is a directory with several subdirectories:

- New files and directories are created in the `tmp` subdirectory before being moved atomically to elsewhere in the directory structure.

- The `new` subdirectory contains jobs that workers should process (see the `Take` method).

- The `cur` subdirectory contains a subdirectory for each worker (named by its process ID) where jobs are placed while the worker processes them.

- When jobs fail or finish successfully, they are moved to the `failed` or `done` subdirectories, respectively. See the `Fail` and `Finish` methods.

Jobs have state in the form of properties, which are really just files inside the job's directory. The `Get` and `Set` methods read these properties and set them atomically. Properties are set both by the process submitting a job (to specify the work that is to be done) and by workers (to checkpoint its progress so the job can continue if interrupted).


Package Files


type Job Uses

type Job struct {
    Basename string
    // contains filtered or unexported fields

func (*Job) Fail Uses

func (job *Job) Fail() error

Move the job to the `failed` subdirectory.

func (*Job) Finish Uses

func (job *Job) Finish() error

Move the job ot the `done` subdirectory

func (*Job) Get Uses

func (j *Job) Get(name string) ([]byte, error)

Read a property of the job. This simply reads afile inside the job's directory.

func (*Job) Set Uses

func (j *Job) Set(name string, data []byte) error

Set a property of the job. This simply creates a file inside the job's directory atomically.

func (*Job) Submit Uses

func (job *Job) Submit() error

Move a job (created by `CreateJob` in the `tmp` subdirectory) to the `new` subdirectory so it becomes available to workers.

type Queue Uses

type Queue struct {
    // contains filtered or unexported fields

func OpenQueue Uses

func OpenQueue(dir string) (*Queue, error)

Open a pqueue. The directory `dir` must already exist. The subdirectories (`new`, `cur`, etc.) will be created if they are missing.

func (*Queue) CreateJob Uses

func (q *Queue) CreateJob(prefix string) (*Job, error)

Create a job in the `tmp` directory of the queue. After you finish preparing the job with `Set`, call the `Submit` method to make the job available to workers.

func (*Queue) RescueDeadJobs Uses

func (q *Queue) RescueDeadJobs() error

Go through the `cur` subdirectory, determine which workers are no longer alove, and resubmit the jobs they were processing when they died.

func (*Queue) Take Uses

func (q *Queue) Take() (*Job, error)

Find an available job (in the `new` subdirectory) and move it to the `cur` subdirectory for this worker process. Returns `nil` if there are no available jobs.

Package pqueue imports 7 packages (graph) and is imported by 1 packages. Updated 2016-07-23. Refresh now. Tools for package owners.