concurrent

package module
v0.0.0-...-344a8b9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: BSD-3-Clause Imports: 3 Imported by: 0

README

go-concurrent

This Go package makes it easy to process a list of things concurrently, using a finite number of goroutines, using a simple closure.

Three broad patterns are supported and described below. For each, the package user can easily cap the maximum concurrency width, that is clipped to the maximum number of CPUs on the system in all cases.

This package was created because the author kept finding the need to implement these patterns over and over.

Import

import "code.jpap.org/go-concurrent"

Grouped execution pattern

The idea here is to take n jobs, split them into groups ("batches" or "chunks"). Each invocation i of the closure is given an index range [m_i, n_i) that specifies a non-overlapping group. The union of all invocations covers the index range [0, n).

If one of the invocations returns an error, the first error received is returned, but all invocations are executed before returning.

  num := int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
  var sum uint32
  concurrent.RunGrouped(len(num), 0, func(m, n int) {
    localSum := 0
    for j := m; j < n; j++ {
      localSum += num[j]
    }
    atomic.AddUint32(&sum, localSum)
  })

Sweeped execution pattern

The idea here is similar to the Grouped pattern, except that the closure is invoked once per job, and if any invocation returns an error, no more invocations are scheduled. This allows errors to "short circuit" execution and return earlier than Grouped equivalent.

Like the Grouped pattern, the concurrency width is limited, to reduce goroutine overheads.

  // A trivial example, to contrast to the Grouped pattern example above.
  // You would almost surely not implement a sum in this manner. ;-)
  num := int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
  var sum uint32
  concurrent.RunSweep(len(num), 0, func(j int) {
    atomic.AddUint32(&sum, num[j])
  })

Runner pattern

A Runner is also provided, that allows jobs to be scheduled without having to know how many jobs are required up-front. The implementation of the Sweeped pattern uses this as its implementation.

License

MIT, see the LICENSE.md file.

Documentation

Overview

Package concurrent makes it easy to process a list of things concurrently using a simple closure.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunGrouped

func RunGrouped(count, maxThreads int, f func(m, n int))

RunGrouped is like RunGroupedErr but without errors.

func RunGroupedErr

func RunGroupedErr(count, maxThreads int, f func(m, n int) error) error

RunGroupedErr the given func f concurrently using up to the specified number of maxThreads, or equal to the number of CPUs on the system if passed zero. The number of jobs is given by count, and the range of jobs [m, n) are passed to the callback f.

func RunSweep

func RunSweep(count, maxThreads int, f func(index int))

RunSweep is like RunSweepErr, but without errors.

func RunSweepErr

func RunSweepErr(count, maxThreads int, f func(index int) error) error

RunSweepErr will use at most maxThreads (or equal to the number of CPUs on the system if zero), to run func f concurrently, returning the first error received. If an error is reported, some func f may not be executed.

Types

type Runner

type Runner struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Runner runs jobs with a specified maximum limit on concurrency.

func NewRunner

func NewRunner(n int) *Runner

NewRunner returns a new Runner that executes jobs concurrently with at most n jobs running at any one time.

func (*Runner) Errors

func (jr *Runner) Errors() []error

Errors returns all of the errors reported by jobs. The order is given by job completion, not submission.

func (*Runner) Failures

func (jr *Runner) Failures() int

Failures returns the number of errors reported by jobs so far. This is useful for callers to fail-fast and stop submitting jobs if an error has been reported.

func (*Runner) Finish

func (jr *Runner) Finish() int

Finish waits for all executing jobs to complete, and returns the number of successful jobs completed.

func (*Runner) Run

func (jr *Runner) Run(job func())

Run is like RunErr, but the job func does not need to return an error.

func (*Runner) RunErr

func (jr *Runner) RunErr(job func() error)

RunErr runs a job that reports an error if it fails.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL