task

package
v0.0.0-...-2f8753c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2019 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WAITING  = "waiting"
	RUNNING  = "running"
	COMPLETE = "complete"
)

Variables

This section is empty.

Functions

func ClearDAGState

func ClearDAGState(RootRunner TaskRunner)

ClearDAGState ...

func CreateAndSetTaskParamsFromHash

func CreateAndSetTaskParamsFromHash(tr TaskRunner, paramHash string) error

CreateAndSetTaskParamsFromHash ....

func CreateTaskRunnerFromParams

func CreateTaskRunnerFromParams(tr TaskRunner, params []*TaskParam) error

CreateTaskRunnerFromParams ... Given Params and a TaskRunner, sets all TaskRunner fields marked as param Note: The struct satisfying the TaskRunner interface MUST be passed to this function as a reference. See these articles for a more thorough explanation: https://stackoverflow.com/questions/6395076/using-reflect-how-do-you-set-the-value-of-a-struct-field http://speakmy.name/2014/09/14/modifying-interfaced-go-struct/

func ResetDAGResultChannels

func ResetDAGResultChannels(RootRunner TaskRunner)

ResetDAGResultChannels ... Need to recreate result channels after each scheduler run, because they are close in RunTaskRunner

func RunTaskRunner

func RunTaskRunner(tRunner TaskRunner, wg *sync.WaitGroup, TokenReturn chan struct{})

RunTaskRunner ... Runs a TaskRunner, sets state and notifies waiting group when run is done

func SetParents

func SetParents(tRunner TaskRunner)

func SetTaskPriorities

func SetTaskPriorities(rootTask *Task) error

SetTaskPriorities ...

Types

type Task

type Task struct {
	Name           string
	Children       []TaskRunner
	Parent         TaskRunner
	ResultsChannel chan string
	WorkerTokens   chan struct{}
	State          string
	Priority       int
	Params         []*TaskParam

	DataProcessed int
	Logger        *log.Logger
	// contains filtered or unexported fields
}

Task ...

func NewTask

func NewTask(name string) *Task

NewTask ...

func (*Task) AddChildren

func (ts *Task) AddChildren(children ...TaskRunner) []TaskRunner

AddChildren ...

func (*Task) End

func (ts *Task) End() time.Time

End ...

func (*Task) GetHash

func (ts *Task) GetHash() string

GetHash ... Returns a hash representation of the task. The elements that comprise the hash are

  1. Task.Name

  2. Serialized Task Params

  3. All Child Serialized Params

    These elements are joined together and hashed, which creates a fairly unique value. This is used primarily to rebuild a TaskRunner from its metadata table or determine whether two task DAGs are equal.

func (*Task) GetSerializedParams

func (ts *Task) GetSerializedParams() string
Given a Task, return a serialized string to represent it.

This is useful for storing the state of each Task that is run in the dag, and can

   be used to re-create a previously run task for DAG re-runs, backfills, or some other
   use case.

   Example:

    type TestTask struct {
	    *Task
	    N int    `task_param:""`
	    X string `task_param:""`
	    Z int
    }

    new_test_task := TestTask{
	5,
	"HI",
	0,
    }
    serialized_test_task := new_test_task.GetSerializedParams()
    fmt.Println(serialized_test_task)
    > N:INT:5_X:STR:HI

    This function uses a lot of reflection / is kind of tricky. Would be good to document
    exactly how this works because I constantly forget what these variables mean :P

func (*Task) SetEnd

func (ts *Task) SetEnd(e time.Time)

func (*Task) SetStart

func (ts *Task) SetStart(s time.Time)

func (*Task) SetState

func (ts *Task) SetState(newState string) (string, error)

SetState ...

func (*Task) Start

func (ts *Task) Start() time.Time

Start ...

type TaskParam

type TaskParam struct {
	Name string
	Data reflect.Value
}

func CreateAndSetTaskParams

func CreateAndSetTaskParams(tr TaskRunner) ([]*TaskParam, error)

CreateAndSetTaskParams ... Uses reflection to inspect struct elements for 'task_param' tag and sets tr.Task.Params accordingly

func DeserializeTaskParams

func DeserializeTaskParams(serializedTaskParams string) ([]*TaskParam, error)

type TaskRunner

type TaskRunner interface {

	// TODO: have Run() return an error, which could be given to scheduler
	Run()

	// requires TaskRuner to have an embedded Task
	GetTask() *Task
}

TaskRunner ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL