Documentation ¶
Overview ¶
Package pipe includes interfaces and concerete classes for piping objects from inputs to outputs. See the examples below and tests for usage.
Index ¶
- Variables
- func IteratorToWriter(it Iterator, w Writer, ...) error
- func ReadAll(it Iterator) ([]interface{}, error)
- type BatchWriter
- type BufferWriter
- type Builder
- func (b *Builder) CloseOutput(closeOutput bool) *Builder
- func (b *Builder) Error(e func(err error) error) *Builder
- func (b *Builder) Filter(f func(object interface{}) (bool, error)) *Builder
- func (b *Builder) Input(in Iterator) *Builder
- func (b *Builder) InputF(fn func() (interface{}, error)) *Builder
- func (b *Builder) InputLimit(inputLimit int) *Builder
- func (b *Builder) Output(w Writer) *Builder
- func (b *Builder) OutputF(fn func(object interface{}) error) *Builder
- func (b *Builder) OutputLimit(outputLimit int) *Builder
- func (b *Builder) Run() error
- func (b *Builder) Transform(t func(inputObject interface{}) (interface{}, error)) *Builder
- type ChannelIterator
- type ChannelWriter
- type ErrInvalidKind
- type FunctionIterator
- type FunctionWriter
- type Iterator
- type LimitedIterator
- type MultiIterator
- type MultiWriter
- type SetIterator
- type SetWriter
- func (sw *SetWriter) Flush() error
- func (sw *SetWriter) Iterator() *SetIterator
- func (sw *SetWriter) Reset()
- func (sw *SetWriter) SliceInterface() []interface{}
- func (sw *SetWriter) SliceType() interface{}
- func (sw *SetWriter) Values() interface{}
- func (sw *SetWriter) WriteObject(object interface{}) error
- func (sw *SetWriter) WriteObjects(objects interface{}) error
- type SliceIterator
- type SliceWriter
- type SyncWriter
- type TransactionWriter
- type Writer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( FilterNotNil = func(object interface{}) (bool, error) { return object != nil, nil } FilterString = func(object interface{}) (bool, error) { _, ok := object.(string) return ok, nil } WriterStdout = NewFunctionWriter(func(object interface{}) error { _, err := fmt.Println(object) return err }) WriterStderr = NewFunctionWriter(func(object interface{}) error { _, err := fmt.Fprintln(os.Stderr, object) return err }) )
Functions ¶
func IteratorToWriter ¶
func IteratorToWriter(it Iterator, w Writer, transform func(inputObject interface{}) (interface{}, error), errorHandler func(err error) error, filter func(inputObject interface{}) (bool, error), inputLimit int, outputLimit int, closeOutput bool) error
IteratorToWriter reads objects from the provided iterator and writes them to the provided writer. If a transform function is given, then transforms the input objects before writing them. If an errorHandler is given, then propogates errors returned by the transformed function through the errorHandler. If the errorHandler returns a non-nil error, then processing will halt. If a filter is given, the input object, after transformation if applicable, is filtered. If the filter returns true and no error, then the object is passed to the writer. If the inputLimit >= 0, then reads the given number of objects from the input. If the outputLimit >= 0, then writes the given number of objecst to the writer.
Types ¶
type BatchWriter ¶
BatchWriter contains the WriteObjects and Flush functions for writing a batch of objects.
type BufferWriter ¶
type BufferWriter struct {
// contains filtered or unexported fields
}
BufferWriter wraps a buffer around an underlying writer. Once the buffer reaches capacity, it writes its values to the underlying writer. The Flush method will propagate to the underlying writer.
func NewBufferWriter ¶
func NewBufferWriter(writer Writer, capacity int) *BufferWriter
NewBufferWriter returns a new BufferWriter with the given capacity.
func (*BufferWriter) Close ¶
func (bw *BufferWriter) Close() error
func (*BufferWriter) Flush ¶
func (bw *BufferWriter) Flush() error
func (*BufferWriter) Reset ¶
func (bw *BufferWriter) Reset()
Reset creates a new underlying slice from the type of the original slice.
func (*BufferWriter) WriteObject ¶
func (bw *BufferWriter) WriteObject(object interface{}) error
func (*BufferWriter) WriteObjects ¶
func (bw *BufferWriter) WriteObjects(objects interface{}) error
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder helps build a pipeline
Example ¶
This examples hows how to use a builder.
input := []interface{}{"a", "b", "c", 1, 2, 3, false, true} it, err := NewSliceIterator(input) if err != nil { panic(err) } w := NewSliceWriterWithValues([]string{}) b := NewBuilder(). Input(it). Filter(func(inputObject interface{}) (bool, error) { // filter to only include strings _, ok := inputObject.(string) return ok, nil }). Output(w) err = b.Run() if err != nil { panic(err) } // the slice writer preserves the type of the initial values. fmt.Println(strings.Join(w.Values().([]string), "\n"))
Output: a b c
Example (SliceToSet) ¶
This examples hows how to use a builder.
// the initial values with duplicates input := []interface{}{"a", "b", "c", "a", "b"} it, err := NewSliceIterator(input) if err != nil { panic(err) } w := NewSetWriter() err = NewBuilder().Input(it).Output(w).Run() if err != nil { panic(err) } values := w.SliceInterface() // get values written as slice of type []interface{} // Sort the returned values sort.Slice(values, func(i, j int) bool { return fmt.Sprint(values[i]) < fmt.Sprint(values[j]) }) for _, value := range values { fmt.Println(value) }
Output: a b c
func (*Builder) CloseOutput ¶
CloseOutput sets the closeOutput for the pipeline.
func (*Builder) Error ¶
Error sets the error handler for the pipeline that catches errors from the transform function. If the error handler returns nil, then the pipeline continues as normal. If the error handler returns the original error (or a new one), then the pipeline bubbles up the error and exits.
func (*Builder) InputF ¶
InputF sets the input for the pipeline to a function by wrapping the provided function with pipe.FunctionIterator.
func (*Builder) InputLimit ¶
InputLimit sets the inputLimit for the pipeline.
func (*Builder) OutputF ¶
OutputF sets the output for the pipeline to a function by wrapping the provided function with pipe.FunctionWriter.
func (*Builder) OutputLimit ¶
OutputLimit sets the outputLimit for the pipeline.
type ChannelIterator ¶
type ChannelIterator struct {
// contains filtered or unexported fields
}
ChannelIterator iterates over a channel of values.
Example ¶
This examples hows how to use a channel iterator to read from a channel.
c := make(chan interface{}, 1000) c <- "a" c <- "b" c <- "c" close(c) it, err := NewChannelIterator(c) if err != nil { panic(err) } for { obj, err := it.Next() if err != nil { if err == io.EOF { break } panic(err) } fmt.Println(obj) }
Output: a b c
func NewChannelIterator ¶
func NewChannelIterator(values interface{}) (*ChannelIterator, error)
NewChannelIterator returns a new ChannelIterator.
func (*ChannelIterator) Next ¶
func (ci *ChannelIterator) Next() (interface{}, error)
type ChannelWriter ¶
type ChannelWriter struct {
// contains filtered or unexported fields
}
ChannelWriter passes each object to the callback function
Example ¶
This examples hows how to use a channel writer to write to a channel.
c := make(chan interface{}, 1000) w, err := NewChannelWriter(c) if err != nil { panic(err) } err = w.WriteObject("a") if err != nil { panic(err) } err = w.WriteObject("b") if err != nil { panic(err) } err = w.WriteObject("c") if err != nil { panic(err) } close(c) for v := range c { fmt.Println(v) }
Output: a b c
func NewChannelWriter ¶
func NewChannelWriter(channel interface{}) (*ChannelWriter, error)
func (*ChannelWriter) Close ¶
func (cw *ChannelWriter) Close() error
func (*ChannelWriter) Flush ¶
func (cw *ChannelWriter) Flush() error
func (*ChannelWriter) WriteObject ¶
func (cw *ChannelWriter) WriteObject(object interface{}) error
type ErrInvalidKind ¶
func (ErrInvalidKind) Error ¶
func (e ErrInvalidKind) Error() string
Error returns the error formatted as a string.
type FunctionIterator ¶
type FunctionIterator struct {
// contains filtered or unexported fields
}
FunctionIterator provides a simple wrapper over a callback function to allow it to iterate.
func NewFunctionIterator ¶
func NewFunctionIterator(callback func() (interface{}, error)) *FunctionIterator
func (*FunctionIterator) Next ¶
func (fi *FunctionIterator) Next() (interface{}, error)
type FunctionWriter ¶
type FunctionWriter struct {
// contains filtered or unexported fields
}
FunctionWriter passes each object to the callback function
Example ¶
This examples hows how to use a function writer to call a callback with each object.
w := NewFunctionWriter(func(object interface{}) error { fmt.Println(object) // print to stdout return nil }) err := w.WriteObject("a") if err != nil { panic(err) } err = w.WriteObject("b") if err != nil { panic(err) } err = w.WriteObject("c") if err != nil { panic(err) }
Output: a b c
func NewFunctionWriter ¶
func NewFunctionWriter(callback func(object interface{}) error) *FunctionWriter
func (*FunctionWriter) Flush ¶
func (sw *FunctionWriter) Flush() error
func (*FunctionWriter) WriteObject ¶
func (sw *FunctionWriter) WriteObject(object interface{}) error
type Iterator ¶
type Iterator interface {
Next() (interface{}, error)
}
Iterator contains the Next function that returns an object until it returns an io.EOF error.
type LimitedIterator ¶
type LimitedIterator struct {
// contains filtered or unexported fields
}
LimitedIterator returns an iterator that reads up to a given number of objects from the underlying reader. Once the maximum number of objects has been read or the underlying iterator has returned io.EOF, Next will return io.EOF. If the underlying iterator returns a non-nil, non-EOF error, Next will return that error. This approach is similiar to the io.LimitedReader.
Example ¶
si, err := NewSliceIterator([]interface{}{"a", "b", "c"}) if err != nil { panic(err) } li := NewLimitedIterator(si, 2) obj, err := li.Next() if err != nil { fmt.Println(err) } fmt.Println(obj) obj, err = li.Next() if err != nil { fmt.Println(err) } fmt.Println(obj) obj, err = li.Next() if err != nil { fmt.Println(err) } fmt.Println(obj)
Output: a b EOF <nil>
func NewLimitedIterator ¶
func NewLimitedIterator(iterator Iterator, limit int) *LimitedIterator
NewLimitedIterator returns a new LimitIterator that serialy iterators through the given iterators.
func (*LimitedIterator) Count ¶
func (li *LimitedIterator) Count() int
Count returns the current count of objects returned.
func (*LimitedIterator) Next ¶
func (li *LimitedIterator) Next() (interface{}, error)
type MultiIterator ¶
type MultiIterator struct {
// contains filtered or unexported fields
}
MultiIterator returns an Iterator that's the logical concatenation of the provided input iterators. They're read sequentially. Once all iterators have returned EOF, Next will return EOF. If any of the iterators return a non-nil, non-EOF error, Next will return that error. This approach is similiar to the io.MultiReader.
Example ¶
This examples shows how to use a multi iterator to iterate through multiple subiterators.
sia, err := NewSliceIterator([]interface{}{"a", "b", "c"}) if err != nil { panic(err) } sib, err := NewSliceIterator([]interface{}{1, 2, 3}) if err != nil { panic(err) } mi := NewMultiIterator(sia, sib) obj, err := mi.Next() if err != nil { panic(err) } fmt.Println(obj) obj, err = mi.Next() if err != nil { panic(err) } fmt.Println(obj) obj, err = mi.Next() if err != nil { panic(err) } fmt.Println(obj) obj, err = mi.Next() if err != nil { panic(err) } fmt.Println(obj) obj, err = mi.Next() if err != nil { panic(err) } fmt.Println(obj) obj, err = mi.Next() if err != nil { panic(err) } fmt.Println(obj)
Output: a b c 1 2 3
func NewMultiIterator ¶
func NewMultiIterator(iterators ...Iterator) *MultiIterator
NewMultiIterator returns a new MultiIterator that serialy iterators through the given iterators.
func (*MultiIterator) Next ¶
func (mi *MultiIterator) Next() (interface{}, error)
func (*MultiIterator) Push ¶
func (mi *MultiIterator) Push(it ...Iterator)
type MultiWriter ¶
type MultiWriter struct {
// contains filtered or unexported fields
}
MultiWriter creates a writer that duplicates its writes to all the provided writers.
func NewMultiWriter ¶
func NewMultiWriter(writers ...Writer) *MultiWriter
func (*MultiWriter) Close ¶
func (mw *MultiWriter) Close() error
func (*MultiWriter) Flush ¶
func (mw *MultiWriter) Flush() error
func (*MultiWriter) WriteObject ¶
func (mw *MultiWriter) WriteObject(object interface{}) error
func (*MultiWriter) WriteObjects ¶
func (mw *MultiWriter) WriteObjects(objects interface{}) error
type SetIterator ¶
type SetIterator struct {
// contains filtered or unexported fields
}
SetIterator iterates over the keys in a map.
Example ¶
This examples shows how to use a map iterator to iterate through the keys of a map.
input := map[string]struct{}{ "a": struct{}{}, "b": struct{}{}, "c": struct{}{}, } it, err := NewSetIterator(input) if err != nil { panic(err) } for { obj, err := it.Next() if err != nil { if err == io.EOF { break } panic(err) } fmt.Println(obj) }
Output:
func NewSetIterator ¶
func NewSetIterator(values interface{}) (*SetIterator, error)
NewSetIterator returns a new SetIterator.
func (*SetIterator) Next ¶
func (mi *SetIterator) Next() (interface{}, error)
type SetWriter ¶
type SetWriter struct {
// contains filtered or unexported fields
}
SetWriter contains the WriteObject and Flush functions for writing objects as keys to a set.
Example ¶
This examples shows how to use a SetWriter to write values to a set as keys to a map.
w := NewSetWriter() err := w.WriteObject("a") if err != nil { panic(err) } err = w.WriteObject("b") if err != nil { panic(err) } err = w.WriteObject("c") if err != nil { panic(err) } err = w.WriteObject("a") if err != nil { panic(err) } values := w.SliceInterface() // get values written as slice of type []interface{} // Sort the returned values sort.Slice(values, func(i, j int) bool { return fmt.Sprint(values[i]) < fmt.Sprint(values[j]) }) // Print the values fmt.Println(values)
Output: [a b c]
func NewSetWriter ¶
func NewSetWriter() *SetWriter
func NewSetWriterWithValues ¶
func NewSetWriterWithValues(initialValues interface{}) *SetWriter
func (*SetWriter) Flush ¶
Flush has no effect for SetWriter as all objects are immediately written to the underlying set.
func (*SetWriter) Iterator ¶
func (sw *SetWriter) Iterator() *SetIterator
func (*SetWriter) Reset ¶
func (sw *SetWriter) Reset()
Reset creates a new underlying set from the type of the original set.
func (*SetWriter) SliceInterface ¶
func (sw *SetWriter) SliceInterface() []interface{}
func (*SetWriter) WriteObject ¶
func (*SetWriter) WriteObjects ¶
type SliceIterator ¶
type SliceIterator struct {
// contains filtered or unexported fields
}
SliceIterator iterates over an array or slice of values.
Example ¶
This examples shows how to use a slice iterator to iterate through a slice of values.
it, err := NewSliceIterator([]interface{}{"a", "b", "c"}) if err != nil { panic(err) } obj, err := it.Next() if err != nil { panic(err) } fmt.Println(obj) obj, err = it.Next() if err != nil { panic(err) } fmt.Println(obj) obj, err = it.Next() if err != nil { panic(err) } fmt.Println(obj)
Output: a b c
func NewSliceIterator ¶
func NewSliceIterator(values interface{}) (*SliceIterator, error)
NewSliceIterator returns a new SliceIterator.
func (*SliceIterator) Next ¶
func (si *SliceIterator) Next() (interface{}, error)
type SliceWriter ¶
type SliceWriter struct {
// contains filtered or unexported fields
}
SliceWriter contains the WriteObject and Flush functions for writing objects to an underlying slice.
Example ¶
This examples shows how to use a slice writer to write values to a slice.
w := NewSliceWriter() err := w.WriteObject("a") if err != nil { panic(err) } err = w.WriteObject("b") if err != nil { panic(err) } err = w.WriteObject("c") if err != nil { panic(err) } values := w.Values() // get values written to slice fmt.Println(values)
Output: [a b c]
func NewSliceWriter ¶
func NewSliceWriter() *SliceWriter
func NewSliceWriterWithCapacity ¶
func NewSliceWriterWithCapacity(initialCapacity int) *SliceWriter
func NewSliceWriterWithValues ¶
func NewSliceWriterWithValues(initialValues interface{}) *SliceWriter
func (*SliceWriter) Flush ¶
func (sw *SliceWriter) Flush() error
func (*SliceWriter) Iterator ¶
func (sw *SliceWriter) Iterator() *SliceIterator
func (*SliceWriter) Reset ¶
func (sw *SliceWriter) Reset()
Reset creates a new underlying slice from the type of the original slice.
func (*SliceWriter) Values ¶
func (sw *SliceWriter) Values() interface{}
func (*SliceWriter) WriteObject ¶
func (sw *SliceWriter) WriteObject(object interface{}) error
func (*SliceWriter) WriteObjects ¶
func (sw *SliceWriter) WriteObjects(objects interface{}) error
type SyncWriter ¶
type SyncWriter struct {
// contains filtered or unexported fields
}
SyncWriter wraps a mutex around an underlying writer, so writes happen sequentially.
func NewSyncWriter ¶
func NewSyncWriter(writer Writer) *SyncWriter
NewSyncWriter returns a new SyncWriter.
func (*SyncWriter) Close ¶
func (sw *SyncWriter) Close() error
func (*SyncWriter) Flush ¶
func (sw *SyncWriter) Flush() error
func (*SyncWriter) WriteObject ¶
func (sw *SyncWriter) WriteObject(object interface{}) error
func (*SyncWriter) WriteObjects ¶
func (sw *SyncWriter) WriteObjects(objects interface{}) error
type TransactionWriter ¶
type TransactionWriter struct {
// contains filtered or unexported fields
}
TransactionWriter opens up a transaction to the underlying writer and closes the underlying writer after the objects are written.
func NewTransactionWriter ¶
func NewTransactionWriter(opener func() (Writer, error), closer func(w Writer) error, async bool) (*TransactionWriter, error)
NewTransactionWriter returns a new TransactionWriter with the opener function and optional closer function.
func (*TransactionWriter) Close ¶
func (tw *TransactionWriter) Close() error
func (*TransactionWriter) Flush ¶
func (tw *TransactionWriter) Flush() error
func (*TransactionWriter) WriteObject ¶
func (tw *TransactionWriter) WriteObject(object interface{}) error
func (*TransactionWriter) WriteObjects ¶
func (tw *TransactionWriter) WriteObjects(objects interface{}) error
Source Files ¶
- BatchWriter.go
- BufferWriter.go
- Builder.go
- ChannelIterator.go
- ChannelWriter.go
- ErrInvalidKind.go
- FunctionIterator.go
- FunctionWriter.go
- Iterator.go
- IteratorToWriter.go
- LimitedIterator.go
- MultiIterator.go
- MultiWriter.go
- ReadAll.go
- SetIterator.go
- SetWriter.go
- SliceIterator.go
- SliceWriter.go
- SyncWriter.go
- TransactionWriter.go
- Writer.go
- pipe.go