Documentation ¶
Overview ¶
Package parallel provides a runner to run tasks with limited concurrency. Using this package, it should be straightforward to replace any loop with similar code that provides concurrency.
Example (Loop) ¶
package main import ( "fmt" "sync/atomic" "gitlab.com/stone.code/parallel" ) func main() { // Example of a simple loop. sum1 := uint32(0) for i := 0; i < 10; i++ { // The use of atomic is unnecessary here, but is used to keep the body // the same as the concurrent loop below. atomic.AddUint32(&sum1, uint32(i)) } // Modified loop where iterations can run concurrently. sum2 := uint32(0) r := parallel.NewRunner(nil) for i := 0; i < 10; i++ { i := i // Need a copy of the current value for the closure. r.Go(func() { atomic.AddUint32(&sum2, uint32(i)) }) } r.Wait() fmt.Println(sum1, sum2) }
Output: 45 45
Example (LoopWithBreak) ¶
package main import ( "context" "fmt" "sync/atomic" "gitlab.com/stone.code/parallel" ) func main() { // Example of a simple loop, which uses a break. sum1 := uint32(0) for i := 0; i < 10; i++ { if i == 5 { break } atomic.AddUint32(&sum1, uint32(i)) } // Modified loop where iterations can run concurrently. sum2 := uint32(0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() r := parallel.NewRunner(ctx) for i := 0; i < 10; i++ { i := i r.Go(func() { if i >= 5 { // Note: that using i == 5 to cancel would lead to non- // determinism. A separate goroutine may add 6..10 before // the call to cancel. // Note: this works is practice, but still contains non- // determinism. There is no guarantee that numbers smaller // than 5 will be executed first. cancel() return } atomic.AddUint32(&sum2, uint32(i)) }) } r.Wait() fmt.Println(sum1, sum2) }
Output: 10 10
Example (LoopWithError) ¶
package main import ( "context" "fmt" "sync/atomic" "gitlab.com/stone.code/parallel" ) func main() { // Example of a simple loop, which uses a break. sum1, err1 := func() (uint32, error) { sum := uint32(0) for i := 0; i < 10; i++ { if i == 5 { return 0, fmt.Errorf("dummy error") } atomic.AddUint32(&sum, uint32(i)) } return sum, nil }() // Modified loop where iterations can run concurrently. // Note, unlike above loop, it is possible that loop body will run for i>5. sum2, err2 := func() (uint32, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() sum := uint32(0) r := parallel.NewRunner(ctx) errs := make(chan error, r.MaxConcurrency()) for i := 0; i < 10; i++ { i := i r.Go(func() { if i == 5 { errs <- fmt.Errorf("dummy error") cancel() return } atomic.AddUint32(&sum, uint32(i)) }) } r.Wait() err, ok := <-errs if ok { return 0, err } return sum, nil }() fmt.Println("A:", sum1, err1) fmt.Println("B:", sum2, err2) }
Output: A: 0 dummy error B: 0 dummy error
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CollectErrors ¶
CollectErrors creates a pipeline to collect errors from multiple goroutines into a slice. The caller is responsible for calling close on the input channel.
The output channel will be blocked until all of the errors have been collected.
Example ¶
package main import ( "context" "fmt" "sync/atomic" "time" "gitlab.com/stone.code/parallel" ) func main() { // Example of a simple loop, which collects errors. sum1, err1 := func() (uint32, []error) { sum := uint32(0) errs := []error(nil) for i := 0; i < 10; i++ { if i == 5 || i == 7 { errs = append(errs, fmt.Errorf("dummy (%d) error", i)) continue } atomic.AddUint32(&sum, uint32(i)) } return sum, errs }() // Modified loop where iterations can run concurrently. sum2, err2 := func() (uint32, []error) { sum := uint32(0) r := parallel.NewRunner(context.Background()) errout, errin := parallel.CollectErrors(r) for i := 0; i < 10; i++ { i := i r.Go(func() { // Note that with enough concurrency, calls with i==5 and calls // with i==7 may execute out of order. To prevent intermittent // failures of the test, we deliberately delay i==7. if i == 7 { time.Sleep(time.Millisecond) } if i == 5 || i == 7 { errin <- fmt.Errorf("dummy (%d) error", i) return } atomic.AddUint32(&sum, uint32(i)) }) } r.Wait() close(errin) return sum, <-errout }() fmt.Println("A:", sum1, err1) fmt.Println("B:", sum2, err2) }
Output: A: 33 [dummy (5) error dummy (7) error] B: 33 [dummy (5) error dummy (7) error]
Types ¶
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
A Runner waits for a collection of callbacks to finish, while running the callbacks in a limited number of goroutines.
A Runner must not be copied after first use.
func NewRunner ¶
NewRunner returns a new runner based on the provided context. The maximum concurrency will be limited to a default value.
See the documentation for NewRunnerWithMax for more details.
func NewRunnerWithMax ¶
NewRunnerWithMax returns a new runner based on the provided context. The maximum concurrency will be limited to max. As a precondition, the maximum concurrency must be greater than zero. If the maximum concurrency is one, then all functions passed to Do will run sequentially.
Example ¶
package main import ( "bytes" "context" "fmt" "gitlab.com/stone.code/parallel" ) func main() { buffer := bytes.NewBuffer(nil) // Note that we set the maximum concurrency to one. This is required to // get deterministic behaviour for the test. If set to a higher number, // then the order of two output lines may be reversed. r := parallel.NewRunnerWithMax(context.Background(), 1) r.Go(func() { buffer.WriteString("one\n") }) r.Go(func() { buffer.WriteString("two\n") }) r.Wait() fmt.Println(buffer.String()) }
Output: one two
func (*Runner) Context ¶
Context returns the runners's context. The returned context is always non-nil; it defaults to the background context.
func (*Runner) Go ¶
func (r *Runner) Go(f func())
Go adds the function f to the queue of actions to be completed. Depending on the runner's maximum concurrency, tasks may be run in parallel, and may execute out of order. Otherwise, tasks will run in the order that they are provided. If there is insufficiency concurrency to start the task, the call to Do will block.
If the provided context is either cancelled, either explicitly or because it exceeds its deadline, before execution of the function f begins, then the function f will not be called.
This method cannot be called after Wait.
func (*Runner) MaxConcurrency ¶
MaxConcurrency returns the concurrency, which is the maximum number of concurrently running routines.
type WaitGroup ¶
type WaitGroup struct {
// contains filtered or unexported fields
}
A WaitGroup waits for a collection of callbacks to finish. Each callback will be executed in a new goroutine.
A WaitGroup must not be copied after first use.
Example ¶
package main import ( "net/http" "gitlab.com/stone.code/parallel" ) func main() { var wg parallel.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { url := url wg.Go(func() { // Error ignored in example _, _ = http.Get(url) }) } // Wait for all HTTP fetches to complete. wg.Wait() }
Output: