cff

package module
v0.0.0-...-e6aa6c5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 5 Imported by: 2

README

cff: a concurrency toolkit for Go

cff logo

cff (pronounce caff as in caffeine) is a library and code generator for Go that makes it easy to write concurrent code in Go.

It gives you:

  • Bounded resource consumption: cff uses a pool of goroutines to run all operations, preventing issues arising from unbounded goroutine growth in your application.
  • Panic-safety: cff prevents panics in your concurrent code from crashing your application in a predictable manner.

cff can be useful when you are trying to:

  1. Run interdependent functions concurrently, with a guarantee that a function does not run before its dependencies.
flowchart LR
  A; B; C; D; E; F; G
  dots1[...]; dots2[...]
  X; Y;

  A & B --> C
  B --> D & E
  A & C --> F
  C & D & E --> G
  F & G --> dots1
  G & E --> dots2

  dots1 --> X
  dots2 --> Y

  style dots1 fill:none,stroke:none
  style dots2 fill:none,stroke:none
  1. Run independent functions concurrently.
flowchart TD
  A; B; dots[...]; H

  done(( Done ))

  A --> done
  B --Error--x done
  dots -.-> done
  H --> done

  style done fill:none,stroke:none
  style dots fill:none,stroke:none
  1. Run the same function on every element of a map or a slice, without risk of unbounded goroutine growth.
flowchart RL
  subgraph Slice ["[]T"]
    i0["x1"]; i1["x2"]; dots1[...]; iN["xN"]
    style dots1 fill:none,stroke:none
  end

  subgraph Map ["map[K]V"]
    m1["(k1, v1)"]; m2["(k2, v2)"]; dots2[...]; mN["(kN, vN)"]
    style dots2 fill:none,stroke:none
  end

  subgraph Workers
    direction LR
    1; 2
  end

  Slice & Map -.-> Workers

See our documentation at https://uber-go.github.io/cff for more information.

Installation

go get -u go.uber.org/cff

Project status

At Uber, we've been using cff in production for several years. We're confident in the stability of its core functionality.

Although its APIs have satisfied a majority of our needs, we expect to add or modify some of these once the project is public.

That said, we intend to make these changes in compliance with Semantic Versioning.

License

cff is licensed under the Apache 2.0 license. See the LICENSE file for more information.

Documentation

Overview

Package cff along with the cff CLI, provides a means of easily writing common concurrent code patterns in a type-safe manner.

Code generation directives

Some APIs in this package are marked as "code generation directives." If you use a code generation directive in a file, that file must have the 'cff' build constraint on top:

//go:build cff

Following that, you must run the following command before you use 'go build' or 'go test' with that file.

cff ./...

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Flow

func Flow(ctx context.Context, opts ...Option) error

Flow specifies a single Flow for execution with cff. A child of the provided context is made available to all tasks in the Flow if they request it.

A Flow MUST have at least one task (specified with Task or Tasks), and at least one result (specified with Results).

var result *Result
cff.Flow(ctx,
	cff.Results(&result),
	cff.Task(
		// ...
	),
)

Tasks may be specified to a Flow in any order. They will be connected based on their inputs and outputs. If a task fails with an error, the entire Flow terminates and the error is returned.

Flow tasks

Within a cff.Flow, each task has:

  • zero or more inputs, specified by its parameters
  • *one* or more outputs, specified by its return values
  • optionally, a context.Context as the first parameter
  • optionally, an error as the last return value

This is roughly expressed as:

func([context.Context], I1, I2, ...) (R1, R2, ..., [error])

The types of the inputs specify the dependencies of this task. cff will run other tasks that provide these dependencies and feed their results back into this task to run it. Similarly, it will feed the results of this task into other tasks that depend on them.

Tasks may use the optional context argument to cancel operations early in case of failures: the context is valid only as long as the flow is running. If the flow terminates early because of a failure, the context is invalidated.

func(context.Context, I1, I2, ...) (R1, R2, ...)

Fallible tasks may declare an error as their last return value. If a task fails, the flow is terminated and all ongoing tasks are canceled.

func(I1, I2, ...) (R1, R2, ..., error)

Task behaviors may further be customized with TaskOption.

This is a code generation directive.

func NewScheduler

func NewScheduler(p SchedulerParams) *scheduler.Scheduler

NewScheduler starts up a cff scheduler for use by Flow or Parallel.

sched := cff.NewScheduler(..)
j1 := sched.Enqueue(cff.Job{...}
j2 := sched.Enqueue(cff.Job{..., Dependencies: []*cff.ScheduledJob{j1}}
// ...
err := sched.Wait()

This is intended to be used by cff's generated code. Do not use directly. This can change without warning.

func Parallel

func Parallel(ctx context.Context, opts ...Option) error

Parallel specifies a parallel operation for execution with cff.

A Parallel must have at least one Task, Tasks, Map, or Slice.

cff.Parallel(ctx,
	cff.Task(/* ... */)
	cff.Slice(/* ... */)
	cff.Map(/* ... */)
)

Tasks inside a Parallel are all independent. They run concurrently with bounded parallelism.

If any of the tasks fail with an error or panic, Parallel terminates the entire operation. You can change this with ContinueOnError. With ContinueOnError(true), Parallel will run through all provided tasks despite errors, and return an aggregated error object representing all encountered failures.

A child of the provided context is made available to all tasks in the parallel if they request it. If the context is cancelled or otherwise errors, Parallel does not run further tasks. This behaviour cannot be changed.

Parallel tasks

Within a Parallel, each task has:

  • optionally, a context.Context as the first parameter
  • optionally, an error as the last return value

Note that tasks inside Parallel cannot have dependencies. Use Flow for that.

This is roughly expressed as:

func([context.Context]) ([error])

Tasks may use the context argument to cancel operations early in case of failures. Fallible tasks may return a non-nil error to signal failure.

Task behaviors may further be customized with TaskOption.

This is a code generation directive.

Types

type AtomicBool

type AtomicBool = atomic.Bool

AtomicBool is a type-safe means of reading and writing boolean values.

This is intended to be used by cff's generated code. Do not use directly. This can change without warning.

type DirectiveInfo

type DirectiveInfo struct {
	Name string
	// Directive is the type of directive (e.g flow or parallel)
	Directive    DirectiveType
	File         string
	Line, Column int
}

DirectiveInfo provides information to uniquely identify a cff Directive.

type DirectiveType

type DirectiveType int

DirectiveType identifies the type of code generation directive for Emitter operations.

const (
	// UnknownDirective is an invalid value for a DirectiveType.
	UnknownDirective DirectiveType = iota

	// FlowDirective marks a Flow.
	FlowDirective

	// ParallelDirective marks a Parallel.
	ParallelDirective
)

func (DirectiveType) String

func (d DirectiveType) String() string

String returns the directive string.

type Emitter

type Emitter interface {
	// TaskInit returns a TaskEmitter which could be memoized based on task name.
	TaskInit(*TaskInfo, *DirectiveInfo) TaskEmitter
	// FlowInit returns a FlowEmitter which could be memoized based on flow name.
	FlowInit(*FlowInfo) FlowEmitter
	// ParallelInit returns a ParallelEmitter which could be memoized based on
	// parallel name.
	ParallelInit(*ParallelInfo) ParallelEmitter
	// SchedulerInit returns an emitter for the cff scheduler.
	SchedulerInit(s *SchedulerInfo) SchedulerEmitter
}

Emitter initializes Task, Flow, and Parallel emitters.

WARNING: Do not use this API. We intend to replace it in an upcoming release.

func EmitterStack

func EmitterStack(emitters ...Emitter) Emitter

EmitterStack combines multiple emitters together into one.

Events are sent to the emitters in an unspecified order. Emitters should not assume the ordering of events.

func NopEmitter

func NopEmitter() Emitter

NopEmitter is a cff emitter that does not do anything.

type FlowEmitter

type FlowEmitter interface {
	// FlowSuccess is called when a flow runs successfully.
	FlowSuccess(context.Context)
	// FlowError is called when a flow fails due to a task error.
	FlowError(context.Context, error)
	// FlowDone is called when a flow finishes.
	FlowDone(context.Context, time.Duration)
}

FlowEmitter receives events for when flow events occur, for the purpose of emitting metrics.

WARNING: Do not use this API. We intend to replace it in an upcoming release.

func NopFlowEmitter

func NopFlowEmitter() FlowEmitter

NopFlowEmitter is a Flow emitter that does not do anything.

type FlowInfo

type FlowInfo struct {
	Name         string
	File         string
	Line, Column int
}

FlowInfo provides information to uniquely identify a flow.

type Job

type Job = scheduler.Job

Job is a job prepared to be enqueued to the cff scheduler.

This is intended to be used by cff's generated code. Do not use directly. This can change without warning.

type MapOption

type MapOption interface {
	// contains filtered or unexported methods
}

MapOption customizes the execution behavior of Map.

func MapEnd

func MapEnd(fn interface{}) MapOption

MapEnd specifies a function for execution when a Map operation finishes. This function will run after all items in the map have been processed.

As with parallel tasks, the function passed to MapEnd may have:

  • an optional context.Context parameter
  • an optional error return value

Therefore, these are all valid:

cff.MapEnd(func() {...})
cff.MapEnd(func() error {...})
cff.MapEnd(func(ctx context.Context) {...})
cff.MapEnd(func(ctx context.Context) error {...})

MapEnd cannot be used with ContinueOnError.

This is a code generation directive.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is an argument for a Flow or Parallel.

See individual option documentation for details.

func Concurrency

func Concurrency(n int) Option

Concurrency specifies the maximum number of goroutines cff should use to execute tasks of this Flow or Parallel.

The default value for this is,

max(GOMAXPROCS, 4)

That is, by default cff will use runtime.GOMAXPROCS goroutines, with a minimum of 4.

This is a code generation directive.

func ContinueOnError

func ContinueOnError(bool) Option

ContinueOnError configures a Parallel to keep running all other tasks despite errors returned by tasks over the course of its execution. By default, Parallel will stop execution at the first error it encounters.

err = cff.Parallel(ctx,
	cff.Task(task1),
	cff.Task(task2),
	// ...
	cff.ContinueOnError(true),
)

If one or more tasks return errors with ContinueOnError(true), Parallel will still run all the other tasks, and accumulate and combine the errors together into a single error object. You can access the full list of errors with go.uber.org/multierr.Errors.

ContinueOnError(true) is incompatible with Flow, SliceEnd and MapEnd.

This is a code generation directive.

func InstrumentFlow

func InstrumentFlow(name string) Option

InstrumentFlow specifies that this Flow should be instrumented for observability. The provided name will be passed to the Emitter you passed into WithEmitter.

This is a code generation directive.

func InstrumentParallel

func InstrumentParallel(name string) Option

InstrumentParallel specifies that this Parallel should be instrumented for observability. The provided name will be passed to the Emitter you passed into WithEmitter.

This is a code generation directive.

func Map

func Map(fn interface{}, m interface{}, opts ...MapOption) Option

Map runs fn in parallel on elements of the provided map with a bounded number of goroutines.

cff.Parallel(ctx,
	cff.Map(
		func(k string, v *User) { /* ... */ },
		map[string]*User{ /* ... */ },
	),
)

For a slice map[K]V, fn has the following signature:

func([ctx context.Context,] k K, v V) ([error])

That is, it has the following parameters in-order:

  • an optional context.Context
  • a key in the map
  • the value of that key in the map

And if the operation is fallible, it may have an error return value. A non-nil error returned by the function halts the entire Parallel operation. Use ContinueOnError to change this.

Map may only be used with Parallel.

This is a code generation directive.

func Params

func Params(args ...interface{}) Option

Params specifies inputs for a Flow that do not have any dependencies. These values are made available to the Flow as-is.

For example:

var req *GetUserRequest = // ...
cff.Flow(
	cff.Params(req),
	// ...
)

This is a code generation directive.

func Results

func Results(results ...interface{}) Option

Results specifies one or more outputs for a Flow. Arguments to Results must be pointers to variables that will hold the result values.

For example:

var result *GetUserResponse
err := cff.Flow(ctx,
	cff.Results(&result),
	// ...
)

This is a code generation directive.

func Slice

func Slice(fn interface{}, slice interface{}, opts ...SliceOption) Option

Slice runs fn in parallel on elements of the provided slice with a bounded number of goroutines.

cff.Parallel(ctx,
	cff.Slice(
		func(el someType) { ... },
		[]someType{...},
	),
)

For a slice []T, fn has the following signature:

func([ctx context.Context,] [idx int,] value T) ([error])

That is, it has the following parameters in-order:

  • an optional context.Context
  • an optional integer holding the index of the element in the slice
  • a value in the slice

And if the operation is fallible, it may have an error return value. A non-nil error returned by the function halts the entire Parallel operation. Use ContinueOnError to change this.

Slice may only be used with Parallel.

This is a code generation directive.

func Task

func Task(fn interface{}, opts ...TaskOption) Option

Task specifies a task for execution with a Flow or Parallel. A task can be a reference to:

  • a top-level function; or
  • a bound method; or
  • an anonymous function

For example:

// Given,
//   var client *Client
//   func (*Client) GetUser(...) (...)
// The following is a bound method reference.
cff.Task(client.GetUser)

// Given,
//   func bindUser(...) (...)
// The following is a top-level function reference.
cff.Task(bindUser),

// The following is an anonymous function reference.
cff.Task(func(...) (...,, error) {
	// ...
})

A Task's usage and constraints change based on whether you're using it inside a Flow or a Parallel. See the documentation for Flow or Parallel for more details.

This is a code generation directive.

func Tasks

func Tasks(fn ...interface{}) Option

Tasks specifies multiple functions for execution with Parallel. As with Task, each argument to Tasks is a reference to:

  • a top-level function; or
  • a bound method; or
  • an anonymous function

They may all match the signature specified for parallel tasks (see Parallel).

Tasks cannot be used with Flow. Use Task for that.

This is a code generation directive.

func WithEmitter

func WithEmitter(Emitter) Option

WithEmitter provides an optional observer for Flow or Parallel events. Emitters can track metrics, logs, or other observability data.

cff.Flow(ctx,
	// ...
	cff.WithEmitter(em),
)

Provide this option multiple times to use multiple emitters.

WARNING: Do not use this API. We intend to replace it in an upcoming release.

This is a code generation directive.

type PanicError

type PanicError struct {
	// Value contains the value recovered from the panic that caused this error.
	Value any

	// Stacktrace contains string of what call stack looks like when the panic happened.
	// This is populated by calling runtime/debug.Stack() when a non-nil value is
	// recovered from a cff-scheduled job.
	Stacktrace []byte
}

PanicError is an error that is thrown when a task panics. It contains the value that is recovered from the panic and the stacktrace of where the panic happened. For example, the following code checks if an error from Flow is due to a panic:

var r string
err := cff.Flow(
	context.Background(),
	cff.Results(&r),
	cff.Task(
		func() string {
			panic("panic")
		},
	),
)
var panicError *cff.PanicError
if errors.As(err, &panicError) {
	// err is from a panic
	fmt.Printf("recovered: %s\n", panicError.Value)
} else {
	// err is not from a panic
}

func (*PanicError) Error

func (pe *PanicError) Error() string

type ParallelEmitter

type ParallelEmitter interface {
	// ParallelSuccess is called when a parallel runs successfully.
	ParallelSuccess(context.Context)
	// ParallelError is called when a parallel fails due to a task error.
	ParallelError(context.Context, error)
	// ParallelDone is called when a parallel finishes.
	ParallelDone(context.Context, time.Duration)
}

ParallelEmitter receives events for when parallel events occur, for the purpose of emitting metrics.

WARNING: Do not use this API. We intend to replace it in an upcoming release.

func NopParallelEmitter

func NopParallelEmitter() ParallelEmitter

NopParallelEmitter is a Parallel emitter that does not do anything.

type ParallelInfo

type ParallelInfo struct {
	Name         string
	File         string
	Line, Column int
}

ParallelInfo provides information to uniquely identify a Parallel operation.

type ScheduledJob

type ScheduledJob = scheduler.ScheduledJob

ScheduledJob is a job that has been scheduled for execution with the cff scheduler.

This is intended to be used by cff's generated code. Do not use directly. This can change without warning.

type SchedulerEmitter

type SchedulerEmitter interface {
	// EmitScheduler emits the state of the cff scheduler.
	EmitScheduler(s SchedulerState)
}

SchedulerEmitter provides observability into the state of the cff scheduler.

WARNING: Do not use this API. We intend to replace it in an upcoming release.

type SchedulerInfo

type SchedulerInfo struct {
	// Name of the directive the scheduler runs tasks for.
	Name string
	// DirectiveType is the type of Directive scheduler is running for
	// (e.g. flow, parallel).
	Directive    DirectiveType
	File         string
	Line, Column int
}

SchedulerInfo provides information about the context the scheduler is running in.

type SchedulerParams

type SchedulerParams struct {
	// Concurrency specifies the number of concurrent workers
	// used by the scheduler to run jobs.
	Concurrency int
	// Emitter provides an emitter for the scheduler.
	Emitter SchedulerEmitter
	// ContinueOnError when true directs the scheduler to continue running
	// through job errors.
	ContinueOnError bool
}

SchedulerParams configures the cff scheduler.

This is intended to be used by cff's generated code. Do not use directly. This can change without warning.

type SchedulerState

type SchedulerState = scheduler.State

SchedulerState describes the status of jobs managed by the cff scheduler.

type SliceOption

type SliceOption interface {
	// contains filtered or unexported methods
}

SliceOption customizes the execution behavior of Slice.

func SliceEnd

func SliceEnd(fn interface{}) SliceOption

SliceEnd specifies a function for execution when a Slice operation finishes. This function will run after all items in the slice have been processed.

As with parallel tasks, the function passed to SliceEnd may have:

  • an optional context.Context parameter
  • an optional error return value

Therefore, these are all valid:

cff.SliceEnd(func() {...})
cff.SliceEnd(func() error {...})
cff.SliceEnd(func(ctx context.Context) {...})
cff.SliceEnd(func(ctx context.Context) error {...})

SliceEnd cannot be used with ContinueOnError.

This is a code generation directive.

type TaskEmitter

type TaskEmitter interface {
	// TaskSuccess is called when a task runs successfully.
	TaskSuccess(context.Context)
	// TaskError is called when a task fails due to a task error.
	TaskError(context.Context, error)
	// TaskErrorRecovered is called when a task fails due to a task error
	// and recovers in a FallbackWith.
	TaskErrorRecovered(context.Context, error)
	// TaskSkipped is called when a task is skipped due to predicate or an
	// earlier task error.
	TaskSkipped(context.Context, error)
	// TaskPanic is called when a task panics.
	TaskPanic(context.Context, interface{})
	// TaskPanicRecovered is called when a task panics but is recovered by
	// a FallbackWith.
	TaskPanicRecovered(context.Context, interface{})
	// TaskDone is called when a task finishes.
	TaskDone(context.Context, time.Duration)
}

TaskEmitter receives events for when task events occur, for the purpose of emitting metrics.

WARNING: Do not use this API. We intend to replace it in an upcoming release.

func NopTaskEmitter

func NopTaskEmitter() TaskEmitter

NopTaskEmitter is a Task emitter that does not do anything.

type TaskInfo

type TaskInfo struct {
	Name         string
	File         string
	Line, Column int
}

TaskInfo provides information to uniquely identify a task.

type TaskOption

type TaskOption interface {
	// contains filtered or unexported methods
}

TaskOption customizes the behavior of a single Task.

func FallbackWith

func FallbackWith(results ...interface{}) TaskOption

FallbackWith specifies that if the corresponding task fails with an error or panics, we should recover from that failure and return the provided values instead.

This function accepts the same number of values as returned by the task with exactly the same types -- not including the error return value (if any).

For example:

// Given,
//   func (*Client) ListUsers(context.Context) ([]*User, error)
// And,
//   var cachedUserList []*User = ...
cff.Task(client.ListUsers, cff.FallbackWith(cachedUserList))

If client.ListUsers returns an error or panics, cff will return cachedUserList instead.

This is a code generation directive.

func Instrument

func Instrument(name string) TaskOption

Instrument specifies that this Task should be instrumented for observability. The provided name will be passed to the Emitter you passed into WithEmitter.

This is a code generation directive.

func Invoke

func Invoke(enable bool) TaskOption

Invoke specifies that a flow task must be executed even if none of other tasks consume its output.

By default, flow tasks have the following restrictions:

  • must have a non-error return value (outputs)
  • the output must be consumed by another task or flow result (via Results)

A task tagged with Invoke(true) loses these restriction. It may have zero outputs, or if it has outputs, other tasks or flow results don't have to consume them.

cff.Task(func(ctx context.Context, req *Request) {
	res, err := shadowClient.Send(req)
	log.Info("shadowed request", "response", res, "error", err)
}, cff.Invoke(true))

This is a code generation directive.

func Predicate

func Predicate(fn interface{}) TaskOption

Predicate specifies a function that determines if the corresponding task should run.

The predicate function has the following signature:

func(I1, I2, ...) bool

Where the arguments I1, I2, ... are inputs similar to a task. Arguments added to the predicate become a dependency of the task, so the predicate or the task will not run until that value is available.

When specified, the corresponding task will be executed only if this function returns true. If the function evaluates to false, the cff will skip execution of this task. If any other tasks depend on this task, cff will give them zero values of the outputs of this task.

For example:

cff.Task(
	authorizeUser,
	cff.Predicate(func(cfg *Config) bool {
		return cfg.Production == true
	}),
)

This is a code generation directive.

Directories

Path Synopsis
cmd
cff
cff is a library and code generator for Go that makes it easy to write concurrent code.
cff is a library and code generator for Go that makes it easy to write concurrent code.
docs module
examples module
Package internal is the internal implementation of the cff compiler.
Package internal is the internal implementation of the cff compiler.
emittertest
Package emittertest provides testing utilities for cff emitters.
Package emittertest provides testing utilities for cff emitters.
flag
Package flag implements command line flag utilities for the cff command.
Package flag implements command line flag utilities for the cff command.
gendirectives
gendirectives generates a file with the following API:
gendirectives generates a file with the following API:
modifier
Package modifier implements modifier-based code generation for cff directives.
Package modifier implements modifier-based code generation for cff directives.
pkg
Package pkg defines the interface for loading Go packages.
Package pkg defines the interface for loading Go packages.
tests Module
Package scheduler implements a runtime scheduler for cff with support for interdependent jobs.
Package scheduler implements a runtime scheduler for cff with support for interdependent jobs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL