dataflow

package
v0.0.0-...-549aca6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2022 License: BSD-2-Clause Imports: 10 Imported by: 0

Documentation

Overview

dataflow package is an implementation combining workflow/tasks mananging concpets most popular in data science. this project was most inspired by nextflow and SciPipe a really awesome collection of tools from scientific-workflows, bioinformatics-pipelines, workflow-engines, most importantly is implemented in go!

Index

Constants

This section is empty.

Variables

View Source
var ErrorTimeOut = fmt.Errorf("TimeOut")

ErrorTimeOut is the error when executes tasks timeout

View Source
var (
	// ErrorUsingActuator is the error when goroutine pool has exception
	ErrorUsingActuator = fmt.Errorf("ErrorUsingActuator")
)

Functions

func DurationPtr

func DurationPtr(t time.Duration) *time.Duration

DurationPtr helps to make a duration ptr

func Exec

func Exec(tasks ...Task) bool

Exec simply runs the tasks concurrently True will be returned is all tasks complete successfully otherwise false will be returned

func ExecWithError

func ExecWithError(tasks ...Task) error

ExecWithError simply runs the tasks concurrently nil will be returned is all tasks complete successfully otherwise custom error will be returned

func HeapProcess

func HeapProcess(inputChan <-chan WorkFunction, options *Settings) <-chan OrderedOutput

Process processes work function based on input. It Accepts an WorkFunction read channel, work function and concurrent go routine pool size. It Returns an interface{} channel.

Types

type Actuator

type Actuator struct {
	// contains filtered or unexported fields
}

Actuator is the base struct

func NewActuator

func NewActuator(opt ...*Options) *Actuator

NewActuator creates an Actuator instance

func (*Actuator) Exec

func (c *Actuator) Exec(tasks ...Task) error

Exec is used to run tasks concurrently

func (*Actuator) ExecWithContext

func (c *Actuator) ExecWithContext(ctx context.Context, tasks ...Task) error

ExecWithContext is used to run tasks concurrently Return nil when tasks are all completed successfully, or return error when some exception happen such as timeout

func (*Actuator) GetTimeout

func (c *Actuator) GetTimeout() *time.Duration

GetTimeout return the timeout set before

type BaseActuator

type BaseActuator interface {
	Exec(tasks ...Task) error
	ExecWithContext(ctx context.Context, tasks ...Task) error
}

BaseActuator is the actuator interface

type GoroutinePool

type GoroutinePool interface {
	Submit(f func()) error
	Release()
}

GoroutinePool is the base routine pool interface User can use custom goroutine pool by implementing this interface

type Job

type Job func()

type JobManager

type JobManager struct {
	Wg sync.WaitGroup
	// contains filtered or unexported fields
}

JobManager is a actuator which has a worker pool

func NewJobManager

func NewJobManager(workerNum int, opt ...*Options) *JobManager

NewJobManager creates an JobManager instance

func (*JobManager) Exec

func (c *JobManager) Exec(tasks ...Task) error

Exec is used to run tasks concurrently

func (*JobManager) ExecWithContext

func (c *JobManager) ExecWithContext(ctx context.Context, tasks ...Task) error

ExecWithContext uses goroutine pool to run tasks concurrently Return nil when tasks are all completed successfully, or return error when some exception happen such as timeout

func (*JobManager) GetTimeout

func (c *JobManager) GetTimeout() *time.Duration

GetTimeout return the timeout set before

func (*JobManager) Release

func (c *JobManager) Release()

Release will release the pool

func (*JobManager) WithPool

func (c *JobManager) WithPool(pool GoroutinePool) *JobManager

WithPool will support for using custom goroutine pool

type Operator

type Operator interface {
	Operation() (interface{}, error)
}

Interface is a type that performs an operation on itself, returning any error.

type Options

type Options struct {
	TimeOut *time.Duration
}

Options use to init actuator

type OrderedOutput

type OrderedOutput struct {
	Value     interface{}
	Remaining func() int
}

OrderedOutput is the output channel type from Process

type Pool

type Pool struct {
	JobQueue chan Job
	// contains filtered or unexported fields
}

func NewPool

func NewPool(numWorkers int, jobQueueLen int) *Pool

Will make pool of gorouting workers. numWorkers - how many workers will be created for this pool queueLen - how many jobs can we accept until we block

Returned object contains JobQueue reference, which you can use to send job to pool.

func (*Pool) JobDone

func (p *Pool) JobDone()

In case you are using WaitAll fn, you should call this method every time your job is done.

If you are not using WaitAll then we assume you have your own way of synchronizing.

func (*Pool) Release

func (p *Pool) Release()

Will release resources used by pool

func (*Pool) WaitAll

func (p *Pool) WaitAll()

Will wait for all jobs to finish.

func (*Pool) WaitCount

func (p *Pool) WaitCount(count int)

How many jobs we should wait when calling WaitAll. It is using WaitGroup Add/Done/Wait

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

The Processor type manages a number of concurrent Processes.

func NewProcessor

func NewProcessor(queue chan Operator, buffer int, threads int) (p *Processor)

Return a new Processor to operate the function f over the number of threads specified taking input from queue and placing the result in buffer. Threads is limited by GOMAXPROCS, if threads is greater GOMAXPROCS or less than 1 then threads is set to GOMAXPROCS.

func (*Processor) Close

func (p *Processor) Close()

Close the queue.

func (*Processor) Process

func (p *Processor) Process(value ...Operator)

Submit values for processing.

func (*Processor) Result

func (p *Processor) Result() (interface{}, error)

Get the next available result.

func (*Processor) Stop

func (p *Processor) Stop()

Terminate the goroutines.

func (*Processor) Wait

func (p *Processor) Wait()

Wait for all running processes to finish.

func (*Processor) Working

func (p *Processor) Working() int

Return the number of working goroutines.

type Result

type Result struct {
	Value interface{}
	Err   error
}

type Settings

type Settings struct {
	PoolSize         int
	OutChannelBuffer int
}

Settings options for Process

type Task

type Task func() error

Task Type

type TimedActuator

type TimedActuator interface {
	BaseActuator
	GetTimeout() *time.Duration
	// contains filtered or unexported methods
}

TimedActuator is the actuator interface within timeout method

type WorkFunction

type WorkFunction interface {
	Run() interface{}
}

WorkFunction interface

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Gorouting instance which can accept client jobs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL