Documentation ¶
Index ¶
- Constants
- Variables
- type Dispatcher
- func (d *Dispatcher) Dispatch(job *Job) (Result, error)
- func (d *Dispatcher) Job(id JobID) (*Job, error)
- func (d *Dispatcher) OnFinished() <-chan *Job
- func (d *Dispatcher) RegisterRunner(name string, runner Runner) bool
- func (d *Dispatcher) RegisterRunnerFunc(name string, runnerFunc RunnerFunc) bool
- func (d *Dispatcher) Start(ctx context.Context)
- func (d *Dispatcher) Subscribe(jobID JobID, sub chan<- *Job) bool
- func (d *Dispatcher) UnSubscribe(jobID JobID, sub chan<- *Job) bool
- type Job
- type JobID
- type Queue
- type Result
- type Runner
- type RunnerFunc
- type Storage
- type Task
Constants ¶
const ( // MaxValidTime signifies how long a job is valid by default // Max set to 12 hrs MaxValidTime = 12 * time.Hour )
Variables ¶
var ( // ErrNotFound when the a requested resource is not found ErrNotFound = errors.New("not found") )
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher coordinates with the workers and queue in executing the jobs.
func NewDispatcher ¶
func NewDispatcher(workerCount int, storage Storage, queue Queue) *Dispatcher
NewDispatcher for a new Dispatcher instance with a storage and Queue
func (*Dispatcher) Dispatch ¶
func (d *Dispatcher) Dispatch(job *Job) (Result, error)
Dispatch dispatches a job and returns a result.
func (*Dispatcher) Job ¶
func (d *Dispatcher) Job(id JobID) (*Job, error)
Job returns the Job associated with ID.
func (*Dispatcher) OnFinished ¶
func (d *Dispatcher) OnFinished() <-chan *Job
OnFinished returns a channel that gets updates for all finished tasks
func (*Dispatcher) RegisterRunner ¶
func (d *Dispatcher) RegisterRunner(name string, runner Runner) bool
RegisterRunner registers a runner
func (*Dispatcher) RegisterRunnerFunc ¶
func (d *Dispatcher) RegisterRunnerFunc(name string, runnerFunc RunnerFunc) bool
RegisterRunnerFunc registers a runnerFunc
func (*Dispatcher) Start ¶
func (d *Dispatcher) Start(ctx context.Context)
Start initiates the dispatcher functions.
func (*Dispatcher) Subscribe ¶
func (d *Dispatcher) Subscribe(jobID JobID, sub chan<- *Job) bool
Subscribe to a specific jobID. Once the job is complete, it is pushed to sub
func (*Dispatcher) UnSubscribe ¶
func (d *Dispatcher) UnSubscribe(jobID JobID, sub chan<- *Job) bool
UnSubscribe from any updates from the jobID
type Job ¶
type Job struct { ID JobID `json:"JobID" swaggertype:"primitive,string"` // Job Identifier Desc string `json:"desc"` // description of the Job Runner string `json:"runner"` // name of the Runner Overrides map[string]interface{} `json:"overrides"` // overrides for the Job Tasks []*Task `json:"tasks"` // list of tasks ran under this Job ValidUntil time.Time `json:"valid_until"` // validity of the job FinishedAt time.Time `json:"finished_at"` // Job finished at. If empty, job is not complete yet Finished bool `json:"finished"` // job status }
Job represents a single prefix job a job can contain multiple sub-tasks
func NewRunnerFuncJob ¶
func NewRunnerFuncJob(description, task string, args []interface{}, overrides map[string]interface{}, validUntil time.Time) *Job
NewRunnerFuncJob creates a new job with task as the runnerFunc
func NewRunnerJob ¶
func NewRunnerJob(description, runner, task string, args []interface{}, overrides map[string]interface{}, validUntil time.Time) *Job
NewRunnerJob creates a new job with runner and its first task
func (Job) HasCompleted ¶
HasCompleted checks if the job has finished running.
func (Job) IsSuccessful ¶
IsSuccessful returns true if the job completed successfully
type JobID ¶
type JobID []byte
JobID is a unique ID for a given job
func (JobID) MarshalJSON ¶
MarshalJSON marshall bytes to hex.
type Queue ¶
type Queue interface { Start(ctx context.Context) EnqueueNow(id []byte) error EnqueueAfter(id []byte, t time.Time) error Dequeue() ([]byte, error) Finished(id []byte) Len() int }
Queue is a message queue
type Result ¶
type Result struct { JobID JobID Dispatcher *Dispatcher }
Result is the result of a Job
type Runner ¶
type Runner interface { New() Runner RunnerFunc(task string) RunnerFunc Next(task string) (next string, ok bool) // next task after the task }
Runner instance to run a stateful job
type RunnerFunc ¶
type RunnerFunc func(args []interface{}, overrides map[string]interface{}) (result interface{}, err error)
RunnerFunc is the func that is called to execute the Job
type Storage ¶
type Storage interface { Get(key []byte) ([]byte, error) Set(key, value []byte) error Delete(key []byte) error Iterate(prefix []byte, f func(key, value []byte)) }
Storage for storing any kind of data
func NewInMemoryStorage ¶
func NewInMemoryStorage() Storage
func NewLevelDBStorage ¶
NewLevelDBStorage returns an levelDB implementation of CeleryBackend
type Task ¶
type Task struct { RunnerFunc string `json:"runnerFuncs"` // name of the runnerFuncs Args []interface{} `json:"args"` // arguments passed to this task Result interface{} `json:"result"` // result after the task run Error string `json:"error"` // error after task run Tries uint `json:"tries"` // number of times task was run. Delay time.Time `json:"delay"` // delay until ready to be run }
Task represents a single task in a Job