workflow

package
v0.0.0-...-e49e9a2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2023 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultSubWorkflowRetryOptions = RetryOptions{

		MaxAttempts: 1,
	}

	DefaultSubWorkflowOptions = SubWorkflowOptions{
		RetryOptions: DefaultSubWorkflowRetryOptions,
	}
)
View Source
var Canceled = sync.Canceled
View Source
var DefaultActivityOptions = ActivityOptions{
	RetryOptions: DefaultRetryOptions,
}
View Source
var DefaultConverter = converter.DefaultConverter
View Source
var DefaultRetryOptions = RetryOptions{
	MaxAttempts:        3,
	BackoffCoefficient: 1,
}

Functions

func CanRetry

func CanRetry(err error) bool

CanRetry returns true if the given error is retryable

func ContinueAsNew

func ContinueAsNew(ctx Context, args ...interface{}) error

ContinueAsNew restarts the current workflow with the given arguments

func Go

func Go(ctx Context, f func(ctx Context))

Go spawns a workflow goroutine

func Logger

func Logger(ctx Context) *slog.Logger

func NewError

func NewError(err error) error

NewError wraps the given error into a workflow error which will be automatically retried

func NewPermanentError

func NewPermanentError(err error) error

NewPermanentError wraps the given error into a workflow error which will not be automatically retried

func Now

func Now(ctx sync.Context) time.Time

func Replaying

func Replaying(ctx sync.Context) bool

func Select

func Select(ctx Context, cases ...SelectCase)

Select is the workflow-save equivalent of the select statement.

func Sleep

func Sleep(ctx sync.Context, d time.Duration) error

func WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

func WorkflowInstance

func WorkflowInstance(ctx sync.Context) *core.WorkflowInstance

Types

type ActivityOptions

type ActivityOptions struct {
	RetryOptions RetryOptions
}

type CancelFunc

type CancelFunc = sync.CancelFunc

type Channel

type Channel[T any] interface {
	Send(ctx Context, v T)

	SendNonblocking(v T) (ok bool)

	Receive(ctx Context) (v T, ok bool)

	ReceiveNonBlocking() (v T, ok bool)

	Close()
}

func NewBufferedChannel

func NewBufferedChannel[T any](size int) Channel[T]

func NewChannel

func NewChannel[T any]() Channel[T]

func NewSignalChannel

func NewSignalChannel[T any](ctx Context, name string) Channel[T]

type Context

type Context = sync.Context

func NewDisconnectedContext

func NewDisconnectedContext(ctx Context) Context

func WithConverter

func WithConverter(ctx Context, c Converter) Context

func WithValue

func WithValue(parent Context, key, val interface{}) Context

WithValue returns a copy of parent in which the value associated with key is val.

Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context. Users of WithValue should define their own types for keys. To avoid allocating when assigning to an interface{}, context keys often have concrete type struct{}. Alternatively, exported context key variables' static type should be a pointer or interface.

type ContextPropagator

type ContextPropagator interface {
	// Inject injects values from context into metadata
	Inject(context.Context, *Metadata) error

	// Extract extracts values from metadata into context
	Extract(context.Context, *Metadata) (context.Context, error)

	// InjectFromWorkflow injects values from the workflow context into metadata
	InjectFromWorkflow(Context, *Metadata) error

	// ExtractToWorkflow extracts values from metadata into a workflow context
	ExtractToWorkflow(Context, *Metadata) (Context, error)
}

type Converter

type Converter = converter.Converter

func GetConverter

func GetConverter(ctx Context) Converter

type Error

type Error = workflowerrors.Error

type Future

type Future[T any] interface {
	// Get returns the value if set, blocks otherwise
	Get(ctx Context) (T, error)
}

func CreateSubWorkflowInstance

func CreateSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflowOptions, workflow interface{}, args ...interface{}) Future[TResult]

func ExecuteActivity

func ExecuteActivity[TResult any](ctx Context, options ActivityOptions, activity interface{}, args ...interface{}) Future[TResult]

ExecuteActivity schedules the given activity to be executed

func ScheduleTimer

func ScheduleTimer(ctx Context, delay time.Duration) Future[struct{}]

func SideEffect

func SideEffect[TResult any](ctx Context, f func(ctx Context) TResult) Future[TResult]

func SignalWorkflow

func SignalWorkflow[T any](ctx Context, instanceID string, name string, arg T) Future[any]

func WithRetries

func WithRetries[T any](ctx Context, retryOptions RetryOptions, fn func(ctx Context, attempt int) Future[T]) Future[T]

type Instance

type Instance = core.WorkflowInstance

type Metadata

type Metadata = core.WorkflowMetadata

type PanicError

type PanicError = workflowerrors.PanicError

type Payload

type Payload = payload.Payload

type RetryOptions

type RetryOptions struct {
	// Maximum number of times to retry
	MaxAttempts int

	// Time to wait before first retry
	FirstRetryInterval time.Duration

	// Maximum delay for any individual retry attempt
	MaxRetryInterval time.Duration

	// Coeffecient for calculation the next retry delay
	BackoffCoefficient float64

	// Timeout after which retries are aborted
	RetryTimeout time.Duration
}

type SelectCase

type SelectCase = sync.SelectCase

func Await

func Await[T any](f Future[T], handler func(Context, Future[T])) SelectCase

Await calls the provided handler when the given future is ready.

func Default

func Default(handler func(Context)) SelectCase

Default calls the given handler if none of the other cases match.

func Receive

func Receive[T any](c Channel[T], handler func(ctx Context, v T, ok bool)) SelectCase

Receive calls the provided handler if the given channel can receive a value. The handler receives the received value, and the ok flag indicating whether the value was received or the channel was closed.

func Send

func Send[T any](c Channel[T], value *T, handler func(ctx Context)) SelectCase

Send calls the provided handler if the given value can be sent to the channel.

type Span

type Span interface {
	End()
}

type SubWorkflowOptions

type SubWorkflowOptions struct {
	InstanceID string

	RetryOptions RetryOptions
}

type WaitGroup

type WaitGroup = sync.WaitGroup

func NewWaitGroup

func NewWaitGroup() WaitGroup

type Workflow

type Workflow = interface{}

type WorkflowTracer

type WorkflowTracer interface {
	Start(ctx Context, name string, opts ...trace.SpanStartOption) (Context, Span)
}

func Tracer

func Tracer(ctx Context) WorkflowTracer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL