cloudflow

package module
v0.0.0-...-7b56046 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2017 License: MIT Imports: 7 Imported by: 0

README

cloudflow

Build Status GoDoc Go Report Card

Description

cloudflow is a workflow engine written in Go. Designed to running with cloud computing platform.

Installation

Install depends libraries.

go get github.com/aws/aws-sdk-go
go get github.com/hashicorp/go-multierror

Install cloudflow.

go get github.com/yonekawa/cloudflow

Usage

Define & Run workflow
import "github.com/yonekawa/cloudflow"

wf := cloudflow.NewWorkflow()
wf.AddTask("download", &DownloadTask{...})
wf.AddTask("process", &ProcessTask{...})

// parallel execution
pt := NewParallelTask()
pt.AddTask("parallel-1", &ProcessTask{})
pt.AddTask("parallel-2", &ProcessTask{})
wf.AddTask("parallel", pt)

wf.AddTask("output", &OutputTask{...})

// Show task summary
fmt.Print(wf.Summary())

wf.Run()
wf.RunFrom("process")
wf.RunOnly("output")

Builtin tasks

task.CommandTask

CommandTask executes local command by exec.Command.

import "github.com/yonekawa/cloudflow/task"

cmd := task.NewCommandTask("go", "help", "build")
err := cmd.Execute()
aws.S3BulkUploadTask & aws.S3BulkDownloadTask

aws.S3BulkUploadTask uploads local files in src dir into S3 dst folder.

import "github.com/aws/aws-sdk-go/session"
import "github.com/yonekawa/cloudflow/platform/aws"

sess := session.Must(session.NewSession())
// Upload ./src files into s3:/s3-bucket/s3dst/
task := aws.NewS3BulkUploadTask(sess, "./src", "/s3dst", "s3-bucket")
err := task.Execute()

aws.S3BulkDownloadTask downloads files in S3 folder into local dst dir.

import "github.com/aws/aws-sdk-go/session"
import "github.com/yonekawa/cloudflow/platform/aws"

sess := session.Must(session.NewSession())
// Download s3:/s3-bucket/s3dst/ files into ./dst
task := aws.NewS3BulkUploadTask(sess, "/s3src", "./dst", "s3-bucket")
err := task.Execute()
aws.BatchJobTask

aws.BatchJobTask submit AWS Batch Job and wait to complete a job.

import "github.com/aws/aws-sdk-go/session"
import "github.com/aws/aws-sdk-go/service/batch"
import "github.com/yonekawa/cloudflow/platform/aws"

sess := session.Must(session.NewSession())
task := aws.NewBatchJobTask(sess, &batch.SubmitJobInput{
  JobDefinition: aws.String("job definition ARN"),
  JobQueue:      aws.String("job queue ARN"),
  JobName:       aws.String("job name"),
})
err := task.Execute()

You can change polling interval and timeout.

task := aws.NewBatchJobTask(sess, &batch.SubmitJobInput{
  JobDefinition: aws.String("job definition ARN"),
  JobQueue:      aws.String("job queue ARN"),
  JobName:       aws.String("job name"),
})
task.PollingTime = 5 * time.Minute
task.Timeout = 10 * time.Hour
aws.LambdaInvokeTask

aws.LambdaInvokeTask invokes lambda function.

import awssdk "github.com/aws/aws-sdk-go/aws"
import "github.com/aws/aws-sdk-go/session"
import "github.com/aws/aws-sdk-go/service/lambda"
import "github.com/yonekawa/cloudflow/platform/aws"

sess := session.Must(session.NewSession())
task := aws.NewLambdaInvokeTask(sess, &batch.SubmitJobInput{
  FunctionName: awssdk.String("function ARN"),
})
err := task.Execute()

License

This library is distributed under the MIT license found in the LICENSE file.

Documentation

Overview

Package cloudflow provides workflow engine with cloud computing platforms.

Installation

go get github.com/yonekawa/cloudflow

Dependencies

go get github.com/aws/aws-sdk-go
go get github.com/hashicorp/go-multierror

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ParallelTask

type ParallelTask struct {
	// contains filtered or unexported fields
}

ParallelTask represents parallel task on workflow.

func NewParallelTask

func NewParallelTask() *ParallelTask

NewParallelTask creates a parallel task by task list.

func (*ParallelTask) AddTask

func (pt *ParallelTask) AddTask(name string, task Task)

AddTask add parallel task with name

func (*ParallelTask) Execute

func (pt *ParallelTask) Execute() error

Execute implement Task.Execute.

func (*ParallelTask) Summary

func (pt *ParallelTask) Summary() string

Summary returns parallel task summary.

type Task

type Task interface {
	Execute() error
}

Task represents task interface of workflow.

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow contains tasks list of workflow definition.

func NewWorkflow

func NewWorkflow() *Workflow

NewWorkflow creates a new workflow definition.

func (*Workflow) AddTask

func (wf *Workflow) AddTask(name string, task Task)

AddTask add task with name.

func (*Workflow) Execute

func (wf *Workflow) Execute() error

Execute implement Task.Execute. Workflow can be Task on other workflow definition.

func (*Workflow) Run

func (wf *Workflow) Run() error

Run defined workflow tasks.

func (*Workflow) RunFrom

func (wf *Workflow) RunFrom(name string) error

RunFrom runs workflow from task specified.

func (*Workflow) RunOnly

func (wf *Workflow) RunOnly(name string) error

RunOnly runs workflow only task specified.

func (*Workflow) SetLogger

func (wf *Workflow) SetLogger(logger *log.Logger)

SetLogger sets log writer.

func (*Workflow) Summary

func (wf *Workflow) Summary() string

Summary returns task flow summary.

Directories

Path Synopsis
platform
aws

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL