workflow

package
v0.0.0-...-fb1d18f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2022 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Exists

func Exists(name string) bool

Types

type BatchJob

type BatchJob struct {
	JobId   string
	Command []string
	Inputs  []BatchJobInput
	Outputs []BatchJobOutput

	ExitCode int
	Start    time.Time
	End      time.Time
	// contains filtered or unexported fields
}

func (*BatchJob) Abort

func (job *BatchJob) Abort()

func (*BatchJob) Execute

func (job *BatchJob) Execute(wf *Workflow, wg *sync.WaitGroup)

func (*BatchJob) GetId

func (job *BatchJob) GetId() string

func (*BatchJob) GetInputs

func (job *BatchJob) GetInputs() []Input

func (*BatchJob) GetOutputs

func (job *BatchJob) GetOutputs() []Output

func (*BatchJob) GetResult

func (job *BatchJob) GetResult() *JobResult

func (*BatchJob) GetStatus

func (job *BatchJob) GetStatus() JobStatus

type BatchJobInput

type BatchJobInput struct {
	// contains filtered or unexported fields
}

func (*BatchJobInput) Abort

func (s *BatchJobInput) Abort()

func (*BatchJobInput) Clear

func (job *BatchJobInput) Clear()

func (*BatchJobInput) GetWriter

func (s *BatchJobInput) GetWriter() (io.WriteCloser, error)

func (*BatchJobInput) Key

func (s *BatchJobInput) Key() string

func (*BatchJobInput) Label

func (job *BatchJobInput) Label() string

func (*BatchJobInput) UnBlock

func (s *BatchJobInput) UnBlock()

type BatchJobOutput

type BatchJobOutput struct {
	// contains filtered or unexported fields
}

func (*BatchJobOutput) Abort

func (s *BatchJobOutput) Abort()

func (*BatchJobOutput) Clear

func (job *BatchJobOutput) Clear()

func (*BatchJobOutput) GetReader

func (s *BatchJobOutput) GetReader() (io.ReadCloser, error)

func (*BatchJobOutput) IsFailed

func (job *BatchJobOutput) IsFailed() bool

func (*BatchJobOutput) Key

func (s *BatchJobOutput) Key() string

func (*BatchJobOutput) Label

func (job *BatchJobOutput) Label() string

func (*BatchJobOutput) UnBlock

func (s *BatchJobOutput) UnBlock()

type Event

type Event interface {
	GetEventType() EventType
}

type EventType

type EventType int
const (
	WorkflowEvents EventType = iota
	JobEvents
	PipeEvents
)

type FileType

type FileType int

type Input

type Input interface {
	Stream
	GetWriter() (io.WriteCloser, error)
}

type Job

type Job interface {
	GetId() string
	GetInputs() []Input
	GetOutputs() []Output
	Execute(wf *Workflow, wg *sync.WaitGroup)
	Abort()
	GetStatus() JobStatus
	GetResult() *JobResult
}

func CreateBatchJob

func CreateBatchJob(jobDto *JobDto) Job

func CreateObjectStoreJob

func CreateObjectStoreJob(jobDto *JobDto) Job

type JobDto

type JobDto struct {
	JobId    string
	Type     string
	Command  []string
	Inputs   []JobInput
	Outputs  []JobOutput
	Bucket   string
	Key      string
	WriteTo  string
	ReadFrom string
}

type JobEvent

type JobEvent struct {
	JobId   string
	Status  JobStatus
	Occured time.Time

	ExitCode int
	Message  string
	// contains filtered or unexported fields
}

func (*JobEvent) GetEventType

func (*JobEvent) GetEventType() EventType

type JobInput

type JobInput struct {
	Path     string
	ReadFrom string
}

type JobOutput

type JobOutput struct {
	Path    string
	WriteTo string
}

type JobResult

type JobResult struct {
	JobId    string
	Status   JobStatus
	Start    *time.Time
	End      *time.Time
	ExitCode int
	Message  string
}

type JobStatus

type JobStatus int
const (
	Created JobStatus = iota
	Running
	Successed
	Failed
	Aborted
)

func (JobStatus) GetDefaultExitCode

func (r JobStatus) GetDefaultExitCode() int

func (JobStatus) IsFailed

func (j JobStatus) IsFailed() bool

func (JobStatus) IsFinished

func (j JobStatus) IsFinished() bool

func (JobStatus) MarshalJSON

func (r JobStatus) MarshalJSON() ([]byte, error)

func (JobStatus) String

func (r JobStatus) String() string

func (*JobStatus) UnmarshalJSON

func (r *JobStatus) UnmarshalJSON(data []byte) error

type ObjectStore

type ObjectStore struct {
	Region    string
	Endpoint  string
	AccessKey string
	SecretKey string
}

func (*ObjectStore) Init

func (o *ObjectStore) Init() error

type ObjectStoreDownloadJob

type ObjectStoreDownloadJob struct {
	Bucket string

	Start time.Time
	End   time.Time
	// contains filtered or unexported fields
}

func (*ObjectStoreDownloadJob) Abort

func (job *ObjectStoreDownloadJob) Abort()

func (*ObjectStoreDownloadJob) Clear

func (job *ObjectStoreDownloadJob) Clear()

func (*ObjectStoreDownloadJob) Close

func (job *ObjectStoreDownloadJob) Close() error

func (*ObjectStoreDownloadJob) Execute

func (job *ObjectStoreDownloadJob) Execute(wf *Workflow, wg *sync.WaitGroup)

func (*ObjectStoreDownloadJob) GetId

func (job *ObjectStoreDownloadJob) GetId() string

func (*ObjectStoreDownloadJob) GetInputs

func (job *ObjectStoreDownloadJob) GetInputs() []Input

func (*ObjectStoreDownloadJob) GetOutputs

func (job *ObjectStoreDownloadJob) GetOutputs() []Output

func (*ObjectStoreDownloadJob) GetReader

func (job *ObjectStoreDownloadJob) GetReader() (io.ReadCloser, error)

func (*ObjectStoreDownloadJob) GetResult

func (job *ObjectStoreDownloadJob) GetResult() *JobResult

func (*ObjectStoreDownloadJob) GetStatus

func (job *ObjectStoreDownloadJob) GetStatus() JobStatus

func (*ObjectStoreDownloadJob) IsFailed

func (job *ObjectStoreDownloadJob) IsFailed() bool

func (*ObjectStoreDownloadJob) Key

func (job *ObjectStoreDownloadJob) Key() string

func (*ObjectStoreDownloadJob) Label

func (job *ObjectStoreDownloadJob) Label() string

func (*ObjectStoreDownloadJob) Read

func (job *ObjectStoreDownloadJob) Read(p []byte) (n int, err error)

func (*ObjectStoreDownloadJob) UnBlock

func (p *ObjectStoreDownloadJob) UnBlock()

type ObjectStoreUploadJob

type ObjectStoreUploadJob struct {
	Bucket string

	Start time.Time
	End   time.Time
	// contains filtered or unexported fields
}

func (*ObjectStoreUploadJob) Abort

func (p *ObjectStoreUploadJob) Abort()

func (*ObjectStoreUploadJob) Clear

func (job *ObjectStoreUploadJob) Clear()

func (*ObjectStoreUploadJob) Execute

func (job *ObjectStoreUploadJob) Execute(wf *Workflow, wg *sync.WaitGroup)

func (*ObjectStoreUploadJob) GetId

func (p *ObjectStoreUploadJob) GetId() string

func (*ObjectStoreUploadJob) GetInputs

func (p *ObjectStoreUploadJob) GetInputs() []Input

func (*ObjectStoreUploadJob) GetOutputs

func (p *ObjectStoreUploadJob) GetOutputs() []Output

func (*ObjectStoreUploadJob) GetResult

func (job *ObjectStoreUploadJob) GetResult() *JobResult

func (*ObjectStoreUploadJob) GetStatus

func (p *ObjectStoreUploadJob) GetStatus() JobStatus

func (*ObjectStoreUploadJob) GetWriter

func (p *ObjectStoreUploadJob) GetWriter() (io.WriteCloser, error)

func (*ObjectStoreUploadJob) Key

func (p *ObjectStoreUploadJob) Key() string

func (*ObjectStoreUploadJob) Label

func (p *ObjectStoreUploadJob) Label() string

func (*ObjectStoreUploadJob) UnBlock

func (p *ObjectStoreUploadJob) UnBlock()

type ObjectStoreUploader

type ObjectStoreUploader struct {
	// contains filtered or unexported fields
}

func (*ObjectStoreUploader) Abort

func (p *ObjectStoreUploader) Abort()

func (*ObjectStoreUploader) Close

func (p *ObjectStoreUploader) Close() error

func (*ObjectStoreUploader) Key

func (p *ObjectStoreUploader) Key() string

func (*ObjectStoreUploader) Write

func (p *ObjectStoreUploader) Write(data []byte) (n int, err error)

type Output

type Output interface {
	Stream
	GetReader() (io.ReadCloser, error)
	IsFailed() bool
}

type PipeHandler

type PipeHandler struct {
	Status JobStatus
	// contains filtered or unexported fields
}

func CreateHandlers

func CreateHandlers(jobs []Job) []*PipeHandler

func (*PipeHandler) AbortAll

func (p *PipeHandler) AbortAll()

func (*PipeHandler) AbortWriters

func (p *PipeHandler) AbortWriters()

func (*PipeHandler) Finished

func (p *PipeHandler) Finished()

func (*PipeHandler) Handle

func (p *PipeHandler) Handle()

func (*PipeHandler) Init

func (p *PipeHandler) Init()

func (*PipeHandler) UnBlock

func (p *PipeHandler) UnBlock()

type Stream

type Stream interface {
	Abort()
	Clear()
	Key() string
	Label() string
	UnBlock()
}

type Workflow

type Workflow struct {
	Objectstore *ObjectStore
	Jobs        []Job

	Status JobStatus
	// contains filtered or unexported fields
}

func CreateWorkflow

func CreateWorkflow(dto *WorkflowDto) *Workflow

func LoadWorkflow

func LoadWorkflow(reader io.Reader) (*Workflow, error)

func (*Workflow) Execute

func (w *Workflow) Execute(status_ch chan Event) *WorkflowResult

func (*Workflow) GetStatus

func (w *Workflow) GetStatus() JobStatus

func (*Workflow) UnBlock

func (w *Workflow) UnBlock()

type WorkflowDto

type WorkflowDto struct {
	Objectstore *ObjectStore
	Jobs        []*JobDto

	Status JobStatus
	// contains filtered or unexported fields
}

type WorkflowEvent

type WorkflowEvent struct {
	Status    JobStatus
	ExecError error
	ExitCode  int
	Message   string
}

func (*WorkflowEvent) GetEventType

func (we *WorkflowEvent) GetEventType() EventType

type WorkflowResult

type WorkflowResult struct {
	Status  JobStatus
	Results []*JobResult
	Start   *time.Time
	End     *time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL