work: github.com/gocraft/work Index | Files | Directories

package work

import "github.com/gocraft/work"

Index

Package Files

client.go dead_pool_reaper.go enqueue.go heartbeater.go identifier.go job.go log.go observer.go periodic_enqueuer.go priority_sampler.go redis.go requeuer.go run.go time.go worker.go worker_pool.go

Variables

var ErrNotDeleted = fmt.Errorf("nothing deleted")

ErrNotDeleted is returned by functions that delete jobs to indicate that although the redis commands were successful, no object was actually deleted by those commmands.

var ErrNotRetried = fmt.Errorf("nothing retried")

ErrNotRetried is returned by functions that retry jobs to indicate that although the redis commands were successful, no object was actually retried by those commmands.

type BackoffCalculator Uses

type BackoffCalculator func(job *Job) int64

You may provide your own backoff function for retrying failed jobs or use the builtin one. Returns the number of seconds to wait until the next attempt.

The builtin backoff calculator provides an exponentially increasing wait function.

type Client Uses

type Client struct {
    // contains filtered or unexported fields
}

Client implements all of the functionality of the web UI. It can be used to inspect the status of a running cluster and retry dead jobs.

func NewClient Uses

func NewClient(namespace string, pool *redis.Pool) *Client

NewClient creates a new Client with the specified redis namespace and connection pool.

func (*Client) DeadJobs Uses

func (c *Client) DeadJobs(page uint) ([]*DeadJob, int64, error)

DeadJobs returns a list of DeadJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of dead jobs is also returned.

func (*Client) DeleteAllDeadJobs Uses

func (c *Client) DeleteAllDeadJobs() error

DeleteAllDeadJobs deletes all dead jobs.

func (*Client) DeleteDeadJob Uses

func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error

DeleteDeadJob deletes a dead job from Redis.

func (*Client) DeleteRetryJob Uses

func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error

DeleteRetryJob deletes a job in the retry queue.

func (*Client) DeleteScheduledJob Uses

func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error

DeleteScheduledJob deletes a job in the scheduled queue.

func (*Client) Queues Uses

func (c *Client) Queues() ([]*Queue, error)

Queues returns the Queue's it finds.

func (*Client) RetryAllDeadJobs Uses

func (c *Client) RetryAllDeadJobs() error

RetryAllDeadJobs requeues all dead jobs. In other words, it puts them all back on the normal work queue for workers to pull from and process.

func (*Client) RetryDeadJob Uses

func (c *Client) RetryDeadJob(diedAt int64, jobID string) error

RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker.

func (*Client) RetryJobs Uses

func (c *Client) RetryJobs(page uint) ([]*RetryJob, int64, error)

RetryJobs returns a list of RetryJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of retry jobs is also returned.

func (*Client) ScheduledJobs Uses

func (c *Client) ScheduledJobs(page uint) ([]*ScheduledJob, int64, error)

ScheduledJobs returns a list of ScheduledJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of scheduled jobs is also returned.

func (*Client) WorkerObservations Uses

func (c *Client) WorkerObservations() ([]*WorkerObservation, error)

WorkerObservations returns all of the WorkerObservation's it finds for all worker pools' workers.

func (*Client) WorkerPoolHeartbeats Uses

func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error)

WorkerPoolHeartbeats queries Redis and returns all WorkerPoolHeartbeat's it finds (even for those worker pools which don't have a current heartbeat).

type DeadJob Uses

type DeadJob struct {
    DiedAt int64 `json:"died_at"`
    *Job
}

DeadJob represents a job in the dead queue.

type Enqueuer Uses

type Enqueuer struct {
    Namespace string // eg, "myapp-work"
    Pool      *redis.Pool
    // contains filtered or unexported fields
}

Enqueuer can enqueue jobs.

func NewEnqueuer Uses

func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer

NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.

func (*Enqueuer) Enqueue Uses

func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error)

Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})

func (*Enqueuer) EnqueueIn Uses

func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error)

EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.

func (*Enqueuer) EnqueueUnique Uses

func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error)

EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. EnqueueUnique returns the job if it was enqueued and nil if it wasn't

func (*Enqueuer) EnqueueUniqueByKey Uses

func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error)

EnqueueUniqueByKey enqueues a job unless a job is already enqueued with the same name and key, updating arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and key can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't

func (*Enqueuer) EnqueueUniqueIn Uses

func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error)

EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.

func (*Enqueuer) EnqueueUniqueInByKey Uses

func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error)

EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. Subsequent calls with same key will update arguments

type GenericHandler Uses

type GenericHandler func(*Job) error

GenericHandler is a job handler without any custom context.

type GenericMiddlewareHandler Uses

type GenericMiddlewareHandler func(*Job, NextMiddlewareFunc) error

GenericMiddlewareHandler is a middleware without any custom context.

type Job Uses

type Job struct {
    // Inputs when making a new job
    Name       string                 `json:"name,omitempty"`
    ID         string                 `json:"id"`
    EnqueuedAt int64                  `json:"t"`
    Args       map[string]interface{} `json:"args"`
    Unique     bool                   `json:"unique,omitempty"`
    UniqueKey  string                 `json:"unique_key,omitempty"`

    // Inputs when retrying
    Fails    int64  `json:"fails,omitempty"` // number of times this job has failed
    LastErr  string `json:"err,omitempty"`
    FailedAt int64  `json:"failed_at,omitempty"`
    // contains filtered or unexported fields
}

Job represents a job.

func (*Job) ArgBool Uses

func (j *Job) ArgBool(key string) bool

ArgBool returns j.Args[key] typed to a bool. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) ArgError Uses

func (j *Job) ArgError() error

ArgError returns the last error generated when extracting typed params. Returns nil if extracting the args went fine.

func (*Job) ArgFloat64 Uses

func (j *Job) ArgFloat64(key string) float64

ArgFloat64 returns j.Args[key] typed to a float64. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) ArgInt64 Uses

func (j *Job) ArgInt64(key string) int64

ArgInt64 returns j.Args[key] typed to an int64. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) ArgString Uses

func (j *Job) ArgString(key string) string

ArgString returns j.Args[key] typed to a string. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().

func (*Job) Checkin Uses

func (j *Job) Checkin(msg string)

Checkin will update the status of the executing job to the specified messages. This message is visible within the web UI. This is useful for indicating some sort of progress on very long running jobs. For instance, on a job that has to process a million records over the course of an hour, the job could call Checkin with the current job number every 10k jobs.

type JobOptions Uses

type JobOptions struct {
    Priority       uint              // Priority from 1 to 10000
    MaxFails       uint              // 1: send straight to dead (unless SkipDead)
    SkipDead       bool              // If true, don't send failed jobs to the dead queue when retries are exhausted.
    MaxConcurrency uint              // Max number of jobs to keep in flight (default is 0, meaning no max)
    Backoff        BackoffCalculator // If not set, uses the default backoff algorithm
}

JobOptions can be passed to JobWithOptions.

type NextMiddlewareFunc Uses

type NextMiddlewareFunc func() error

NextMiddlewareFunc is a function type (whose instances are named 'next') that you call to advance to the next middleware.

type Q Uses

type Q map[string]interface{}

Q is a shortcut to easily specify arguments for jobs when enqueueing them. Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com", "track": true})

type Queue Uses

type Queue struct {
    JobName string `json:"job_name"`
    Count   int64  `json:"count"`
    Latency int64  `json:"latency"`
}

Queue represents a queue that holds jobs with the same name. It indicates their name, count, and latency (in seconds). Latency is a measurement of how long ago the next job to be processed was enqueued.

type RetryJob Uses

type RetryJob struct {
    RetryAt int64 `json:"retry_at"`
    *Job
}

RetryJob represents a job in the retry queue.

type ScheduledJob Uses

type ScheduledJob struct {
    RunAt int64 `json:"run_at"`
    *Job
}

ScheduledJob represents a job in the scheduled queue.

type WorkerObservation Uses

type WorkerObservation struct {
    WorkerID string `json:"worker_id"`
    IsBusy   bool   `json:"is_busy"`

    // If IsBusy:
    JobName   string `json:"job_name"`
    JobID     string `json:"job_id"`
    StartedAt int64  `json:"started_at"`
    ArgsJSON  string `json:"args_json"`
    Checkin   string `json:"checkin"`
    CheckinAt int64  `json:"checkin_at"`
}

WorkerObservation represents the latest observation taken from a worker. The observation indicates whether the worker is busy processing a job, and if so, information about that job.

type WorkerPool Uses

type WorkerPool struct {
    // contains filtered or unexported fields
}

WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.

func NewWorkerPool Uses

func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool

NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently.

func NewWorkerPoolWithOptions Uses

func NewWorkerPoolWithOptions(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool, workerPoolOpts WorkerPoolOptions) *WorkerPool

NewWorkerPoolWithOptions creates a new worker pool as per the NewWorkerPool function, but permits you to specify additional options such as sleep backoffs.

func (*WorkerPool) Drain Uses

func (wp *WorkerPool) Drain()

Drain drains all jobs in the queue before returning. Note that if jobs are added faster than we can process them, this function wouldn't return.

func (*WorkerPool) Job Uses

func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool

Job registers the job name to the specified handler fn. For instance, when workers pull jobs from the name queue they'll be processed by the specified handler function. fn can take one of these forms: (*ContextType).func(*Job) error, (ContextType matches the type of ctx specified when creating a pool) func(*Job) error, for the generic handler format.

func (*WorkerPool) JobWithOptions Uses

func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool

JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them.

func (*WorkerPool) Middleware Uses

func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool

Middleware appends the specified function to the middleware chain. The fn can take one of these forms: (*ContextType).func(*Job, NextMiddlewareFunc) error, (ContextType matches the type of ctx specified when creating a pool) func(*Job, NextMiddlewareFunc) error, for the generic middleware format.

func (*WorkerPool) PeriodicallyEnqueue Uses

func (wp *WorkerPool) PeriodicallyEnqueue(spec string, jobName string) *WorkerPool

PeriodicallyEnqueue will periodically enqueue jobName according to the cron-based spec. The spec format is based on https://godoc.org/github.com/robfig/cron, which is a relatively standard cron format. Note that the first value is the seconds! If you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.

func (*WorkerPool) Start Uses

func (wp *WorkerPool) Start()

Start starts the workers and associated processes.

func (*WorkerPool) Stop Uses

func (wp *WorkerPool) Stop()

Stop stops the workers and associated processes.

type WorkerPoolHeartbeat Uses

type WorkerPoolHeartbeat struct {
    WorkerPoolID string   `json:"worker_pool_id"`
    StartedAt    int64    `json:"started_at"`
    HeartbeatAt  int64    `json:"heartbeat_at"`
    JobNames     []string `json:"job_names"`
    Concurrency  uint     `json:"concurrency"`
    Host         string   `json:"host"`
    Pid          int      `json:"pid"`
    WorkerIDs    []string `json:"worker_ids"`
}

WorkerPoolHeartbeat represents the heartbeat from a worker pool. WorkerPool's write a heartbeat every 5 seconds so we know they're alive and includes config information.

type WorkerPoolOptions Uses

type WorkerPoolOptions struct {
    SleepBackoffs []int64 // Sleep backoffs in milliseconds
}

WorkerPoolOptions can be passed to NewWorkerPoolWithOptions.

Directories

PathSynopsis
webui
webui/internal/assets

Package work imports 16 packages (graph) and is imported by 30 packages. Updated 2019-07-08. Refresh now. Tools for package owners.