parallel

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2022 License: Apache-2.0 Imports: 9 Imported by: 12

README

Go library for structured concurrency

go.dev reference

Structured concurrency helps reasoning about the behaviour of parallel programs. parallel implements structured concurrency for Go.

func subtask(ctx context.Context) error {
    // to be run in parallel
}

type subtaskWithData struct { /* ... * / }

func (swd *subtaskWithData) Run(ctx context.Context) error {
    // to be run in parallel
}

err := parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
    swd := &subtaskWithData{}

    // do some synchronous initialization here

    spawn("subtask", parallel.Fail, subtask)
    spawn("subtaskWithData", parallel.Fail, swd.Run)
    return nil
})

Runs initializaiton within parallel.Run(), and then waits until context is canceled, or one of spawned tasks finishes. Panics in goroutines are captured.

See the documentation for additional features:

  • subprocess groups without inversion of control
  • tasks that may exit and keep the group running
  • tasks that may exit and cause the group to stop gracefully

Copyright Tectonic Networks, Inc.

Licensed under Apache 2.0 license.

Authors:

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run(ctx context.Context, start func(ctx context.Context, spawn SpawnFn) error) error

Run runs a task with several subtasks.

The start function is the start-up sequence of the task. It receives a spawn function that can be used to launch subtasks. If the outer context closes, or any of the subtasks returns an error or panics, the inner context passed to start and to every task will also close, signalling all remaining goroutines to terminate.

The start function should perform all necessary initialization and return. When it returns, it doesn't by itself cause the task context to close, and subtasks to exit.

Run waits until the start function and all subtasks exit.

If start returns an error, it becomes the return value of Run. Otherwise, Run returns the error or panic value from the first failed subtask.

The subtasks can in turn be implemented using parallel.Run and have subtasks of their own.

Example:

err := parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
    s1, err := service1.New(...)
    if err != nil {
        return err
    }

    s2, err := service2.New(...)
    if err != nil {
        return err
    }

    if err := s1.HeavyInit(ctx); err != nil {
        return err
    }

    spawn("service1", parallel.Fail, s1.Run)
    spawn("service2", parallel.Fail, s2.Run)
    return nil
})

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group is a facility for running a task with several subtasks without inversion of control. For most ordinary use cases, use Run instead.

return Run(ctx, start)

...is equivalent to:

g := NewGroup(ctx)
if err := start(g.Context(), g.Spawn); err != nil {
    g.Exit(err)
}
return g.Wait()

Group is mostly useful in test suites where starting and finishing the group is controlled by test setup and teardown functions.

func NewGroup

func NewGroup(ctx context.Context) *Group

NewGroup creates a new Group controlled by the given context

func NewSubgroup

func NewSubgroup(spawn SpawnFn, name string, onExit OnExit, fields ...zapcore.Field) *Group

NewSubgroup creates a new Group nested within another. The spawn argument is the spawn function of the parent group.

The subgroup's context is inherited from the parent group. The entire subgroup is treated as a task in the parent group.

Example within parallel.Run:

err := parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
    spawn(...)
    spawn(...)
    subgroup := parallel.NewSubgroup(spawn, "updater")
    subgroup.Spawn(...)
    subgroup.Spawn(...)
    return nil
})

Example within an explicit group:

group := parallel.NewGroup(ctx)
group.Spawn(...)
group.Spawn(...)
subgroup := parallel.NewSubgroup(group.Spawn, "updater")
subgroup.Spawn(...)
subgroup.Spawn(...)

func (*Group) Complete

func (g *Group) Complete(ctx context.Context) error

Complete first waits for either the given context to close or the group to exit on its own, then for the group's remaining subtasks to finish.

Returns the group result. If the group result is nil, returns the error from the given context so as to not confuse parallel.Fail if the group is empty.

This is a convenience method useful when attaching a subgroup:

spawn("subgroup", parallel.Fail, subgroup.Complete)

...or:

group.Spawn("subgroup", parallel.Fail, subgroup.Complete)

func (*Group) Context

func (g *Group) Context() context.Context

Context returns the inner context of the group which controls the lifespan of its subtasks

func (*Group) Done

func (g *Group) Done() <-chan struct{}

Done returns a channel that closes when the last running subtask finishes. If no subtasks are running, the returned channel is already closed.

func (*Group) Exit

func (g *Group) Exit(err error)

Exit prompts the group to shut down, if it's not already shutting down or finished. This causes the inner context to close, which should prompt any running subtasks to exit. Use Wait to block until all the subtasks actually finish.

If the group result is not yet set, Exit sets it to err.

func (*Group) Running

func (g *Group) Running() int

Running returns the number of running subtasks

func (*Group) Spawn

func (g *Group) Spawn(name string, onExit OnExit, task Task)

Spawn spawns a subtask. See documentation for SpawnFn.

When a subtask finishes, it sets the result of the group if it's not already set (unless the task returns nil and its OnExit mode is Continue).

func (*Group) Wait

func (g *Group) Wait() error

Wait blocks until no subtasks are running, then returns the group result.

The group result is set by finishing subtasks (see the documentation for OnExit modes) as well as by Exit calls.

type OnExit

type OnExit int

OnExit is an enumeration of exit handling modes. It specifies what should happen to the parent task if the subtask returns nil.

Regardless of the chosen mode, if the subtask returns an error, it causes the parent task to shut down gracefully and return that error.

const (
	// Continue means other subtasks of the parent task should continue to run.
	// Note that the parent task will return nil if its last remaining subtask
	// returns nil, even if Continue is specified.
	//
	// Use this mode for finite jobs that need to run once.
	Continue OnExit = iota

	// Exit means shut down the parent task gracefully.
	//
	// Use this mode for tasks that should be able to initiate graceful
	// shutdown, such as an HTTP server with a /quit endpoint that needs to
	// cause the process to exit.
	//
	// If any of other subtasks return an error, and it is not a (possibly
	// wrapped) context.Canceled, then the parent task will return the error.
	// Only first error from subtasks will be returned, the rest will be
	// discarded.
	//
	// If all other subtasks return nil or context.Canceled, the parent task
	// returns nil.
	Exit

	// Fail means shut down the parent task gracefully and return an error.
	//
	// Use this mode for subtasks that should never return unless their context
	// is closed.
	Fail
)

func (OnExit) String

func (onExit OnExit) String() string

type PanicError

type PanicError struct {
	Value interface{}
	Stack []byte
}

PanicError is the error type that occurs when a subtask panics

func (PanicError) Error

func (err PanicError) Error() string

func (PanicError) Unwrap

func (err PanicError) Unwrap() error

Unwrap returns the error passed to panic, or nil if panic was called with something other than an error

type SpawnFn

type SpawnFn func(name string, onExit OnExit, task Task)

SpawnFn is a function that starts a subtask in a goroutine.

The task name is only for error messages. It is recommended that name is chosen uniquely within the parent task, but it's not enforced.

The onExit mode specifies what happens if the subtask exits, see documentation for OnExit.

type Task

type Task func(ctx context.Context) error

A Task is the main function of a service or component, or any other task.

This is a core concept of this package. The simple signature of a task makes it possible to seamlessly combine code using this package with code that isn't aware of it.

When ctx is closed, the function should finish as soon as possible and return ctx.Err().

A task can also finish for any other reason, returning an error if there was a problem, or nil if it was a finite job that has completed successfully.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL