flow

package
v0.0.0-...-5d4f8f2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2020 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkerInitError WorkerEventType = "WORKER_INIT_FAILED"
	WorkerAdded     WorkerEventType = "ADDED"
	WorkerModified  WorkerEventType = "MODIFIED"
	WorkerDeleted   WorkerEventType = "DELETED"
	WorkerError     WorkerEventType = "ERROR"
	WorkerFailed    WorkerEventType = "FAILED"
	WorkerSucceeded WorkerEventType = "SUCCEEDED"

	DefaultChanSize int32 = 100
)

Variables

View Source
var FlowStatusKey = map[int]string{
	0:  "FLOW_CREATED",
	1:  "FLOW_WAITINGTOSTART",
	2:  "FLOW_STARTING",
	3:  "FLOW_STARTED",
	4:  "FLOW_WAITING",
	5:  "FLOW_RUNNING",
	6:  "FLOW_COMPLETING",
	7:  "FLOW_CANCELLING",
	8:  "FLOW_STOPPING",
	9:  "FLOW_FAILED",
	10: "FLOW_STOPPED",
	11: "FLOW_COMPLETED",
	12: "FLOW_CANCELLED",
}

Functions

func ErrTaskComplete

func ErrTaskComplete() error

func FlowStatusToString

func FlowStatusToString(key FlowStatus) string

func GetClient

func GetClient(hfConfig *config.KubeConfig) (client *kube.Clientset, fnerr error)

func InvalidFlowId

func InvalidFlowId() error

func InvalidFlowIdError

func InvalidFlowIdError(flowId string) error

func InvalidFlowParamsError

func InvalidFlowParamsError(flowId string) error

func InvalidTaskError

func InvalidTaskError(taskId string) error

func InvalidTaskId

func InvalidTaskId() error

func InvalidTaskStatusError

func InvalidTaskStatusError() error

func InvalidWorkerFlowCombo

func InvalidWorkerFlowCombo() error

func NewFlowEngine

func NewFlowEngine(
	qs *queryServer,
	db db_pkg.DatabaseContext,
	logger storage.ObjectAPIServer,
	c *config.Config) (*flowEngine, error)

func NewQueryServer

func NewQueryServer(db db_pkg.DatabaseContext) *queryServer

func NewWorkPoolWatcher

func NewWorkPoolWatcher() chan WorkerEvent

func TaskWorkerExistsError

func TaskWorkerExistsError(flowId, taskId string) error

Types

type ComputeOptions

type ComputeOptions struct {

	// true when the master runs inside cluster
	InCluster bool
	// contains filtered or unexported fields
}

Worker/Pod Details to generate kubernetes namespace

type Flow

type Flow struct {
	Id      string
	Version string
}

func FlowRef

func FlowRef(id string) Flow

type FlowAttrs

type FlowAttrs struct {
	Flow Flow

	// mounted file systems
	OpenMounts map[string]string

	// value - look at constants above
	Status FlowStatus

	Created   time.Time
	Started   time.Time
	Completed time.Time
	Failed    time.Time

	CompletionText string

	EnvVars map[string]string

	// support multiple tasks in future releases
	Tasks map[string]TaskAttrs `json:"Tasks"`

	FlowConfig *FlowConfig `json:"FlowConfig"`
	// contains filtered or unexported fields
}

TODO: Add version to flow at somepoint

func NewFlowAttrs

func NewFlowAttrs(fc *FlowConfig) *FlowAttrs

func (*FlowAttrs) AddTask

func (f *FlowAttrs) AddTask(taskConfig *TaskConfig) *TaskAttrs

func (*FlowAttrs) FirstTask

func (f *FlowAttrs) FirstTask() *TaskAttrs

func (*FlowAttrs) FlowStatus

func (f *FlowAttrs) FlowStatus() string

func (*FlowAttrs) IsComplete

func (f *FlowAttrs) IsComplete() bool

func (*FlowAttrs) IsCompleted

func (fi *FlowAttrs) IsCompleted() bool

func (*FlowAttrs) IsCreated

func (fi *FlowAttrs) IsCreated() bool

func (*FlowAttrs) IsFailed

func (fi *FlowAttrs) IsFailed() bool

func (*FlowAttrs) IsStarted

func (fi *FlowAttrs) IsStarted() bool

func (*FlowAttrs) IsStarting

func (fi *FlowAttrs) IsStarting() bool

type FlowAttrsMessage

type FlowAttrsMessage struct {
	Type       FlowMessageType
	FlowAttrs  *FlowAttrs
	TasksAttrs *[]tasks.TaskAttrs
	TaskAttrs  *tasks.TaskAttrs
}

type FlowConfig

type FlowConfig struct {
	MountMap MountMap
}

type FlowEngine

type FlowEngine interface {
	StartFlow(flowId, taskId string) (*FlowAttrs, error)
	LaunchFlow(repoName string, branchName string, commitId string, cmdString string, evars map[string]string) (*FlowAttrs, error)
	LogStream(flowId string) (io.ReadCloser, error)
}

type FlowMessage

type FlowMessage struct {
	Type          FlowMessageType
	Flow          *Flow
	Tasks         *[]tasks.Task
	FlowStatusStr string
	Task          *tasks.Task
	TaskStatusStr string
	EnvVars       map[string]string

	Repos  []*ws.RepoMessage
	CmdStr string
}

type FlowMessageType

type FlowMessageType int
const (
	FlowRequest  FlowMessageType = 10
	FlowResponse FlowMessageType = 20
	FlowData     FlowMessageType = 30
)

type FlowOutRepoRequest

type FlowOutRepoRequest struct {
}

type FlowOutRepoResponse

type FlowOutRepoResponse struct {
	Repo   *ws.Repo
	Branch *ws.Branch
	Commit *ws.Commit
}

type FlowServer

type FlowServer struct {
	// contains filtered or unexported fields
}

func (*FlowServer) Close

func (fs *FlowServer) Close()

func (*FlowServer) DetachTaskWorker

func (fs *FlowServer) DetachTaskWorker(workerId, flowId, taskId string) error

func (*FlowServer) GetCommandLogPath

func (fs *FlowServer) GetCommandLogPath(taskId string) string

func (*FlowServer) GetFlowAttr

func (fs *FlowServer) GetFlowAttr(flowId string) (*FlowAttrs, error)

func (*FlowServer) GetFlowLogPath

func (fs *FlowServer) GetFlowLogPath(flowId string) string

func (*FlowServer) GetModel

func (fs *FlowServer) GetModel(flow Flow) (repo *ws.Repo, branch *ws.Branch, commit *ws.Commit, fnErr error)

func (*FlowServer) GetOrCreateModel

func (fs *FlowServer) GetOrCreateModel(flow Flow) (repo *ws.Repo, branch *ws.Branch, commit *ws.Commit, fnErr error)

func (*FlowServer) GetOrCreateOutput

func (fs *FlowServer) GetOrCreateOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) GetOutput

func (fs *FlowServer) GetOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) GetTaskLog

func (fs *FlowServer) GetTaskLog(flowId string) ([]byte, int, error)

func (*FlowServer) GetTaskLogPath

func (fs *FlowServer) GetTaskLogPath(taskId string) string

func (*FlowServer) LaunchFlow

func (fs *FlowServer) LaunchFlow(repoName, branchName, commitId, cmdStr string, evars map[string]string) (*FlowAttrs, error)

func (*FlowServer) LogStream

func (fs *FlowServer) LogStream(flow_id string) (io.ReadCloser, error)

func (*FlowServer) NewModel

func (fs *FlowServer) NewModel(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) NewOutput

func (fs *FlowServer) NewOutput(flow Flow) (*ws.Repo, *ws.Branch, *ws.Commit, error)

func (*FlowServer) RegisterWorker

func (fs *FlowServer) RegisterWorker(flowId string, taskId string, ipaddr string) (*WorkerAttrs, error)

func (*FlowServer) UpdateWorkerTaskStatus

func (fs *FlowServer) UpdateWorkerTaskStatus(worker Worker, tsr *TaskStatusChangeRequest) (*TaskStatusChangeResponse, error)

type FlowStatus

type FlowStatus int
const (
	FLOW_CREATED FlowStatus = iota
	FLOW_WAITINGTOSTART
	FLOW_STARTING
	FLOW_STARTED
	FLOW_WAITING
	FLOW_RUNNING
	FLOW_COMPLETING
	FLOW_CANCELLING
	FLOW_STOPPING
	FLOW_FAILED
	FLOW_STOPPED
	FLOW_COMPLETED
	FLOW_CANCELLED
)

type FlowTaskWorker

type FlowTaskWorker struct {
	Worker  Worker    `json:"Worker"`
	Flow    Flow      `json:"Flow"`
	Task    Task      `json:"Task"`
	Created time.Time `json:"created"`
}

type NewFlowLaunchRequest

type NewFlowLaunchRequest struct {
	Repo      ws.Repo
	Branch    ws.Branch
	Commit    ws.Commit
	CmdString string
}

type NewFlowLaunchResponse

type NewFlowLaunchResponse struct {
	TaskStatus    tasks.TaskStatus
	TaskStatusStr string
	Task          *tasks.Task
	Flow          *Flow
}

type PodKeeper

type PodKeeper struct {
	// contains filtered or unexported fields
}

generate packages / functions for creating/destroying workers

func NewDefaultPodKeeper

func NewDefaultPodKeeper(
	config *config.Config,
	db db_pkg.DatabaseContext,
	logger storage.ObjectAPIServer) (*PodKeeper, error)

func NewWorkerPool

func NewWorkerPool(config *config.Config, db db_pkg.DatabaseContext, logger storage.ObjectAPIServer) (*PodKeeper, error)

TODO: add worker limits

func (*PodKeeper) AssignWorker

func (pk *PodKeeper) AssignWorker(taskId string, flowAttrs *FlowAttrs, masterIp string, masterPort int32, masterExtPort int32) error

launch a new namespace config with K8s

func (*PodKeeper) CloseWatch

func (pk *PodKeeper) CloseWatch()

func (*PodKeeper) LogStream

func (pk *PodKeeper) LogStream(flowId string) (io.ReadCloser, error)

create a new channel and currespoding go rouine to call pod log return channel. let the reader read

func (*PodKeeper) ReleaseWorker

func (pk *PodKeeper) ReleaseWorker(flow Flow) error

task Id?

func (*PodKeeper) SaveMessageToWorkerLog

func (pk *PodKeeper) SaveMessageToWorkerLog(s string, worker Worker, flow Flow) error

func (*PodKeeper) SavePodLog

func (pk *PodKeeper) SavePodLog(podId, logDir, logName string) error

func (*PodKeeper) SaveWorkerLog

func (pk *PodKeeper) SaveWorkerLog(worker Worker, flow Flow) error

func (*PodKeeper) Watch

func (pk *PodKeeper) Watch(eventCh chan WorkerEvent)

func (*PodKeeper) WorkerExists

func (pk *PodKeeper) WorkerExists(flowId, taskId string) bool

type TaskStatusChangeRequest

type TaskStatusChangeRequest struct {
	Flow       Flow
	Task       tasks.Task
	TaskStatus tasks.TaskStatus
	Message    string
}

type TaskStatusChangeResponse

type TaskStatusChangeResponse struct {
	FlowAttrs *FlowAttrs
}

type Worker

type Worker struct {
	Id       string
	PodId    string
	PodPhase string
}

type WorkerAttrs

type WorkerAttrs struct {
	Worker    Worker       `json:"Worker"`
	Flow      Flow         `json:"Flow"`
	Ip        string       `json:"ip"`
	Task      Task         `json:"Task"`
	Started   time.Time    `json:"started"`
	Completed time.Time    `json:"completed"`
	Error     string       `json:"error"`
	Status    WorkerStatus `json:"WorkerStatus"` // REGISTERED, RUNNING, FAILED, STOPPED
}

type WorkerEvent

type WorkerEvent struct {
	Type WorkerEventType

	Worker Worker
	Flow   Flow
	Task   tsk_pkg.Task
}

type WorkerEventType

type WorkerEventType string

func WorkerEventFromStr

func WorkerEventFromStr(evt string) WorkerEventType

type WorkerPool

type WorkerPool interface {
	WorkerExists(flowId, taskId string) bool
	AssignWorker(taskId string, flowAttrs *FlowAttrs, masterIp string, masterPort, masterExtPort int32) error
	ReleaseWorker(flow Flow) error

	Watch(eventCh chan WorkerEvent)
	CloseWatch()

	SaveWorkerLog(worker Worker, flow Flow) error
	LogStream(flowId string) (io.ReadCloser, error)
}

type WorkerStatus

type WorkerStatus int
const (
	WORKER_REGISTERED WorkerStatus = iota
	WORKER_RUNNING
	WORKER_FAILED
	WORKER_COMPLETED
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL