parallel

package
v2.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2023 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package parallel contains generic typesafe functions to manage concurrent logic of various kinds.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consumers

func Consumers[F ~func(context.Context, int, T) error, T any](ctx context.Context, n int, f F) (func(T) error, func() error)

Consumers launches n parallel workers each consuming values supplied by the caller.

When a value is available, an available worker calls the function f to consume it. This callback receives the worker's number (in the range 0 through n-1) and the value.

The caller receives two callbacks: one for sending a value to the workers via an internal channel, and one for closing that channel, signaling the end of input and causing the workers to exit normally.

The value-sending callback may block until a worker is available to consume the value.

An error from any worker cancels them all. This error is returned from the close-channel callback. After any error, the value-sending callback will return an error. (Not the original error, however. For that, the caller should still invoke the close callback.)

Example
package main

import (
	"context"
	"fmt"

	"github.com/bobg/go-generics/v2/parallel"
)

func main() {
	ctx := context.Background()

	// One of three goroutines prints incoming values.
	send, closefn := parallel.Consumers(ctx, 3, func(_ context.Context, _, val int) error {
		fmt.Println(val)
		return nil
	})

	// Caller produces values.
	for i := 1; i <= 5; i++ {
		err := send(i)
		if err != nil {
			panic(err)
		}
	}
	if err := closefn(); err != nil {
		panic(err)
	}
}
Output:

1
2
3
4
5

func Pool

func Pool[F ~func(T) (U, error), T, U any](n int, f F) func(T) (U, error)

Pool permits up to n concurrent calls to a function f. The caller receives a callback for requesting a worker from this pool. When no worker is available, the callback blocks until one becomes available. Then it invokes f and returns the result.

Each call of the callback is synchronous. Any desired concurrency is the responsibility of the caller.

Example
package main

import (
	"fmt"
	"sync"

	"github.com/bobg/go-generics/v2/parallel"
)

func main() {
	// Three workers available, each negating its input.
	pool := parallel.Pool(3, func(n int) (int, error) {
		return -n, nil
	})

	var wg sync.WaitGroup

	// Ten goroutines requesting work from those three workers.
	for i := 1; i <= 10; i++ {
		i := i // Go loop-var pitfall
		wg.Add(1)
		go func() {
			neg, err := pool(i)
			if err != nil {
				panic(err)
			}
			fmt.Println(neg)
			wg.Done()
		}()
	}

	wg.Wait()

}
Output:

-1
-2
-3
-4
-5
-6
-7
-8
-9
-10

func Producers

func Producers[F ~func(context.Context, int, func(T) error) error, T any](ctx context.Context, n int, f F) iter.Of[T]

Producers launches n parallel workers each running the function f.

Each worker receives its worker number (in the range 0 through n-1) and a callback to use for producing a value. If the callback returns an error, the worker should exit with that error.

The callback that the worker uses to produce a value may block until the caller is able to consume the value.

An error from any worker cancels them all.

The caller gets an iterator over the values produced.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bobg/go-generics/v2/parallel"
)

func main() {
	ctx := context.Background()

	// Five goroutines each produce their worker number and then exit.
	it := parallel.Producers(ctx, 5, func(_ context.Context, n int, send func(int) error) error {
		return send(n)
	})

	// Caller consumes the produced values.
	for it.Next() {
		fmt.Println(it.Val())
	}
	if err := it.Err(); err != nil {
		panic(err)
	}
}
Output:

0
1
2
3
4

func Protect

func Protect[T any](val T) (reader func() T, writer func(T), closer func())

Protect offers safe concurrent access to a protected value. It is a "share memory by communicating" alternative to protecting the value with sync.RWMutex.

The caller gets back three functions: a reader for getting the protected value, a writer for updating it, and a closer for releasing resources when no further reads or writes are needed.

Any number of calls to the reader may run concurrently. If T is a "reference type" (see below) then the caller should not make any changes to the value it receives from the reader.

A call to the writer prevents other reader and writer calls from running until it is done. It waits for pending calls to finish before it executes. After a call to the writer, future reader calls will receive the updated value.

The closer should be called to release resources when no more reader or writer calls are needed. Calling any of the functions (reader, writer, or closer) after a call to the closer may cause a panic.

The term "reference type" here means a type (such as pointer, slice, map, channel, function, and interface) that allows a caller C to make changes that will be visible to other callers outside of C's scope. In other words, if the type is int and caller A does this:

val := reader()
val++

it will not affect the value that caller B sees when it does its own call to reader(). But if the type is *int and caller A does this:

val := reader()
*val++

then the change in the pointed-to value _will_ be seen by caller B.

For more on the fuzzy concept of "reference types" in Go, see https://github.com/go101/go101/wiki/About-the-terminology-%22reference-type%22-in-Go

Example
package main

import (
	"fmt"
	"sync"

	"github.com/bobg/go-generics/v2/parallel"
)

func main() {
	// A caller is supplied with a reader and a writer
	// for purposes of accessing and updating the protected value safely
	// (in this case an int, initially 4).
	reader, writer, closer := parallel.Protect(4)
	defer closer()

	// Call the reader in three concurrent goroutines, each printing the protected value.
	var wg sync.WaitGroup
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func() {
			fmt.Println(reader())
			wg.Done()
		}()
	}
	wg.Wait()

	// Increment the protected value.
	writer(reader() + 1)

	// Call the reader in three concurrent goroutines, each printing the protected value.
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func() {
			fmt.Println(reader())
			wg.Done()
		}()
	}
	wg.Wait()

}
Output:

4
4
4
5
5
5

func Values

func Values[F ~func(context.Context, int) (T, error), T any](ctx context.Context, n int, f F) ([]T, error)

Values produces a slice of n values using n parallel workers each running the function f.

Each worker receives its worker number (in the range 0 through n-1).

An error from any worker cancels them all. The first error is returned to the caller.

The resulting slice has length n. The value at position i comes from worker i.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bobg/go-generics/v2/parallel"
)

func main() {
	ctx := context.Background()

	// Five goroutines, each placing its worker number in the corresponding slot of the result slice.
	values, err := parallel.Values(ctx, 5, func(_ context.Context, n int) (int, error) {
		return n, nil
	})
	if err != nil {
		panic(err)
	}
	fmt.Println(values)
}
Output:

[0 1 2 3 4]

Types

type Error

type Error struct {
	N   int
	Err error
}

Error is an error type for wrapping errors returned from worker goroutines. It contains the worker number of the goroutine that produced the error.

func (Error) Error

func (e Error) Error() string

func (Error) Unwrap

func (e Error) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL