workman

package module
v0.0.0-...-ac9fb95 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2016 License: MIT Imports: 6 Imported by: 0

README

workman GoDoc

Package workman provides constructs for parallel processing.

Usage

getHTTP := func(i int) {
    res, _ := http.Get(fmt.Sprintf("http://httpbin.org/get?n=%d", i))
    data, _ := ioutil.ReadAll(res.Body)
    res.Body.Close()
    fmt.Printf("%s", data)
}

wm, _ := NewWorkManager(4)
wm.StartWorkers(getHTTP)
for i := 0; i < 10; i++ {
    wm.SendWork(i)
}
wm.WaitForCompletion()

Specifying Workers

Construct a new work manager with the desired number of parallel workers

wm, err := NewWorkManager(10)

Sending Work

When using the WorkManager, it is important to invoke SendWork as if you were invoking your work function directly. That is, if you start workers using a function with the following signature:

wm.StartWorkers(func(n int, line string, active bool) {})

Then you should send it work as follows:

wm.SendWork(10, "Townsend", true)
wm.SendWork(12, "Folsom/Pacific", false)

Handling Errors

Your function should return an error or nothing

wm.StartWorkers(func() error {
    return errors.New("bad run")
})

The error returned from WaitForCompletion indicates if at least one error occurred during processing. Those errors can be enumerated as follows:

if err := wm.WaitForCompletion(); err != nil {
    errs := err.All()
    for _, e := range errs {
        fmt.Println(e)
    }
}

Rate Limiting

Set a rate limit by passing an int, representing the number of allowed requests per second across all workers.

wm.SetRateLimit(5)

Happy processing!

Documentation

Overview

Package workman provides constructs for parallel processing.

getHTTP := func(i int) {
    res, _ := http.Get(fmt.Sprintf("http://httpbin.org/get?n=%d", i))
    data, _ := ioutil.ReadAll(res.Body)
    res.Body.Close()
    fmt.Printf("%s", data)
}

wm, _ := NewWorkManager(4)
wm.StartWorkers(getHTTP)
for i := 0; i < 10; i++ {
    wm.SendWork(i)
}
wm.WaitForCompletion()

Construct a new work manager with the desired number of parallel workers

wm, err := NewWorkManager(10)

When using the WorkManager, it is important to invoke SendWork as if you were invoking your work function directly. That is, if you start workers using a function with the following signature:

wm.StartWorkers(func(n int, line string, active bool) {})

Then you should send it work as follows:

wm.SendWork(10, "Townsend", true)
wm.SendWork(12, "Folsom/Pacific", false)

Your function should return an error or nothing

wm.StartWorkers(func() error {
	return errors.New("bad run")
})

The error returned from WaitForCompletion indicates if at least one error occurred during processing. Those errors can be enumerated as follows:

if err := wm.WaitForCompletion(); err != nil {
	errs := err.All()
	for _, e := range errs {
		fmt.Println(e)
	}
}

Set a rate limit by passing an int, representing the number of allowed requests per second across all workers.

wm.SetRateLimit(5)

Happy processing!

Index

Constants

This section is empty.

Variables

View Source
var (
	// The user provided an invalid number of workers
	ErrTooFewWorkers = errors.New("number of workers must exceed 0")

	// The user provided an invalid work function
	ErrInvalidWorkFuncType = errors.New("invalid work function type")

	// Workers have already been started and cannot start again
	ErrAlreadyStarted = errors.New("workers already started")

	// Work has already completed and no more work can be done
	ErrWorkCompleted = errors.New("work has completed")

	// Some workers encountered errors during processing
	ErrWorkerErrors = errors.New("some workers encountered errors")

	// A worker timed out before completing its work
	ErrWorkerTimeout = errors.New("worker timeout")

	// Args are of the wrong type or length to be passed to the work function
	ErrBadWorkArgs = errors.New("args can't be passed to work function")
)

Functions

This section is empty.

Types

type WorkManager

type WorkManager struct {
	// WorkerMaxTimeout is the maximum allowed time a worker can run
	WorkerMaxTimeout time.Duration
	// contains filtered or unexported fields
}

WorkManager manages a pool of parallel workers and provides an API for sending work and collecting errors. It offers a layer of abstraction over the concept of workers and the asynchronous nature of its processing.

A WorkManager is required to call three methods:

  • StartWorkers(func (args...)) Pass a function (or method with pointer receiver) to inform the WorkManager how to process the work you will send it. This is a normal function that takes some arbitrary number of arguments. Note: the function passed *must* only return an error. It is also unsafe for the function passed to read or write to any shared state.
  • SendWork(args...) Pass arguments as you would normally pass to the function previously given to StartWorkers.
  • WaitForCompletion() Wait for all workers to complete their tasks and receive a list of any errors that were encountered during processing.

The WorkManager is stateful. It is an error to run its methods out of order or to send it work after it has already completed all work.

func NewWorkManager

func NewWorkManager(n int) (WorkManager, error)

NewWorkManager returns a WorkManager with n parallel workers.

func (*WorkManager) SendWork

func (wm *WorkManager) SendWork(args ...interface{}) error

SendWork provides the args necessary for the workers to run their workFunc.

func (*WorkManager) SetRateLimit

func (wm *WorkManager) SetRateLimit(r int)

SetRateLimit sets the workers to be collectively limited to r requests per second

func (*WorkManager) StartWorkers

func (wm *WorkManager) StartWorkers(workFunc interface{}) error

StartWorkers starts a pool of workers that will run workFunc.

func (*WorkManager) WaitForCompletion

func (wm *WorkManager) WaitForCompletion() *WorkManagerError

WaitForCompletion blocks until all workers have completed their work. The returned error is non-nil if any worker encountered an error. The exact error list is returned by calling err.All()

type WorkManagerError

type WorkManagerError struct {
	// contains filtered or unexported fields
}

WorkManagerError stores a list of errors encountered during worker processing.

func (*WorkManagerError) All

func (e *WorkManagerError) All() []error

All returns a list of all errors encountered by workers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL