tork

package module
v0.1.73 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: MIT Imports: 3 Imported by: 1

README

tork

FeaturesInstallationDocumentationQuick StartREST APIWeb UI

Tork is a highly-scalable, general-purpose workflow engine.

Features:

tork

Documentation

See tork.run for the full documentation.

Quick Start

  1. Ensure you have Docker with API Version >= 1.42 (use docker version | grep API to check).

  2. Download the binary for your system from the releases page.

Hello World

Start in standalone mode:

./tork run standalone

Submit a job in another terminal:

# hello.yaml
---
name: hello job
tasks:
  - name: say hello
    image: ubuntu:mantic #docker image
    run: |
      echo -n hello world
  - name: say goodbye
    image: ubuntu:mantic
    run: |
      echo -n bye world
JOB_ID=$(curl \
  -s \
  -X POST \
  --data-binary @hello.yaml \
  -H "Content-type: text/yaml" \
  http://localhost:8000/jobs | jq -r .id)

Query for the status of the job:

curl -s http://localhost:8000/jobs/$JOB_ID | jq .

{
  "id": "ed0dba93d262492b8cf26e6c1c4f1c98",
  "state": "COMPLETED",
  ...
  "execution": [
    {
      ...
      "state": "COMPLETED",
    }
  ],
}

A slightly more interesting example

The following job:

  1. Downloads a remote video file using a pre task to a shared /tmp volume.
  2. Converts the first 5 seconds of the downloaded video using ffmpeg.
  3. Uploads the converted video to a destination using a post task.
# convert.yaml
---
name: convert a video
inputs:
  source: https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv
tasks:
  - name: convert the first 5 seconds of a video
    image: jrottenberg/ffmpeg:3.4-alpine
    run: |
      ffmpeg -i /tmp/input.ogv -t 5 /tmp/output.mp4
    mounts:
      - type: volume
        target: /tmp
    pre:
      - name: download the remote file
        image: alpine:3.18.3
        env:
          SOURCE_URL: "{{ inputs.source }}"
        run: |
          wget \
          $SOURCE_URL \
          -O /tmp/input.ogv
    post:
      - name: upload the converted file
        image: alpine:3.18.3
        run: |
          wget \
          --post-file=/tmp/output.mp4 \
          https://devnull-as-a-service.com/dev/null

Submit the job in another terminal:

JOB_ID=$(curl \
  -s \
  -X POST \
  --data-binary @convert.yaml \
  -H "Content-type: text/yaml" \
  http://localhost:8000/jobs | jq -r .id)

More examples

Check out the examples folder.

REST API

See the REST API documentation.

Swagger Docs

Make sure you have CORS configured in your config file:

[middleware.web.cors]
enabled = true

Start Tork in standalone or coordinator mode.

go run cmd/main.go run standalone

Serve the Swagger Docs

docker compose up -d swagger

Visit http://localhost:9000

Web UI

Tork Web is a web based tool for interacting with Tork.

Web UI

License

Copyright (c) 2023-present Arik Cohen. Tork is free and open-source software licensed under the MIT License.

Documentation

Index

Constants

View Source
const (
	MountTypeVolume string = "volume"
	MountTypeBind   string = "bind"
	MountTypeTmpfs  string = "tmpfs"
)
View Source
const (
	Version = "0.1.73"
)

Variables

View Source
var (
	GitCommit string = "develop"
)
View Source
var HEARTBEAT_RATE = time.Second * 30
View Source
var LAST_HEARTBEAT_TIMEOUT = time.Minute * 5

Functions

This section is empty.

Types

type EachTask

type EachTask struct {
	Var         string `json:"var,omitempty"`
	List        string `json:"list,omitempty"`
	Task        *Task  `json:"task,omitempty"`
	Size        int    `json:"size,omitempty"`
	Completions int    `json:"completions,omitempty"`
}

func (*EachTask) Clone

func (e *EachTask) Clone() *EachTask

type Job

type Job struct {
	ID          string            `json:"id,omitempty"`
	ParentID    string            `json:"parentId,omitempty"`
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	State       JobState          `json:"state,omitempty"`
	CreatedAt   time.Time         `json:"createdAt,omitempty"`
	StartedAt   *time.Time        `json:"startedAt,omitempty"`
	CompletedAt *time.Time        `json:"completedAt,omitempty"`
	FailedAt    *time.Time        `json:"failedAt,omitempty"`
	Tasks       []*Task           `json:"tasks"`
	Execution   []*Task           `json:"execution"`
	Position    int               `json:"position"`
	Inputs      map[string]string `json:"inputs,omitempty"`
	Context     JobContext        `json:"context,omitempty"`
	TaskCount   int               `json:"taskCount,omitempty"`
	Output      string            `json:"output,omitempty"`
	Result      string            `json:"result,omitempty"`
	Error       string            `json:"error,omitempty"`
	Defaults    *JobDefaults      `json:"defaults,omitempty"`
	Webhooks    []*Webhook        `json:"webhooks,omitempty"`
}

func (*Job) Clone

func (j *Job) Clone() *Job

type JobContext

type JobContext struct {
	Job    map[string]string `json:"job,omitempty"`
	Inputs map[string]string `json:"inputs,omitempty"`
	Tasks  map[string]string `json:"tasks,omitempty"`
}

func (JobContext) AsMap

func (c JobContext) AsMap() map[string]any

func (JobContext) Clone

func (c JobContext) Clone() JobContext

type JobDefaults added in v0.1.13

type JobDefaults struct {
	Retry   *TaskRetry  `json:"retry,omitempty"`
	Limits  *TaskLimits `json:"limits,omitempty"`
	Timeout string      `json:"timeout,omitempty"`
	Queue   string      `json:"queue,omitempty"`
}

func (*JobDefaults) Clone added in v0.1.13

func (d *JobDefaults) Clone() *JobDefaults

type JobMetrics added in v0.1.5

type JobMetrics struct {
	Running int `json:"running"`
}

type JobState

type JobState string
const (
	JobStatePending   JobState = "PENDING"
	JobStateScheduled JobState = "SCHEDULED"
	JobStateRunning   JobState = "RUNNING"
	JobStateCancelled JobState = "CANCELLED"
	JobStateCompleted JobState = "COMPLETED"
	JobStateFailed    JobState = "FAILED"
	JobStateRestart   JobState = "RESTART"
)

type JobSummary added in v0.1.6

type JobSummary struct {
	ID          string            `json:"id,omitempty"`
	ParentID    string            `json:"parentId,omitempty"`
	Inputs      map[string]string `json:"inputs,omitempty"`
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	State       JobState          `json:"state,omitempty"`
	CreatedAt   time.Time         `json:"createdAt,omitempty"`
	StartedAt   *time.Time        `json:"startedAt,omitempty"`
	CompletedAt *time.Time        `json:"completedAt,omitempty"`
	FailedAt    *time.Time        `json:"failedAt,omitempty"`
	Position    int               `json:"position"`
	TaskCount   int               `json:"taskCount,omitempty"`
	Result      string            `json:"result,omitempty"`
	Error       string            `json:"error,omitempty"`
}

func NewJobSummary added in v0.1.6

func NewJobSummary(j *Job) *JobSummary

type Metrics added in v0.1.5

type Metrics struct {
	Jobs  JobMetrics  `json:"jobs"`
	Tasks TaskMetrics `json:"tasks"`
	Nodes NodeMetrics `json:"nodes"`
}

type Mount added in v0.1.14

type Mount struct {
	Type   string `json:"type,omitempty"`
	Source string `json:"source,omitempty"`
	Target string `json:"target,omitempty"`
}

type Node

type Node struct {
	ID              string     `json:"id,omitempty"`
	Name            string     `json:"name,omitempty"`
	StartedAt       time.Time  `json:"startedAt,omitempty"`
	CPUPercent      float64    `json:"cpuPercent,omitempty"`
	LastHeartbeatAt time.Time  `json:"lastHeartbeatAt,omitempty"`
	Queue           string     `json:"queue,omitempty"`
	Status          NodeStatus `json:"status,omitempty"`
	Hostname        string     `json:"hostname,omitempty"`
	TaskCount       int        `json:"taskCount,omitempty"`
	Version         string     `json:"version"`
}

func (*Node) Clone added in v0.1.4

func (n *Node) Clone() *Node

type NodeMetrics added in v0.1.5

type NodeMetrics struct {
	Running    int     `json:"online"`
	CPUPercent float64 `json:"cpuPercent"`
}

type NodeStatus

type NodeStatus string
const (
	NodeStatusUP      NodeStatus = "UP"
	NodeStatusDown    NodeStatus = "DOWN"
	NodeStatusOffline NodeStatus = "OFFLINE"
)

type ParallelTask

type ParallelTask struct {
	Tasks       []*Task `json:"tasks,omitempty"`
	Completions int     `json:"completions,omitempty"`
}

func (*ParallelTask) Clone

func (p *ParallelTask) Clone() *ParallelTask

type Registry added in v0.1.11

type Registry struct {
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

func (*Registry) Clone added in v0.1.11

func (r *Registry) Clone() *Registry

type SubJobTask

type SubJobTask struct {
	ID          string            `json:"id,omitempty"`
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	Tasks       []*Task           `json:"tasks,omitempty"`
	Inputs      map[string]string `json:"inputs,omitempty"`
	Output      string            `json:"output,omitempty"`
	Detached    bool              `json:"detached,omitempty"`
	Webhooks    []*Webhook        `json:"webhooks,omitempty"`
}

func (*SubJobTask) Clone

func (s *SubJobTask) Clone() *SubJobTask

type Task

type Task struct {
	ID          string            `json:"id,omitempty"`
	JobID       string            `json:"jobId,omitempty"`
	ParentID    string            `json:"parentId,omitempty"`
	Position    int               `json:"position,omitempty"`
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	State       TaskState         `json:"state,omitempty"`
	CreatedAt   *time.Time        `json:"createdAt,omitempty"`
	ScheduledAt *time.Time        `json:"scheduledAt,omitempty"`
	StartedAt   *time.Time        `json:"startedAt,omitempty"`
	CompletedAt *time.Time        `json:"completedAt,omitempty"`
	FailedAt    *time.Time        `json:"failedAt,omitempty"`
	CMD         []string          `json:"cmd,omitempty"`
	Entrypoint  []string          `json:"entrypoint,omitempty"`
	Run         string            `json:"run,omitempty"`
	Image       string            `json:"image,omitempty"`
	Registry    *Registry         `json:"registry,omitempty"`
	Env         map[string]string `json:"env,omitempty"`
	Files       map[string]string `json:"files,omitempty"`
	Queue       string            `json:"queue,omitempty"`
	Error       string            `json:"error,omitempty"`
	Pre         []*Task           `json:"pre,omitempty"`
	Post        []*Task           `json:"post,omitempty"`
	Mounts      []Mount           `json:"mounts,omitempty"`
	Networks    []string          `json:"networks,omitempty"`
	NodeID      string            `json:"nodeId,omitempty"`
	Retry       *TaskRetry        `json:"retry,omitempty"`
	Limits      *TaskLimits       `json:"limits,omitempty"`
	Timeout     string            `json:"timeout,omitempty"`
	Result      string            `json:"result,omitempty"`
	Var         string            `json:"var,omitempty"`
	If          string            `json:"if,omitempty"`
	Parallel    *ParallelTask     `json:"parallel,omitempty"`
	Each        *EachTask         `json:"each,omitempty"`
	SubJob      *SubJobTask       `json:"subjob,omitempty"`
	GPUs        string            `json:"gpus,omitempty"`
	Tags        []string          `json:"tags,omitempty"`
	Workdir     string            `json:"workdir,omitempty"`
}

Task is the basic unit of work that a Worker can handle.

func CloneTasks

func CloneTasks(tasks []*Task) []*Task

func (*Task) Clone

func (t *Task) Clone() *Task

type TaskLimits

type TaskLimits struct {
	CPUs   string `json:"cpus,omitempty"`
	Memory string `json:"memory,omitempty"`
}

func (*TaskLimits) Clone

func (l *TaskLimits) Clone() *TaskLimits

type TaskLogPart added in v0.1.63

type TaskLogPart struct {
	Number    int        `json:"number,omitempty"`
	TaskID    string     `json:"taskId,omitempty"`
	Contents  string     `json:"contents,omitempty"`
	CreatedAt *time.Time `json:"createdAt,omitempty"`
}

type TaskMetrics added in v0.1.5

type TaskMetrics struct {
	Running int `json:"running"`
}

type TaskRetry

type TaskRetry struct {
	Limit    int `json:"limit,omitempty"`
	Attempts int `json:"attempts,omitempty"`
}

func (*TaskRetry) Clone

func (r *TaskRetry) Clone() *TaskRetry

type TaskState

type TaskState string

State defines the list of states that a task can be in, at any given moment.

const (
	TaskStatePending   TaskState = "PENDING"
	TaskStateScheduled TaskState = "SCHEDULED"
	TaskStateRunning   TaskState = "RUNNING"
	TaskStateCancelled TaskState = "CANCELLED"
	TaskStateStopped   TaskState = "STOPPED"
	TaskStateCompleted TaskState = "COMPLETED"
	TaskStateFailed    TaskState = "FAILED"
	TaskStateSkipped   TaskState = "SKIPPED"
)

func (TaskState) IsActive

func (s TaskState) IsActive() bool

type TaskSummary added in v0.1.62

type TaskSummary struct {
	ID          string     `json:"id,omitempty"`
	JobID       string     `json:"jobId,omitempty"`
	Position    int        `json:"position,omitempty"`
	Name        string     `json:"name,omitempty"`
	Description string     `json:"description,omitempty"`
	State       TaskState  `json:"state,omitempty"`
	CreatedAt   *time.Time `json:"createdAt,omitempty"`
	ScheduledAt *time.Time `json:"scheduledAt,omitempty"`
	StartedAt   *time.Time `json:"startedAt,omitempty"`
	CompletedAt *time.Time `json:"completedAt,omitempty"`
	Error       string     `json:"error,omitempty"`
	Result      string     `json:"result,omitempty"`
	Var         string     `json:"var,omitempty"`
	Tags        []string   `json:"tags,omitempty"`
}

func NewTaskSummary added in v0.1.62

func NewTaskSummary(t *Task) *TaskSummary

type Webhook added in v0.1.20

type Webhook struct {
	URL     string            `json:"url,omitempty"`
	Headers map[string]string `json:"headers,omitempty"`
	Event   string            `json:"event,omitempty"`
}

func CloneWebhooks added in v0.1.20

func CloneWebhooks(webhooks []*Webhook) []*Webhook

func (Webhook) Clone added in v0.1.20

func (w Webhook) Clone() *Webhook

Directories

Path Synopsis
db
internal
reexec
Package reexec facilitates the busybox style reexec of the docker binary that we require because of the forking limitations of using Go.
Package reexec facilitates the busybox style reexec of the docker binary that we require because of the forking limitations of using Go.
middleware
job
web

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL