Documentation ¶
Index ¶
- Variables
- func RegisterTransform(hub *Hub, spec TransformSpec)
- type Datum
- type DatumKey
- type DatumSeqFunc
- type ExportSaw
- type Hub
- type JSONDecoder
- type JSONEncoder
- type MergeSaw
- type ProtoDecoder
- type ProtoEncoder
- type Saw
- type SawNoResult
- type SeqID
- type TopicID
- type TransformFunc
- type TransformSpec
- type ValueDecoder
- type ValueEncoder
- type VarFloat
- type VarInt
- type Window
- type WindowFrameFactory
- type WindowSpec
Constants ¶
This section is empty.
Variables ¶
var GlobalHub = NewHub("global")
Functions ¶
func RegisterTransform ¶
func RegisterTransform(hub *Hub, spec TransformSpec)
RegisterTransform creates a Transform Saw instance, register it on hub with spec.Inputs topics subscribed.
Types ¶
type Datum ¶
type Datum struct { Key DatumKey Value interface{} }
Datum is the data item passed between saws. Datum is a key-value pair where key must be string and value can by anything. The optional SortOrder specifies optimal order when datums with same key get aggregated.
type DatumSeqFunc ¶
type ExportSaw ¶
type ExportSaw interface {
Export() (interface{}, error)
}
Saw can optionally provide Export() interface, it provides a snapshot of its current state, which can be later merged to another saw
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is a simple pubsub to allow loosely coupled communication between saws Saws can Register with topic(s) it subscribes to, or Publish datum to a topic.
It's a simple local, sync implementation only for better pipeline program structure, and it should keep it as it is. parallel, async computing, should be addressed by Queues and Pars, implemented by each individual Saw.
type JSONDecoder ¶
func NewJSONDecoder ¶
func NewJSONDecoder(example interface{}) JSONDecoder
func (JSONDecoder) DecodeValue ¶
func (jd JSONDecoder) DecodeValue(buf []byte) (interface{}, error)
type JSONEncoder ¶
type JSONEncoder struct{}
func (JSONEncoder) EncodeValue ¶
func (je JSONEncoder) EncodeValue(value interface{}, buf []byte) ([]byte, error)
type MergeSaw ¶
type MergeSaw interface {
MergeFrom(other interface{}) error
}
Saw can merge Export() result of other saws, so that Saws can be aggregated or migrated between instances
type ProtoDecoder ¶
func NewProtoDecoder ¶
func NewProtoDecoder(example interface{}) ProtoDecoder
func (ProtoDecoder) DecodeValue ¶
func (pd ProtoDecoder) DecodeValue(buf []byte) (interface{}, error)
type ProtoEncoder ¶
type ProtoEncoder struct{}
func (ProtoEncoder) EncodeValue ¶
func (pe ProtoEncoder) EncodeValue(value interface{}, buf []byte) ([]byte, error)
type Saw ¶
type Saw interface { // Feeds a new data point into Saw. Emit(v Datum) error // Release all resources and returns final computation result. Result(ctx context.Context) (interface{}, error) }
Saw is the basic computation unit, it's largely a state machine.
In general, implementation should allow concurrent call to Emit() for good parallelism, it can be archive by having a stateless saw (see Transfrom), or make it managed by concurrent tables like table.MemTable (see table subpackage).
type SawNoResult ¶
type SawNoResult struct{}
type TransformFunc ¶
type TransformSpec ¶
type TransformSpec struct { Name string Transfrom TransformFunc Inputs []TopicID Outputs []TopicID }
TransformSpec configures a Transform.
Transform is a stateless Saw, that for each Datum input --- normally from Hub, it calls a pure TransformFunc to get a Datum output, then publish the output to a set of Hub topics.
Use RegisterTransform() to create a Transform saw and register it to a Hub.
type ValueDecoder ¶
type ValueEncoder ¶
type VarFloat ¶
func ReportFloat ¶
Creates float var for reporting. see ReportInt() for usage detail.
type Window ¶
type Window struct {
// contains filtered or unexported fields
}
Window implements a sliding window of saws. Window keeps finite set of saws, called frames, each corresponded with a SeqID. Window keeps WindowSpec.WindowSize of latest frames with largest, contintued SeqID.
For every input, it calls WindowSpec.SeqFunc to get SeqID of a datum and route it to the frame if it's still in sliding window. Window assumes datum's SeqID are dense, rougly incremental overtime, or you will get too many datums dropped.
When window needed to be slided, Result() of old frames will be called in their seperate gorountines. In Window.Result(), all frames' Result() will be called in this maner as they all been slide away. Window doesn't care frame's Result() return.
func NewWindow ¶
func NewWindow(spec WindowSpec) *Window
func (*Window) AllFrames ¶
Gets the all frame the Window currently holds. returned frames are not locked, would be Emit()-ing or even Result()-ing when caller gets the return.
func (*Window) LatestFrame ¶
Gets the latest frame or nil when there's no data yet. returned frame is not locked, would be Emit()-ing or even Result()-ing when caller gets the return.
type WindowSpec ¶
type WindowSpec struct { Name string FrameFactory WindowFrameFactory SeqFunc DatumSeqFunc WindowSize int MaxSeqAdvance int }