functional

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2021 License: BSD-3-Clause Imports: 6 Imported by: 6

Documentation

Overview

Package functional provides functional programming constructs.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// Done indicates that the end of a Stream has been reached
	Done = errors.New("functional: End of Stream reached.")
	// Filters return Skipped to indicate that the current value should be
	// skipped.
	Skipped = errors.New("functional: Value skipped.")
)

Functions

func EmitAll

func EmitAll(s Stream, e Emitter) (opened bool)

EmitAll emits all of Stream s to Emitter e. If the Stream for e becomes closed, EmitAll returns false. Otherwise EmitAll returns true.

func MultiConsume

func MultiConsume(s Stream, ptr interface{}, copier Copier, consumers ...Consumer) (errors []error)

MultiConsume consumes the values of s, a Stream of T, sending those T values to each Consumer in consumers. MultiConsume consumes values from s until no Consumer in consumers is accepting values. ptr is a *T that receives the values from s. copier is a Copier of T used to copy T values to the Streams sent to each Consumer in consumers. Passing null for copier means use simple assignment. MultiConsume returns all the errors from the individual Consume methods. The order of the returned errors matches the order of the consumers.

func WaitForClose

func WaitForClose(e Emitter)

Use WaitForClose in emitting functions (See description for NewGenerator). WaitForClose yields execution to the caller until caller calls Close on associated Stream while returning Done each time caller calls Next. An emitting function calls WaitForClose when it is done emitting values. but before it does any final cleanup.

Types

type CompositeMapper

type CompositeMapper struct {
	// contains filtered or unexported fields
}

CompositeMapper represents Mappers composed together e.g f(g(x)). Programs using CompositeMapper should typically store and pass them as values, not pointers. A CompositeMapper can be used by multiple goroutines simultaneously if its underlying Mappers can be used by multiple goroutines simultaneously. The zero value for CompositeMapper is a Mapper that maps nothing (the Map method always returns Skipped).

func Compose

func Compose(f Mapper, g Mapper, c Creater) CompositeMapper

Compose composes two Mappers together into one e.g f(g(x)). If g maps type T values to type U values, and f maps type U values to type V values, then Compose returns a CompositeMapper mapping T values to V values. c is a Creater of U. Each time Map is called on returned CompositeMapper, it invokes c to create a U value to receive the intermediate result from g.

func (CompositeMapper) Fast

func (c CompositeMapper) Fast() Mapper

Fast returns a quicker version of this CompositeMapper that cannot be used by multiple goroutines simultaneously as if FastCompose were used.

func (CompositeMapper) Map

func (c CompositeMapper) Map(srcPtr interface{}, destPtr interface{}) error

type Consumer

type Consumer interface {
	// Consume consumes values from Stream s.
	Consume(s Stream) error
}

A Consumer of T consumes the T values from a Stream of T.

func CompositeConsumer

func CompositeConsumer(
	ptr interface{},
	copier Copier,
	consumers ...Consumer) Consumer

CompositeConsumer returns a Consumer that sends values it consumes to each one of consumers. The returned Consumer's Consume method reports an error if the Consume method in any of consumers reports an error. ptr is a *T where T values being consumed are temporarily held; copier knows how to copy the values of type T being consumed (can be nil if simple assignment should be used). If caller passes a slice for consumers, no copy is made of it.

func FilterConsumer

func FilterConsumer(c Consumer, f Filterer) Consumer

FilterConsumer creates a new Consumer whose Consume method applies f to the Stream before passing it onto c.

func MapConsumer

func MapConsumer(c Consumer, m Mapper, ptr interface{}) Consumer

MapConsumer creates a new Consumer whose Consume method applies m to the Stream before passing it onto c. c cnsumes U values; m maps T values to U Values; ptr is *T to temporarily hold T values, and this function returns a consumer of T values.

func ModifyConsumer

func ModifyConsumer(
	c Consumer,
	f func(s Stream) Stream) Consumer

ModifyConsumer returns a new Consumer that applies f to its Stream and then gives the resulting Stream to c. If c is a Consumer of T and f takes a Stream of U and returns a Stream of T, then ModifyConsumer returns a Consumer of U. The Consume method of the returned Consumer will close the Stream that f returns but not the original Stream. It does this by wrapping the original Stream with NoCloseStream.

func NilConsumer

func NilConsumer() Consumer

NilConsumer returns a consumer that consumes no values.

type ConsumerFunc

type ConsumerFunc func(s Stream) error

ConsumerFunc is an adapter that allows ordinary functions to be used as

Consumers.

func (ConsumerFunc) Consume

func (f ConsumerFunc) Consume(s Stream) error

type Copier

type Copier func(src, dest interface{})

Copier of T copies the value at src to the value at dest. src and dest are of type *T and both point to pre-initialized T.

type Creater

type Creater func() interface{}

Creater of T creates a new, pre-initialized, T and returns a pointer to it.

type Emitter

type Emitter interface {

	// EmitPtr returns the pointer supplied to Next of associated Stream.
	// If Close is called on associated Stream, EmitPtr returns nil and false.
	EmitPtr() (ptr interface{}, streamOpened bool)

	// Return causes Next of associated Stream to return err. Return yields
	// execution to the caller of Next blocking until it calls Next again or
	// Close. Finally, Return returns the pointer passed to Next or nil and
	// false if caller called Close.
	Return(err error) (ptr interface{}, streamOpened bool)
}

Emitter allows a function to emit values to an associated Stream.

type Filterer

type Filterer interface {
	// Filter returns nil if value ptr points to should be included or Skipped
	// if value should be skipped. ptr must be a *T.
	Filter(ptr interface{}) error
}

Filterer of T filters values in a Stream of T.

func All

func All(fs ...Filterer) Filterer

All returns a Filterer that returns nil if all of the fs return nil. Otherwise it returns the first error encountered.

func Any

func Any(fs ...Filterer) Filterer

Any returns a Filterer that returns Skipped if all of the fs return Skipped. Otherwise it returns nil or the first error not equal to Skipped.

func NewFilterer

func NewFilterer(f func(ptr interface{}) error) Filterer

NewFilterer returns a new Filterer of T. f takes a *T returning nil if T value pointed to it should be included or Skipped if it should not be included. f can return other errors too.

type Mapper

type Mapper interface {
	// Map does the mapping storing the mapped value at destPtr.
	// If Mapper returns Skipped, then no mapped value is stored at destPtr.
	// Map may return other errors. srcPtr is a *T; destPtr is a *U
	Map(srcPtr interface{}, destPtr interface{}) error
}

Mapper maps a type T value to a type U value in a Stream.

func FastCompose

func FastCompose(f Mapper, g Mapper, ptr interface{}) Mapper

FastCompose works like Compose except that it uses a *U value instead of a Creater of U to link f ang g. ptr is the *U value. Intermediate results from g are stored at ptr. Unlike Compose, the Mapper that FastCompose returns cannot be used by multiple goroutines simultaneously since what ptr points to changes with each call to Map.

func NewMapper

func NewMapper(m func(srcPtr interface{}, destPtr interface{}) error) Mapper

NewMapper returns a new Mapper mapping T values to U Values. In f, srcPtr is a *T and destPtr is a *U pointing to pre-allocated T and U values respectively. f returns Skipped if mapped value should be skipped. f can also return other errors.

type Rows

type Rows interface {
	// Next advances to the next row. Next returns false if there is no next row.
	// Every call to Scan, even the first one, must be preceded by a call to Next.
	Next() bool
	// Reads the values out of the current row. args are pointer types.
	Scan(args ...interface{}) error
}

Rows represents rows in a database table. Most database API already have a type that implements this interface

type Stream

type Stream interface {
	// Next emits the next value in this Stream of T.
	// If Next returns nil, the next value is stored at ptr.
	// If Next returns Done, then the end of the Stream has been reached,
	// and the value ptr points to is unspecified. ptr must be a *T.
	// Once Next returns Done, it should continue to return Done.
	Next(ptr interface{}) error
	// Caller calls Close when it is finished with this Stream.
	// The result of calling Next after Close is unspecified.
	io.Closer
}

Stream is a sequence emitted values. Each call to Next() emits the next value in the stream. A Stream that emits values of type T is a Stream of T.

func Concat

func Concat(s ...Stream) Stream

Concat concatenates multiple Streams into one. If x = (x1, x2, ...) and y = (y1, y2, ...) then Concat(x, y) = (x1, x2, ..., y1, y2, ...). Calling Close on returned Stream closes all underlying streams. If caller passes a slice to Concat, no copy is made of it.

func Count

func Count() Stream

Count returns an infinite Stream of int which emits all values beginning at 0. Calling Close on returned Stream is a no-op.

func CountFrom

func CountFrom(start, step int) Stream

CountFrom returns an infinite Stream of int emitting values beginning at start and increasing by step. Calling Close on returned Stream is a no-op.

func Cycle

func Cycle(f func() Stream) Stream

Cycle(f) is equivalent to Flatten(NewStreamFromStreamFunc(f))

func Deferred

func Deferred(f func() Stream) Stream

Deferred(f) is equivalent to Flatten(Slice(NewStreamFromStreamFunc(f), 0, 1))

func DropWhile

func DropWhile(f Filterer, s Stream) Stream

DropWhile returns a Stream that emits the values in s starting at the first value where the Filter method of f returns Skipped. The returned Stream's Next method reports any errors that the Filter method of f returns until it returns Skipped. f is a Filterer of T; s is a Stream of T. Calling Close on returned Stream closes s.

func Filter

func Filter(f Filterer, s Stream) Stream

Filter filters values from s, returning a new Stream of T. The returned Stream's Next method reports any errors besides Skipped that the Filter method of f returns. f is a Filterer of T; s is a Stream of T. Calling Close on returned Stream closes s.

func Flatten

func Flatten(s Stream) Stream

Flatten converts a Stream of Stream of T into a Stream of T. The returned Stream automatically closes each emitted Stream from s propagating any error from closing through Next. Calling Close on returned Stream closes s and the last emitted Stream from s currently being read.

func Map

func Map(f Mapper, s Stream, ptr interface{}) Stream

Map applies f, which maps a type T value to a type U value, to a Stream of T producing a new Stream of U. If s is (x1, x2, x3, ...), Map returns the Stream (f(x1), f(x2), f(x3), ...). If f returns Skipped for a T value, then the corresponding U value is left out of the returned stream. ptr is a *T providing storage for emitted values from s. If f is a CompositeMapper, Fast() is called on it automatically. Calling Close on returned Stream closes s.

func Merge

func Merge(
	creater Creater,
	copier Copier,
	before func(lhs, rhs interface{}) bool,
	streams ...Stream) Stream

Merge merges multiple streams that emit their elments in order into a single stream that emits all the elements in order. Calling Close on returned Stream closes all underlying streams. If caller passes a slice to Merge, no copy is made of it. Each Stream in streams must emit elements in order according to before, and the resulting Stream emits all elements in order according to before. before returns true if the T element at lhs comes before the T element at rhs. lhs and rhs are *T.

func NewGenerator

func NewGenerator(f func(e Emitter) error) Stream

NewGenerator creates a Stream that emits the values from emitting function f. First, f emits values by calling EmitPtr and Return on the Emitter passed to it. When When f is through emitting values or when EmitPtr or Return returns false for streamOpened, f calls WaitForClose(), performs any necessary cleanup and finally returns the error that Close() on the associated Stream will return. Its very important that f calls WaitForClose() before performing cleanup to ensure that the cleanup is done after Close() is called on the associated Stream. Caller must call Close() on returned Stream or else the goroutine operating the Stream will never exit. Note that execution of f begins the first time the caller calls Next() or Close() on associated Stream.

Example
package main

import (
	"fmt"
	"github.com/keep94/gofunctional3/functional"
	"time"
)

// DateCount represents a count of some occurrence by date
type DateCount struct {
	Date  time.Time
	Count int64
}

// YearCount represents a count of some occurrence by year
type YearCount struct {
	Year  int
	Count int64
}

// ByYear takes a Stream of DateCount instances and returns a Stream of
// YearCount instances by totaling the counts in the DateCount instances
// by year. The DateCount instances must be ordered by Date. Caller must Close
// the returned Stream. Calling Close on returned Stream also closes s.
func ByYear(s functional.Stream) functional.Stream {
	return functional.NewGenerator(func(e functional.Emitter) error {
		var ptr interface{}
		var opened bool
		var incoming DateCount

		// As soon as caller calls Close() we quit without pulling unnecessary
		// values from underlying stream.
		if ptr, opened = e.EmitPtr(); !opened {
			return s.Close()
		}
		var err error

		// We get the first date while propagating any errors to the caller.
		// Note that we stay in this loop until either we get a first date or
		// caller calls Close()
		for err = s.Next(&incoming); err != nil; err = s.Next(&incoming) {
			// Propagate error and yield execution to caller. Then get the next
			// pointer caller pases to Next().
			if ptr, opened = e.Return(err); !opened {
				return s.Close()
			}
		}
		currentYear := incoming.Date.Year()

		// Running total for current year
		sum := incoming.Count

		// When Done marker is reached, we have to emit the count of the final year.
		for err = s.Next(&incoming); err != functional.Done; err = s.Next(&incoming) {

			// Propagate any errors to caller.
			if err != nil {
				if ptr, opened = e.Return(err); !opened {
					return s.Close()
				}
				continue
			}
			year := incoming.Date.Year()

			// If year changed, emit the count for the current year.
			// Then change currentYear and and make sum be the running total for
			// that year.
			if year != currentYear {
				*ptr.(*YearCount) = YearCount{Year: currentYear, Count: sum}
				if ptr, opened = e.Return(nil); !opened {
					return s.Close()
				}
				sum = incoming.Count
				currentYear = year
			} else {
				sum += incoming.Count
			}
		}
		// Emit the final year.
		*ptr.(*YearCount) = YearCount{Year: currentYear, Count: sum}

		// Note that we return nil, not functional.Done, since we are emitting the
		// count for the final year. The call to WaitForClose() takes
		// care of returning functional.Done to the caller until the caller calls
		// Close()
		e.Return(nil)
		functional.WaitForClose(e)
		return s.Close()
	})
}

func YMD(year, month, day int) time.Time {
	return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC)
}

func main() {
	s := functional.NewStreamFromPtrs(
		[]*DateCount{
			{YMD(2013, 5, 24), 13},
			{YMD(2013, 4, 1), 5},
			{YMD(2013, 1, 1), 8},
			{YMD(2012, 12, 31), 24},
			{YMD(2012, 5, 26), 10}},
		nil)
	s = ByYear(s)
	defer s.Close()
	var yc YearCount
	for err := s.Next(&yc); err == nil; err = s.Next(&yc) {
		fmt.Printf("%d: %d\n", yc.Year, yc.Count)
	}
}
Output:

2013: 26
2012: 34

func NewStreamFromPtrs

func NewStreamFromPtrs(aSlice interface{}, c Copier) Stream

NewStreamFromPtrs converts a []*T into a Stream of T. aSlice is a []*T. c is a Copier of T. If c is nil, regular assignment is used. Calling Close on returned Stream is a no-op.

func NewStreamFromStreamFunc

func NewStreamFromStreamFunc(f func() Stream) Stream

NewStreamFromStreamFunc creates a Stream of Streams by repeatedly calling f. Calling Close on returned Stream is a no-op.

func NewStreamFromValues

func NewStreamFromValues(aSlice interface{}, c Copier) Stream

NewStreamFromValues converts a []T into a Stream of T. aSlice is a []T. c is a Copier of T. If c is nil, regular assignment is used. Calling Close on returned Stream is a no-op.

func NilStream

func NilStream() Stream

NilStream returns a Stream that emits no values. Calling Close on returned Stream is a no-op.

func NoCloseStream

func NoCloseStream(s Stream) Stream

NoCloseStream returns a Stream just like s but with a Close method that does nothing. This function is useful for preventing a stream from closing its underlying stream.

func ReadLines

func ReadLines(r io.Reader) Stream

ReadLines returns the lines of text in r separated by either "\n" or "\r\n" as a Stream of string. The emitted string types do not contain the end of line characters. Calling Close on returned Stream does nothing.

func ReadLinesAndClose

func ReadLinesAndClose(r io.ReadCloser) Stream

ReadLinesAndClose works just like ReadLines except that calling Close on returned Stream closes r.

func ReadRows

func ReadRows(r Rows) Stream

ReadRows returns the rows in a database table as a Stream of Tuple. Calling Close on returned stream does nothing.

func Slice

func Slice(s Stream, start int, end int) Stream

Slice returns a Stream that will emit elements in s starting at index start and continuing to but not including index end. Indexes are 0 based. If end is negative, it means go to the end of s. Calling Close on returned Stream closes s.

func TakeWhile

func TakeWhile(f Filterer, s Stream) Stream

TakeWhile returns a Stream that emits the values in s until the Filter method of f returns Skipped. The returned Stream's Next method reports any errors besides Skipped that the Filter method of f returns. Calling Close on returned Stream closes s. f is a Filterer of T; s is a Stream of T.

type Tuple

type Tuple interface {
	// Ptrs returns a pointer to each field in the tuple.
	Ptrs() []interface{}
}

Tuple represents a tuple of values that ReadRows emits

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL