streams

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2018 License: MIT Imports: 7 Imported by: 9

README

Streams

Go Report Card Build Status Coverage Status GitHub release GitHub license

Streams is a light weight, simple stream processing library. While Kafka is the main use case for Streams, it is flexible enough to be used for any form of processing from any source.

Note: This is currently a work in progress.

Installation

You can install streams using go get

go get github.com/msales/streams

Concepts

Streams breaks processing into the following basic parts.

  • Message is a message in the system, consisting of a key, value and context.

  • Sources reads and handles position from a data source.

  • Processor processes the data, optionally passing it on or marking the sources position. A sink is just a processor the does not forward the data on.

  • Pipe gives processors an abstract view of the current state, allowing Messages to flow through the system.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BranchProcessor

type BranchProcessor struct {
	// contains filtered or unexported fields
}

BranchProcessor is a processor that branches into one or more streams

based on the results of the predicates.

func (*BranchProcessor) Close

func (p *BranchProcessor) Close() error

Close closes the processor.

func (*BranchProcessor) Process

func (p *BranchProcessor) Process(msg *Message) error

Process processes the stream nodeMessage.

func (*BranchProcessor) WithPipe

func (p *BranchProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type ErrorFunc

type ErrorFunc func(error)

ErrorFunc represents a streams error handling function.

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

FilterProcessor is a processor that filters a stream using a predicate function.

func (*FilterProcessor) Close

func (p *FilterProcessor) Close() error

Close closes the processor.

func (*FilterProcessor) Process

func (p *FilterProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*FilterProcessor) WithPipe

func (p *FilterProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapProcessor

type FlatMapProcessor struct {
	// contains filtered or unexported fields
}

FlatMapProcessor is a processor that maps a stream using a flat mapping function.

func (*FlatMapProcessor) Close

func (p *FlatMapProcessor) Close() error

Close closes the processor.

func (*FlatMapProcessor) Process

func (p *FlatMapProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*FlatMapProcessor) WithPipe

func (p *FlatMapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type FlatMapper

type FlatMapper func(*Message) ([]*Message, error)

FlatMapper represents a mapping function that return multiple messages.

type MapProcessor

type MapProcessor struct {
	// contains filtered or unexported fields
}

MapProcessor is a processor that maps a stream using a mapping function.

func (*MapProcessor) Close

func (p *MapProcessor) Close() error

Close closes the processor.

func (*MapProcessor) Process

func (p *MapProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*MapProcessor) WithPipe

func (p *MapProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Mapper

type Mapper func(*Message) (*Message, error)

Mapper represents a mapping function.

type MergeProcessor

type MergeProcessor struct {
	// contains filtered or unexported fields
}

MergeProcessor is a processor that passes the message on, keeping track of seen metadata.

func (*MergeProcessor) Close

func (p *MergeProcessor) Close() error

Close closes the processor.

func (*MergeProcessor) Process

func (p *MergeProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*MergeProcessor) WithPipe

func (p *MergeProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Message

type Message struct {
	Ctx   context.Context
	Key   interface{}
	Value interface{}
	// contains filtered or unexported fields
}

Message represents data the flows through the stream.

func NewMessage

func NewMessage(k, v interface{}) *Message

NewMessage creates a Message.

func NewMessageWithContext

func NewMessageWithContext(ctx context.Context, k, v interface{}) *Message

NewMessageWithContext creates a Message with the given context.

func (Message) Empty

func (m Message) Empty() bool

Empty determines if the Message is empty.

func (*Message) Metadata

func (m *Message) Metadata() map[Source]interface{}

Metadata returns the Message metadata.

func (*Message) WithMetadata

func (m *Message) WithMetadata(s Source, v interface{}) *Message

WithMetadata add metadata to the Message from a Source.

type Node

type Node interface {
	// Name gets the node name.
	Name() string
	// AddChild adds a child node to the node.
	AddChild(n Node)
	// Children gets the nodes children.
	Children() []Node
	// Processor gets the nodes processor.
	Processor() Processor
}

Node represents a topology node.

type Pipe

type Pipe interface {
	// Forward queues the data to all processor children in the topology.
	Forward(*Message) error
	// Forward queues the data to the the given processor(s) child in the topology.
	ForwardToChild(*Message, int) error
	// Commit commits the current state in the sources.
	Commit(*Message) error
}

Pipe allows messages to flow through the processors.

func NewPipe added in v1.1.0

func NewPipe(children []Pump) Pipe

NewPipe create a new processorPipe instance.

type Predicate

type Predicate func(*Message) (bool, error)

Predicate represents a stream filter function.

type PrintProcessor

type PrintProcessor struct {
	// contains filtered or unexported fields
}

PrintProcessor is a processor that prints the stream to stdout.

func (*PrintProcessor) Close

func (p *PrintProcessor) Close() error

Close closes the processor.

func (*PrintProcessor) Process

func (p *PrintProcessor) Process(msg *Message) error

Process processes the stream Message.

func (*PrintProcessor) WithPipe

func (p *PrintProcessor) WithPipe(pipe Pipe)

WithPipe sets the pipe on the Processor.

type Processor

type Processor interface {
	// WithPipe sets the pipe on the Processor.
	WithPipe(Pipe)
	// Process processes the stream Message.
	Process(*Message) error
	// Close closes the processor.
	Close() error
}

Processor represents a stream processor.

func NewBranchProcessor

func NewBranchProcessor(fns []Predicate) Processor

NewBranchProcessor creates a new BranchProcessor instance.

func NewFilterProcessor

func NewFilterProcessor(fn Predicate) Processor

NewFilterProcessor creates a new FilterProcessor instance.

func NewFlatMapProcessor

func NewFlatMapProcessor(fn FlatMapper) Processor

NewFlatMapProcessor creates a new FlatMapProcessor instance.

func NewMapProcessor

func NewMapProcessor(fn Mapper) Processor

NewMapProcessor creates a new MapProcessor instance.

func NewMergeProcessor

func NewMergeProcessor() Processor

NewMergeProcessor creates a new MergeProcessor instance.

func NewPrintProcessor

func NewPrintProcessor() Processor

NewPrintProcessor creates a new PrintProcessor instance.

type ProcessorNode

type ProcessorNode struct {
	// contains filtered or unexported fields
}

ProcessorNode represents the topology node for a processor.

func NewProcessorNode

func NewProcessorNode(name string, p Processor) *ProcessorNode

NewProcessorNode creates a new ProcessorNode.

func (*ProcessorNode) AddChild

func (n *ProcessorNode) AddChild(node Node)

AddChild adds a child node to the node.

func (*ProcessorNode) Children

func (n *ProcessorNode) Children() []Node

Children gets the nodes children.

func (*ProcessorNode) Name

func (n *ProcessorNode) Name() string

Name gets the node name.

func (*ProcessorNode) Processor added in v1.1.0

func (n *ProcessorNode) Processor() Processor

Processor gets the nodes processor.

type Pump added in v1.1.0

type Pump interface {
	// Process processes a message in the Pump.
	Process(*Message) error
	// Close closes the pump.
	Close() error
}

Pump represent a Message pump.

func NewPump added in v1.1.0

func NewPump(node Node, pipe TimedPipe, errFn ErrorFunc) Pump

NewPump creates a new processorPump instance.

type Source

type Source interface {
	// Consume gets the next Message from the Source.
	Consume() (*Message, error)
	// Commit marks the consumed Message as processed.
	Commit(interface{}) error
	// Close closes the Source.
	Close() error
}

Source represents a stream source.

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

SourceNode represents a node between the source and the rest of the node tree.

func NewSourceNode

func NewSourceNode(name string) *SourceNode

NewSourceNode create a new SourceNode.

func (*SourceNode) AddChild

func (n *SourceNode) AddChild(node Node)

AddChild adds a child node to the node.

func (*SourceNode) Children

func (n *SourceNode) Children() []Node

Children gets the nodes children.

func (*SourceNode) Name

func (n *SourceNode) Name() string

Name gets the node name.

func (*SourceNode) Processor added in v1.1.0

func (n *SourceNode) Processor() Processor

Processor gets the nodes processor.

type SourcePump added in v1.1.0

type SourcePump interface {
	// Stop stops the source pump from running.
	Stop()
	// Close closed the source pump.
	Close() error
}

SourcePump represents a Message pump for sources.

func NewSourcePump added in v1.1.0

func NewSourcePump(name string, source Source, pumps []Pump, errFn ErrorFunc) SourcePump

NewSourcePump creates a new SourcePump.

type SourcePumps added in v1.1.0

type SourcePumps []SourcePump

SourcePumps represents a set of source pumps.

func (SourcePumps) StopAll added in v1.1.0

func (p SourcePumps) StopAll()

StopAll stops all source pumps.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a stream of data.

func (*Stream) Branch

func (s *Stream) Branch(name string, preds ...Predicate) []*Stream

Branch branches a stream based in the given Predcates.

func (*Stream) Filter

func (s *Stream) Filter(name string, pred Predicate) *Stream

Filter filters the stream using a Predicate.

func (*Stream) FlatMap

func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream

FlatMap runs a flat mapper on the stream.

func (*Stream) Map

func (s *Stream) Map(name string, mapper Mapper) *Stream

Map runs a Mapper on the stream.

func (*Stream) Merge

func (s *Stream) Merge(name string, streams ...*Stream) *Stream

Merge merges one or more streams into this stream.

func (*Stream) Print

func (s *Stream) Print(name string) *Stream

Print prints the data in the stream.

func (*Stream) Process

func (s *Stream) Process(name string, p Processor) *Stream

Process runs a custom processor on the stream.

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

StreamBuilder represents a stream builder.

func NewStreamBuilder

func NewStreamBuilder() *StreamBuilder

NewStreamBuilder creates a new StreamBuilder.

func (*StreamBuilder) Build

func (sb *StreamBuilder) Build() *Topology

Build builds the stream Topology.

func (*StreamBuilder) Source

func (sb *StreamBuilder) Source(name string, source Source) *Stream

Source adds a Source to the stream, returning the Stream.

type Task

type Task interface {
	// Start starts the streams processors.
	Start() error
	// OnError sets the error handler.
	OnError(fn ErrorFunc)
	// Close stops and closes the streams processors.
	Close() error
}

Task represents a streams task.

func NewTask

func NewTask(topology *Topology) Task

NewTask creates a new streams task.

type TimedPipe added in v1.1.0

type TimedPipe interface {
	// Reset resets the accumulative pipe duration.
	Reset()
	// Duration returns the accumulative pipe duration.
	Duration() time.Duration
}

TimedPipe represents a pipe that can accumulate execution time.

type Topology

type Topology struct {
	// contains filtered or unexported fields
}

Topology represents the streams topology.

func (Topology) Processors

func (t Topology) Processors() []Node

Processors gets the topology Processors.

func (Topology) Sources

func (t Topology) Sources() map[Source]Node

Sources get the topology Sources.

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

TopologyBuilder represents a topology builder.

func NewTopologyBuilder

func NewTopologyBuilder() *TopologyBuilder

NewTopologyBuilder creates a new TopologyBuilder.

func (*TopologyBuilder) AddProcessor

func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node

AddProcessor adds a Processor to the builder, returning the created Node.

func (*TopologyBuilder) AddSource

func (tb *TopologyBuilder) AddSource(name string, source Source) Node

AddSource adds a Source to the builder, returning the created Node.

func (*TopologyBuilder) Build

func (tb *TopologyBuilder) Build() *Topology

Build creates an immutable Topology.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL