tasks

package
v0.0.0-...-a3f3dab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2017 License: AGPL-3.0 Imports: 10 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterTaskdef

func RegisterTaskdef(name string, f NewTaskFunc)

RegisterTaskdef registers a task type, must be called before a task can be used.

Types

type DatastoreTaskable

type DatastoreTaskable interface {
	Taskable
	SetDatastore(ds datastore.Datastore)
}

SqlDbTaskable is a task that has a method for assigning a datastore to the task. If your task needs access to a datastore, implement DatastoreTaskable, task-orchestrators will detect this method and call it to set the datastore before calling Taskable.Do

type NewTaskFunc

type NewTaskFunc func() Taskable

NewTaskFunc is a function that creates new task instances task-orchestrators use NewTaskFunc to create new Tasks, and then attempt to json.Unmarshal params into the task definition

type Progress

type Progress struct {
	Percent float32 `json:"percent"`         // percent complete between 0.0 & 1.0
	Step    int     `json:"step"`            // current Step
	Steps   int     `json:"steps"`           // number of Steps in the task
	Status  string  `json:"status"`          // status string that describes what is currently happening
	Done    bool    `json:"done"`            // complete flag
	Dest    string  `json:"dest"`            // place for sending users, could be a url, could be a relative path
	Error   error   `json:"error,omitempty"` // error message
}

Progress represents the current state of a task tasks will be given a Progress channel to send updates

func (Progress) String

func (p Progress) String() string

type Task

type Task struct {
	// uuid identifier for task
	Id string `json:"id"`
	// created date rounded to secounds
	Created time.Time `json:"created"`
	// updated date rounded to secounds
	Updated time.Time `json:"updated"`
	// human-readable title for the task, meant to be descriptive & varied
	Title string `json:"title"`
	// id of user that submitted this task
	UserId string `json:"userId"`
	// Type of task to be executed
	Type string `json:"type"`
	// parameters supplied to the task, should be json bytes
	Params map[string]interface{} `json:"params"`
	// Status Message
	Status string `json:"status,omitempty"`
	// Error Message
	Error string `json:"error,omitempty"`
	// timstamp for when request was added to the tasks queue
	// nil if request hasn't been sent to the queue
	Enqueued *time.Time `json:"enqueued,omitempty"`
	// timestamp for when the task was removed from the queue
	// and started, nil if the request hasn't been started
	Started *time.Time `json:"started,omitempty"`
	// timestamp for when request succeeded
	// nil if task hasn't succeeded
	Succeeded *time.Time `json:"succeeded,omitempty"`
	// timestamp for when request failed
	// nil if task hasn't failed
	Failed *time.Time `json:"failed,omitempty"`
	// progress of this task's completion
	// progress may not be stored, but instead kept ephemerally
	Progress *Progress `json:"progress,omitempty"`
}

Task represents the storable state of a task. Note this is not the "task" itself (the function that will be called to do the actual work associated with a task) but the state associated with performing a task. Task holds the type of work to be done, parameters to configure the work to be done, and the status of that work. different types of "work" are done by implementing the Taskable interface specified in taskdef.go lots of the methods on Task overlap with Taskable, this is on purpose, as Task wraps phases of task completion to track the state of a task

func ReadTasks

func ReadTasks(store datastore.Datastore, orderby string, limit, offset int) ([]*Task, error)

ReadTasks reads a list of tasks from store

func TaskFromDelivery

func TaskFromDelivery(store datastore.Datastore, msg amqp.Delivery) (*Task, error)

TaskFromDelivery reads a task from store based on an amqp.Delivery message

func (Task) DatastoreType

func (t Task) DatastoreType() string

DatastoreType is to fulfill the sql_datastore.Model interface It distinguishes "Task" as a storable type. "Task" is not (yet) intended for use outside of Datatogether servers.

func (*Task) Delete

func (t *Task) Delete(store datastore.Datastore) error

func (*Task) Do

func (task *Task) Do(store datastore.Datastore, tc chan *Task) error

Do performs the

func (*Task) Enqueue

func (task *Task) Enqueue(store datastore.Datastore, amqpurl string) error

Enqueue adds a task to the queue located at ampqurl, writing creates/updates for the task to the given store

func (Task) GetId

func (t Task) GetId() string

GetId returns a task's cannoncial identifier

func (Task) Key

func (t Task) Key() datastore.Key

Key is to fulfill the sql_datastore.Model interface

func (*Task) NewSQLModel

func (t *Task) NewSQLModel(key datastore.Key) sql_datastore.Model

func (*Task) PubSubChannelName

func (t *Task) PubSubChannelName() string

func (*Task) QueueMsg

func (t *Task) QueueMsg() (amqp.Publishing, error)

QueueMsg formats the task as an amqp.Publishing message for adding to a queue

func (*Task) Read

func (t *Task) Read(store datastore.Datastore) error

func (*Task) SQLParams

func (t *Task) SQLParams(cmd sql_datastore.Cmd) []interface{}

func (*Task) SQLQuery

func (t *Task) SQLQuery(cmd sql_datastore.Cmd) string

func (*Task) Save

func (t *Task) Save(store datastore.Datastore) (err error)

func (*Task) StatusString

func (t *Task) StatusString() string

StatusString returns a string representation of the status of a task based on the state of it's date stamps

func (*Task) UnmarshalSQL

func (t *Task) UnmarshalSQL(row sqlutil.Scannable) error

type TaskRequests

type TaskRequests struct {
	// url to amqp server for enqueuing tasks, only required
	// to fullfill requests, not submit them
	AmqpUrl string
	// Store to read / write tasks to only required
	// to fulfill requests, not submit them
	Store datastore.Datastore
}

TaskRequests encapsulates all types of requests that can be made in relation to tasks, to be made available for RPC calls. TODO - should this internal state be moved into the package level via package-level setter funcs?

func (TaskRequests) Enqueue

func (r TaskRequests) Enqueue(params *TasksEnqueueParams, task *Task) (err error)

Add a task to the queue for completion

func (TaskRequests) Get

func (t TaskRequests) Get(args *TasksGetParams, res *Task) (err error)

func (TaskRequests) List

func (t TaskRequests) List(args *TasksListParams, res *[]*Task) (err error)

type Taskable

type Taskable interface {
	// are these task params valid? return error if not
	// this func will be called before adding the task to
	// the queue, and won't be added on failure.
	Valid() error
	// Do the task, returning incremental progress updates
	// it's expected that the func will send either
	// p.Done == true or p.Error != nil at least once
	// to signal that the task is either done or errored
	Do(updates chan Progress)
}

Taskable anything that fits on a task queue, it is a type of "work" that can be performed. Lots of things

func NewTaskable

func NewTaskable(name string) (Taskable, error)

NewTaskable generates a new Taskable instance from the registered types

type TasksEnqueueParams

type TasksEnqueueParams struct {
	// Title of the task
	// Requesters should generate their own task title for now
	// tasks currently have no way of generating a sensible default title
	Title string
	// Type of task to perform
	Type string
	// User that initiated the request
	UserId string
	// Parameters to feed to the task
	Params map[string]interface{}
}

TasksEnqueueParams are for enqueing a task.

type TasksGetParams

type TasksGetParams struct {
	Id string
}

Get a single Task, currently only lookup by ID is supported

type TasksListParams

type TasksListParams struct {
	OrderBy string
	Limit   int
	Offset  int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL