stream

package
v0.0.0-...-758c02f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2020 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultDelayThreshold       = 2 * time.Second
	DefaultBundleCountThreshold = 1000
	DefaultBufferedByteLimit    = 10 * 1e6 // 10MiB
)

Variables

This section is empty.

Functions

func NewBasicFlow

func NewBasicFlow(mapFunc MapFn, filterFunc1 FilterFn, groupFunc GroupFn, filterFunc2 GroupFilterFn, opts ...bbx.Option) streams.Flow

Types

type Aggregator

type Aggregator struct {
	GroupF GroupFunc
	// contains filtered or unexported fields
}

Aggregator groups the incoming elements using a function. The elements are grouped by the key returned by the function.

eg:
 GroupF(1,2,3) = a
 GroupF(4,5)   = b

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[---------- AggregatorFunc --------]
                |               |

out --------------[1,2,3]---------[4,5] --

func NewAggregator

func NewAggregator(groupFunc GroupFunc, opts ...bbx.Option) *Aggregator

NewAggregator returns a new Aggregator instance. groupFunc is the grouping function.

func (*Aggregator) In

func (a *Aggregator) In() chan<- interface{}

In returns an input channel for receiving data

func (*Aggregator) Out

func (a *Aggregator) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Aggregator) To

func (a *Aggregator) To(sink streams.Sink)

To streams data to the given sink

func (*Aggregator) Via

func (a *Aggregator) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type FilterFn

type FilterFn func(Metadata) bool

type GroupFilterFn

type GroupFilterFn func([]Metadata) bool

type GroupFn

type GroupFn func(Metadata) string

type GroupFunc

type GroupFunc func(interface{}) string

GroupFunc is a filter predicate function.

type MapFn

type MapFn func([]byte) (Metadata, []byte)

type Metadata

type Metadata map[string]string

type Pipe

type Pipe struct {
	// contains filtered or unexported fields
}

Pipe collapses multiple flows into a single one. Pipe can be used to replace

Source.
      Via(f1).
      Via(f2).
      Via(f3).
      To(Sink)

with

	Source.
      	  Via(NewPipe(f1,f2,f3)).
		  To(Sink)

func NewPipe

func NewPipe(flows ...streams.Flow) *Pipe

NewPipe returns a new Pipe instance.

func (*Pipe) In

func (p *Pipe) In() chan<- interface{}

In returns an input channel for receiving data

func (*Pipe) Out

func (p *Pipe) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Pipe) To

func (p *Pipe) To(sink streams.Sink)

To streams data to the given sink

func (*Pipe) Via

func (p *Pipe) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL