sink

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2023 License: MIT Imports: 6 Imported by: 2

Documentation

Overview

Package defines Sink which is a type of a task. A Sink is used as a terminal point of a data pipeline.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ElementWiseFn

type ElementWiseFn[S any] func(context.Context, S) error

A function that defines the behavior of an elementwise operator.

type ElementWiseOp

type ElementWiseOp[S any] struct {
	// contains filtered or unexported fields
}

A Sink task that processes elements fed by its upstream task one by one.

func ElementWise

func ElementWise[S any](fn ElementWiseFn[S]) *ElementWiseOp[S]

Create an elementwise operator from an ElementWiseFn.

func (*ElementWiseOp) AsSink

func (op *ElementWiseOp) AsSink(opts ...task.Option) Sink[S]

Convert this operation as a Sink.

func (*ElementWiseOp) AsTask

func (op *ElementWiseOp) AsTask(opts ...task.Option) task.Task[S, struct{}]

Convert this operation as a Task.

func (*ElementWiseOp[S]) Concurrent

func (op *ElementWiseOp[S]) Concurrent(concurrency int) Sink[S]

Create a concurrent Sink from multiple elementwise operators that have the same behavior.

type ExposeOp

type ExposeOp[S any] struct {
	// contains filtered or unexported fields
}

A Sink task that exposes elements fed by its upstream task via a gRPC service.

func Expose

func Expose[S any](lis net.Listener, m marshal.Spec[S], buffer int) *ExposeOp[S]

Create an expose operator that runs a gRPC service.

The gRPC service is bounded to the passed listener, and each message is encoded as defined with the passed marshal spec.

Use source.Pull to receive elements exposed via this operator. Or, it is also possible to consume elements exposed by directly communicating with the gRPC service. For more details on how to communicate with the service, please see the Communicator service definition.

func (*ExposeOp) AsSink

func (op *ExposeOp) AsSink(opts ...task.Option) Sink[S]

Convert this operation as a Sink.

func (*ExposeOp) AsTask

func (op *ExposeOp) AsTask(opts ...task.Option) task.Task[S, struct{}]

Convert this operation as a Task.

type Sink

type Sink[S any] task.Task[S, struct{}]

A task that is used as a terminal point of a data pipeline.

A Sink receives elements fed by an upstream task, and consumes them, for example, print them into STDOUT or writing them into a file.

The output channel needs to be closed without sending any elements.

func Concurrent

func Concurrent[S any](ss []Sink[S], opts ...task.Option) Sink[S]

Create a Sink from multiple Sinks. The passed Sinks will run concurrently.

func ConcurrentFromFn

func ConcurrentFromFn[S any](fn SinkFn[S], concurrency int, opts ...task.Option) Sink[S]

Create a Sink to run the provided SinkFn concurrently. This is a shorthand to create a concurrent Sink from Sinks with the same function.

func FromFn

func FromFn[S any](fn SinkFn[S], opts ...task.Option) Sink[S]

Build a Sink with a SinkFn.

type SinkFn

type SinkFn[S any] func(ctx context.Context, in <-chan S) error

A function that defines a Sink's behavior. This function should receive elements via the passed input channel. Please note that this function should not close the passed channel because closing the input channel is the upstream task's responsibility. The whole pipeline will be aborted when the returned error is not nil.

type ToChanOp added in v0.5.0

type ToChanOp[S any] struct {
	ElementWiseOp[S]
	// contains filtered or unexported fields
}

A Sink task that sends elements fed by its upstream task to the passed chan.

func ToChan added in v0.5.0

func ToChan[S any](c chan<- S) *ToChanOp[S]

Create a ToChanOp from a chan.

func (*ToChanOp[S]) AsSink added in v0.5.0

func (op *ToChanOp[S]) AsSink(opts ...task.Option) Sink[S]

Convert this operation as a Sink.

func (*ToChanOp[S]) AsTask added in v0.5.0

func (op *ToChanOp[S]) AsTask(opts ...task.Option) task.Task[S, struct{}]

Convert this operation as a Task.

type ToSliceOp

type ToSliceOp[S any] struct {
	ElementWiseOp[S]
}

A Sink task that adds elements fed by its upstream task to the passed slice.

func ToSlice

func ToSlice[S any](s *[]S) *ToSliceOp[S]

Create a ToSliceOp from a slice.

func (*ToSliceOp) AsSink

func (op *ToSliceOp) AsSink(opts ...task.Option) Sink[S]

Convert this operation as a Sink.

func (*ToSliceOp) AsTask

func (op *ToSliceOp) AsTask(opts ...task.Option) task.Task[S, struct{}]

Convert this operation as a Task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL