base: github.com/grailbio/base/sync/workerpool Index | Files

package workerpool

import "github.com/grailbio/base/sync/workerpool"

Index

Package Files

workerpool.go

type Task Uses

type Task interface {
    Do(grp *TaskGroup) error
}

Task provides an interface for an individual task. Tasks are executed by workers by calling the Do function.

type TaskGroup Uses

type TaskGroup struct {
    Name       string
    ErrHandler *multierror.MultiError
    Wp         *WorkerPool
    // contains filtered or unexported fields
}

TaskGroup is used group Tasks together so the consumer can wait for a specific subgroup of Tasks to Wait.

func (*TaskGroup) Enqueue Uses

func (grp *TaskGroup) Enqueue(t Task, block bool) bool

Enqueue puts a Task in the queue. If block is true and the channel is full, then the function blocks. If block is false and the channel is full, then the function returns false.

func (*TaskGroup) Wait Uses

func (grp *TaskGroup) Wait()

Wait blocks until all Tasks in this TaskGroup have completed.

type WorkerPool Uses

type WorkerPool struct {
    Ctx         context.Context
    Concurrency int
    // contains filtered or unexported fields
}

WorkerPool provides a mechanism for executing Tasks with a specific concurrency. A Task is an interface containing a single function Do. A TaskGroup allows Tasks to be grouped together so the parent process can wait for all Tasks in a TaskGroup to Wait. Tasks can create new Tasks and add them to the TaskGroup or new TaskGroups and add them to the WorkerPool. A simple example looks like this:

wp := fileset.WorkerPool(context.Background(), 3) tg1 := wp.NewTaskGroup("context1") tg1.Enqueue(MyFirstTask, true) tg2 := wp.NewTaskGroup("context2") tg2.Enqueue(MyFourthTask, true) tg1.Enqueue(MySecondTask, true) tg2.Enqueue(MyFifthTask, true) tg1.Enqueue(MyThirdTask, true) tg1.Wait() tg2.Enqueue(MySixthTask, true) tg2.Wait() wp.Wait()

TaskGroups can come and go until wp.Wait() has been called. Tasks can come and go in a TaskGroup until tg.Wait() has been called. All the Tasks in this example are executed by 3 go routines.

Note: Each WorkerPool will create a goroutine to keep track of active TaskGroups. Each TaskGroup will create a goroutine to keep track of pending/active tasks.

func New Uses

func New(ctx context.Context, concurrency int) *WorkerPool

New creates a WorkerPool with the given concurrency.

TODO(pknudsgaard): Should return a closure calling Wait.

func (*WorkerPool) Err Uses

func (wp *WorkerPool) Err() error

Err returns the context.Context error to determine if WorkerPool Waitd due to the context.

func (*WorkerPool) NewTaskGroup Uses

func (wp *WorkerPool) NewTaskGroup(name string, errHandler *multierror.MultiError) *TaskGroup

NewTaskGroup creates a TaskGroup for Tasks to be executed in.

TODO(pknudsgaard): TaskGroup should have a context.Context which is separate from the WorkerPool context.Context.

TODO(pknudsgaard): Should return a closure calling Wait.

func (*WorkerPool) Wait Uses

func (wp *WorkerPool) Wait()

Wait blocks until all TaskGroups in the WorkerPool have Waitd.

Package workerpool imports 4 packages (graph). Updated 2019-01-08. Refresh now. Tools for package owners.