source

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2023 License: MIT Imports: 7 Imported by: 2

Documentation

Overview

Package defines Source which is a type of a task. A Source is used as a starting point of a data pipeline.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChanSourceOp added in v0.5.0

type ChanSourceOp[T any] struct {
	// contains filtered or unexported fields
}

A Source task that emits elements received from the passed chan.

func FromChan added in v0.5.0

func FromChan[T any](c <-chan T) *ChanSourceOp[T]

Create a ChanSourceOp from a slice.

func (*ChanSourceOp) AsSource added in v0.5.0

func (op *ChanSourceOp) AsSource(opts ...task.Option) Source[T]

Convert this operation as a Source.

func (*ChanSourceOp) AsTask added in v0.5.0

func (op *ChanSourceOp) AsTask(opts ...task.Option) task.Task[struct{}, T]

Convert this operation as a Task.

type PullOp

type PullOp[T any] struct {
	// contains filtered or unexported fields
}

A Source task that reads elements from a gRPC Communicator service and emits them.

This Source task can be used to pull data from another process that exposes data via a Communicator service.

func Pull

func Pull[T any](conn grpc.ClientConnInterface, m marshal.Spec[T], batchSize int) *PullOp[T]

Create a PullOp with a gRPC connection and a marshal spec.

The gRPC connection needs to be a connection with a Communicator service. And, the marshal spec defines how elements exposed via the Communicator service should be decoded. To successfully decode elements pulled from the Communicator service, the marshal spec needs to be the same one used to expose the data.

The batchSize argument defines the size of batches when pulling data from a Communicator service. The larger batch size reduces the number of communication over a network, but also, it let the process to wait for a large batch is fulfilled.

func (*PullOp) AsSource

func (op *PullOp) AsSource(opts ...task.Option) Source[T]

Convert this operation as a Source.

func (*PullOp) AsTask

func (op *PullOp) AsTask(opts ...task.Option) task.Task[struct{}, T]

Convert this operation as a Task.

type SliceSourceOp

type SliceSourceOp[T any] struct {
	// contains filtered or unexported fields
}

A Source task that emits elements in the passed slice.

func FromSlice

func FromSlice[T any](items []T) *SliceSourceOp[T]

Create a SliceSourceOp from a slice.

func (*SliceSourceOp) AsSource

func (op *SliceSourceOp) AsSource(opts ...task.Option) Source[T]

Convert this operation as a Source.

func (*SliceSourceOp) AsTask

func (op *SliceSourceOp) AsTask(opts ...task.Option) task.Task[struct{}, T]

Convert this operation as a Task.

type Source

type Source[T any] task.Task[struct{}, T]

A task that is used as a starting point of a data pipeline.

A Source usually generates multiple elements and feeds them to a downstream task, for example, from a provided slice or by reading a file.

The input channel needs to be closed without sending any elements.

func Concurrent

func Concurrent[T any](ss []Source[T], opts ...task.Option) Source[T]

Create a Source from multiple Sources. The passed Sources will run concurrently, and those outputs will be merged as outputs of the created Source.

func FromFn

func FromFn[T any](fn SourceFn[T], opts ...task.Option) Source[T]

Build a Source with a SourceFn.

type SourceFn

type SourceFn[T any] func(ctx context.Context, out chan<- T) error

A function that defines a Source's behavior. This function should send elements to the passed output channel. Please note that this function should not close the output channel because source.FromFn automatically closes the channel. The whole pipeline will be aborted when the returned error is not nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL