parallel

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

README

go-parallel

parallelism in Go using generics

Parallel functions

  • MapBatches
  • QueueWorkers
  • ArrayWorkers1
  • BatchedChannel

Concurrency helpers

  • Concurrent: run n functions concurrently
  • TrySend
  • TryRecv

Error handling

This library relies on go-recovery to trap panics that occur in go routines. go-recovery by default will log panics but can be configured to send them to an error monitoring service.

For maximum flexibility with error handling, many of the parallel functions return an error channel. Any errors that occur in work functions will be put into the error channel. There are helpers for common patterns for dealing with the errors:

  • CollectErrors
  • CancelAfterFirstError

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ArrayWorkers1

func ArrayWorkers1[T any](nParallel int, objects []T, cancel <-chan struct{}, fn func(int, T) error) <-chan error

Just operate on one object at a time Processing continues until completion or a value is read from the cancel channel Returns a channel of errors Uses QueueWorkers under the hood

func BatchWorkers added in v0.3.0

func BatchWorkers[T any](bw BatchWork, objects []T, worker func([]T) error) error

BatchWorkers combines BatchedChannel, QueueWorkers, and CancelAfterFirstError The given objects are batched up and worked in parallel

func BatchedChannel

func BatchedChannel[T any](bw BatchWork, objects []T) <-chan []T

BatchedChannel sends slices of Batchwork.Size objects to the resulting channel

func CancelAfterFirstError

func CancelAfterFirstError(cancel chan struct{}, errors <-chan error) error

For functions that take a cancel channel and return an error channel. This helper will cancel on the first error. Waits for all processing to complete. Returns a multierror of any resulting errors.

This does not immediately stop existing processing (which requires panicing). This cancels future work distribution.

func CollectErrors

func CollectErrors(errors <-chan error) error

Combine all errors from a channel of errors into a single multierr error.

func Concurrent

func Concurrent(n int, fn func(int) error) error

n is the number of go routines to spawn errors are returned as a combined multierr panics in the given function are recovered and converted to an error

func MapBatches

func MapBatches[T any, U any](nParallel int, objects []T, fn func(T) (U, error)) ([]U, error)

func QueueWorkers

func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error

spawn N parallel workers that apply fn to T. Any panics in the given fn are recovered and returned as errors. Work is taken out of the given queue and given to the first available worker. Once all workers are full, reflect.Select is used to select the next worker. Close the given queue to shutdown the workers.

Workers will continue to work after encountering an error. Errors are sent to the returned error channel. When the given queue is closed and the work is processed, the returned error channel will be closed.

func TryRecv

func TryRecv[T any](c <-chan T) (receivedObject T, received bool)

try to send to a channel, return true if sent, false if not

func TrySend

func TrySend[T any](c chan<- T, obj T) bool

try to send to a channel, return true if sent, false if not

Types

type BatchWork

type BatchWork struct {
	Size        int
	Parallelism int
	Cancel      chan struct{}
}

func (*BatchWork) AdjustForSmallLength

func (bw *BatchWork) AdjustForSmallLength(total int) int

If the length is too small, decrease the batch size

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL