fuego

package module
v11.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

README

ƒuego logo

ƒuego - Functional Experiment in Go

Tweet

fuego goreportcard

Buy Me A Coffee

ƒuego example ƒuego example

Table of content

Overview

Making Go come to functional programming.

This is a research project in functional programming which I hope will prove useful!

ƒuego brings a few functional paradigms to Go. The intent is to save development time while promoting code readability and reduce the risk of complex bugs.

I hope you will find it useful!

Have fun!!

(toc)

Type Parameters

Starting with version v11.0.0, ƒuego uses Go 1.18's Type Parameters.

It is a drastic design change and fundamentally incompatible with previous versions of ƒuego.

Use v10 or prior if you need the pre-Go1.18 version of ƒuego that is based on interface Entry.

(toc)

Documentation

The code documentation and some examples can be found on godoc.

The tests form the best source of documentation. ƒuego comes with a good collection of unit tests and testable Go examples. Don't be shy, open them up and read them and tinker with them!

Note:
Most tests use unbuffered channels to help detect deadlocks. In real life scenarios, it is recommended to use buffered channels for increased performance.

(toc)

Installation

Download

go get github.com/seborama/fuego

Or for a specific version:

go get gopkg.in/seborama/fuego.v11

Import in your code

You can import the package in the usual Go fashion.

To simplify usage, you can use an alias:

package sample

import ƒ "gopkg.in/seborama/fuego.v11"

...or import as a blank import:

package sample

import _ "gopkg.in/seborama/fuego.v11"

Note: dot imports should work just fine but the logger may be disabled, unless you initialised the zap global logger yourself.

(toc)

Debugging

Set environment variable FUEGO_LOG_LEVEL to enable logging to the desired level.

(toc)

Example Stream

strs := []string{
    "a",
    "b",
    "bb",
    "bb",
    "cc",
    "ddd",
}
    
Collect(
  NewStreamFromSlice[string](strs, 100).
    Filter(isString).
    Distinct(stringHash),
  GroupingBy(
    stringLength,
    Mapping(
      stringToUpper,
      Filtering(
        stringLengthGreaterThan(1),
        ToSlice[string](),
      ),
    ),
  ),
)

// result: map[1:[] 2:[BB CC] 3:[DDD]]

(toc)

Contributions

Contributions and feedback are welcome.

For contributions, you must develop in TDD fashion and ideally provide Go testable examples (if meaningful).

If you have an idea to improve ƒuego, please share it via an issue.

And if you like ƒuego give it a star to show your support for the project - it will put a smile on my face! 😊

Thanks!!

(toc)

The Golden rules of the game

  1. Producers close their channel.

  2. Consumers do not close channels.

  3. Producers and consumers should be running in separate Go routines to prevent deadlocks when the channels' buffers fill up.

(toc)

Pressure

Go channels support buffering that affects the behaviour when combining channels in a pipeline.

When the buffer of a Stream's channel of a consumer is full, the producer will not be able to send more data through to it. This protects downstream operations from overloading.

Presently, a Go channel cannot dynamically change its buffer size. This prevents from adapting the stream flexibly. Constructs that use 'select' on channels on the producer side can offer opportunities for mitigation.

(toc)

Features summary

Streams:

  • Stream:
    • Filter
    • Map / FlatMap
    • Reduce
    • GroupBy
    • All/Any/None -Match
    • Intersperse
    • Distinct
    • Head* / Last* / Take* / Drop*
    • StartsWith / EndsWith
    • ForEach / Peek
    • ...
  • ComparableStream
  • MathableStream

Functional Types:

  • Optional
  • Predicate

Functions:

  • Consumer / BiConsumer
  • Function / BiFunction
  • StreamFunction

Collectors:

  • GroupingBy
  • Mapping
  • FlatMapping
  • Filtering
  • Reducing
  • ToSlice
  • ToMap*

Check the godoc for full details.

(toc)

Concurrency

As of v8.0.0, a new concurrent model offers to process a stream concurrently while preserving order.

This is not possible yet with all Stream methods but it is available with e.g. Stream.Map.

Notes on concurrency

Concurrent streams are challenging to implement owing to ordering issues in parallel processing. At the moment, the view is that the most sensible approach is to delegate control to users. Multiple ƒuego streams can be created and data distributed across as desired. This empowers users of ƒuego to implement the desired behaviour of their pipelines.

Stream has some methods that fan out (e.g. ForEachC). See the godoc for further information and limitations.

I recommend Rob Pike's slides on Go concurrency patterns:

As a proof of concept and for facilitation, ƒuego has a CStream implementation to manage concurrently a collection of Streams.

(toc)

Collectors

A Collector is a mutable reduction operation, optionally transforming the accumulated result.

Collectors can be combined to express complex operations in a concise manner.
Simply put, a collector allows the creation of bespoke actions on a Stream.

ƒuego exposes a number of functional methods such as MapToInt, Head, LastN, Filter, etc...
Collectors also provide a few functional methods.

But... what if you need something else? And it is not straightforward or readable when combining the existing methods ƒuego offers?

Enters Collector: implement your own requirement functionally!
Focus on what needs doing in your streams (and delegate the details of the how to the implementation of your Collector).

(toc)

Golang, Receivers and Functions

Some tests (e.g. TestCollector_Filtering) are using receiver Getter methods in guise of Function[T, R any]. Here is the explanation how this is possible.

Function[T, R any] is defined as func(T) R.

A method Getter is typically defined as func (T) Property() R {...}.

With t as the receiver, Go allows t.Property() be called as T.Property(t). This is a func(T) R and hence a Function[T, R any].

Example - TestCollector_Filtering:

employee.Department(employees[0]) is the same as employees[0].Department(), and of course, they both return a string.

The first syntax has one advantage for our purpose though: it is a Function[T, R any].

(toc)

Known limitations

  • several operations may be memory intensive or poorly performing.

No parameterised method in Go

Go 1.18 brings typed parameters. However, parameterised methods are not allowed.

This prevents the Map() method of Stream from mapping to, and from returning, a new typed parameter.

To circumvent this, we need to use a decorator function to re-map the Stream.

This can lead to a leftward-growing chain of decorator function calls that makes the intent opaque:

ReStream(
  ReStream(is, Stream[int]{}).Map(float2int),
  Stream[string]{}).Map(int2string)
// This is actually performing: Stream.Map(float2int).Map(int2string)

ƒuego includes a set of casting functions that reduce the visually leftward-growing chain of decorators while preserving a natural functional flow expression:

C(C(C(
  s.
    Map(float2int_), Int).
    Map(int2string_), String).
    Map(string2int_), Int).
    ForEach(print[int])
// This is actually performing: s.Map(float2int).Map(int2string).Map(string2int).ForEach(print)

While not perfect, this is the best workable compromise I have obtained thus far.

(toc)

Performance issues when using numerous parameterised methods in Go 1.18

As a result of this issue, an experiment to add MapTo<native_type>() Stream[<native_type>] is disabled.

Instead, use function CC (ComparableStream Cast) to access Min(), Max(), and MC (MathableStream Cast) to access Sum().

(toc)

Buy Me A Coffee

Documentation

Overview

Package fuego provides various functional facilities.

//////////////// Important note: ////////////////

Go does not yet support parameterised methods: https://go.googlesource.com/proposal/+/master/design/43651-type-parameters.md#no-parameterized-methods

The below construct is not currently possible:

func (s Stream[T]) Map[R any](mapper Function[T, R]) Stream[R] {...}
                      ^^^^^^^ no!

One option would be to make `Map` a function rather than a method but constructs would be chained right-to-left rather than left-to-right, which I think is awkward. Example: "Map(Map(stream,f1),f2)" instead of "stream.Map(f1).Map(f2)".

A syntactically lighter approach is provided with `SC“ and `C“. See functions `SC“ and `C `for casting Stream[Any] to a typed Stream[T any].

Go 1.18 suffers from a performance issue:

Index

Constants

View Source
const (
	Bool       = false
	Int        = int(0)
	Int8       = int8(0)
	Int16      = int16(0)
	Int32      = int32(0)
	Int64      = int64(0)
	Uint       = uint(0)
	Uint8      = uint8(0)
	Uint16     = uint16(0)
	Uint32     = uint32(0)
	Uint64     = uint64(0)
	Float32    = float32(0)
	Float64    = float64(0)
	Complex64  = complex64(0)
	Complex128 = complex128(0)
	String     = ""
)

Useful constants that represent native go types.

View Source
const PanicCollectorMissingAccumulator = "collector missing accumulator"

PanicCollectorMissingAccumulator signifies that the accumulator of a Collector was not provided.

View Source
const PanicCollectorMissingFinisher = "collector missing finisher"

PanicCollectorMissingFinisher signifies that the Finisher of a Collector was not provided.

View Source
const PanicCollectorMissingSupplier = "collector missing supplier"

PanicCollectorMissingSupplier signifies that the Supplier of a Collector was not provided.

View Source
const PanicDuplicateKey = "duplicate key"

PanicDuplicateKey signifies that an attempt was made to duplicate a key in a container (such as a map).

View Source
const PanicMissingChannel = "stream requires a channel"

PanicMissingChannel signifies that the Stream is missing a channel.

View Source
const PanicNilNotPermitted = "nil not permitted"

PanicNilNotPermitted signifies that the `nil` value is not allowed in the context.

View Source
const PanicNoSuchElement = "no such element"

PanicNoSuchElement signifies that the requested element is not present. Examples: when the Stream is empty, or when an Optional does not have a value.

Variables

View Source
var (
	SBool       = []bool{}
	SInt        = []int{}
	SInt8       = []int8{}
	SInt16      = []int16{}
	SInt32      = []int32{}
	SInt64      = []int64{}
	SUint       = []uint{}
	SUint8      = []uint8{}
	SUint16     = []uint16{}
	SUint32     = []uint32{}
	SUint64     = []uint64{}
	SFloat32    = []float32{}
	SFloat64    = []float64{}
	SComplex64  = []complex64{}
	SComplex128 = []complex128{}
	SString     = []string{}
)

Useful variables that represent native go types. nolint: gochecknoglobals

View Source
var (
	BoolPtr       = ptr(Bool)
	IntPtr        = ptr(Int)
	Int8Ptr       = ptr(Int8)
	Int16Ptr      = ptr(Int16)
	Int32Ptr      = ptr(Int32)
	Int64Ptr      = ptr(Int64)
	UintPtr       = ptr(Uint)
	Uint8Ptr      = ptr(Uint8)
	Uint16Ptr     = ptr(Uint16)
	Uint32Ptr     = ptr(Uint32)
	Uint64Ptr     = ptr(Uint64)
	Float32Ptr    = ptr(Float32)
	Float64Ptr    = ptr(Float64)
	Complex64Ptr  = ptr(Complex64)
	Complex128Ptr = ptr(Complex128)
	StringPtr     = ptr(String)
)

Useful variables that represent native go types. nolint: gochecknoglobals

View Source
var (
	SBoolPtr       = []*bool{}
	SIntPtr        = []*int{}
	SInt8Ptr       = []*int8{}
	SInt16Ptr      = []*int16{}
	SInt32Ptr      = []*int32{}
	SInt64Ptr      = []*int64{}
	SUintPtr       = []*uint{}
	SUint8Ptr      = []*uint8{}
	SUint16Ptr     = []*uint16{}
	SUint32Ptr     = []*uint32{}
	SUint64Ptr     = []*uint64{}
	SFloat32Ptr    = []*float32{}
	SFloat64Ptr    = []*float64{}
	SComplex64Ptr  = []*complex64{}
	SComplex128Ptr = []*complex128{}
	SStringPtr     = []*string{}
)

Useful variables that represent native go types. nolint: gochecknoglobals

Functions

func Collect

func Collect[T, A, R any](s Stream[T], c Collector[T, A, R]) R

Collect reduces and optionally mutates the stream with the supplied Collector.

This is a continuous terminal operation and hence expects the producer to close the stream in order to complete.

func Concatenate

func Concatenate[T string](a, b T) T

Concatenate is BiFunction that returns the concatenation of the two strings passed to it.

func Identity

func Identity[T any](v T) T

Identity is a basic Function that returns the original value passed to it, unmodified.

func IdentityFinisher

func IdentityFinisher[T any](t T) T

IdentityFinisher is a basic finisher that returns the original value passed to it, unmodified.

func Max

func Max[T Comparable](a, b T) T

Max is a BiFunction that returns the greatest of 2 values.

func Min

func Min[T Comparable](a, b T) T

Min is a BiFunction that returns the smallest of 2 values.

func Sum

func Sum[T Mathable](a, b T) T

Sum is a BiFunction that returns the sum of 2 values.

Types

type Any

type Any any

Any is an alias for type `any`.

func ToAny

func ToAny[T any](v T) Any

ToAny is a basic Function that returns the original value passed to it, cast to an 'Any' type.

type BiConsumer

type BiConsumer[T, U any] func(T, U)

BiConsumer that accepts two arguments and does not return any value.

type BiFunction

type BiFunction[T, U, R any] func(T, U) R

BiFunction that accepts two arguments and produces a result.

type BinaryOperator

type BinaryOperator[T any] func(T, T) T

BinaryOperator that accepts two arguments of the same type and produces a result of the same type. This is a special case of BiFunction.

type Collector

type Collector[T, A, R any] struct {
	// contains filtered or unexported fields
}

A Collector is a mutable reduction operation, optionally transforming the accumulated result.

Collectors can be combined to express complex operations in a concise manner.

In other words, a collector allows creating custom actions on a Stream. **fuego** comes shipped with a number of methods such as `MapToInt`, `Head`, `LastN`, `Filter`, etc, and Collectors also provide a few additional methods. But what if you need something else? And it is straightforward or readable when combining the existing methods fuego offers? Enters `Collector`: implement you own requirement functionally! Focus on *what* needs to be done in your streams (and delegate the details of the *how* to the implementation of your `Collector`).

Type T: type of input elements to the reduction operation Type A: mutable accumulation type of the reduction operation (often hidden as an implementation detail) Type R: result type of the reduction operation.

func Filtering

func Filtering[T, A, R any](predicate Predicate[T], collector Collector[T, A, R]) Collector[T, A, R]

Filtering filters the entries a Collector accepts to a subset that satisfy the given predicate.

func FlatMapping

func FlatMapping[U, T, A, R any](mapper StreamFunction[T, U], collector Collector[U, A, R]) Collector[T, A, R]

FlatMapping adapts the Entries a Collector accepts to another type by applying a flat mapping function which maps input elements to a `Stream`.

func GroupingBy

func GroupingBy[T any, K comparable, A, D any](classifier Function[T, K], downstream Collector[T, A, D]) Collector[T, map[K]A, map[K]D]

GroupingBy groups the elements of the downstream Collector by classifying them with the provided classifier function.

Type T: the type of the input elements Type K: the type of the keys Type A: the intermediate accumulation type of the downstream collector Type D: the result type of the downstream reduction

classifier: a classifier function mapping input elements to keys downstream: a Collector implementing the downstream reduction

func Mapping

func Mapping[T, U, A, R any](mapper Function[T, U], downstream Collector[U, A, R]) Collector[T, A, R]

Mapping adapts a Collector with elements of type U to a collector with elements of type T.

func NewCollector

func NewCollector[T, A, R any](supplier Supplier[A], accumulator BiFunction[A, T, A], finisher Function[A, R]) Collector[T, A, R]

NewCollector creates a new Collector.

func Reducing

func Reducing[T any](f2 BiFunction[T, T, T]) Collector[T, Optional[T], T]

Reducing returns a collector that performs a reduction of its input elements using the provided BiFunction.

func ToMap

func ToMap[T any, K comparable, V any](keyMapper Function[T, K], valueMapper Function[T, V]) Collector[T, map[K]V, map[K]V]

ToMap returns a collector that accumulates the input entries into a Go map. Type T: type from which the elements are accumulated in the map. Type K: type of the keys derived from T. Type V: type of the values derived from T.

func ToMapWithMerge

func ToMapWithMerge[T any, K comparable, V any](keyMapper Function[T, K], valueMapper Function[T, V], mergeFn BiFunction[V, V, V]) Collector[T, map[K]V, map[K]V]

ToMapWithMerge returns a collector that accumulates the input entries into a Go map. Key collision strategy is managed by mergeFn. Type T: type from which the elements are accumulated in the map. Type K: type of the keys derived from T. Type V: type of the values derived from T.

func ToSlice

func ToSlice[T any]() Collector[T, []T, []T]

ToSlice returns a collector that accumulates the input entries into a Go slice. Type T: type of the elements accumulated in the slice.

type Comparable

type Comparable interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
		~float32 | ~float64 |
		string
}

Comparable is a constraint that matches any type that supports the operators: >= <= > < == != .

type ComparableStream

type ComparableStream[T Comparable] struct {
	Stream[T]
}

ComparableStream is a stream of Comparable type.

func CC

func CC[U Comparable](from Stream[Any], to U) ComparableStream[U]

CC is a typed cast function from a non-parameterised Stream[Any] to a parameterised type ComparableStream[U]. CC receives a type U and creates a ComparableStream[U].

CC exists to address the current lack of support in Go for parameterised methods and a performance issue with Go 1.18. See doc.go for more details.

func (ComparableStream[T]) Max

func (s ComparableStream[T]) Max() T

func (ComparableStream[T]) Min

func (s ComparableStream[T]) Min() T

type Consumer

type Consumer[T any] func(T)

Consumer that accepts one argument and does not return any value.

type Function

type Function[T, R any] func(T) R

Function that accepts one argument and produces a result.

type Logger

type Logger interface {
	Debug(msg string, fields ...zap.Field)
	Info(msg string, fields ...zap.Field)
	Warn(msg string, fields ...zap.Field)
	Error(msg string, fields ...zap.Field)
	With(fields ...zap.Field) *zap.Logger
}

Logger exposes basic logging methods. This is provided as a sheer convenience.

type Mathable

type Mathable interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
		~float32 | ~float64 |
		~complex64 | ~complex128
}

Mathable is a constraint that matches any type that supports math operations.

type MathableStream

type MathableStream[T Mathable] struct {
	Stream[T]
}

MathableStream is a Stream of Mathable type.

func MC

func MC[U Mathable](from Stream[Any], to U) MathableStream[U]

MC is a typed cast function from a non-parameterised Stream[Any] to a parameterised type MathableStream[U]. MC receives a type U and creates a MathableStream[U].

MC exists to address the current lack of support in Go for parameterised methods and a performance issue with Go 1.18. See doc.go for more details.

func (MathableStream[T]) Average

func (s MathableStream[T]) Average() T

Average returns the arithmetic average of the numbers in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction. This is a terminal operation and hence expects the producer to close the stream in order to complete.

func (MathableStream[T]) Sum

func (s MathableStream[T]) Sum() T

Sum return the sum of all items on the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction. This is a terminal operation and hence expects the producer to close the stream in order to complete.

type Option

type Option func(*zap.Config)

Option is a function to provide the Logger creation with extra initialisation options.

type Optional

type Optional[T any] struct {
	// contains filtered or unexported fields
}

Optional is a container object which may or may not contain a value (NO: nil is considered a non-value). See IsPresent().

Additional methods that depend on the presence or absence of a contained value are provided, such as OrElse() (returns a default value if no value is present) and IfPresent() (performs an action if a value is present).

This is a value-based class; programmers should treat instances that are equal as interchangeable.

func OptionalEmpty

func OptionalEmpty[T any]() Optional[T]

OptionalEmpty returns an empty Optional instance. No value is present for this Optional.

func OptionalOf

func OptionalOf[T any](val T) Optional[T]

OptionalOf returns an Optional describing the given (NO: non-nil) value. Currently, Go 1.18 does not permit nil generic types. See: https://github.com/golang/go/issues/22729

func (Optional[T]) Filter

func (o Optional[T]) Filter(p Predicate[T]) Optional[T]

Filter returns an Optional describing the value if a value is present, and the value matches the given predicate, otherwise returns an empty Optional.

func (Optional[T]) FlatMap

func (o Optional[T]) FlatMap(f Function[T, Optional[Any]]) Optional[Any]

FlatMap returns the result of applying the given Optional-bearing mapping function to the value if present, otherwise returns an empty Optional.

func (Optional[T]) Get

func (o Optional[T]) Get() T

Get returns the value if present, otherwise panics.

func (Optional[T]) IfPresent

func (o Optional[T]) IfPresent(c Consumer[T])

IfPresent performs the given action with the value, if a value is present, otherwise performs the given empty-based action.

func (Optional[T]) IsPresent

func (o Optional[T]) IsPresent() bool

IsPresent return true if a value is present. If no value is present, the object is considered empty and IsPresent() returns false.

func (Optional[T]) Map

func (o Optional[T]) Map(f Function[T, Any]) Optional[Any]

Map returns an Optional describing (as if by ofNullable(T)) the result of applying the given mapping function to the value if present, otherwise returns an empty Optional. If the mapping function returns a nil result then this method returns an empty Optional.

func (Optional[T]) Or

func (o Optional[T]) Or(s Supplier[Optional[T]]) Optional[T]

Or returns an Optional describing the value if present, otherwise returns an Optional produced by the supplying function.

func (Optional[T]) OrElse

func (o Optional[T]) OrElse(other T) T

OrElse returns the value if present, otherwise returns other.

func (Optional[T]) OrElseGet

func (o Optional[T]) OrElseGet(other Supplier[T]) T

OrElseGet returns the value if present, otherwise returns the result produced by the supplying function.

type Predicate

type Predicate[T any] func(t T) bool

Predicate represents a predicate (boolean-valued function) of one argument. Could also be: `type Predicate[T any] Function[T, bool]`.

func False

func False[T any]() Predicate[T]

False returns a predicate that returns always false.

func True

func True[T any]() Predicate[T]

True returns a predicate that returns always true.

func (Predicate[T]) And

func (p Predicate[T]) And(other Predicate[T]) Predicate[T]

And is a composed predicate that represents a short-circuiting logical AND of this predicate and another.

func (Predicate[T]) Negate

func (p Predicate[T]) Negate() Predicate[T]

Negate is an alias for Not().

func (Predicate[T]) Not

func (p Predicate[T]) Not() Predicate[T]

Not is the logical negation of a predicate.

func (Predicate[T]) Or

func (p Predicate[T]) Or(other Predicate[T]) Predicate[T]

Or is a composed predicate that represents a short-circuiting logical OR of two predicates.

func (Predicate[T]) Xor

func (p Predicate[T]) Xor(other Predicate[T]) Predicate[T]

Xor is a composed predicate that represents a short-circuiting logical XOR of two predicates.

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream is a sequence of elements supporting sequential and (in specific circumstances) parallel operations.

A Stream is a wrapper over a Go channel ('nil' channels are prohibited).

NOTE:

Concurrent streams are challenging to implement owing to ordering issues in parallel processing. At the moment, the view is that the most sensible approach is to delegate control to users. Multiple fuego streams can be created and data distributed across as desired. This empowers users of fuego to implement the desired behaviour of their pipelines.

As of v8.0.0, fuego offers ordered concurrency for some linear operations such as Map().

Creation

When providing a Go channel to create a Stream, the Stream's remains open unless the channel is closed by the producer. Should the producer not close the channel unintentionally, the Go function will stray.

Streams created from a slice are bounded since the slice has finite content.

func C

func C[U any](from Stream[Any], to U) Stream[U]

C is a typed cast function from a non-parameterised Stream[Any] to a parameterised type Stream[U]. C receives a type U and creates a Stream[U].

C exists to address the current lack of support in Go for parameterised methods. See doc.go for more details.

See SC for A Stream cast.

func NewConcurrentStream

func NewConcurrentStream[T any](c chan T, n int) Stream[T]

NewConcurrentStream creates a new Stream with a degree of concurrency of n.

func NewStream

func NewStream[T any](c chan T) Stream[T]

NewStream creates a new Stream.

This function does not close the provided channel.

func NewStreamFromSlice

func NewStreamFromSlice[T any](slice []T, bufsize int) Stream[T]

NewStreamFromSlice creates a new Stream from a Go slice.

The slice data is published to the stream after which the stream is closed.

func SC

func SC[U any](from Stream[Any], to Stream[U]) Stream[U]

SC is a typed Stream cast function from a non-parameterised Stream[Any] to a parameterised Stream[U]. SC receives a typed Stream[U].

SC exists to address the current lack of support in Go for parameterised methods. See doc.go for more details.

See C for A typed cast.

func (Stream[T]) AllMatch

func (s Stream[T]) AllMatch(p Predicate[T]) bool

AllMatch returns whether all of the elements in the stream satisfy the predicate.

This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).

func (Stream[T]) AnyMatch

func (s Stream[T]) AnyMatch(p Predicate[T]) bool

AnyMatch returns whether any of the elements in the stream satisfies the predicate.

This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).

func (Stream[T]) Concurrency

func (s Stream[T]) Concurrency() int

Concurrency returns the stream's concurrency level (i.e. parallelism).

func (Stream[T]) Concurrent

func (s Stream[T]) Concurrent(n int) Stream[T]

Concurrent sets the level of concurrency for this Stream.

This is used for concurrent methods such as Stream.Map.

Note that to switch off concurrency, you should provide n = 0. With n = 1, concurrency is internal whereby the Stream writer will not block on writing a single element (i.e. buffered channel of 1). This already provides significant processing gains.

Performance:

Channels are inherently expensive to use owing to their internal mutex lock.

Benefits will ONLY be observed when the execution has a degree of latency (at the very least, several dozens of nanoseconds). The higher the latency, the better the gains from concurrency (even on a single CPU core).

If latency is too low or next to none, using concurrency will likely be slower than without, particularly when no CPU core is available.

func (Stream[T]) Count

func (s Stream[T]) Count() int

Count the number of elements in the stream.

This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).

func (Stream[T]) Distinct

func (s Stream[T]) Distinct(hashFn func(T) uint32) Stream[T]

Distinct returns a stream of the distinct elements of this stream. Distinctiveness is determined via the provided hashFn.

This operation is costly both in time and in memory. It is strongly recommended to use buffered channels for this operation.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) Drop added in v11.1.0

func (s Stream[T]) Drop(n uint64) Stream[T]

Drop the first 'n' elements of this stream and returns a new stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) DropUntil added in v11.1.0

func (s Stream[T]) DropUntil(p Predicate[T]) Stream[T]

DropUntil drops the first elements of this stream until the predicate is satisfied and returns a new stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) DropWhile added in v11.1.0

func (s Stream[T]) DropWhile(p Predicate[T]) Stream[T]

DropWhile drops the first elements of this stream while the predicate is satisfied and returns a new stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) EndsWith added in v11.1.0

func (s Stream[T]) EndsWith(slice []T) bool

EndsWith returns true when this stream ends with the supplied elements.

This is a potentially expensive method since it has to consume all the elements in the Stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) Filter

func (s Stream[T]) Filter(predicate Predicate[T]) Stream[T]

Filter returns a stream consisting of the elements of this stream that match the given predicate.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) FlatMap

func (s Stream[T]) FlatMap(mapper StreamFunction[T, Any]) Stream[Any]

FlatMap takes a StreamFunction to flatten the entries in this stream and produce a new stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) ForEach

func (s Stream[T]) ForEach(c Consumer[T])

ForEach executes the given consumer function for each entry in this stream.

This is a continuous terminal operation. It will only complete if the producer closes the stream.

func (Stream[T]) GroupBy

func (s Stream[T]) GroupBy(classifier Function[T, Any]) map[Any][]T

GroupBy groups the elements of this Stream by classifying them.

This is a continuous terminal operation and hence expects the producer to close the stream in order to complete.

func (Stream[T]) Head added in v11.1.0

func (s Stream[T]) Head() T

Head returns the first Entry in this stream.

This function only consumes at most one element from the stream.

func (Stream[T]) HeadN added in v11.1.0

func (s Stream[T]) HeadN(n uint64) []T

HeadN returns a slice of the first n elements in this stream.

This function only consumes at most 'n' elements from the stream.

func (Stream[T]) Intersperse

func (s Stream[T]) Intersperse(e T) Stream[T]

Intersperse inserts an element between all elements of this Stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) Last added in v11.1.0

func (s Stream[T]) Last() T

Last returns the last Entry in this stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) LastN added in v11.1.0

func (s Stream[T]) LastN(n uint64) []T

LastN returns a slice of the last n elements in this stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) LeftReduce

func (s Stream[T]) LeftReduce(f2 BiFunction[T, T, T]) T

LeftReduce accumulates the elements of this Stream by applying the given function.

This is a continuous terminal operation. It will only complete if the producer closes the stream.

func (Stream[T]) Limit added in v11.1.0

func (s Stream[T]) Limit(n uint64) Stream[T]

Limit is a synonym for Take.

func (Stream[T]) Map

func (s Stream[T]) Map(mapper Function[T, Any]) Stream[Any]

Map returns a Stream consisting of the result of applying the given function to the elements of this stream.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) NoneMatch

func (s Stream[T]) NoneMatch(p Predicate[T]) bool

NoneMatch returns whether none of the elements in the stream satisfies the predicate. It is the opposite of AnyMatch.

This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).

func (Stream[T]) Peek

func (s Stream[T]) Peek(consumer Consumer[T]) Stream[T]

Peek is akin to ForEach but returns the Stream.

This is useful e.g. for debugging.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) Reduce

func (s Stream[T]) Reduce(f2 BiFunction[T, T, T]) T

Reduce is an alias for LeftReduce.

See LeftReduce for more info.

func (Stream[T]) StartsWith added in v11.1.0

func (s Stream[T]) StartsWith(slice []T) bool

StartsWith returns true when this stream starts with the elements in the supplied slice.

This function only consume as much data from the stream as is necessary to prove (or disprove) it starts with the supplied slice data.

func (Stream[T]) StreamAny

func (s Stream[T]) StreamAny() Stream[Any]

StreamAny returns this stream as a Stream[Any].

func (Stream[T]) Take added in v11.1.0

func (s Stream[T]) Take(n uint64) Stream[T]

Take returns a stream of the first 'n' elements of this stream.

This function streams continuously until the 'n' elements are picked or the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) TakeUntil added in v11.1.0

func (s Stream[T]) TakeUntil(p Predicate[T]) Stream[T]

TakeUntil returns a stream of the first elements of this stream until the predicate is satisfied.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) TakeWhile added in v11.1.0

func (s Stream[T]) TakeWhile(p Predicate[T]) Stream[T]

TakeWhile returns a stream of the first elements of this stream while the predicate is satisfied.

This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.

func (Stream[T]) ToSlice

func (s Stream[T]) ToSlice() []T

ToSlice extracts the elements of the stream into a []T.

This is a special case of a reduction.

This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).

type StreamFunction

type StreamFunction[T, R any] func(T) Stream[R]

StreamFunction that accepts one argument and produces a Stream[R].

It is used with when "flat mapping" a `Stream`. In effect, this is a one to many operation, such as exploding the individual values of a slice into a Stream[R].

func FlattenSlice

func FlattenSlice[T any](bufsize int) StreamFunction[[]T, Any]

FlattenSlice is a StreamFunction that flattens a []T slice to a Stream[Any] of its elements.

func FlattenTypedSlice

func FlattenTypedSlice[T any](bufsize int) StreamFunction[[]T, T]

FlattenTypedSlice is a StreamFunction that flattens a []T slice to a Stream[T] of its elements.

type Supplier

type Supplier[T any] func() T

Supplier accepts no argument and returns a T.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL