saw

package module
v0.0.0-...-17386ec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2016 License: MIT Imports: 7 Imported by: 0

README

Saw: log processing the right way

Work in progress, see design notes about motivation and goals.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GlobalHub = NewHub("global")

Functions

func RegisterTransform

func RegisterTransform(hub *Hub, spec TransformSpec)

RegisterTransform creates a Transform Saw instance, register it on hub with spec.Inputs topics subscribed.

Types

type Datum

type Datum struct {
	Key   DatumKey
	Value interface{}
}

Datum is the data item passed between saws. Datum is a key-value pair where key must be string and value can by anything. The optional SortOrder specifies optimal order when datums with same key get aggregated.

type DatumKey

type DatumKey string

type DatumSeqFunc

type DatumSeqFunc func(datum Datum) SeqID

type ExportSaw

type ExportSaw interface {
	Export() (interface{}, error)
}

Saw can optionally provide Export() interface, it provides a snapshot of its current state, which can be later merged to another saw

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is a simple pubsub to allow loosely coupled communication between saws Saws can Register with topic(s) it subscribes to, or Publish datum to a topic.

It's a simple local, sync implementation only for better pipeline program structure, and it should keep it as it is. parallel, async computing, should be addressed by Queues and Pars, implemented by each individual Saw.

func NewHub

func NewHub(varPrefix string) *Hub

func (*Hub) Publish

func (hub *Hub) Publish(id TopicID, datum Datum)

Publish to topic, resulting in emit to all saws subscirbed in sequence. Concurrent calls to Publish() are not synchonized, subscribers is expected to handle concurrent Emit()

func (*Hub) Register

func (hub *Hub) Register(saw Saw, subscribes ...TopicID)

Register saw that subscribes to a list of Topic

type JSONDecoder

type JSONDecoder struct {
	ValueType reflect.Type
}

func NewJSONDecoder

func NewJSONDecoder(example interface{}) JSONDecoder

func (JSONDecoder) DecodeValue

func (jd JSONDecoder) DecodeValue(buf []byte) (interface{}, error)

type JSONEncoder

type JSONEncoder struct{}

func (JSONEncoder) EncodeValue

func (je JSONEncoder) EncodeValue(value interface{}, buf []byte) ([]byte, error)

type MergeSaw

type MergeSaw interface {
	MergeFrom(other interface{}) error
}

Saw can merge Export() result of other saws, so that Saws can be aggregated or migrated between instances

type ProtoDecoder

type ProtoDecoder struct {
	ValueType reflect.Type
}

func NewProtoDecoder

func NewProtoDecoder(example interface{}) ProtoDecoder

func (ProtoDecoder) DecodeValue

func (pd ProtoDecoder) DecodeValue(buf []byte) (interface{}, error)

type ProtoEncoder

type ProtoEncoder struct{}

func (ProtoEncoder) EncodeValue

func (pe ProtoEncoder) EncodeValue(value interface{}, buf []byte) ([]byte, error)

type Saw

type Saw interface {
	// Feeds a new data point into Saw.
	Emit(v Datum) error

	// Release all resources and returns final computation result.
	Result(ctx context.Context) (interface{}, error)
}

Saw is the basic computation unit, it's largely a state machine.

In general, implementation should allow concurrent call to Emit() for good parallelism, it can be archive by having a stateless saw (see Transfrom), or make it managed by concurrent tables like table.MemTable (see table subpackage).

type SawNoResult

type SawNoResult struct{}

func (SawNoResult) Result

func (snr SawNoResult) Result(ctx context.Context) (interface{}, error)

type SeqID

type SeqID int64

func (SeqID) Advance

func (seq SeqID) Advance(x int) SeqID

func (SeqID) DistanceFrom

func (seq SeqID) DistanceFrom(other SeqID) int

type TopicID

type TopicID string

type TransformFunc

type TransformFunc func(input Datum) (output Datum, err error)

type TransformSpec

type TransformSpec struct {
	Name      string
	Transfrom TransformFunc
	Inputs    []TopicID
	Outputs   []TopicID
}

TransformSpec configures a Transform.

Transform is a stateless Saw, that for each Datum input --- normally from Hub, it calls a pure TransformFunc to get a Datum output, then publish the output to a set of Hub topics.

Use RegisterTransform() to create a Transform saw and register it to a Hub.

type ValueDecoder

type ValueDecoder interface {
	DecodeValue(buf []byte) (interface{}, error)
}

type ValueEncoder

type ValueEncoder interface {
	EncodeValue(value interface{}, buf []byte) ([]byte, error)
}

type VarFloat

type VarFloat interface {
	Add(delta float64)
	Set(value float64)
}

func ReportFloat

func ReportFloat(ns, name string) VarFloat

Creates float var for reporting. see ReportInt() for usage detail.

type VarInt

type VarInt interface {
	Add(delta int64)
	Set(value int64)
}

func ReportInt

func ReportInt(ns, name string) VarInt

Creates or fetches a int var for reporting, unlike its underling expvar, ReportInt is expected to called when saws are dynamically created, in TableItemFactory etc, so that or saws inside a single table can shares same reporting metric.

type Window

type Window struct {
	// contains filtered or unexported fields
}

Window implements a sliding window of saws. Window keeps finite set of saws, called frames, each corresponded with a SeqID. Window keeps WindowSpec.WindowSize of latest frames with largest, contintued SeqID.

For every input, it calls WindowSpec.SeqFunc to get SeqID of a datum and route it to the frame if it's still in sliding window. Window assumes datum's SeqID are dense, rougly incremental overtime, or you will get too many datums dropped.

When window needed to be slided, Result() of old frames will be called in their seperate gorountines. In Window.Result(), all frames' Result() will be called in this maner as they all been slide away. Window doesn't care frame's Result() return.

func NewWindow

func NewWindow(spec WindowSpec) *Window

func (*Window) AllFrames

func (win *Window) AllFrames() map[SeqID]Saw

Gets the all frame the Window currently holds. returned frames are not locked, would be Emit()-ing or even Result()-ing when caller gets the return.

func (*Window) Emit

func (win *Window) Emit(datum Datum) error

func (*Window) LatestFrame

func (win *Window) LatestFrame() (seq SeqID, frame Saw)

Gets the latest frame or nil when there's no data yet. returned frame is not locked, would be Emit()-ing or even Result()-ing when caller gets the return.

func (*Window) Result

func (win *Window) Result(ctx context.Context) (result interface{}, err error)

Result finalize all frames it's currently managing, returns after all frames sent for finalize finishes, including previous ones caused by sliding.

type WindowFrameFactory

type WindowFrameFactory func(name string, seq SeqID) (Saw, error)

type WindowSpec

type WindowSpec struct {
	Name          string
	FrameFactory  WindowFrameFactory
	SeqFunc       DatumSeqFunc
	WindowSize    int
	MaxSeqAdvance int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL