parallel: gitlab.com/stone.code/parallel Index | Examples | Files

package parallel

import "gitlab.com/stone.code/parallel"

Package parallel provides a runner to run tasks with limited concurrency. Using this package, it should be straightforward to replace any loop with similar code that provides concurrency.

Code:

// Example of a simple loop.
sum1 := uint32(0)
for i := 0; i < 10; i++ {
    // The use of atomic is unnecessary here, but is used to keep the body
    // the same as the concurrent loop below.
    atomic.AddUint32(&sum1, uint32(i))
}

// Modified loop where iterations can run concurrently.
sum2 := uint32(0)
r := parallel.NewRunner(nil)
for i := 0; i < 10; i++ {
    i := i // Need a copy of the current value for the closure.
    r.Go(func() {
        atomic.AddUint32(&sum2, uint32(i))
    })
}
r.Wait()

fmt.Println(sum1, sum2)

Output:

45 45

Code:

// Example of a simple loop, which uses a break.
sum1 := uint32(0)
for i := 0; i < 10; i++ {
    if i == 5 {
        break
    }
    atomic.AddUint32(&sum1, uint32(i))
}

// Modified loop where iterations can run concurrently.
sum2 := uint32(0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := parallel.NewRunner(ctx)
for i := 0; i < 10; i++ {
    i := i
    r.Go(func() {
        if i >= 5 {
            // Note: that using i == 5 to cancel would lead to non-
            // determinism.  A separate goroutine may add 6..10 before
            // the call to cancel.
            // Note:  this works is practice, but still contains non-
            // determinism.  There is no guarantee that numbers smaller
            // than 5 will be executed first.
            cancel()
            return
        }
        atomic.AddUint32(&sum2, uint32(i))
    })
}
r.Wait()

fmt.Println(sum1, sum2)

Output:

10 10

Code:

// Example of a simple loop, which uses a break.
sum1, err1 := func() (uint32, error) {
    sum := uint32(0)
    for i := 0; i < 10; i++ {
        if i == 5 {
            return 0, fmt.Errorf("dummy error")
        }
        atomic.AddUint32(&sum, uint32(i))
    }
    return sum, nil
}()

// Modified loop where iterations can run concurrently.
// Note, unlike above loop, it is possible that loop body will run for i>5.
sum2, err2 := func() (uint32, error) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sum := uint32(0)
    r := parallel.NewRunner(ctx)
    errs := make(chan error, r.MaxConcurrency())
    for i := 0; i < 10; i++ {
        i := i
        r.Go(func() {
            if i == 5 {
                errs <- fmt.Errorf("dummy error")
                cancel()
                return
            }
            atomic.AddUint32(&sum, uint32(i))
        })
    }
    r.Wait()

    err, ok := <-errs
    if ok {
        return 0, err
    }

    return sum, nil
}()

fmt.Println("A:", sum1, err1)
fmt.Println("B:", sum2, err2)

Output:

A: 0 dummy error
B: 0 dummy error

Index

Examples

Package Files

doc.go pipeline.go runner.go waitgroup.go

func CollectErrors Uses

func CollectErrors(r *Runner) (<-chan []error, chan<- error)

CollectErrors creates a pipeline to collect errors from multiple goroutines into a slice. The caller is responsible for calling close on the input channel.

The output channel will be blocked until all of the errors have been collected.

Code:

// Example of a simple loop, which collects errors.
sum1, err1 := func() (uint32, []error) {
    sum := uint32(0)
    errs := []error(nil)
    for i := 0; i < 10; i++ {
        if i == 5 || i == 7 {
            errs = append(errs, fmt.Errorf("dummy (%d) error", i))
            continue
        }
        atomic.AddUint32(&sum, uint32(i))
    }
    return sum, errs
}()

// Modified loop where iterations can run concurrently.
sum2, err2 := func() (uint32, []error) {
    sum := uint32(0)
    r := parallel.NewRunner(context.Background())
    errout, errin := parallel.CollectErrors(r)
    for i := 0; i < 10; i++ {
        i := i
        r.Go(func() {
            // Note that with enough concurrency, calls with i==5 and calls
            // with i==7 may execute out of order.  To prevent intermittent
            // failures of the test, we deliberately delay i==7.
            if i == 7 {
                time.Sleep(time.Millisecond)
            }
            if i == 5 || i == 7 {
                errin <- fmt.Errorf("dummy (%d) error", i)
                return
            }
            atomic.AddUint32(&sum, uint32(i))
        })
    }
    r.Wait()

    close(errin)
    return sum, <-errout
}()

fmt.Println("A:", sum1, err1)
fmt.Println("B:", sum2, err2)

Output:

A: 33 [dummy (5) error dummy (7) error]
B: 33 [dummy (5) error dummy (7) error]

type Runner Uses

type Runner struct {
    // contains filtered or unexported fields
}

A Runner waits for a collection of callbacks to finish, while running the callbacks in a limited number of goroutines.

A Runner must not be copied after first use.

func NewRunner Uses

func NewRunner(ctx context.Context) *Runner

NewRunner returns a new runner based on the provided context. The maximum concurrency will be limited to a default value.

See the documentation for NewRunnerWithMax for more details.

func NewRunnerWithMax Uses

func NewRunnerWithMax(ctx context.Context, max int) *Runner

NewRunnerWithMax returns a new runner based on the provided context. The maximum concurrency will be limited to max. As a precondition, the maximum concurrency must be greater than zero. If the maximum concurrency is one, then all functions passed to Do will run sequentially.

Code:

buffer := bytes.NewBuffer(nil)

// Note that we set the maximum concurrency to one.  This is required to
// get deterministic behaviour for the test.  If set to a higher number,
// then the order of two output lines may be reversed.
r := parallel.NewRunnerWithMax(context.Background(), 1)
r.Go(func() {
    buffer.WriteString("one\n")
})
r.Go(func() {
    buffer.WriteString("two\n")
})
r.Wait()

fmt.Println(buffer.String())

Output:

one
two

func (*Runner) Context Uses

func (r *Runner) Context() context.Context

Context returns the runners's context. The returned context is always non-nil; it defaults to the background context.

func (*Runner) Go Uses

func (r *Runner) Go(f func())

Go adds the function f to the queue of actions to be completed. Depending on the runner's maximum concurrency, tasks may be run in parallel, and may execute out of order. Otherwise, tasks will run in the order that they are provided. If there is insufficiency concurrency to start the task, the call to Do will block.

If the provided context is either cancelled, either explicitly or because it exceeds its deadline, before execution of the function f begins, then the function f will not be called.

This method cannot be called after Wait.

func (*Runner) MaxConcurrency Uses

func (r *Runner) MaxConcurrency() int

MaxConcurrency returns the concurrency, which is the maximum number of concurrently running routines.

func (*Runner) Wait Uses

func (r *Runner) Wait()

Wait marks the runner as complete, and waits for all of the functions to terminate. This method must be called or the runner will leak goroutines.

type WaitGroup Uses

type WaitGroup struct {
    // contains filtered or unexported fields
}

A WaitGroup waits for a collection of callbacks to finish. Each callback will be executed in a new goroutine.

A WaitGroup must not be copied after first use.

Code:

var wg parallel.WaitGroup
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for _, url := range urls {
    url := url
    wg.Go(func() {
        http.Get(url)
    })
}
// Wait for all HTTP fetches to complete.
wg.Wait()

func (*WaitGroup) Go Uses

func (wg *WaitGroup) Go(f func())

Go starts a goroutine to execute the callback f, while also maintaining a count of the running goroutines.

func (*WaitGroup) Wait Uses

func (wg *WaitGroup) Wait()

Wait blocks until all of the callbacks have finished executing.

Package parallel imports 3 packages (graph). Updated 2019-08-12. Refresh now. Tools for package owners.