traverse

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: Apache-2.0 Imports: 10 Imported by: 18

Documentation

Overview

Package traverse provides primitives for concurrent and parallel traversal of slices or user-defined collections.

Example
package main

import (
	"math/rand"

	"github.com/grailbio/base/traverse"
)

func main() {
	// Compute N random numbers in parallel.
	const N = 1e5
	out := make([]float64, N)
	_ = traverse.Parallel.Range(len(out), func(start, end int) error {
		r := rand.New(rand.NewSource(rand.Int63()))
		for i := start; i < end; i++ {
			out[i] = r.Float64()
		}
		return nil
	})
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var Parallel = T{Limit: 2 * runtime.GOMAXPROCS(0)}

Parallel is the default traverser for parallel traversal, intended CPU-intensive parallel computing. Parallel limits the number of concurrent invocations to a small multiple of the runtime's available processors.

Functions

func CPU

func CPU(fn func() error) error

CPU calls the function fn for each available system CPU. CPU returns when all calls have completed or on first error.

func Each

func Each(n int, fn func(i int) error) error

Each performs concurrent traversal over n elements. It is a shorthand for (T{}).Each.

Types

type Reporter

type Reporter interface {
	// Init is called when processing is about to begin. Parameter
	// n indicates the number of tasks to be executed by the traversal.
	Init(n int)
	// Complete is called after the traversal has completed.
	Complete()

	// Begin is called when task i is begun.
	Begin(i int)
	// End is called when task i has completed.
	End(i int)
}

A Reporter receives events from an ongoing traversal. Reporters can be passed as options into Traverse, and are used to monitor progress of long-running traversals.

func NewSimpleReporter

func NewSimpleReporter(name string) Reporter

NewSimpleReporter returns a new reporter that prints the number of queued, running, and completed tasks to stderr.

func NewTimeEstimateReporter

func NewTimeEstimateReporter(name string) Reporter

NewTimeEstimateReporter returns a reporter that reports the number of jobs queued, running, and done, as well as the running time of the Traverse and an estimate for the amount of time remaining. Note: for estimation, it assumes jobs have roughly equal running time and are FIFO-ish (that is, it does not try to account for the bias of shorter jobs finishing first and therefore skewing the average estimated job run time).

type T

type T struct {
	// Limit is the traverser's concurrency limit: there will be no more
	// than Limit concurrent invocations per traversal. A limit value of
	// zero (the default value) denotes no limit.
	Limit int
	// Sequential indicates that early indexes should be handled before later
	// ones.  E.g. if there are 40000 tasks and Limit == 40, the initial
	// assignment is usually
	//   worker 0 <- tasks 0-999
	//   worker 1 <- tasks 1000-1999
	//   ...
	//   worker 39 <- tasks 39000-39999
	// but when Sequential == true, only tasks 0-39 are initially assigned, then
	// task 40 goes to the first worker to finish, etc.
	// Note that this increases synchronization overhead.  It should not be used
	// with e.g. > 1 billion tiny tasks; in that scenario, the caller should
	// organize such tasks into e.g. 10000-task chunks and perform a
	// sequential-traverse on the chunks.
	// This scheduling algorithm does perform well when tasks are sorted in order
	// of decreasing size.
	Sequential bool
	// Reporter receives status reports for each traversal. It is
	// intended for users who wish to monitor the progress of large
	// traversal jobs.
	Reporter Reporter
}

A T is a traverser: it provides facilities for concurrently invoking functions that traverse collections of data.

func Limit

func Limit(n int) T

Limit returns a traverser with limit n.

func LimitSequential added in v0.0.11

func LimitSequential(n int) T

LimitSequential returns a sequential traverser with limit n.

func (T) Each

func (t T) Each(n int, fn func(i int) error) error

Each performs a traversal on fn. Specifically, Each invokes fn(i) for 0 <= i < n, managing concurrency and error propagation. Each returns when the all invocations have completed, or after the first invocation fails, in which case the first invocation error is returned. Each also propagates panics from underlying invocations to the caller. Note that if a function panics and doesn't release shared resources that fn might need in a traverse child, this could lead to deadlock.

func (T) Range

func (t T) Range(n int, fn func(start, end int) error) error

Range performs ranged traversal on fn: n is split into contiguous ranges, and fn is invoked for each range. The range sizes are determined by the traverser's concurrency limits. Range allows the caller to amortize function call costs, and is typically used when limit is small and n is large, for example on parallel traversal over large collections, where each item's processing time is comparatively small.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL