pipe

package
v0.0.0-...-8c3e90f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2021 License: MIT Imports: 6 Imported by: 7

Documentation

Overview

Package pipe includes interfaces and concerete classes for piping objects from inputs to outputs. See the examples below and tests for usage.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	FilterNotNil = func(object interface{}) (bool, error) {
		return object != nil, nil
	}
	FilterString = func(object interface{}) (bool, error) {
		_, ok := object.(string)
		return ok, nil
	}
	WriterStdout = NewFunctionWriter(func(object interface{}) error {
		_, err := fmt.Println(object)
		return err
	})
	WriterStderr = NewFunctionWriter(func(object interface{}) error {
		_, err := fmt.Fprintln(os.Stderr, object)
		return err
	})
)

Functions

func IteratorToWriter

func IteratorToWriter(it Iterator, w Writer, transform func(inputObject interface{}) (interface{}, error), errorHandler func(err error) error, filter func(inputObject interface{}) (bool, error), inputLimit int, outputLimit int, closeOutput bool) error

IteratorToWriter reads objects from the provided iterator and writes them to the provided writer. If a transform function is given, then transforms the input objects before writing them. If an errorHandler is given, then propogates errors returned by the transformed function through the errorHandler. If the errorHandler returns a non-nil error, then processing will halt. If a filter is given, the input object, after transformation if applicable, is filtered. If the filter returns true and no error, then the object is passed to the writer. If the inputLimit >= 0, then reads the given number of objects from the input. If the outputLimit >= 0, then writes the given number of objecst to the writer.

func ReadAll

func ReadAll(it Iterator) ([]interface{}, error)

Types

type BatchWriter

type BatchWriter interface {
	WriteObjects(objects interface{}) error
	Flush() error
}

BatchWriter contains the WriteObjects and Flush functions for writing a batch of objects.

type BufferWriter

type BufferWriter struct {
	// contains filtered or unexported fields
}

BufferWriter wraps a buffer around an underlying writer. Once the buffer reaches capacity, it writes its values to the underlying writer. The Flush method will propagate to the underlying writer.

func NewBufferWriter

func NewBufferWriter(writer Writer, capacity int) *BufferWriter

NewBufferWriter returns a new BufferWriter with the given capacity.

func (*BufferWriter) Close

func (bw *BufferWriter) Close() error

func (*BufferWriter) Flush

func (bw *BufferWriter) Flush() error

func (*BufferWriter) Reset

func (bw *BufferWriter) Reset()

Reset creates a new underlying slice from the type of the original slice.

func (*BufferWriter) WriteObject

func (bw *BufferWriter) WriteObject(object interface{}) error

func (*BufferWriter) WriteObjects

func (bw *BufferWriter) WriteObjects(objects interface{}) error

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder helps build a pipeline

Example

This examples hows how to use a builder.

input := []interface{}{"a", "b", "c", 1, 2, 3, false, true}

it, err := NewSliceIterator(input)
if err != nil {
	panic(err)
}

w := NewSliceWriterWithValues([]string{})

b := NewBuilder().
	Input(it).
	Filter(func(inputObject interface{}) (bool, error) {
		// filter to only include strings
		_, ok := inputObject.(string)
		return ok, nil
	}).
	Output(w)

err = b.Run()
if err != nil {
	panic(err)
}
// the slice writer preserves the type of the initial values.
fmt.Println(strings.Join(w.Values().([]string), "\n"))
Output:

a
b
c
Example (SliceToSet)

This examples hows how to use a builder.

// the initial values with duplicates
input := []interface{}{"a", "b", "c", "a", "b"}

it, err := NewSliceIterator(input)
if err != nil {
	panic(err)
}

w := NewSetWriter()

err = NewBuilder().Input(it).Output(w).Run()
if err != nil {
	panic(err)
}

values := w.SliceInterface() // get values written as slice of type []interface{}

// Sort the returned values
sort.Slice(values, func(i, j int) bool {
	return fmt.Sprint(values[i]) < fmt.Sprint(values[j])
})

for _, value := range values {
	fmt.Println(value)
}
Output:

a
b
c

func NewBuilder

func NewBuilder() *Builder

NewBuilder returns a new builder.

func (*Builder) CloseOutput

func (b *Builder) CloseOutput(closeOutput bool) *Builder

CloseOutput sets the closeOutput for the pipeline.

func (*Builder) Error

func (b *Builder) Error(e func(err error) error) *Builder

Error sets the error handler for the pipeline that catches errors from the transform function. If the error handler returns nil, then the pipeline continues as normal. If the error handler returns the original error (or a new one), then the pipeline bubbles up the error and exits.

func (*Builder) Filter

func (b *Builder) Filter(f func(object interface{}) (bool, error)) *Builder

Filter sets the filter for the pipeline.

func (*Builder) Input

func (b *Builder) Input(in Iterator) *Builder

Input sets the input for the pipeline.

func (*Builder) InputF

func (b *Builder) InputF(fn func() (interface{}, error)) *Builder

InputF sets the input for the pipeline to a function by wrapping the provided function with pipe.FunctionIterator.

func (*Builder) InputLimit

func (b *Builder) InputLimit(inputLimit int) *Builder

InputLimit sets the inputLimit for the pipeline.

func (*Builder) Output

func (b *Builder) Output(w Writer) *Builder

Output sets the output for the pipeline.

func (*Builder) OutputF

func (b *Builder) OutputF(fn func(object interface{}) error) *Builder

OutputF sets the output for the pipeline to a function by wrapping the provided function with pipe.FunctionWriter.

func (*Builder) OutputLimit

func (b *Builder) OutputLimit(outputLimit int) *Builder

OutputLimit sets the outputLimit for the pipeline.

func (*Builder) Run

func (b *Builder) Run() error

Run runs the pipeline.

func (*Builder) Transform

func (b *Builder) Transform(t func(inputObject interface{}) (interface{}, error)) *Builder

Transform sets the transform for the pipeline.

type ChannelIterator

type ChannelIterator struct {
	// contains filtered or unexported fields
}

ChannelIterator iterates over a channel of values.

Example

This examples hows how to use a channel iterator to read from a channel.

c := make(chan interface{}, 1000)

c <- "a"
c <- "b"
c <- "c"
close(c)

it, err := NewChannelIterator(c)
if err != nil {
	panic(err)
}
for {
	obj, err := it.Next()
	if err != nil {
		if err == io.EOF {
			break
		}
		panic(err)
	}
	fmt.Println(obj)
}
Output:

a
b
c

func NewChannelIterator

func NewChannelIterator(values interface{}) (*ChannelIterator, error)

NewChannelIterator returns a new ChannelIterator.

func (*ChannelIterator) Next

func (ci *ChannelIterator) Next() (interface{}, error)

type ChannelWriter

type ChannelWriter struct {
	// contains filtered or unexported fields
}

ChannelWriter passes each object to the callback function

Example

This examples hows how to use a channel writer to write to a channel.

c := make(chan interface{}, 1000)

w, err := NewChannelWriter(c)
if err != nil {
	panic(err)
}

err = w.WriteObject("a")
if err != nil {
	panic(err)
}

err = w.WriteObject("b")
if err != nil {
	panic(err)
}

err = w.WriteObject("c")
if err != nil {
	panic(err)
}

close(c)

for v := range c {
	fmt.Println(v)
}
Output:

a
b
c

func NewChannelWriter

func NewChannelWriter(channel interface{}) (*ChannelWriter, error)

func (*ChannelWriter) Close

func (cw *ChannelWriter) Close() error

func (*ChannelWriter) Flush

func (cw *ChannelWriter) Flush() error

func (*ChannelWriter) WriteObject

func (cw *ChannelWriter) WriteObject(object interface{}) error

type ErrInvalidKind

type ErrInvalidKind struct {
	Value    reflect.Type
	Expected []reflect.Kind
}

func (ErrInvalidKind) Error

func (e ErrInvalidKind) Error() string

Error returns the error formatted as a string.

type FunctionIterator

type FunctionIterator struct {
	// contains filtered or unexported fields
}

FunctionIterator provides a simple wrapper over a callback function to allow it to iterate.

func NewFunctionIterator

func NewFunctionIterator(callback func() (interface{}, error)) *FunctionIterator

func (*FunctionIterator) Next

func (fi *FunctionIterator) Next() (interface{}, error)

type FunctionWriter

type FunctionWriter struct {
	// contains filtered or unexported fields
}

FunctionWriter passes each object to the callback function

Example

This examples hows how to use a function writer to call a callback with each object.

w := NewFunctionWriter(func(object interface{}) error {
	fmt.Println(object) // print to stdout
	return nil
})

err := w.WriteObject("a")
if err != nil {
	panic(err)
}

err = w.WriteObject("b")
if err != nil {
	panic(err)
}

err = w.WriteObject("c")
if err != nil {
	panic(err)
}
Output:

a
b
c

func NewFunctionWriter

func NewFunctionWriter(callback func(object interface{}) error) *FunctionWriter

func (*FunctionWriter) Flush

func (sw *FunctionWriter) Flush() error

func (*FunctionWriter) WriteObject

func (sw *FunctionWriter) WriteObject(object interface{}) error

type Iterator

type Iterator interface {
	Next() (interface{}, error)
}

Iterator contains the Next function that returns an object until it returns an io.EOF error.

type LimitedIterator

type LimitedIterator struct {
	// contains filtered or unexported fields
}

LimitedIterator returns an iterator that reads up to a given number of objects from the underlying reader. Once the maximum number of objects has been read or the underlying iterator has returned io.EOF, Next will return io.EOF. If the underlying iterator returns a non-nil, non-EOF error, Next will return that error. This approach is similiar to the io.LimitedReader.

Example
si, err := NewSliceIterator([]interface{}{"a", "b", "c"})
if err != nil {
	panic(err)
}

li := NewLimitedIterator(si, 2)

obj, err := li.Next()
if err != nil {
	fmt.Println(err)
}
fmt.Println(obj)

obj, err = li.Next()
if err != nil {
	fmt.Println(err)
}
fmt.Println(obj)

obj, err = li.Next()
if err != nil {
	fmt.Println(err)
}
fmt.Println(obj)
Output:

a
b
EOF
<nil>

func NewLimitedIterator

func NewLimitedIterator(iterator Iterator, limit int) *LimitedIterator

NewLimitedIterator returns a new LimitIterator that serialy iterators through the given iterators.

func (*LimitedIterator) Count

func (li *LimitedIterator) Count() int

Count returns the current count of objects returned.

func (*LimitedIterator) Limit

func (li *LimitedIterator) Limit() int

Limit returns the set limit.

func (*LimitedIterator) Next

func (li *LimitedIterator) Next() (interface{}, error)

type MultiIterator

type MultiIterator struct {
	// contains filtered or unexported fields
}

MultiIterator returns an Iterator that's the logical concatenation of the provided input iterators. They're read sequentially. Once all iterators have returned EOF, Next will return EOF. If any of the iterators return a non-nil, non-EOF error, Next will return that error. This approach is similiar to the io.MultiReader.

Example

This examples shows how to use a multi iterator to iterate through multiple subiterators.

sia, err := NewSliceIterator([]interface{}{"a", "b", "c"})
if err != nil {
	panic(err)
}

sib, err := NewSliceIterator([]interface{}{1, 2, 3})
if err != nil {
	panic(err)
}

mi := NewMultiIterator(sia, sib)

obj, err := mi.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)

obj, err = mi.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)
Output:

a
b
c
1
2
3

func NewMultiIterator

func NewMultiIterator(iterators ...Iterator) *MultiIterator

NewMultiIterator returns a new MultiIterator that serialy iterators through the given iterators.

func (*MultiIterator) Next

func (mi *MultiIterator) Next() (interface{}, error)

func (*MultiIterator) Push

func (mi *MultiIterator) Push(it ...Iterator)

type MultiWriter

type MultiWriter struct {
	// contains filtered or unexported fields
}

MultiWriter creates a writer that duplicates its writes to all the provided writers.

func NewMultiWriter

func NewMultiWriter(writers ...Writer) *MultiWriter

func (*MultiWriter) Close

func (mw *MultiWriter) Close() error

func (*MultiWriter) Flush

func (mw *MultiWriter) Flush() error

func (*MultiWriter) WriteObject

func (mw *MultiWriter) WriteObject(object interface{}) error

func (*MultiWriter) WriteObjects

func (mw *MultiWriter) WriteObjects(objects interface{}) error

type SetIterator

type SetIterator struct {
	// contains filtered or unexported fields
}

SetIterator iterates over the keys in a map.

Example

This examples shows how to use a map iterator to iterate through the keys of a map.

input := map[string]struct{}{
	"a": struct{}{},
	"b": struct{}{},
	"c": struct{}{},
}

it, err := NewSetIterator(input)
if err != nil {
	panic(err)
}

for {
	obj, err := it.Next()
	if err != nil {
		if err == io.EOF {
			break
		}
		panic(err)
	}
	fmt.Println(obj)
}
Output:

func NewSetIterator

func NewSetIterator(values interface{}) (*SetIterator, error)

NewSetIterator returns a new SetIterator.

func (*SetIterator) Next

func (mi *SetIterator) Next() (interface{}, error)

type SetWriter

type SetWriter struct {
	// contains filtered or unexported fields
}

SetWriter contains the WriteObject and Flush functions for writing objects as keys to a set.

Example

This examples shows how to use a SetWriter to write values to a set as keys to a map.

w := NewSetWriter()

err := w.WriteObject("a")
if err != nil {
	panic(err)
}

err = w.WriteObject("b")
if err != nil {
	panic(err)
}

err = w.WriteObject("c")
if err != nil {
	panic(err)
}

err = w.WriteObject("a")
if err != nil {
	panic(err)
}

values := w.SliceInterface() // get values written as slice of type []interface{}

// Sort the returned values
sort.Slice(values, func(i, j int) bool {
	return fmt.Sprint(values[i]) < fmt.Sprint(values[j])
})

// Print the values
fmt.Println(values)
Output:

[a b c]

func NewSetWriter

func NewSetWriter() *SetWriter

func NewSetWriterWithValues

func NewSetWriterWithValues(initialValues interface{}) *SetWriter

func (*SetWriter) Flush

func (sw *SetWriter) Flush() error

Flush has no effect for SetWriter as all objects are immediately written to the underlying set.

func (*SetWriter) Iterator

func (sw *SetWriter) Iterator() *SetIterator

func (*SetWriter) Reset

func (sw *SetWriter) Reset()

Reset creates a new underlying set from the type of the original set.

func (*SetWriter) SliceInterface

func (sw *SetWriter) SliceInterface() []interface{}

func (*SetWriter) SliceType

func (sw *SetWriter) SliceType() interface{}

func (*SetWriter) Values

func (sw *SetWriter) Values() interface{}

func (*SetWriter) WriteObject

func (sw *SetWriter) WriteObject(object interface{}) error

func (*SetWriter) WriteObjects

func (sw *SetWriter) WriteObjects(objects interface{}) error

type SliceIterator

type SliceIterator struct {
	// contains filtered or unexported fields
}

SliceIterator iterates over an array or slice of values.

Example

This examples shows how to use a slice iterator to iterate through a slice of values.

it, err := NewSliceIterator([]interface{}{"a", "b", "c"})
if err != nil {
	panic(err)
}

obj, err := it.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)

obj, err = it.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)

obj, err = it.Next()
if err != nil {
	panic(err)
}
fmt.Println(obj)
Output:

a
b
c

func NewSliceIterator

func NewSliceIterator(values interface{}) (*SliceIterator, error)

NewSliceIterator returns a new SliceIterator.

func (*SliceIterator) Next

func (si *SliceIterator) Next() (interface{}, error)

type SliceWriter

type SliceWriter struct {
	// contains filtered or unexported fields
}

SliceWriter contains the WriteObject and Flush functions for writing objects to an underlying slice.

Example

This examples shows how to use a slice writer to write values to a slice.

w := NewSliceWriter()

err := w.WriteObject("a")
if err != nil {
	panic(err)
}

err = w.WriteObject("b")
if err != nil {
	panic(err)
}

err = w.WriteObject("c")
if err != nil {
	panic(err)
}

values := w.Values() // get values written to slice

fmt.Println(values)
Output:

[a b c]

func NewSliceWriter

func NewSliceWriter() *SliceWriter

func NewSliceWriterWithCapacity

func NewSliceWriterWithCapacity(initialCapacity int) *SliceWriter

func NewSliceWriterWithValues

func NewSliceWriterWithValues(initialValues interface{}) *SliceWriter

func (*SliceWriter) Flush

func (sw *SliceWriter) Flush() error

func (*SliceWriter) Iterator

func (sw *SliceWriter) Iterator() *SliceIterator

func (*SliceWriter) Reset

func (sw *SliceWriter) Reset()

Reset creates a new underlying slice from the type of the original slice.

func (*SliceWriter) Values

func (sw *SliceWriter) Values() interface{}

func (*SliceWriter) WriteObject

func (sw *SliceWriter) WriteObject(object interface{}) error

func (*SliceWriter) WriteObjects

func (sw *SliceWriter) WriteObjects(objects interface{}) error

type SyncWriter

type SyncWriter struct {
	// contains filtered or unexported fields
}

SyncWriter wraps a mutex around an underlying writer, so writes happen sequentially.

func NewSyncWriter

func NewSyncWriter(writer Writer) *SyncWriter

NewSyncWriter returns a new SyncWriter.

func (*SyncWriter) Close

func (sw *SyncWriter) Close() error

func (*SyncWriter) Flush

func (sw *SyncWriter) Flush() error

func (*SyncWriter) WriteObject

func (sw *SyncWriter) WriteObject(object interface{}) error

func (*SyncWriter) WriteObjects

func (sw *SyncWriter) WriteObjects(objects interface{}) error

type TransactionWriter

type TransactionWriter struct {
	// contains filtered or unexported fields
}

TransactionWriter opens up a transaction to the underlying writer and closes the underlying writer after the objects are written.

func NewTransactionWriter

func NewTransactionWriter(opener func() (Writer, error), closer func(w Writer) error, async bool) (*TransactionWriter, error)

NewTransactionWriter returns a new TransactionWriter with the opener function and optional closer function.

func (*TransactionWriter) Close

func (tw *TransactionWriter) Close() error

func (*TransactionWriter) Flush

func (tw *TransactionWriter) Flush() error

func (*TransactionWriter) WriteObject

func (tw *TransactionWriter) WriteObject(object interface{}) error

func (*TransactionWriter) WriteObjects

func (tw *TransactionWriter) WriteObjects(objects interface{}) error

type Writer

type Writer interface {
	WriteObject(object interface{}) error
	Flush() error
}

Writer contains the WriteObject and Flush functions for writing objects.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL