gocelery

package module
v2.0.0-...-3b07af1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2022 License: MIT Imports: 15 Imported by: 10

README

gocelery

Task runner in Go

Why?

Go Centrifuge Node needs a task runner with first class support to run jobs with stateful runners.

Storage

Storage is used to store the jobs and other queuing related details Currently supported storage: LevelDb, InMemory

Queue

A Queue would enqueue jobs and dequeues when ready to be executed. A default queue implementation is provided.

Dispatcher

A dispatcher uses Storage and Queue to run tasks asynchronously.

Job

A Job defines a single job. it can encompasses multiple tasks and has a validity time. if fails, backoffs exponentially until either successful or expired. Dispatcher uses gob encoder to encode the job params like arguments, overrides etc... Register your custom types with gob before dispatching a job.

Examples

Checkout examples package for more examples

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.

Documentation

Index

Constants

View Source
const (
	// MaxValidTime signifies how long a job is valid by default
	// Max set to 12 hrs
	MaxValidTime = 12 * time.Hour
)

Variables

View Source
var (
	// ErrNotFound when the a requested resource is not found
	ErrNotFound = errors.New("not found")
)

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher coordinates with the workers and queue in executing the jobs.

func NewDispatcher

func NewDispatcher(workerCount int, storage Storage, queue Queue) *Dispatcher

NewDispatcher for a new Dispatcher instance with a storage and Queue

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(job *Job) (Result, error)

Dispatch dispatches a job and returns a result.

func (*Dispatcher) Job

func (d *Dispatcher) Job(id JobID) (*Job, error)

Job returns the Job associated with ID.

func (*Dispatcher) OnFinished

func (d *Dispatcher) OnFinished() <-chan *Job

OnFinished returns a channel that gets updates for all finished tasks

func (*Dispatcher) RegisterRunner

func (d *Dispatcher) RegisterRunner(name string, runner Runner) bool

RegisterRunner registers a runner

func (*Dispatcher) RegisterRunnerFunc

func (d *Dispatcher) RegisterRunnerFunc(name string, runnerFunc RunnerFunc) bool

RegisterRunnerFunc registers a runnerFunc

func (*Dispatcher) Start

func (d *Dispatcher) Start(ctx context.Context)

Start initiates the dispatcher functions.

func (*Dispatcher) Subscribe

func (d *Dispatcher) Subscribe(jobID JobID, sub chan<- *Job) bool

Subscribe to a specific jobID. Once the job is complete, it is pushed to sub

func (*Dispatcher) UnSubscribe

func (d *Dispatcher) UnSubscribe(jobID JobID, sub chan<- *Job) bool

UnSubscribe from any updates from the jobID

type Job

type Job struct {
	ID         JobID                  `json:"JobID" swaggertype:"primitive,string"` // Job Identifier
	Desc       string                 `json:"desc"`                                 // description of the Job
	Runner     string                 `json:"runner"`                               // name of the Runner
	Overrides  map[string]interface{} `json:"overrides"`                            // overrides for the Job
	Tasks      []*Task                `json:"tasks"`                                // list of tasks ran under this Job
	ValidUntil time.Time              `json:"valid_until"`                          // validity of the job
	FinishedAt time.Time              `json:"finished_at"`                          // Job finished at. If empty, job is not complete yet
	Finished   bool                   `json:"finished"`                             // job status
}

Job represents a single prefix job a job can contain multiple sub-tasks

func NewRunnerFuncJob

func NewRunnerFuncJob(description, task string,
	args []interface{}, overrides map[string]interface{},
	validUntil time.Time) *Job

NewRunnerFuncJob creates a new job with task as the runnerFunc

func NewRunnerJob

func NewRunnerJob(description, runner, task string,
	args []interface{}, overrides map[string]interface{},
	validUntil time.Time) *Job

NewRunnerJob creates a new job with runner and its first task

func (Job) HasCompleted

func (j Job) HasCompleted() bool

HasCompleted checks if the job has finished running.

func (Job) HexID

func (j Job) HexID() string

HexID returns a hex encoded string of 32 byte jobID

func (Job) IsSuccessful

func (j Job) IsSuccessful() bool

IsSuccessful returns true if the job completed successfully

func (Job) IsValid

func (j Job) IsValid() bool

IsValid returns of the job is still valid.

func (Job) LastTask

func (j Job) LastTask() *Task

LastTask returns the last task of the Job

type JobID

type JobID []byte

JobID is a unique ID for a given job

func (JobID) Hex

func (j JobID) Hex() string

Hex returns hex encoded jobID.

func (JobID) MarshalJSON

func (j JobID) MarshalJSON() ([]byte, error)

MarshalJSON marshall bytes to hex.

type Queue

type Queue interface {
	Start(ctx context.Context)
	EnqueueNow(id []byte) error
	EnqueueAfter(id []byte, t time.Time) error
	Dequeue() ([]byte, error)
	Finished(id []byte)
	Len() int
}

Queue is a message queue

func NewQueue

func NewQueue(storage Storage, reQueueTimeout time.Duration) Queue

NewQueue returns an implementation of Queue if timeout is 0, then defaults to 30 minutes

type Result

type Result struct {
	JobID      JobID
	Dispatcher *Dispatcher
}

Result is the result of a Job

func (Result) Await

func (r Result) Await(ctx context.Context) (res interface{}, err error)

Await waits for the job to be finished and return the result.

type Runner

type Runner interface {
	New() Runner
	RunnerFunc(task string) RunnerFunc
	Next(task string) (next string, ok bool) // next task after the task
}

Runner instance to run a stateful job

type RunnerFunc

type RunnerFunc func(args []interface{}, overrides map[string]interface{}) (result interface{}, err error)

RunnerFunc is the func that is called to execute the Job

type Storage

type Storage interface {
	Get(key []byte) ([]byte, error)
	Set(key, value []byte) error
	Delete(key []byte) error
	Iterate(prefix []byte, f func(key, value []byte))
}

Storage for storing any kind of data

func NewInMemoryStorage

func NewInMemoryStorage() Storage

func NewLevelDBStorage

func NewLevelDBStorage(db *leveldb.DB) Storage

NewLevelDBStorage returns an levelDB implementation of CeleryBackend

type Task

type Task struct {
	RunnerFunc string        `json:"runnerFuncs"` // name of the runnerFuncs
	Args       []interface{} `json:"args"`        // arguments passed to this task
	Result     interface{}   `json:"result"`      // result after the task run
	Error      string        `json:"error"`       // error after task run
	Tries      uint          `json:"tries"`       // number of times task was run.
	Delay      time.Time     `json:"delay"`       // delay until ready to be run
}

Task represents a single task in a Job

func (Task) DidRun

func (t Task) DidRun() bool

DidRun returns if the task was run at least once.

func (Task) IsSuccessful

func (t Task) IsSuccessful() bool

IsSuccessful if a task is successful.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL