mfr

package
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2021 License: MIT Imports: 4 Imported by: 2

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func SinkFilter

func SinkFilter(sink luigi.Sink, f FilterFunc) luigi.Sink

SinkFilter returns a new Sink whose values are selected according to the given FilterFunc.

func SinkMap

func SinkMap(sink luigi.Sink, f MapFunc) luigi.Sink

SinkMap returns a Sink which writes converted values to its argument according to a given MapFunc.

func SourceFilter

func SourceFilter(src luigi.Source, f FilterFunc) luigi.Source

SinkFilter returns a new Source whose values are filtered according to the given FilterFunc.

Example
isInt := func(
	_ context.Context,
	v interface{},
) (bool, error) {
	_, ok := v.(int)
	return ok, nil
}

numbers := luigi.SliceSource([]interface{}{0, "one", 2, "three", 4})
ints := SourceFilter(&numbers, isInt)

for {
	v, err := ints.Next(context.Background())
	if luigi.IsEOS(err) {
		break
	}
	fmt.Print(v.(int))
}
Output:

024

func SourceMap

func SourceMap(src luigi.Source, f MapFunc) luigi.Source

SinkMap returns a new Source which produces converted values according to a given MapFunc.

Example
toRune := func(
	_ context.Context,
	v interface{},
) (interface{}, error) {
	return rune(v.(int) + 97), nil
}

numbers := luigi.SliceSource([]interface{}{0, 1, 2, 3, 4})
runes := SourceMap(&numbers, toRune)

for {
	v, err := runes.Next(context.Background())
	if luigi.IsEOS(err) {
		break
	}
	fmt.Print(string(v.(rune)))
}
Output:

abcde

Types

type FilterFunc

type FilterFunc func(ctx context.Context, v interface{}) (bool, error)

FilterFunc is used as a predicate to select values in a stream.

type MapFunc

type MapFunc func(context.Context, interface{}) (interface{}, error)

MapFunc is used to convert or 'map' values in streams.

type ReduceFunc

type ReduceFunc func(ctx context.Context, acc, v interface{}) (interface{}, error)

ReduceFunc is a function that reduces a value v and an accumulator to the next accumulator value.

type ReduceSink

type ReduceSink interface {
	luigi.Sink
	luigi.Observable
}

ReduceSink is a type that reduces values into an accumulator that can be observed using the Observable methods.

Example
// adder sums all numbers in the stream
adder := func(
	_ context.Context,
	acc, v interface{},
) (interface{}, error) {
	if acc == nil {
		acc = 0
	}
	sum := v.(int) + acc.(int)
	return interface{}(sum), nil
}
reducer := NewReduceSink(adder)

numbers := luigi.SliceSource([]interface{}{0, 1, 2, 3, 4})
_ = luigi.Pump(context.Background(), reducer, &numbers)

total, _ := reducer.Value()
fmt.Println(total)
Output:

10

func NewReduceSink

func NewReduceSink(f ReduceFunc) ReduceSink

NewReduceSink returns a ReduceSink that uses the passed reduce function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL