parallelizer

package
v0.0.0-...-d323686 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2024 License: GPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultParallelism int = 16

DefaultParallelism is the default parallelism used in scheduler.

Variables

View Source
var ErrWaitTimeout = xerrors.New("timed out waiting for the condition")

ErrWaitTimeout is returned when the condition exited without success.

View Source
var PanicHandlers = []func(interface{}){logPanic}
View Source
var ReallyCrash = true

Functions

func BackoffUntil

func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{})

BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.

If sliding is true, the period is computed after f runs. If it is false then period includes the runtime for f.

func ContextForChannel

func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc)

ContextForChannel derives a child context from a parent channel.

The derived context's Done channel is closed when the returned cancel function is called or when the parent channel is closed, whichever happens first.

Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.

func HandleCrash

func HandleCrash(additionalHandlers ...func(interface{}))

func Jitter

func Jitter(duration time.Duration, maxFactor float64) time.Duration

Jitter returns a time.Duration between duration and duration + maxFactor * duration.

This allows clients to avoid converging on periodic behavior. If maxFactor is 0.0, a suggested default value will be chosen.

func JitterUntil

func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{})

JitterUntil loops until stop channel is closed, running f every period.

If jitterFactor is positive, the period is jittered before every run of f. If jitterFactor is not positive, the period is unchanged and not jittered.

If sliding is true, the period is computed after f runs. If it is false then period includes the runtime for f.

Close stopCh to stop. f may not be invoked if stop channel is already closed. Pass NeverStop to if you don't want it stop.

func JitterUntilWithContext

func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool)

func ParallelizeUntil

func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options)

ParallelizeUntil is a framework that allows for parallelizing N independent pieces of work until done or the context is canceled.

func PollImmediateUntil

func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.

PollImmediateUntil runs the 'condition' before waiting for the interval. 'condition' will always be invoked at least once.

func PollImmediateUntilWithContext

func PollImmediateUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error

PollImmediateUntilWithContext tries a condition func until it returns true, an error or the specified context is cancelled or expired.

PollImmediateUntilWithContext runs the 'condition' before waiting for the interval. 'condition' will always be invoked at least once.

func Until

func Until(f func(), period time.Duration, stopCh <-chan struct{})

Until loops until stop channel is closed, running f every period.

Until is syntactic sugar on top of JitterUntil with zero jitter factor and with sliding = true (which means the timer for period starts after the f completes).

func UntilWithContext

func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration)

UntilWithContext loops until context is done, running f every period.

UntilWithContext is syntactic sugar on top of JitterUntilWithContext with zero jitter factor and with sliding = true (which means the timer for period starts after the f completes).

func WaitForWithContext

func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn ConditionWithContextFunc) error

WaitForWithContext continually checks 'fn' as driven by 'wait'.

WaitForWithContext gets a channel from 'wait()”, and then invokes 'fn' once for every value placed on the channel and once more when the channel is closed. If the channel is closed and 'fn' returns false without error, WaitForWithContext returns ErrWaitTimeout.

If 'fn' returns an error the loop ends and that error is returned. If 'fn' returns true the loop ends and nil is returned.

context.Canceled will be returned if the ctx.Done() channel is closed without fn ever returning true.

When the ctx.Done() channel is closed, because the golang `select` statement is "uniform pseudo-random", the `fn` might still run one or multiple times, though eventually `WaitForWithContext` will return.

func WithChunkSize

func WithChunkSize(c int) func(*options)

WithChunkSize allows to set chunks of work items to the workers, rather than processing one by one. It is recommended to use this option if the number of pieces significantly higher than the number of workers and the work done for each item is small.

Types

type Backoff

type Backoff struct {
	// The initial duration.
	Duration time.Duration
	// Duration is multiplied by factor each iteration, if factor is not zero
	// and the limits imposed by Steps and Cap have not been reached.
	// Should not be negative.
	// The jitter does not contribute to the updates to the duration parameter.
	Factor float64
	// The sleep at each iteration is the duration plus an additional
	// amount chosen uniformly at random from the interval between
	// zero and `jitter*duration`.
	Jitter float64
	// The remaining number of iterations in which the duration
	// parameter may change (but progress can be stopped earlier by
	// hitting the cap). If not positive, the duration is not
	// changed. Used for exponential backoff in combination with
	// Factor and Cap.
	Steps int
	// A limit on revised values of the duration parameter. If a
	// multiplication by the factor parameter would make the duration
	// exceed the cap then the duration is set to the cap and the
	// steps parameter is set to zero.
	Cap time.Duration
}

Backoff holds parameters applied to a Backoff function.

func (*Backoff) Step

func (b *Backoff) Step() time.Duration

Step (1) returns an amount of time to sleep determined by the original Duration and Jitter and (2) mutates the provided Backoff to update its Steps and Duration.

type BackoffManager

type BackoffManager interface {
	Backoff() clock.Timer
}

func NewExponentialBackoffManager

func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager

NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset. This backoff manager is used to reduce load during upstream unhealthiness.

func NewJitteredBackoffManager

func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager

NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter is negative, backoff will not be jittered.

type ConditionFunc

type ConditionFunc func() (done bool, err error)

ConditionFunc returns true if the condition is satisfied, or an error if the loop should be aborted.

func (ConditionFunc) WithContext

func (cf ConditionFunc) WithContext() ConditionWithContextFunc

WithContext converts a ConditionFunc into a ConditionWithContextFunc

type ConditionWithContextFunc

type ConditionWithContextFunc func(context.Context) (done bool, err error)

ConditionWithContextFunc returns true if the condition is satisfied, or an error if the loop should be aborted.

The caller passes along a context that can be used by the condition function.

type DoWorkPieceFunc

type DoWorkPieceFunc func(piece int)

type ErrorChannel

type ErrorChannel struct {
	// contains filtered or unexported fields
}

ErrorChannel supports non-blocking send and receive operation to capture error. A maximum of one error is kept in the channel and the rest of the errors sent are ignored, unless the existing error is received and the channel becomes empty again.

func NewErrorChannel

func NewErrorChannel() *ErrorChannel

NewErrorChannel returns a new ErrorChannel.

func (*ErrorChannel) ReceiveError

func (e *ErrorChannel) ReceiveError() error

ReceiveError receives an error from channel without blocking on the receiver.

func (*ErrorChannel) SendError

func (e *ErrorChannel) SendError(err error)

SendError sends an error without blocking the sender.

func (*ErrorChannel) SendErrorWithCancel

func (e *ErrorChannel) SendErrorWithCancel(err error, cancel context.CancelFunc)

SendErrorWithCancel sends an error without blocking the sender and calls cancel function.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group allows to start a group of goroutines and wait for their completion.

func (*Group) Start

func (g *Group) Start(f func())

Start starts f in a new goroutine in the group.

func (*Group) StartWithChannel

func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{}))

StartWithChannel starts f in a new goroutine in the group. stopCh is passed to f as an argument. f should stop when stopCh is available.

func (*Group) StartWithContext

func (g *Group) StartWithContext(ctx context.Context, f func(context.Context))

StartWithContext starts f in a new goroutine in the group. ctx is passed to f as an argument. f should stop when ctx.Done() is available.

func (*Group) Wait

func (g *Group) Wait()

type Options

type Options func(*options)

type Parallelizer

type Parallelizer struct {
	// contains filtered or unexported fields
}

Parallelizer holds the parallelism for scheduler.

func NewParallelizer

func NewParallelizer(p int) Parallelizer

NewParallelizer returns an object holding the parallelism.

func (Parallelizer) Until

func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece DoWorkPieceFunc)

Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.

type WaitWithContextFunc

type WaitWithContextFunc func(ctx context.Context) <-chan struct{}

WaitWithContextFunc creates a channel that receives an item every time a test should be executed and is closed when the last test should be invoked.

When the specified context gets cancelled or expires the function stops sending item and returns immediately.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL