parallel

package module
v0.0.0-...-3c37c0a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2020 License: BSD-3-Clause Imports: 3 Imported by: 0

README

parallel

Easy concurrent loops.

Package parallel provides a runner to run tasks with limited concurrency. Using this package, it should be straightforward to replace any loop with similar code that provides concurrency.

Installation

As with most projects for Go, installation is as easy as go get.

Documentation

Please refer to the documentation on godoc.org for API documentation.

An example of how the package can be used is below. The full source can be found on the Go playground.

urls := []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}

// Build a runner using the default context, and which will limit
// concurrency to the number of CPUs. 
r := parallel.NewRunner(nil)
for _, url := range urls {
    url := url
    r.Go(func() {
        // Fetch the URL.
        http.Get(url)
    })
}
// Wait for all HTTP fetches to complete.
r.Wait()

Contributing

Development of this project is ongoing. If you find a bug or have any suggestions, please open an issue.

If you'd like to contribute, please fork the repository and make changes. Pull requests are welcome.

  • golang.org/x/sync/errgroup: Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
  • github.com/juju/utils/parallel: The parallel package provides utilities for running tasks concurrently.
  • sync.WaitGroup: A WaitGroup waits for a collection of goroutines to finish.
  • Worker Pools: An example on how to implement a worker pool using goroutines and channels, provided by Go by Example.

Licensing

This project is licensed under the 3-Clause BSD License. See the LICENSE in the repository.

Documentation

Overview

Package parallel provides a runner to run tasks with limited concurrency. Using this package, it should be straightforward to replace any loop with similar code that provides concurrency.

Example (Loop)
package main

import (
	"fmt"
	"sync/atomic"

	"gitlab.com/stone.code/parallel"
)

func main() {
	// Example of a simple loop.
	sum1 := uint32(0)
	for i := 0; i < 10; i++ {
		// The use of atomic is unnecessary here, but is used to keep the body
		// the same as the concurrent loop below.
		atomic.AddUint32(&sum1, uint32(i))
	}

	// Modified loop where iterations can run concurrently.
	sum2 := uint32(0)
	r := parallel.NewRunner(nil)
	for i := 0; i < 10; i++ {
		i := i // Need a copy of the current value for the closure.
		r.Go(func() {
			atomic.AddUint32(&sum2, uint32(i))
		})
	}
	r.Wait()

	fmt.Println(sum1, sum2)

}
Output:

45 45
Example (LoopWithBreak)
package main

import (
	"context"
	"fmt"
	"sync/atomic"

	"gitlab.com/stone.code/parallel"
)

func main() {
	// Example of a simple loop, which uses a break.
	sum1 := uint32(0)
	for i := 0; i < 10; i++ {
		if i == 5 {
			break
		}
		atomic.AddUint32(&sum1, uint32(i))
	}

	// Modified loop where iterations can run concurrently.
	sum2 := uint32(0)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	r := parallel.NewRunner(ctx)
	for i := 0; i < 10; i++ {
		i := i
		r.Go(func() {
			if i >= 5 {
				// Note: that using i == 5 to cancel would lead to non-
				// determinism.  A separate goroutine may add 6..10 before
				// the call to cancel.
				// Note:  this works is practice, but still contains non-
				// determinism.  There is no guarantee that numbers smaller
				// than 5 will be executed first.
				cancel()
				return
			}
			atomic.AddUint32(&sum2, uint32(i))
		})
	}
	r.Wait()

	fmt.Println(sum1, sum2)

}
Output:

10 10
Example (LoopWithError)
package main

import (
	"context"
	"fmt"
	"sync/atomic"

	"gitlab.com/stone.code/parallel"
)

func main() {
	// Example of a simple loop, which uses a break.
	sum1, err1 := func() (uint32, error) {
		sum := uint32(0)
		for i := 0; i < 10; i++ {
			if i == 5 {
				return 0, fmt.Errorf("dummy error")
			}
			atomic.AddUint32(&sum, uint32(i))
		}
		return sum, nil
	}()

	// Modified loop where iterations can run concurrently.
	// Note, unlike above loop, it is possible that loop body will run for i>5.
	sum2, err2 := func() (uint32, error) {
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()

		sum := uint32(0)
		r := parallel.NewRunner(ctx)
		errs := make(chan error, r.MaxConcurrency())
		for i := 0; i < 10; i++ {
			i := i
			r.Go(func() {
				if i == 5 {
					errs <- fmt.Errorf("dummy error")
					cancel()
					return
				}
				atomic.AddUint32(&sum, uint32(i))
			})
		}
		r.Wait()

		err, ok := <-errs
		if ok {
			return 0, err
		}

		return sum, nil
	}()

	fmt.Println("A:", sum1, err1)
	fmt.Println("B:", sum2, err2)

}
Output:

A: 0 dummy error
B: 0 dummy error

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectErrors

func CollectErrors(r *Runner) (<-chan []error, chan<- error)

CollectErrors creates a pipeline to collect errors from multiple goroutines into a slice. The caller is responsible for calling close on the input channel.

The output channel will be blocked until all of the errors have been collected.

Example
package main

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"

	"gitlab.com/stone.code/parallel"
)

func main() {
	// Example of a simple loop, which collects errors.
	sum1, err1 := func() (uint32, []error) {
		sum := uint32(0)
		errs := []error(nil)
		for i := 0; i < 10; i++ {
			if i == 5 || i == 7 {
				errs = append(errs, fmt.Errorf("dummy (%d) error", i))
				continue
			}
			atomic.AddUint32(&sum, uint32(i))
		}
		return sum, errs
	}()

	// Modified loop where iterations can run concurrently.
	sum2, err2 := func() (uint32, []error) {
		sum := uint32(0)
		r := parallel.NewRunner(context.Background())
		errout, errin := parallel.CollectErrors(r)
		for i := 0; i < 10; i++ {
			i := i
			r.Go(func() {
				// Note that with enough concurrency, calls with i==5 and calls
				// with i==7 may execute out of order.  To prevent intermittent
				// failures of the test, we deliberately delay i==7.
				if i == 7 {
					time.Sleep(time.Millisecond)
				}
				if i == 5 || i == 7 {
					errin <- fmt.Errorf("dummy (%d) error", i)
					return
				}
				atomic.AddUint32(&sum, uint32(i))
			})
		}
		r.Wait()

		close(errin)
		return sum, <-errout
	}()

	fmt.Println("A:", sum1, err1)
	fmt.Println("B:", sum2, err2)

}
Output:

A: 33 [dummy (5) error dummy (7) error]
B: 33 [dummy (5) error dummy (7) error]

Types

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

A Runner waits for a collection of callbacks to finish, while running the callbacks in a limited number of goroutines.

A Runner must not be copied after first use.

func NewRunner

func NewRunner(ctx context.Context) *Runner

NewRunner returns a new runner based on the provided context. The maximum concurrency will be limited to a default value.

See the documentation for NewRunnerWithMax for more details.

func NewRunnerWithMax

func NewRunnerWithMax(ctx context.Context, max int) *Runner

NewRunnerWithMax returns a new runner based on the provided context. The maximum concurrency will be limited to max. As a precondition, the maximum concurrency must be greater than zero. If the maximum concurrency is one, then all functions passed to Do will run sequentially.

Example
package main

import (
	"bytes"
	"context"
	"fmt"

	"gitlab.com/stone.code/parallel"
)

func main() {
	buffer := bytes.NewBuffer(nil)

	// Note that we set the maximum concurrency to one.  This is required to
	// get deterministic behaviour for the test.  If set to a higher number,
	// then the order of two output lines may be reversed.
	r := parallel.NewRunnerWithMax(context.Background(), 1)
	r.Go(func() {
		buffer.WriteString("one\n")
	})
	r.Go(func() {
		buffer.WriteString("two\n")
	})
	r.Wait()

	fmt.Println(buffer.String())

}
Output:

one
two

func (*Runner) Context

func (r *Runner) Context() context.Context

Context returns the runners's context. The returned context is always non-nil; it defaults to the background context.

func (*Runner) Go

func (r *Runner) Go(f func())

Go adds the function f to the queue of actions to be completed. Depending on the runner's maximum concurrency, tasks may be run in parallel, and may execute out of order. Otherwise, tasks will run in the order that they are provided. If there is insufficiency concurrency to start the task, the call to Do will block.

If the provided context is either cancelled, either explicitly or because it exceeds its deadline, before execution of the function f begins, then the function f will not be called.

This method cannot be called after Wait.

func (*Runner) MaxConcurrency

func (r *Runner) MaxConcurrency() int

MaxConcurrency returns the concurrency, which is the maximum number of concurrently running routines.

func (*Runner) Wait

func (r *Runner) Wait()

Wait marks the runner as complete, and waits for all of the functions to terminate. This method must be called or the runner will leak goroutines.

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

A WaitGroup waits for a collection of callbacks to finish. Each callback will be executed in a new goroutine.

A WaitGroup must not be copied after first use.

Example
package main

import (
	"net/http"

	"gitlab.com/stone.code/parallel"
)

func main() {
	var wg parallel.WaitGroup
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	for _, url := range urls {
		url := url
		wg.Go(func() {
			// Error ignored in example
			_, _ = http.Get(url)
		})
	}
	// Wait for all HTTP fetches to complete.
	wg.Wait()
}
Output:

func (*WaitGroup) Go

func (wg *WaitGroup) Go(f func())

Go starts a goroutine to execute the callback f, while also maintaining a count of the running goroutines.

func (*WaitGroup) Wait

func (wg *WaitGroup) Wait()

Wait blocks until all of the callbacks have finished executing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL