crew

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2023 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildRestApi

func BuildRestApi(e *echo.Echo, prefix string, controller *TaskController, authMiddleware echo.MiddlewareFunc, loginFunc func(c echo.Context) error, inShutdown *bool, watchers map[string]TaskGroupWatcher)

func ServeRestApi

func ServeRestApi(wg *sync.WaitGroup, controller *TaskController, authMiddleware echo.MiddlewareFunc, loginFunc func(c echo.Context) error) (*http.Server, *echo.Echo)

ServeRestApi starts the REST API server. wg: A waitgroup that the server can use to signal when it is done. controller: The root task controller to use to manage all tasks and task groups. authMiddleware: The echo middleware function that will be used to authenticate API calls. loginFunc: The function that will be used to handle login requests.

Types

type ChildTask

type ChildTask struct {
	Id                  string      `json:"id"`
	Name                string      `json:"name"`
	Worker              string      `json:"worker"`
	Workgroup           string      `json:"workgroup"`
	Key                 string      `json:"key"`
	RemainingAttempts   int         `json:"remainingAttempts"`
	IsPaused            bool        `json:"isPaused"`
	RunAfter            time.Time   `json:"runAfter"`
	ErrorDelayInSeconds int         `json:"errorDelayInSeconds"`
	Input               interface{} `json:"input"`
	ParentIds           []string    `json:"parentIds"`
}

type HttpPostClient

type HttpPostClient struct {
	UrlForTask func(task *Task) (url string, err error) `json:"-"`
}

HttpPostClient delivers tasks to workers via http post.

func NewHttpPostClient

func NewHttpPostClient() *HttpPostClient

NewHttpPostClient creates a new HttpPostClient.

func (*HttpPostClient) Post

func (client *HttpPostClient) Post(task *Task, parents []*Task) (response WorkerResponse, err error)

Post delivers a task to a worker.

type MemoryTaskStorage

type MemoryTaskStorage struct {
	// contains filtered or unexported fields
}

MemoryTaskStorage is a task storage that only stores state in memory.

func NewMemoryTaskStorage

func NewMemoryTaskStorage() *MemoryTaskStorage

NewMemoryTaskStorage creates a new MemoryTaskStorage.

func (*MemoryTaskStorage) AllTaskGroups

func (storage *MemoryTaskStorage) AllTaskGroups() (taskGroups []*TaskGroup, err error)

All TaskGroups returns all task groups.

func (*MemoryTaskStorage) AllTasksInGroup

func (storage *MemoryTaskStorage) AllTasksInGroup(taskGroupId string) (tasks []*Task, err error)

All AllTasksInGroup returns all tasks within a group.

func (*MemoryTaskStorage) DeleteTask

func (storage *MemoryTaskStorage) DeleteTask(taskId string) (err error)

Delete task deletes a task by task id.

func (*MemoryTaskStorage) DeleteTaskGroup

func (storage *MemoryTaskStorage) DeleteTaskGroup(taskGroupId string) (err error)

DeleteTaskGroup deletes a task group by task group id.

func (*MemoryTaskStorage) FindTask

func (storage *MemoryTaskStorage) FindTask(taskId string) (task *Task, err error)

FindTask finds a task by task group id and task id.

func (*MemoryTaskStorage) FindTaskGroup

func (storage *MemoryTaskStorage) FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error)

FindTaskGroup finds a task group by task group id.

func (*MemoryTaskStorage) GetTaskChildren

func (storage *MemoryTaskStorage) GetTaskChildren(taskId string) (tasks []*Task, err error)

GetTaskChildren returns the children of a task.

func (*MemoryTaskStorage) GetTaskParents

func (storage *MemoryTaskStorage) GetTaskParents(taskId string) (tasks []*Task, err error)

GetTaskParents returns the parents of a task.

func (*MemoryTaskStorage) GetTasksInWorkgroup

func (storage *MemoryTaskStorage) GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error)

func (*MemoryTaskStorage) GetTasksWithKey

func (storage *MemoryTaskStorage) GetTasksWithKey(key string) (tasks []*Task, err error)

func (*MemoryTaskStorage) SaveTask

func (storage *MemoryTaskStorage) SaveTask(task *Task, create bool) (err error)

SaveTask saves a task.

func (*MemoryTaskStorage) SaveTaskGroup

func (storage *MemoryTaskStorage) SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error)

SaveTaskGroup doesn't do anything for memory storage.

func (*MemoryTaskStorage) TryLockTask

func (storage *MemoryTaskStorage) TryLockTask(taskId string) (unlocker func() error, err error)

type RedisTaskStorage

type RedisTaskStorage struct {
	Client  *goredislib.Client
	RedSync *redsync.Redsync
}

RedisTaskStorage stores tasks in the filesystem as JSON files.

func NewRedisTaskStorage

func NewRedisTaskStorage(Addr string, Password string, DB int) *RedisTaskStorage

NewRedisTaskStorage creates a new RedisTaskStorage.

func (*RedisTaskStorage) AllTaskGroups

func (storage *RedisTaskStorage) AllTaskGroups() (taskGroups []*TaskGroup, err error)

All TaskGroups returns all task groups.

func (*RedisTaskStorage) AllTasksInGroup

func (storage *RedisTaskStorage) AllTasksInGroup(taskGroupId string) (tasks []*Task, err error)

All AllTasksInGroup returns all tasks within a group.

func (*RedisTaskStorage) AllTasksInList

func (storage *RedisTaskStorage) AllTasksInList(path string) (tasks []*Task, err error)

func (*RedisTaskStorage) DeleteTask

func (storage *RedisTaskStorage) DeleteTask(taskId string) (err error)

Delete task deletes a task by task id.

func (*RedisTaskStorage) DeleteTaskGroup

func (storage *RedisTaskStorage) DeleteTaskGroup(taskGroupId string) (err error)

DeleteTaskGroup deletes a task group by task group id.

func (*RedisTaskStorage) FindTask

func (storage *RedisTaskStorage) FindTask(taskId string) (task *Task, err error)

FindTask finds a task by task group id and task id.

func (*RedisTaskStorage) FindTaskAtPath

func (storage *RedisTaskStorage) FindTaskAtPath(path string) (task *Task, err error)

func (*RedisTaskStorage) FindTaskGroup

func (storage *RedisTaskStorage) FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error)

FindTaskGroup finds a task group by task group id.

func (*RedisTaskStorage) FindTaskGroupAtPath

func (storage *RedisTaskStorage) FindTaskGroupAtPath(path string) (taskGroup *TaskGroup, err error)

func (*RedisTaskStorage) GetExpiration

func (storage *RedisTaskStorage) GetExpiration() time.Duration

func (*RedisTaskStorage) GetLockExpiration

func (storage *RedisTaskStorage) GetLockExpiration() time.Duration

func (*RedisTaskStorage) GetTaskChildren

func (storage *RedisTaskStorage) GetTaskChildren(taskId string) (tasks []*Task, err error)

GetTaskChildren returns the children of a task.

func (*RedisTaskStorage) GetTaskParents

func (storage *RedisTaskStorage) GetTaskParents(taskId string) (tasks []*Task, err error)

GetTaskParents returns the parents of a task.

func (*RedisTaskStorage) GetTasksInWorkgroup

func (storage *RedisTaskStorage) GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error)

func (*RedisTaskStorage) GetTasksWithKey

func (storage *RedisTaskStorage) GetTasksWithKey(key string) (tasks []*Task, err error)

func (*RedisTaskStorage) SaveTask

func (storage *RedisTaskStorage) SaveTask(task *Task, create bool) (err error)

SaveTask saves a task.

func (*RedisTaskStorage) SaveTaskGroup

func (storage *RedisTaskStorage) SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error)

SaveTaskGroup doesn't do anything for memory storage.

func (*RedisTaskStorage) TaskGroupKey

func (storage *RedisTaskStorage) TaskGroupKey(taskGroupId string) string

TaskGroupKey returns the key for a task group.

func (*RedisTaskStorage) TaskGroupsPrefix

func (storage *RedisTaskStorage) TaskGroupsPrefix() string

func (*RedisTaskStorage) TaskKey

func (storage *RedisTaskStorage) TaskKey(taskId string) string

TaskKey returns the key for a task.

func (*RedisTaskStorage) TaskMutexKey

func (storage *RedisTaskStorage) TaskMutexKey(taskId string) string

TaskMutexKey returns the key for a task's lock.

func (*RedisTaskStorage) TryLockTask

func (storage *RedisTaskStorage) TryLockTask(taskId string) (unlocker func() error, err error)

type Task

type Task struct {
	Id                  string      `json:"id"`
	TaskGroupId         string      `json:"taskGroupId"`
	Name                string      `json:"name"`
	Worker              string      `json:"worker"`
	Workgroup           string      `json:"workgroup"`
	Key                 string      `json:"key"`
	RemainingAttempts   int         `json:"remainingAttempts"`
	IsPaused            bool        `json:"isPaused"`
	IsComplete          bool        `json:"isComplete"`
	RunAfter            time.Time   `json:"runAfter"`
	IsSeed              bool        `json:"isSeed"`
	ErrorDelayInSeconds int         `json:"errorDelayInSeconds"`
	Input               interface{} `json:"input"`
	Output              interface{} `json:"output"`
	Errors              []string    `json:"errors"`
	CreatedAt           time.Time   `json:"createdAt"`
	ParentIds           []string    `json:"parentIds"`
	BusyExecuting       bool        `json:"busyExecuting"`
	Storage             TaskStorage `json:"-"`
}

A Task represents a unit of work that can be completed by a worker. IMPORTANT! If you change task's fields, also update Task.ts in crew-go-javascript

func NewTask

func NewTask() *Task

NewTask creates a new Task.

func (*Task) CanExecute

func (task *Task) CanExecute(parents []*Task) bool

CanExecute determines if a Task is in a state where it can be executed.

type TaskClient

type TaskClient interface {
	Post(task *Task, parents []*Task) (response WorkerResponse, err error)
}

TaskClient defines the interface for delivering tasks to workers.

type TaskController

type TaskController struct {
	Storage                 TaskStorage
	Client                  TaskClient
	Feed                    chan interface{}
	Throttler               *Throttler
	Pending                 *sync.WaitGroup
	AbandonedCheckScheduler *gocron.Scheduler
	AbandonedCheckMutex     *sync.Mutex
}

TaskGroup represents a group of tasks.

func NewTaskController

func NewTaskController(storage TaskStorage, client TaskClient, throttler *Throttler) *TaskController

NewTaskController returns a new TaskController.

func (*TaskController) CreateTask

func (controller *TaskController) CreateTask(task *Task) (err error)

func (*TaskController) CreateTaskGroup

func (controller *TaskController) CreateTaskGroup(taskGroup *TaskGroup) (err error)

func (*TaskController) DeleteTask

func (controller *TaskController) DeleteTask(id string) (err error)

func (*TaskController) DeleteTaskGroup

func (controller *TaskController) DeleteTaskGroup(id string) (err error)

func (*TaskController) EmitTaskFeedEvent

func (controller *TaskController) EmitTaskFeedEvent(event string, task *Task)

func (*TaskController) EmitTaskGroupFeedEvent

func (controller *TaskController) EmitTaskGroupFeedEvent(event string, taskGroup *TaskGroup)

func (*TaskController) Evaluate

func (controller *TaskController) Evaluate(task *Task)

func (*TaskController) Execute

func (controller *TaskController) Execute(taskToExecute *Task)

func (*TaskController) GetTask

func (controller *TaskController) GetTask(id string) (task *Task, err error)

func (*TaskController) GetTaskGroup

func (controller *TaskController) GetTaskGroup(id string) (taskGroup *TaskGroup, err error)

func (*TaskController) GetTaskGroupProgress

func (controller *TaskController) GetTaskGroupProgress(id string) (completedPercent float64, err error)

func (*TaskController) GetTaskGroups

func (controller *TaskController) GetTaskGroups(page int, pageSize int, search string) (taskGroups []*TaskGroup, total int, err error)

func (*TaskController) GetTasksInGroup

func (controller *TaskController) GetTasksInGroup(taskGroupId string, page int, pageSize int, search string, skipCompleted bool) (tasks []*Task, total int, err error)

func (*TaskController) HandleExecuteError

func (controller *TaskController) HandleExecuteError(task *Task, message string)

func (*TaskController) PauseOrResumeTaskGroup

func (controller *TaskController) PauseOrResumeTaskGroup(id string, isPaused bool) (err error)

func (*TaskController) ResetTask

func (controller *TaskController) ResetTask(task *Task, remainingAttempts int)

func (*TaskController) ResetTaskById

func (controller *TaskController) ResetTaskById(id string, remainingAttempts int) (task *Task, err error)

func (*TaskController) ResetTaskGroup

func (controller *TaskController) ResetTaskGroup(id string, remainingAttempts int) (err error)

func (*TaskController) RetryTaskById

func (controller *TaskController) RetryTaskById(id string, remainingAttempts int) (task *Task, err error)

func (*TaskController) RetryTaskGroup

func (controller *TaskController) RetryTaskGroup(id string, remainingAttempts int) (err error)

func (*TaskController) Shutdown

func (controller *TaskController) Shutdown() (err error)

func (*TaskController) Startup

func (controller *TaskController) Startup() (err error)

func (*TaskController) TriggerTaskEvaluate

func (controller *TaskController) TriggerTaskEvaluate(id string) (err error)

func (*TaskController) UpdateTask

func (controller *TaskController) UpdateTask(id string, update map[string]interface{}) (updatedTask *Task, err error)

func (*TaskController) UpdateTaskGroup

func (controller *TaskController) UpdateTaskGroup(id string, update map[string]interface{}) (taskGroup *TaskGroup, err error)

type TaskFeedEvent

type TaskFeedEvent struct {
	Event string `json:"type"`
	Task  *Task  `json:"task"`
}

type TaskGroup

type TaskGroup struct {
	Id        string    `json:"id"`
	Name      string    `json:"name"`
	CreatedAt time.Time `json:"createdAt"`
}

TaskGroup represents a group of tasks. IMPORTANT! If you change task's fields, also update TaskGroup.ts in crew-go-javascript

func NewTaskGroup

func NewTaskGroup(id string, name string) *TaskGroup

NewTaskGroup creates a new TaskGroup.

type TaskGroupFeedEvent

type TaskGroupFeedEvent struct {
	Event     string     `json:"type"`
	TaskGroup *TaskGroup `json:"taskGroup"`
}

type TaskGroupWatcher

type TaskGroupWatcher struct {
	TaskGroupId string
	Channel     chan string
	RequestId   string
	Socket      *websocket.Conn
}

TaskGroupWatcher is used to collect events from the task group controller and deliver them to a websocket.

type TaskStorage

type TaskStorage interface {
	SaveTask(task *Task, create bool) (err error)
	FindTask(taskId string) (task *Task, err error)
	TryLockTask(taskId string) (unlocker func() error, err error)
	// UnlockTask(taskId string) (err error)
	DeleteTask(taskId string) (err error)
	GetTaskChildren(taskId string) (tasks []*Task, err error)
	GetTaskParents(taskId string) (tasks []*Task, err error)
	GetTasksInWorkgroup(workgroup string) (tasks []*Task, err error)
	GetTasksWithKey(key string) (tasks []*Task, err error)

	SaveTaskGroup(taskGroup *TaskGroup, create bool) (err error)
	AllTaskGroups() (taskGroups []*TaskGroup, err error)
	AllTasksInGroup(taskGroupId string) (tasks []*Task, err error)
	FindTaskGroup(taskGroupId string) (taskGroup *TaskGroup, err error)
	DeleteTaskGroup(taskGroupId string) (err error)
}

TaskStorage defines the methods required for implementing crew's task storage interface.

type ThrottlePopQuery

type ThrottlePopQuery struct {
	TaskId string
	Worker string
}

ThrottlePopQuery is a request to the throttler to notify that a worker is done.

type ThrottlePushQuery

type ThrottlePushQuery struct {
	TaskId string
	Worker string
	Resp   chan bool
}

A ThrottlePushQuery is a request to the throttler to see if there is enough bandwidth for a worker to run.

type Throttler

type Throttler struct {
	Push chan ThrottlePushQuery
	Pop  chan ThrottlePopQuery
}

type WorkerPayload

type WorkerPayload struct {
	Input   interface{}                 `json:"input"`
	Worker  string                      `json:"worker"`
	Parents []WorkerPayloadParentResult `json:"parents"`
	TaskId  string                      `json:"taskId"`
}

WorkerPayload defines the input sent to a worker (post body).

type WorkerPayloadParentResult

type WorkerPayloadParentResult struct {
	TaskId string      `json:"taskId"`
	Worker string      `json:"worker"`
	Input  interface{} `json:"input"`
	Output interface{} `json:"output"`
}

WorkerPayloadParentResult defines the schema for output from a worker.

type WorkerResponse

type WorkerResponse struct {
	Output                  interface{}  `json:"output"`
	Children                []*ChildTask `json:"children"`
	WorkgroupDelayInSeconds int          `json:"workgroupDelayInSeconds"`
	ChildrenDelayInSeconds  int          `json:"childrenDelayInSeconds"`
	Error                   interface{}  `json:"error"`
}

WorkerResponse defines the schema of output returned from workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL