async

package
v2.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2023 License: MIT Imports: 9 Imported by: 4

README

Async

Why you want to use this package

Package async simplifies the implementation of orchestration patterns for concurrent systems. It is similar to Java Future or JS Promise, which makes life much easier when dealing with asynchronous operation and concurrent processing. Golang is excellent in terms of parallel programming. However, dealing with goroutines and channels could be a big headache when business logic gets complicated. Wrapping them into higher-level functions improves code readability significantly and makes it easier for engineers to reason about the system's behaviours.

Currently, this package includes:

  • Asynchronous tasks with cancellations, context propagation and state.
  • Task chaining by using continuations.
  • Fork/join pattern - running a batch of tasks in parallel and blocking until all finish.

Concept

Task is a basic concept like Future in Java. You can create a Task using an executable function which takes in context.Context, then returns error and an optional result.

task := NewTask(func(context.Context) (animal, error) {
    // run the job
    return res, err
})

silentTask := NewSilentTask(func(context.Context) error {
    // run the job
    return err
})
Get the result

The function will be executed asynchronously. You can query whether it's completed by calling task.State(), which is a non-blocking function. Alternative, you can wait for the response using task.Outcome() or silentTask.Wait(), which will block the execution until the task is done. These functions are quite similar to the equivalents in Java Future.isDone() or Future.get().

Cancelling

There could be case that we don't care about the result anymore some time after execution. In this case, a task can be aborted by invoking task.Cancel().

Chaining

To have a follow-up action after a task is done, you can use the provided family of Continue functions. This could be very useful to create a chain of processing, or to have a teardown process at the end of a task.

Fork join

ForkJoin is meant for running multiple subtasks concurrently. They could be different parts of the main task which can be executed independently. The following code example illustrates how you can send files to S3 concurrently with a few lines of code.

func uploadFilesConcurrently(files []string) {
    var tasks []Task[string]
    for _, file := range files {
        f := file
        
        tasks = append(tasks, NewTask(func(ctx context.Context) (string, error) {
            return upload(ctx, f)
        }))
    }

    ForkJoin(context.Background(), tasks)
}

func upload(ctx context.Context, file string) (string, error){
    // do file uploading
    return "", nil
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrDefaultCancelReason = errors.New("no reason provided")

ErrDefaultCancelReason is default reason when none is provided.

Functions

func CancelAll

func CancelAll[T SilentTask](tasks []T)

CancelAll cancels all given tasks.

Note: task cannot be nil

func ForkJoin

func ForkJoin[T SilentTask](ctx context.Context, tasks []T)

ForkJoin executes given tasks in parallel and waits for ALL to complete before returning.

Note: task cannot be nil

Example
first := NewTask(
	func(context.Context) (int, error) {
		return 1, nil
	},
)

second := NewTask(
	func(context.Context) (interface{}, error) {
		return nil, errors.New("some error")
	},
)

ForkJoin(context.Background(), []SilentTask{first, second})

fmt.Println(first.Outcome())
fmt.Println(second.Outcome())
Output:

1 <nil>
<nil> some error

func ForkJoinFailFast

func ForkJoinFailFast[T SilentTask](ctx context.Context, tasks []T) error

ForkJoinFailFast executes given tasks in parallel and waits for the 1st task to fail and returns immediately or for ALL to complete successfully before returning.

Note: task cannot be nil

func WaitAll

func WaitAll[T SilentTask](tasks []T)

WaitAll waits for all executed tasks to finish.

Note: task cannot be nil

Types

type MockSilentTask

type MockSilentTask struct {
	mock.Mock
}

MockSilentTask is an autogenerated mock type for the SilentTask type

func NewMockSilentTask

func NewMockSilentTask(t mockConstructorTestingTNewMockSilentTask) *MockSilentTask

NewMockSilentTask creates a new instance of MockSilentTask. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockSilentTask) Cancel

func (_m *MockSilentTask) Cancel()

Cancel provides a mock function with given fields:

func (*MockSilentTask) CancelWithReason

func (_m *MockSilentTask) CancelWithReason(_a0 error)

CancelWithReason provides a mock function with given fields: _a0

func (*MockSilentTask) Duration

func (_m *MockSilentTask) Duration() time.Duration

Duration provides a mock function with given fields:

func (*MockSilentTask) Error

func (_m *MockSilentTask) Error() error

Error provides a mock function with given fields:

func (*MockSilentTask) Execute

func (_m *MockSilentTask) Execute(ctx context.Context) SilentTask

Execute provides a mock function with given fields: ctx

func (*MockSilentTask) ExecuteSync

func (_m *MockSilentTask) ExecuteSync(ctx context.Context) SilentTask

ExecuteSync provides a mock function with given fields: ctx

func (*MockSilentTask) State

func (_m *MockSilentTask) State() State

State provides a mock function with given fields:

func (*MockSilentTask) Wait

func (_m *MockSilentTask) Wait()

Wait provides a mock function with given fields:

func (*MockSilentTask) WithRecoverAction

func (_m *MockSilentTask) WithRecoverAction(recoverAction PanicRecoverWork)

WithRecoverAction provides a mock function with given fields: recoverAction

type MockTask

type MockTask[T interface{}] struct {
	mock.Mock
}

MockTask is an autogenerated mock type for the Task type

func NewMockTask

func NewMockTask[T interface{}](t mockConstructorTestingTNewMockTask) *MockTask[T]

NewMockTask creates a new instance of MockTask. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockTask[T]) Cancel

func (_m *MockTask[T]) Cancel()

Cancel provides a mock function with given fields:

func (*MockTask[T]) CancelWithReason

func (_m *MockTask[T]) CancelWithReason(_a0 error)

CancelWithReason provides a mock function with given fields: _a0

func (*MockTask[T]) Duration

func (_m *MockTask[T]) Duration() time.Duration

Duration provides a mock function with given fields:

func (*MockTask[T]) Error

func (_m *MockTask[T]) Error() error

Error provides a mock function with given fields:

func (*MockTask[T]) Execute

func (_m *MockTask[T]) Execute(ctx context.Context) SilentTask

Execute provides a mock function with given fields: ctx

func (*MockTask[T]) ExecuteSync

func (_m *MockTask[T]) ExecuteSync(ctx context.Context) SilentTask

ExecuteSync provides a mock function with given fields: ctx

func (*MockTask[T]) Outcome

func (_m *MockTask[T]) Outcome() (T, error)

Outcome provides a mock function with given fields:

func (*MockTask[T]) ResultOrDefault

func (_m *MockTask[T]) ResultOrDefault(_a0 T) T

ResultOrDefault provides a mock function with given fields: _a0

func (*MockTask[T]) Run

func (_m *MockTask[T]) Run(ctx context.Context) Task[T]

Run provides a mock function with given fields: ctx

func (*MockTask[T]) RunSync

func (_m *MockTask[T]) RunSync(ctx context.Context) Task[T]

RunSync provides a mock function with given fields: ctx

func (*MockTask[T]) State

func (_m *MockTask[T]) State() State

State provides a mock function with given fields:

func (*MockTask[T]) Wait

func (_m *MockTask[T]) Wait()

Wait provides a mock function with given fields:

func (*MockTask[T]) WithRecoverAction

func (_m *MockTask[T]) WithRecoverAction(recoverAction PanicRecoverWork)

WithRecoverAction provides a mock function with given fields: recoverAction

type PanicRecoverWork

type PanicRecoverWork func(any)

PanicRecoverWork represents a unit of work to be executed when a panic occurs.

type SilentTask

type SilentTask interface {
	// WithRecoverAction attaches the given recover action with task so that
	// it can be executed when a panic occurs.
	WithRecoverAction(recoverAction PanicRecoverWork)
	// Execute starts this task asynchronously.
	Execute(ctx context.Context) SilentTask
	// ExecuteSync starts this task synchronously.
	ExecuteSync(ctx context.Context) SilentTask
	// Wait waits for this task to complete.
	Wait()
	// Cancel changes the state of this task to `Cancelled`.
	Cancel()
	// CancelWithReason changes the state of this task to `Cancelled` with the given reason.
	CancelWithReason(error)
	// Error returns the error that occurred when this task was executed.
	Error() error
	// State returns the current state of this task. This operation is non-blocking.
	State() State
	// Duration returns the duration of this task.
	Duration() time.Duration
}

SilentTask represents a unit of work to complete in silence like background works that return no values.

func ContinueInSilence

func ContinueInSilence(currentTask SilentTask, nextAction func(context.Context, error) error) SilentTask

ContinueInSilence proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func ContinueWithNoResult

func ContinueWithNoResult[T any](currentTask Task[T], nextAction func(context.Context, T, error) error) SilentTask

ContinueWithNoResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func InvokeInSilence

func InvokeInSilence(ctx context.Context, action SilentWork) SilentTask

InvokeInSilence creates a new SilentTask and runs it asynchronously.

func NewSilentTask

func NewSilentTask(action SilentWork) SilentTask

NewSilentTask creates a new SilentTask.

func NewSilentTasks

func NewSilentTasks(actions ...SilentWork) []SilentTask

NewSilentTasks creates a group of new SilentTask.

type SilentWork

type SilentWork func(context.Context) error

SilentWork represents a unit of work to execute in silence like background works that return no values.

type State

type State byte

State represents the state enumeration for a task.

const (
	IsCreated   State = iota // IsCreated represents a newly created task
	IsRunning                // IsRunning represents a task which is currently running
	IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
	IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
)

Various task states.

type Task

type Task[T any] interface {
	SilentTask
	// Run starts this task asynchronously.
	Run(ctx context.Context) Task[T]
	// RunSync starts this task synchronously.
	RunSync(ctx context.Context) Task[T]
	// Outcome waits for this task to complete and returns the final result & error.
	Outcome() (T, error)
	// ResultOrDefault waits for this task to complete and returns the final result if
	// there's no error or the default result if there's an error.
	ResultOrDefault(T) T
}

Task represents a unit of work that is expected to return a value of a particular type.

func Completed

func Completed[T any](result T, err error) Task[T]

Completed returns a completed task with the given result and error.

func ContinueWith

func ContinueWith[T any, S any](currentTask Task[T], nextAction func(context.Context, T, error) (S, error)) Task[S]

ContinueWith proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func ContinueWithResult

func ContinueWithResult[T any](currentTask SilentTask, nextAction func(context.Context, error) (T, error)) Task[T]

ContinueWithResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func Invoke

func Invoke[T any](ctx context.Context, action Work[T]) Task[T]

Invoke creates a new Task and runs it asynchronously.

func NewTask

func NewTask[T any](action Work[T]) Task[T]

NewTask creates a new Task.

func NewTasks

func NewTasks[T any](actions ...Work[T]) []Task[T]

NewTasks creates a group of new Task.

type Work

type Work[T any] func(context.Context) (T, error)

Work represents a unit of work to execute that is expected to return a value of a particular type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL