go-pipe: github.com/spatialcurrent/go-pipe/pkg/pipe Index | Examples | Files

package pipe

import "github.com/spatialcurrent/go-pipe/pkg/pipe"

Package pipe includes interfaces and concerete classes for piping objects from inputs to outputs. See the examples below and tests for usage.

Index

Examples

Package Files

BatchWriter.go BufferWriter.go Builder.go ChannelIterator.go ChannelWriter.go ErrInvalidKind.go FunctionIterator.go FunctionWriter.go Iterator.go IteratorToWriter.go LimitedIterator.go MultiIterator.go SetIterator.go SetWriter.go SliceIterator.go SliceWriter.go Writer.go pipe.go

Variables

var (
    FilterNotNil = func(object interface{}) (bool, error) {
        return object != nil, nil
    }
    FilterString = func(object interface{}) (bool, error) {
        _, ok := object.(string)
        return ok, nil
    }
    WriterStdout = NewFunctionWriter(func(object interface{}) error {
        _, err := fmt.Println(object)
        return err
    })
    WriterStderr = NewFunctionWriter(func(object interface{}) error {
        _, err := fmt.Fprintln(os.Stderr, object)
        return err
    })
)

func IteratorToWriter Uses

func IteratorToWriter(it Iterator, w Writer, transform func(inputObject interface{}) (interface{}, error), errorHandler func(err error) error, filter func(inputObject interface{}) (bool, error), inputLimit int, outputLimit int, closeOutput bool) error

IteratorToWriter reads objects from the provided iterator and writes them to the provided writer. If a transform function is given, then transforms the input objects before writing them. If an errorHandler is given, then propogates errors returned by the transformed function through the errorHandler. If the errorHandler returns a non-nil error, then processing will halt. If a filter is given, the input object, after transformation if applicable, is filtered. If the filter returns true and no error, then the object is passed to the writer. If the inputLimit >= 0, then reads the given number of objects from the input. If the outputLimit >= 0, then writes the given number of objecst to the writer.

type BatchWriter Uses

type BatchWriter interface {
    WriteObjects(objects interface{}) error
    Flush() error
}

BatchWriter contains the WriteObjects and Flush functions for writing a batch of objects.

type BufferWriter Uses

type BufferWriter struct {
    // contains filtered or unexported fields
}

BufferWriter wraps a buffer around an underlying writer. Once the buffer reaches capacity, it writes its values to the underlying writer. The Flush method will propagate to the underlying writer.

func NewBufferWriter Uses

func NewBufferWriter(writer Writer, capacity int) *BufferWriter

NewBufferWriter returns a new BufferWriter with the given capacity.

func (*BufferWriter) Flush Uses

func (bw *BufferWriter) Flush() error

func (*BufferWriter) Reset Uses

func (bw *BufferWriter) Reset()

Reset creates a new underlying slice from the type of the original slice.

func (*BufferWriter) WriteObject Uses

func (bw *BufferWriter) WriteObject(object interface{}) error

func (*BufferWriter) WriteObjects Uses

func (bw *BufferWriter) WriteObjects(objects interface{}) error

type Builder Uses

type Builder struct {
    // contains filtered or unexported fields
}

Builder helps build a pipeline

This examples hows how to use a builder.

Code:

input := []interface{}{"a", "b", "c", 1, 2, 3, false, true}

it, err := NewSliceIterator(input)
if err != nil {
    panic(err)
}

w := NewSliceWriterWithValues([]string{})

b := NewBuilder().
    Input(it).
    Filter(func(inputObject interface{}) (bool, error) {
        // filter to only include strings
        _, ok := inputObject.(string)
        return ok, nil
    }).
    Output(w)

err = b.Run()
if err != nil {
    panic(err)
}
// the slice writer preserves the type of the initial values.
fmt.Println(strings.Join(w.Values().([]string), "\n"))

Output:

a
b
c

This examples hows how to use a builder.

Code:

// the initial values with duplicates
input := []interface{}{"a", "b", "c", "a", "b"}

it, err := NewSliceIterator(input)
if err != nil {
    panic(err)
}

w := NewSetWriter()

err = NewBuilder().Input(it).Output(w).Run()
if err != nil {
    panic(err)
}

values := w.SliceInterface() // get values written as slice of type []interface{}

// Sort the returned values
sort.Slice(values, func(i, j int) bool {
    return fmt.Sprint(values[i]) < fmt.Sprint(values[j])
})

for _, value := range values {
    fmt.Println(value)
}

Output:

a
b
c

func NewBuilder Uses

func NewBuilder() *Builder

NewBuilder returns a new builder.

func (*Builder) CloseOutput Uses

func (b *Builder) CloseOutput(closeOutput bool) *Builder

CloseOutput sets the closeOutput for the pipeline.

func (*Builder) Error Uses

func (b *Builder) Error(e func(err error) error) *Builder

Error sets the error handler for the pipeline that catches errors from the transform function. If the error handler returns nil, then the pipeline continues as normal. If the error handler returns the original error (or a new one), then the pipeline bubbles up the error and exits.

func (*Builder) Filter Uses

func (b *Builder) Filter(f func(object interface{}) (bool, error)) *Builder

Filter sets the filter for the pipeline.

func (*Builder) Input Uses

func (b *Builder) Input(in Iterator) *Builder

Input sets the input for the pipeline.

func (*Builder) InputF Uses

func (b *Builder) InputF(fn func() (interface{}, error)) *Builder

InputF sets the input for the pipeline to a function by wrapping the provided function with pipe.FunctionIterator.

func (*Builder) InputLimit Uses

func (b *Builder) InputLimit(inputLimit int) *Builder

InputLimit sets the inputLimit for the pipeline.

func (*Builder) Output Uses

func (b *Builder) Output(w Writer) *Builder

Output sets the output for the pipeline.

func (*Builder) OutputF Uses

func (b *Builder) OutputF(fn func(object interface{}) error) *Builder

OutputF sets the output for the pipeline to a function by wrapping the provided function with pipe.FunctionWriter.

func (*Builder) OutputLimit Uses

func (b *Builder) OutputLimit(outputLimit int) *Builder

OutputLimit sets the outputLimit for the pipeline.

func (*Builder) Run Uses

func (b *Builder) Run() error

Run runs the pipeline.

func (*Builder) Transform Uses

func (b *Builder) Transform(t func(inputObject interface{}) (interface{}, error)) *Builder

Transform sets the transform for the pipeline.

type ChannelIterator Uses

type ChannelIterator struct {
    // contains filtered or unexported fields
}

ChannelIterator iterates over a channel of values.

This examples hows how to use a channel iterator to read from a channel.

Code:

c := make(chan interface{}, 1000)

c <- "a"
c <- "b"
c <- "c"
close(c)

it, err := NewChannelIterator(c)
if err != nil {
    panic(err)
}
for {
    obj, err := it.Next()
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }
    fmt.Println(obj)
}

Output:

a
b
c

func NewChannelIterator Uses

func NewChannelIterator(values interface{}) (*ChannelIterator, error)

NewChannelIterator returns a new ChannelIterator.

func (*ChannelIterator) Next Uses

func (ci *ChannelIterator) Next() (interface{}, error)

type ChannelWriter Uses

type ChannelWriter struct {
    // contains filtered or unexported fields
}

ChannelWriter passes each object to the callback function

This examples hows how to use a channel writer to write to a channel.

Code:

c := make(chan interface{}, 1000)

w, err := NewChannelWriter(c)
if err != nil {
    panic(err)
}

err = w.WriteObject("a")
if err != nil {
    panic(err)
}

err = w.WriteObject("b")
if err != nil {
    panic(err)
}

err = w.WriteObject("c")
if err != nil {
    panic(err)
}

close(c)

for v := range c {
    fmt.Println(v)
}

Output:

a
b
c

func NewChannelWriter Uses

func NewChannelWriter(channel interface{}) (*ChannelWriter, error)

func (*ChannelWriter) Close Uses

func (cw *ChannelWriter) Close() error

func (*ChannelWriter) Flush Uses

func (cw *ChannelWriter) Flush() error

func (*ChannelWriter) WriteObject Uses

func (cw *ChannelWriter) WriteObject(object interface{}) error

type ErrInvalidKind Uses

type ErrInvalidKind struct {
    Value    reflect.Type
    Expected []reflect.Kind
}

func (ErrInvalidKind) Error Uses

func (e ErrInvalidKind) Error() string

Error returns the error formatted as a string.

type FunctionIterator Uses

type FunctionIterator struct {
    // contains filtered or unexported fields
}

FunctionIterator provides a simple wrapper over a callback function to allow it to iterate.

func NewFunctionIterator Uses

func NewFunctionIterator(callback func() (interface{}, error)) *FunctionIterator

func (*FunctionIterator) Next Uses

func (fi *FunctionIterator) Next() (interface{}, error)

type FunctionWriter Uses

type FunctionWriter struct {
    // contains filtered or unexported fields
}

FunctionWriter passes each object to the callback function

This examples hows how to use a function writer to call a callback with each object.

Code:

w := NewFunctionWriter(func(object interface{}) error {
    fmt.Println(object) // print to stdout
    return nil
})

err := w.WriteObject("a")
if err != nil {
    panic(err)
}

err = w.WriteObject("b")
if err != nil {
    panic(err)
}

err = w.WriteObject("c")
if err != nil {
    panic(err)
}

Output:

a
b
c

func NewFunctionWriter Uses

func NewFunctionWriter(callback func(object interface{}) error) *FunctionWriter

func (*FunctionWriter) Flush Uses

func (sw *FunctionWriter) Flush() error

func (*FunctionWriter) WriteObject Uses

func (sw *FunctionWriter) WriteObject(object interface{}) error

type Iterator Uses

type Iterator interface {
    Next() (interface{}, error)
}

Iterator contains the Next function that returns an object until it returns an io.EOF error.

type LimitedIterator Uses

type LimitedIterator struct {
    // contains filtered or unexported fields
}

LimitedIterator returns an iterator that reads up to a given number of objects from the underlying reader. Once the maximum number of objects has been read or the underlying iterator has returned io.EOF, Next will return io.EOF. If the underlying iterator returns a non-nil, non-EOF error, Next will return that error. This approach is similiar to the io.LimitedReader.

- https://godoc.org/io#LimitedReader

Code:

si, err := NewSliceIterator([]interface{}{"a", "b", "c"})
if err != nil {
    panic(err)
}

li := NewLimitedIterator(si, 2)

obj, err := li.Next()
if err != nil {
    fmt.Println(err)
}
fmt.Println(obj)

obj, err = li.Next()
if err != nil {
    fmt.Println(err)
}
fmt.Println(obj)

obj, err = li.Next()
if err != nil {
    fmt.Println(err)
}
fmt.Println(obj)

Output:

a
b
EOF
<nil>

func NewLimitedIterator Uses

func NewLimitedIterator(iterator Iterator, limit int) *LimitedIterator

NewLimitedIterator returns a new LimitIterator that serialy iterators through the given iterators.

func (*LimitedIterator) Count Uses

func (li *LimitedIterator) Count() int

Count returns the current count of objects returned.

func (*LimitedIterator) Limit Uses

func (li *LimitedIterator) Limit() int

Limit returns the set limit.

func (*LimitedIterator) Next Uses

func (li *LimitedIterator) Next() (interface{}, error)

type MultiIterator Uses

type MultiIterator struct {
    // contains filtered or unexported fields
}

MultiIterator returns an Iterator that's the logical concatenation of the provided input iterators. They're read sequentially. Once all iterators have returned EOF, Next will return EOF. If any of the iterators return a non-nil, non-EOF error, Next will return that error. This approach is similiar to the io.MultiReader.

- https://godoc.org/io#MultiReader

This examples shows how to use a multi iterator to iterate through multiple subiterators.

Code:

sia, err := NewSliceIterator([]interface{}{"a", "b", "c"})
if err != nil {
    panic(err)
}

sib, err := NewSliceIterator([]interface{}{1, 2, 3})
if err != nil {
    panic(err)
}

mi := NewMultiIterator(sia, sib)

obj, err := mi.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

Output:

a
b
c
1
2
3

func NewMultiIterator Uses

func NewMultiIterator(iterators ...Iterator) *MultiIterator

NewMultiIterator returns a new MultiIterator that serialy iterators through the given iterators.

func (*MultiIterator) Next Uses

func (mi *MultiIterator) Next() (interface{}, error)

func (*MultiIterator) Push Uses

func (mi *MultiIterator) Push(it ...Iterator)

type SetIterator Uses

type SetIterator struct {
    // contains filtered or unexported fields
}

SetIterator iterates over the keys in a map.

This examples shows how to use a map iterator to iterate through the keys of a map.

Code:


input := map[string]struct{}{
    "a": struct{}{},
    "b": struct{}{},
    "c": struct{}{},
}

it, err := NewSetIterator(input)
if err != nil {
    panic(err)
}

for {
    obj, err := it.Next()
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }
    fmt.Println(obj)
}

func NewSetIterator Uses

func NewSetIterator(values interface{}) (*SetIterator, error)

NewSetIterator returns a new SetIterator.

func (*SetIterator) Next Uses

func (mi *SetIterator) Next() (interface{}, error)

type SetWriter Uses

type SetWriter struct {
    // contains filtered or unexported fields
}

SetWriter contains the WriteObject and Flush functions for writing objects as keys to a set.

This examples shows how to use a SetWriter to write values to a set as keys to a map.

Code:

w := NewSetWriter()

err := w.WriteObject("a")
if err != nil {
    panic(err)
}

err = w.WriteObject("b")
if err != nil {
    panic(err)
}

err = w.WriteObject("c")
if err != nil {
    panic(err)
}

err = w.WriteObject("a")
if err != nil {
    panic(err)
}

values := w.SliceInterface() // get values written as slice of type []interface{}

// Sort the returned values
sort.Slice(values, func(i, j int) bool {
    return fmt.Sprint(values[i]) < fmt.Sprint(values[j])
})

// Print the values
fmt.Println(values)

Output:

[a b c]

func NewSetWriter Uses

func NewSetWriter() *SetWriter

func NewSetWriterWithValues Uses

func NewSetWriterWithValues(initialValues interface{}) *SetWriter

func (*SetWriter) Flush Uses

func (sw *SetWriter) Flush() error

Flush has no effect for SetWriter as all objects are immediately written to the underlying set.

func (*SetWriter) Iterator Uses

func (sw *SetWriter) Iterator() *SetIterator

func (*SetWriter) Reset Uses

func (sw *SetWriter) Reset()

Reset creates a new underlying set from the type of the original set.

func (*SetWriter) SliceInterface Uses

func (sw *SetWriter) SliceInterface() []interface{}

func (*SetWriter) SliceType Uses

func (sw *SetWriter) SliceType() interface{}

func (*SetWriter) Values Uses

func (sw *SetWriter) Values() interface{}

func (*SetWriter) WriteObject Uses

func (sw *SetWriter) WriteObject(object interface{}) error

func (*SetWriter) WriteObjects Uses

func (sw *SetWriter) WriteObjects(objects interface{}) error

type SliceIterator Uses

type SliceIterator struct {
    // contains filtered or unexported fields
}

SliceIterator iterates over an array or slice of values.

This examples shows how to use a slice iterator to iterate through a slice of values.

Code:

it, err := NewSliceIterator([]interface{}{"a", "b", "c"})
if err != nil {
    panic(err)
}

obj, err := it.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

obj, err = it.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

obj, err = it.Next()
if err != nil {
    panic(err)
}
fmt.Println(obj)

Output:

a
b
c

func NewSliceIterator Uses

func NewSliceIterator(values interface{}) (*SliceIterator, error)

NewSliceIterator returns a new SliceIterator.

func (*SliceIterator) Next Uses

func (si *SliceIterator) Next() (interface{}, error)

type SliceWriter Uses

type SliceWriter struct {
    // contains filtered or unexported fields
}

SliceWriter contains the WriteObject and Flush functions for writing objects to an underlying slice.

This examples shows how to use a slice writer to write values to a slice.

Code:

w := NewSliceWriter()

err := w.WriteObject("a")
if err != nil {
    panic(err)
}

err = w.WriteObject("b")
if err != nil {
    panic(err)
}

err = w.WriteObject("c")
if err != nil {
    panic(err)
}

values := w.Values() // get values written to slice

fmt.Println(values)

Output:

[a b c]

func NewSliceWriter Uses

func NewSliceWriter() *SliceWriter

func NewSliceWriterWithCapacity Uses

func NewSliceWriterWithCapacity(initialCapacity int) *SliceWriter

func NewSliceWriterWithValues Uses

func NewSliceWriterWithValues(initialValues interface{}) *SliceWriter

func (*SliceWriter) Flush Uses

func (sw *SliceWriter) Flush() error

func (*SliceWriter) Iterator Uses

func (sw *SliceWriter) Iterator() *SliceIterator

func (*SliceWriter) Reset Uses

func (sw *SliceWriter) Reset()

Reset creates a new underlying slice from the type of the original slice.

func (*SliceWriter) Values Uses

func (sw *SliceWriter) Values() interface{}

func (*SliceWriter) WriteObject Uses

func (sw *SliceWriter) WriteObject(object interface{}) error

func (*SliceWriter) WriteObjects Uses

func (sw *SliceWriter) WriteObjects(objects interface{}) error

type Writer Uses

type Writer interface {
    WriteObject(object interface{}) error
    Flush() error
}

Writer contains the WriteObject and Flush functions for writing objects.

Package pipe imports 6 packages (graph) and is imported by 5 packages. Updated 2019-11-21. Refresh now. Tools for package owners.