task

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2023 License: MIT Imports: 7 Imported by: 2

Documentation

Overview

Package defines Task interface which is a core component to build a data pipeline.

Index

Constants

This section is empty.

Variables

View Source
var GetName = metadata.GetName

GetName gets the current task name from a context. If the task is runnining as a part of a task, this returns the most closest task's name.

Functions

func Emit added in v0.2.0

func Emit[T any](ctx context.Context, out chan<- T, el T) error

Emit sends an element to the provided channel. In addition to simply sending a value to the channel, this function also takes care of the provided context. When the provided context is canceled, this function returns an error that explains why it is canceled, without sending any value to the channel.

Types

type Connection

type Connection[S, M, T any] struct {
	Src  Task[S, M] // The first task that is contained in this Connection.
	Dest Task[M, T] // The second task that is contained in this Connection.
	// contains filtered or unexported fields
}

Connection is a task that represents connected two tasks.

Type parameters:

S: Type of elements fed by an upstream task
M: Type of elements that are sent from Src to Dest
T: Type of elements that are passed to a downstream task

type InOutOption added in v0.3.0

type InOutOption func(*inout.Options)

An option for a Task's input or output.

func WithTimeout added in v0.3.0

func WithTimeout(d time.Duration) InOutOption

Return an option to configure timeout. This timeout is applied to sending or receiving a value to/from a channel. Please note that this will not be applied to sending or receiving an element rather than an entire Task execution. For a Task input, when receiving a value from the input channel takes more than the timeout value, the input channel will be closed, and the passed context will be canceled. For a Task output, when sending a value to the output channel takes more than the timeout value, the context passed to the task will be canceled.

type Option added in v0.3.0

type Option func(opts *options)

An option for a task.

func WithInputOptions added in v0.3.0

func WithInputOptions(opts ...InOutOption) Option

An Option to set input options.

func WithName added in v0.3.0

func WithName(name string) Option

An Option to set name of a task. This information can be used for debugging purpose.

func WithOutputOptions added in v0.3.0

func WithOutputOptions(opts ...InOutOption) Option

An Option to set output options.

type Task

type Task[S, T any] interface {
	// Run this task.
	// Inputs for this task are provided via the `in` channel,
	// and outputs of this task should be passed to a downstream task by sending them to the `out` channel.
	//
	// This function must finish when the passed context has been canceled because the context will be canceled
	// when a downstream task has finished without consuming all elements in its input channel.
	// For example, sending a value to the input channel can block a goroutine when the channel buffer is full.
	// When a downstream task has finished without consuming all elements in its input channel, it is possible
	// that an upstream task still runs without knowing its downstream task is already finished, and keeps trying
	// to send values to its input channel. In this case, the upstream task can get stuck because of a full input channel.
	// Checking ctx.Done() with the `select` clause when sending a value to the output channel is a solution
	// to avoid this issue.
	// Please see Emit because the function is an easy shorthand to do this.
	Run(ctx context.Context, in <-chan S, out chan<- T) error

	// Convert this task as a task.
	// Usually, calling this function of a task returns the task itself.
	// This is used to cast other types of tasks into a task with proper type parameters.
	AsTask() Task[S, T]

	// Add a function that needs to be called after this task has completed.
	// More specifically, the registered function will be called just before the Run function returns its result.
	// This can be used, for example, to close a file or a database connection when this task has completed.
	Defer(func())
}

Task is an interface that represents a component of a data pipeline.

Each task takes an input channel and an output channel, and communicates with other tasks through them.

Type parameters:

S: Type of elements fed by an upstream task
T: Type of elements that are passed to a downstream task

func Connect

func Connect[S, M, T any](src Task[S, M], dest Task[M, T], buf int, opts ...Option) Task[S, T]

Connect two tasks as a Connection.

func FromFn added in v0.3.0

func FromFn[S, T any](fn TaskFn[S, T], opts ...Option) Task[S, T]

Build a Task with a TaskFn.

type TaskFn added in v0.3.0

type TaskFn[S, T any] func(ctx context.Context, in <-chan S, out chan<- T) error

A function that defines a Task's behavior. For more details, please see the Run function defined as a part of the Task interface. Please note that this function should close the output channel when the task finishes because task.FromFn does not automatically close the channel. The whole pipeline will be aborted when the returned error is not nil.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL