Documentation ¶
Index ¶
- type BranchProcessor
- type ErrorFunc
- type FilterProcessor
- type FlatMapProcessor
- type FlatMapper
- type MapProcessor
- type Mapper
- type MergeProcessor
- type Message
- type Node
- type Pipe
- type Predicate
- type PrintProcessor
- type Processor
- type ProcessorNode
- type Pump
- type Source
- type SourceNode
- type SourcePump
- type SourcePumps
- type Stream
- func (s *Stream) Branch(name string, preds ...Predicate) []*Stream
- func (s *Stream) Filter(name string, pred Predicate) *Stream
- func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
- func (s *Stream) Map(name string, mapper Mapper) *Stream
- func (s *Stream) Merge(name string, streams ...*Stream) *Stream
- func (s *Stream) Print(name string) *Stream
- func (s *Stream) Process(name string, p Processor) *Stream
- type StreamBuilder
- type Task
- type TimedPipe
- type Topology
- type TopologyBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BranchProcessor ¶
type BranchProcessor struct {
// contains filtered or unexported fields
}
BranchProcessor is a processor that branches into one or more streams
based on the results of the predicates.
func (*BranchProcessor) Process ¶
func (p *BranchProcessor) Process(msg *Message) error
Process processes the stream nodeMessage.
func (*BranchProcessor) WithPipe ¶
func (p *BranchProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FilterProcessor ¶
type FilterProcessor struct {
// contains filtered or unexported fields
}
FilterProcessor is a processor that filters a stream using a predicate function.
func (*FilterProcessor) Process ¶
func (p *FilterProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FilterProcessor) WithPipe ¶
func (p *FilterProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapProcessor ¶
type FlatMapProcessor struct {
// contains filtered or unexported fields
}
FlatMapProcessor is a processor that maps a stream using a flat mapping function.
func (*FlatMapProcessor) Close ¶
func (p *FlatMapProcessor) Close() error
Close closes the processor.
func (*FlatMapProcessor) Process ¶
func (p *FlatMapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*FlatMapProcessor) WithPipe ¶
func (p *FlatMapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type FlatMapper ¶
FlatMapper represents a mapping function that return multiple messages.
type MapProcessor ¶
type MapProcessor struct {
// contains filtered or unexported fields
}
MapProcessor is a processor that maps a stream using a mapping function.
func (*MapProcessor) Process ¶
func (p *MapProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MapProcessor) WithPipe ¶
func (p *MapProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type MergeProcessor ¶
type MergeProcessor struct {
// contains filtered or unexported fields
}
MergeProcessor is a processor that passes the message on, keeping track of seen metadata.
func (*MergeProcessor) Process ¶
func (p *MergeProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*MergeProcessor) WithPipe ¶
func (p *MergeProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Message ¶
type Message struct { Ctx context.Context Key interface{} Value interface{} // contains filtered or unexported fields }
Message represents data the flows through the stream.
func NewMessageWithContext ¶
NewMessageWithContext creates a Message with the given context.
func (*Message) WithMetadata ¶
WithMetadata add metadata to the Message from a Source.
type Node ¶
type Node interface { // Name gets the node name. Name() string // AddChild adds a child node to the node. AddChild(n Node) // Children gets the nodes children. Children() []Node // Processor gets the nodes processor. Processor() Processor }
Node represents a topology node.
type Pipe ¶
type Pipe interface { // Forward queues the data to all processor children in the topology. Forward(*Message) error // Forward queues the data to the the given processor(s) child in the topology. ForwardToChild(*Message, int) error // Commit commits the current state in the sources. Commit(*Message) error }
Pipe allows messages to flow through the processors.
type PrintProcessor ¶
type PrintProcessor struct {
// contains filtered or unexported fields
}
PrintProcessor is a processor that prints the stream to stdout.
func (*PrintProcessor) Process ¶
func (p *PrintProcessor) Process(msg *Message) error
Process processes the stream Message.
func (*PrintProcessor) WithPipe ¶
func (p *PrintProcessor) WithPipe(pipe Pipe)
WithPipe sets the pipe on the Processor.
type Processor ¶
type Processor interface { // WithPipe sets the pipe on the Processor. WithPipe(Pipe) // Process processes the stream Message. Process(*Message) error // Close closes the processor. Close() error }
Processor represents a stream processor.
func NewBranchProcessor ¶
NewBranchProcessor creates a new BranchProcessor instance.
func NewFilterProcessor ¶
NewFilterProcessor creates a new FilterProcessor instance.
func NewFlatMapProcessor ¶
func NewFlatMapProcessor(fn FlatMapper) Processor
NewFlatMapProcessor creates a new FlatMapProcessor instance.
func NewMapProcessor ¶
NewMapProcessor creates a new MapProcessor instance.
func NewMergeProcessor ¶
func NewMergeProcessor() Processor
NewMergeProcessor creates a new MergeProcessor instance.
func NewPrintProcessor ¶
func NewPrintProcessor() Processor
NewPrintProcessor creates a new PrintProcessor instance.
type ProcessorNode ¶
type ProcessorNode struct {
// contains filtered or unexported fields
}
ProcessorNode represents the topology node for a processor.
func NewProcessorNode ¶
func NewProcessorNode(name string, p Processor) *ProcessorNode
NewProcessorNode creates a new ProcessorNode.
func (*ProcessorNode) AddChild ¶
func (n *ProcessorNode) AddChild(node Node)
AddChild adds a child node to the node.
func (*ProcessorNode) Children ¶
func (n *ProcessorNode) Children() []Node
Children gets the nodes children.
func (*ProcessorNode) Processor ¶ added in v1.1.0
func (n *ProcessorNode) Processor() Processor
Processor gets the nodes processor.
type Pump ¶ added in v1.1.0
type Pump interface { // Process processes a message in the Pump. Process(*Message) error // Close closes the pump. Close() error }
Pump represent a Message pump.
type Source ¶
type Source interface { // Consume gets the next Message from the Source. Consume() (*Message, error) // Commit marks the consumed Message as processed. Commit(interface{}) error // Close closes the Source. Close() error }
Source represents a stream source.
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
SourceNode represents a node between the source and the rest of the node tree.
func NewSourceNode ¶
func NewSourceNode(name string) *SourceNode
NewSourceNode create a new SourceNode.
func (*SourceNode) AddChild ¶
func (n *SourceNode) AddChild(node Node)
AddChild adds a child node to the node.
func (*SourceNode) Children ¶
func (n *SourceNode) Children() []Node
Children gets the nodes children.
func (*SourceNode) Processor ¶ added in v1.1.0
func (n *SourceNode) Processor() Processor
Processor gets the nodes processor.
type SourcePump ¶ added in v1.1.0
type SourcePump interface { // Stop stops the source pump from running. Stop() // Close closed the source pump. Close() error }
SourcePump represents a Message pump for sources.
func NewSourcePump ¶ added in v1.1.0
func NewSourcePump(name string, source Source, pumps []Pump, errFn ErrorFunc) SourcePump
NewSourcePump creates a new SourcePump.
type SourcePumps ¶ added in v1.1.0
type SourcePumps []SourcePump
SourcePumps represents a set of source pumps.
func (SourcePumps) StopAll ¶ added in v1.1.0
func (p SourcePumps) StopAll()
StopAll stops all source pumps.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a stream of data.
func (*Stream) FlatMap ¶
func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream
FlatMap runs a flat mapper on the stream.
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
StreamBuilder represents a stream builder.
func NewStreamBuilder ¶
func NewStreamBuilder() *StreamBuilder
NewStreamBuilder creates a new StreamBuilder.
func (*StreamBuilder) Build ¶
func (sb *StreamBuilder) Build() *Topology
Build builds the stream Topology.
type Task ¶
type Task interface { // Start starts the streams processors. Start() error // OnError sets the error handler. OnError(fn ErrorFunc) // Close stops and closes the streams processors. Close() error }
Task represents a streams task.
type TimedPipe ¶ added in v1.1.0
type TimedPipe interface { // Reset resets the accumulative pipe duration. Reset() // Duration returns the accumulative pipe duration. Duration() time.Duration }
TimedPipe represents a pipe that can accumulate execution time.
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
Topology represents the streams topology.
func (Topology) Processors ¶
Processors gets the topology Processors.
type TopologyBuilder ¶
type TopologyBuilder struct {
// contains filtered or unexported fields
}
TopologyBuilder represents a topology builder.
func NewTopologyBuilder ¶
func NewTopologyBuilder() *TopologyBuilder
NewTopologyBuilder creates a new TopologyBuilder.
func (*TopologyBuilder) AddProcessor ¶
func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node
AddProcessor adds a Processor to the builder, returning the created Node.
func (*TopologyBuilder) AddSource ¶
func (tb *TopologyBuilder) AddSource(name string, source Source) Node
AddSource adds a Source to the builder, returning the created Node.
func (*TopologyBuilder) Build ¶
func (tb *TopologyBuilder) Build() *Topology
Build creates an immutable Topology.