goconnect

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2019 License: Apache-2.0 Imports: 12 Imported by: 3

README

Go Connect

Is a framework which sits somewhere between Kafka Connect and Apache Beam implemented in Go instead of JVM so its a lot more efficient and has a low package and memory footprint - it can run happily even on tiny chips.

  • it builds linear pipelines for similar to Kafka Connect so it's goal is data connectivity not general data processing
  • it is more general than Kafka Connect and can build file-for-a-file pipelines
  • but it is a bit less general compared to Beam it only builds linear chains of transforms, not graphs
  • it is also stateless at the moment
  • like Beam, it has internal concept of parallelism and coders
  • additionally it features vertical and horizontal parallelism out-of-the-box
    • (roots cannot have vertical parallelism greater than 1)
  • it scales similarly to Kafka Connect by simply running multiple identical instances
  • it guarantees at-least-once processing and is capable of exactly-once with a choice of optimistic and pessimistic checkpointing depending whether the source supports some notion of offsets or not
  • it has a concept of EventTime built-in
  • it is a unified data processing framework in terms of batch/stream semantics if the input data is bounded the pipeline will terminate when all input elements are fully processed if the input data is unbounded the pipeline will run indefinitely
  • it has a first-class support for Avro with Schema Registry
  • pipelines have a distinct declaration and materialization phases

(NOTE: THE PROTOTYPE IN THIS CODEBASE DOESN'T HAVE ALL THE FEATURES LISTED ABOVE BUT THOSE ARE THE AIM AND WILL APPEAR SOON)

Implemented Features

  • Generalized at-least-once processing guarantees for all element wise processing, i.e. covers all features below
  • Declaration vs Materialization - pipeline is declared as a graph of Defs and then materialized into a runnable graph
  • Bounded and Unbounded sources are treated identically both in the API and in the runner context - stream and batch totally unified
  • Network Runner: network.Runner(pipeline, peers..) - allows stages to place network constraints by requesting receivers/senders and applying network logic
  • Single Runner: Pipeline.Run() - for pipelines whose stages don't have any network constraints and can run in parallel without coordination
  • TriggerEach / TriggerEvery for SinkFn - either may be used optionally for performance gain, default trigger is no trigger, i.e. only one at the end if
  • TriggerEach / TriggerEvery for FoldFn - either must be used in order for fold to emit anything
  • Configurable stage buffers: .Buffer(numElements int) - used to control pending/ack buffers, not output buffers
  • Any stage except Root can have veritical paralleism set by .Par(n) with guaranteed ordering
    • i.e. how many routines run the fn in parallel and push to the output channel
  • Stage output can be limited to n elements .Limit(n) - this makes any pipeline bounded
  • Coders are injected recursively using coder.Registry()
  • Coder vertical parallelism can be controlled by Pipeline.CoderPar(n) default setting
  • Coders
    • binary > xml > binary
    • gzip > binary > gzip
    • string > binary > string
    • avro generic decoder (binary)
    • avro generic encoder (binary or json)
    • avro generic projector
    • avro schema registry decoder
    • avro schema registry encoder
  • Sources (Root Transforms)
    • List Source / RoundRobin Source
    • Amqp09 Source
    • Kafka Source
    • File Source
  • Transforms
    • SinkFn(func)
    • FoldFn(func)
    • FilterFn + UserFilterFn(func)
    • MapFn + UserMapFn(func)
    • FlatMapFn + UserFlatMapFn(func)
    • FoldFn +UserFoldFn(func) + .Count()
    • Text (used to flatMap files into lines)
  • Sinks
    • Kafka Sink
    • Amqp09 Sink
    • StdOut Sink
  • Network
    • RoundRobin
    • MergeOrdered

Features In Progress/TODO

  • refactor root to materialized
    • migrate all transforms which use Context.Put/Get to Materialization
  • file source using network split internally to spread the URLs
  • subflows
    • file-for-a-file use case
    • avro applied to kv pairs: goconnect.KVBinary -> avro.KVGenericDecoder -> avro.KVBinary -> SchemaRegistryKVEncoder(topic) -> goconnect.KVBinary
  • persistent checkpoints
  • avro: specific decoder and encoder
  • avro-parquet converter
  • converter abstraction for decoupling source and sink schemas
  • s3 sink
  • sftp source/sink
  • processing epochs: dynamic node join/leave potentially using Raft algo
  • coder injection shortest path in case there are mutliple combinations satisfying the in/out types
  • analyse pipeline network constraints and decide warn/recommend single/network runners
  • Indirect type handling in coder injection - this will enable using *KVBytes instead of KVBytes and less copy
  • Exactly-Once processing as an extension to the existing at-least-once
  • Develop general conception of stateful processing (parallel with guarantees and not just kv-stores but also prefix trees, etc.)
Test Cases
  • Root committer works with backpressure, i.e. the longer the commits take the less frequent they become
  • Final commit is precise when limit is applied on a pipeline with unbounded root
  • Pipeline with network nodes fails if network runner is not used
  • Roots can select an exclusive node when running in a network
  • sinks with timed triggers call flush on sinks
  • Sinks with no trigger will trigger once at the end
  • Sinks and Folds with .TriggerEach(1) will emit and flush on every element respectively
  • sinks without timed triggers still checkpoint correctly and on bounded/limit condition call flush once at the end
  • folds with timed trigger always emit values at correct intervals
  • pipelines with bounded root always terminate with correct result
  • filters, maps, flatmaps and sinks preserve order with Par(>1)
  • Limits work on roots, filters, maps, flatmaps, folds, sinks with or without Par(>1)
  • Limits work on stages that follow after a fold
  • Multiple folds works correctly, e.g. user fold fn followed by counter

The Concept

The API is partially based on reflection but this is only used in-stream for user defined functions - implementation of transform interfaces use type casting in the worst case, no reflection.

The the aim is to be able to inject coders and network stages behind the scenes so it looks something like this:

A. pipeline is declared statically with only those transforms that are required logically for the job

B. pipeline's distributed/parallel aspect is then analysed and network coders are injected accordingly

C. pipeline type analysis is performed and type coders are injected accordingly

D. final pipeline is materialized which starts all routines that wrap around the stage channels

E. materialized pipeline is drained and if all sources are bounded it will eventually terminate

Example Pipeline based on the reflection api:

Declared pipeline:

:  AMQP-QUEUE:[Bytes]                                                   (1) logical
>> [Node]:XML-FILTER:[Node]                                             (2) logical
>> [Bytes]:STD-OUT:[]                                                   (3) logical

Pipeline with injected network coders

:  AMQP-QUEUE:[Bytes]                                                   (1) logical
>> [Bytes]:NET-SPLIT:[Bytes]                                             -  net-out
>> [Node]:XML-FILTER:[Node]                                             (2) logical
>> [Bytes]:NET-MERGE-SORT:[Bytes]                                        -  net-in
>> [Bytes]:STD-OUT:[]                                                   (3) logical

Final Pipeline with injected type coders

:  AMQP-QUEUE:[Bytes]                                                   (1) logical
>> [Bytes]:NET-SPLIT:[Bytes]                                             -  net-out
>> [Bytes]:XML-DECODER:[Node]                                            -  decode
>> [Node]:XML-FILTER:[Node]                                             (2) logical
>> [Node]:XML-DECODER:[Bytes]                                            -  encode
>> [Bytes]:NET-MERGE-SORT:[Bytes]                                        -  net-in
>> [Bytes]:STD-OUT:[]                                                   (3) logical

Say 3 instances of this pipelines are executed then this is what has to happen in terms of network

(1) AMQP Source - can be executed symmetrically without coordination but only one of the instances will be active

  • Net-Split - stamps the input elements and distributes them among the instances using round robin strategy (2) XML Filter is a pure map transform so can also be executed symmetrically and run in parallel without coordination
  • Net-Merge-Sort - receives all the outputs and ensures the ordering per source instance
    • it runs only one node - which one is informed by the logical stage, for STD-OUT sink it wouuld make sense to run it on the one that joined the group last, as that's where the user will prefer to see the output (3) STD-OUT - simply runs everywhere since it is the Net-Merge that controls where the output is

So there are several implications of this:

  • the default mode of coordination is no coordination - the stage runs everywhere unless constrained otherwise
  • Network coders are informed by their surrounding logical stages
  • all records must be stamped to: a) preserve absolute ordering if required by a network merge stage b) provide a means for optimistic checkpointing
  • Network In Coders are informed by the succeeding stage but there are many subtleties to consider e.g.:
    • JMS SOURCE - runs on all and fan-out to all but has to use pessimistic checkpointing becuase in some implementations it is not possible to ack "up to a point"
    • AMQP QUEUE SOURCE - run on all and fan-out to all - it will have only one active instance by the protocol with optimistic checkpoitning as it support delivery tagging
    • KAFKA SOURCE - run on all with no coordination - kafka consumer is partitioned
    • LOCAL FILE SOURCE - runs on one specific node and does fan-out to all of the file contents
    • REMOTE FILE SOURCE - will be divided into 2 sub-stages (either explicitly or implicitly)
      • a) list of URLs runs on any one node and fan-out to all
      • b) file contents is retrieved from - runs on all by default
    • STD-OUT - runs on one specific node selected by the user

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AnyType = reflect.TypeOf([]interface{}{}).Elem()
View Source
var BinaryType = reflect.TypeOf([]byte{})
View Source
var ByteType = reflect.TypeOf(byte(0))
View Source
var ErrorType = reflect.TypeOf(fmt.Errorf("{}"))
View Source
var Int16Type = reflect.TypeOf(int16(0))
View Source
var Int32Type = reflect.TypeOf(int32(0))
View Source
var Int64Type = reflect.TypeOf(int64(0))
View Source
var IntType = reflect.TypeOf(int(0))
View Source
var KVBinaryType = reflect.TypeOf(&KVBinary{})
View Source
var StringType = reflect.TypeOf("")

Functions

func RunGraphs

func RunGraphs(graphs ...Graph)

Types

type Checkpoint

type Checkpoint struct {
	Part int
	Data interface{}
}

type Closeable

type Closeable interface {
	Close(*Context) error
}

type Connector

type Connector interface {
	GetNodeID() uint16
	MakeReceiver(stage uint16) Receiver
	GetNumPeers() uint16
	NewSender(nodeId uint16, stage uint16) Sender
}

type Context

type Context struct {
	Emit func(element *Element)

	Close func()
	// contains filtered or unexported fields
}

func NewContext

func NewContext(connector Connector, stageId uint16, def *Def) *Context

func (*Context) Get

func (c *Context) Get(index int) interface{}

func (*Context) GetNodeID

func (c *Context) GetNodeID() uint16

func (*Context) GetNumPeers

func (c *Context) GetNumPeers() uint16

func (*Context) GetReceiver

func (c *Context) GetReceiver() Receiver

func (*Context) GetStage

func (c *Context) GetStage() uint16

func (*Context) MakeSender

func (c *Context) MakeSender(targetNodeId uint16) Sender

func (*Context) MakeSenders

func (c *Context) MakeSenders() []Sender

func (*Context) Put

func (c *Context) Put(index int, data interface{})

FIXME instead of Put and Get on Context migrate all transforms that needs to materialized forms

func (*Context) Start

func (c *Context) Start()

func (*Context) Termination

func (c *Context) Termination() <-chan bool

type Def

type Def struct {
	Type reflect.Type
	Fn   Fn
	Id   int
	Up   *Def
	// contains filtered or unexported fields
}

func (*Def) Apply

func (def *Def) Apply(f Fn) *Def

func (*Def) Buffer

func (def *Def) Buffer(i int) *Def

func (*Def) Count

func (def *Def) Count() *Def

func (*Def) Filter

func (def *Def) Filter(f interface{}) *Def

func (*Def) FlatMap

func (def *Def) FlatMap(f interface{}) *Def

func (*Def) Fold

func (def *Def) Fold(init interface{}, acc interface{}) *Def

func (*Def) Limit

func (def *Def) Limit(i uint64) *Def

func (*Def) Map

func (def *Def) Map(f interface{}) *Def

func (*Def) Par

func (def *Def) Par(i int) *Def

func (*Def) TriggerEach

func (def *Def) TriggerEach(i int) *Def

func (*Def) TriggerEvery

func (def *Def) TriggerEvery(i time.Duration) *Def

type Element

type Element struct {
	Checkpoint Checkpoint
	Value      interface{}
	Stamp      Stamp
	FromNodeId uint16
	// contains filtered or unexported fields
}

func (*Element) Ack

func (e *Element) Ack()

type Filter

type Filter interface {
	Type() reflect.Type
	Materialize() func(input interface{}) bool
}

func UserFilterFn

func UserFilterFn(f interface{}) Filter

type FilterProcessor

type FilterProcessor struct {
	// contains filtered or unexported fields
}

func (*FilterProcessor) Materialize

func (p *FilterProcessor) Materialize() func(input *Element, context PContext)

type Fn

type Fn interface{}

type FoldFn

type FoldFn interface {
	InType() reflect.Type
	OutType() reflect.Type
	Process(interface{})
	Collect() Element
}

func UserFoldFn

func UserFoldFn(initial interface{}, f interface{}) FoldFn

type Graph

type Graph []*Context

func ConnectStages

func ConnectStages(connector Connector, pipeline *Pipeline) Graph

type Input

type Input interface {
	InType() reflect.Type
}

type KVBinary

type KVBinary struct {
	Key   []byte
	Value []byte
}

type MapProcessor

type MapProcessor struct {
	// contains filtered or unexported fields
}

func (*MapProcessor) Materialize

func (p *MapProcessor) Materialize() func(input *Element, context PContext)

type Mapper

type Mapper interface {
	InType() reflect.Type
	OutType() reflect.Type
	Materialize() func(input interface{}) interface{}
}

func UserMapFn

func UserMapFn(f interface{}) Mapper

type NetTransform

type NetTransform interface {
	InType() reflect.Type
	OutType() reflect.Type
	Run(<-chan *Element, *Context)
}

type OrderedElementSet

type OrderedElementSet struct {
	// contains filtered or unexported fields
}

func NewOrderedElementSet

func NewOrderedElementSet(cap int) *OrderedElementSet

func (*OrderedElementSet) AddElement

func (set *OrderedElementSet) AddElement(elementToAdd *Element, context *Context)

type Output

type Output interface {
	OutType() reflect.Type
}

type PC

type PC struct {
	Uniq           uint64
	UpstreamNodeId uint16
	Checkpoint     *Checkpoint
}

type PContext

type PContext interface {
	//TODO EmitTime(ts time.Time)
	//TODO Checkpoint(partition int, data interface{})
	//TODO Emit(value interface{})
	Emit(*Element)
}

type Pipeline

type Pipeline struct {
	Defs []*Def
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline() *Pipeline

func (*Pipeline) Apply

func (p *Pipeline) Apply(def *Def, f Fn) *Def

func (*Pipeline) Filter

func (p *Pipeline) Filter(that *Def, fn Filter) *Def

func (*Pipeline) Fold

func (p *Pipeline) Fold(that *Def, fn FoldFn) *Def

func (*Pipeline) Mapper

func (p *Pipeline) Mapper(that *Def, fn Mapper) *Def

func (*Pipeline) Par

func (p *Pipeline) Par(defaultPar int) *Pipeline

func (*Pipeline) Processor

func (p *Pipeline) Processor(that *Def, fn Processor) *Def

func (*Pipeline) Root

func (p *Pipeline) Root(source Root) *Def

func (*Pipeline) Run

func (p *Pipeline) Run()

func (*Pipeline) Sink

func (p *Pipeline) Sink(that *Def, fn Sink) *Def

func (*Pipeline) Transform

func (p *Pipeline) Transform(that *Def, fn NetTransform) *Def

func (*Pipeline) WithCoders

func (p *Pipeline) WithCoders(coders []Transform) *Pipeline

type Processor

type Processor interface {
	//Materialize() creates a single-routine context that will not be shared
	Materialize() func(input *Element, context PContext)
}

func UserFlatMapFn

func UserFlatMapFn(f interface{}) Processor

type Receiver

type Receiver interface {
	Elements() <-chan *Element
	Ack(upstreamNodeId uint16, uniq uint64) error
}

type Root

type Root interface {
	OutType() reflect.Type
	//TODO Materialize() func(context PContext)
	Run(*Context)
	Commit(Watermark, *Context) error
}

type Sender

type Sender interface {
	Acks() <-chan uint64
	Send(element *Element)
	Eos()
	Close() error
}

type Sink

type Sink interface {
	InType() reflect.Type
	Process(*Element, *Context)
	Flush(*Context) error
}

type Stamp

type Stamp struct {
	Unix int64
	Uniq uint64
}

func (*Stamp) String

func (s *Stamp) String() string

func (*Stamp) Valid

func (s *Stamp) Valid() bool

type Transform

type Transform interface {
	InType() reflect.Type
	OutType() reflect.Type
}

type Watermark

type Watermark map[int]interface{}

type Work

type Work struct {
	// contains filtered or unexported fields
}

func (*Work) Emit

func (w *Work) Emit(e *Element)

type WorkResult

type WorkResult struct {
	// contains filtered or unexported fields
}

type WorkerGroup

type WorkerGroup struct {
	// contains filtered or unexported fields
}

func NewWorkerGroup

func NewWorkerGroup(c *Context, p Processor) *WorkerGroup

func (*WorkerGroup) Start

func (g *WorkerGroup) Start(input chan *Element) *WorkerGroup

Directories

Path Synopsis
kv
str
url
xml
avro Module
io
std
amqp09 Module
kafka1 Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL