blocks

package
v0.1.1-0...-48b70db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2014 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CREATE_OUT_CHAN = iota
	DELETE_OUT_CHAN = iota
)

Variables

View Source
var (
	Library     BlockLibrary
	LibraryBlob string
)

Functions

func Blocked

func Blocked(b *Block)

func BuildLibrary

func BuildLibrary()

func Connection

func Connection(b *Block)

Connection accepts the input from a block and outputs it to another block. This block is a special case in that it requires an input and an output block to be created.

func Count

func Count(b *Block)

Count uses a priority queue to count the number of messages that have been sent to the count block over a duration of time in seconds.

Note that this is an exact count and therefore has O(N) memory requirements.

func Date

func Date(b *Block)

func Filter

func Filter(b *Block)

func FromHTTPStream

func FromHTTPStream(b *Block)

creates a persistent HTTP connection, emitting all messages from the stream into streamtools

func FromNSQ

func FromNSQ(b *Block)

connects to an NSQ topic and emits each message into streamtools.

func FromPost

func FromPost(b *Block)

accepts JSON through POSTs to the /in endpoint and broadcasts to other blocks.

func FromSQS

func FromSQS(b *Block)

hooks into an Amazon SQS, and emits every message it sees into streamtools

func GenRandom

func GenRandom(b *Block)

emits a JSON blob full of random stuff. Set the Interval using a rule.

func GenTicker

func GenTicker(b *Block)

emits the time. Specify the period - the time between emissions - in seconds as a rule.

func Get

func Get(msg interface{}, branch ...string) (interface{}, error)

func GetHTTP

func GetHTTP(b *Block)

Get, on any inbound message, GETs an external JSON and emits it

func GetS3

func GetS3(b *Block)

Gets the key specified in the inbound message. Specify the bucket using a rule.

func GroupHistogram

func GroupHistogram(b *Block)

GroupHistogram is a group of histograms, where each histogram is indexed by one field in the incoming message, and the histogram captures a distrbuition over another field.

func Histogram

func Histogram(b *Block)

creates a histogram of a specified key

func Learn

func Learn(b *Block)

func LinearModel

func LinearModel(b *Block)

func ListS3

func ListS3(b *Block)

lists an S3 bucket, within a specified time inteval, starting with a specified prefix.

func Map

func Map(b *Block)

func Mask

func Mask(b *Block)

Mask modifies a JSON stream with an additive key filter. Mask uses the JSON object recieved through the rule channel to determine which keys should be included in the resulting object. An empty JSON object ({}) is used as the notation to include all values for a key.

For instance, if the JSON rule is:

{"a":{}, "b":{"d":{}},"x":{}}

And an incoming message looks like:

{"a":24, "b":{"c":"test", "d":[1,3,4]}, "f":5, "x":{"y":5, "z":10}}

The resulting object after the application of Mask would be:

{"a":24, "b":{"d":[1,3,4]}, "x":{"y":5, "z":10}}

func Mean

func Mean(b *Block)

Mean() is an online mean The mean for a stream of data is updated 1 data point at a time. Formula: mu_i+1 = mu_i * (n - 1) /n + (1/n) * x_i

func MovingAverage

func MovingAverage(b *Block)

func Pack

func Pack(b *Block)

Pack groups messages together by testing equality of keys, and emits the group of messages after no new messages have been added to the group for a specified amount of time.

func PostHTTP

func PostHTTP(b *Block)

POSTs an input message to an HTTP endpoint and emits the response

func Sd

func Sd(b *Block)

Sd calculates standard deviation in an online fashion using Welford's Algorithm. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf

func Set

func Set(m interface{}, key string, val interface{}) error

func SkeletonState

func SkeletonState(b *Block)

this is a skeleton state block. It doesn't do anything, but can be used as a template to make new state blocks.

func SkeletonTransfer

func SkeletonTransfer(b *Block)

func Sync

func Sync(b *Block)

The Sync block delays a message by a specified lag relative to a timestamp internal to the message. Specify which key holds the timestamp, and how long the lag should be, using a rule.

func Ticker

func Ticker(b *Block)

emits the time. Specify the period - the time between emissions - in seconds as a rule.

func Timeseries

func Timeseries(b *Block)

stores a specified key from the last `NumSamples` messages

func ToBeanstalkd

func ToBeanstalkd(b *Block)

func ToElasticsearch

func ToElasticsearch(b *Block)

Posts a message to a specified Elasticsearch index with the given type.

func ToFile

func ToFile(b *Block)

Writes messages to a specified file, one line per message

func ToLog

func ToLog(b *Block)

ToLog prints recieved messages to the stream tools logger.

func ToNSQ

func ToNSQ(b *Block)

writes messages to an NSQ topic

func ToWebsocket

func ToWebsocket(b *Block)

ToWebsocket stands up and writes to a websocket

func Unpack

func Unpack(b *Block)

unpack an array into seperate messages, and emit them in order

func Var

func Var(b *Block)

Var calculates variance in an online fashion using Welford's Algorithm. The Var() block is the Sd() block with the squared correction. Ref: http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.302.7503&rep=rep1&type=pdf

Types

type BMsg

type BMsg struct {
	Msg          interface{}
	ResponseChan chan interface{}
}

type Block

type Block struct {
	BlockType string
	ID        string
	InChan    chan *BMsg
	OutChans  map[string]chan *BMsg
	Routes    map[string]chan *BMsg
	AddChan   chan *OutChanMsg
	InBlocks  map[string]string // contains the route, if specified
	OutBlocks map[string]string // contains the route, if specified
	QuitChan  chan bool
}

func NewBlock

func NewBlock(name string, ID string) (*Block, error)

type BlockLibrary

type BlockLibrary map[string]*BlockTemplate

A block library is a collection of possible block templates

type BlockRoutine

type BlockRoutine func(*Block)

type BlockTemplate

type BlockTemplate struct {
	BlockType  string
	RouteNames []string
	// BlockRoutine is the central processing routine for a block. All the work gets done in here
	Routine BlockRoutine
}

Block is the basic interface for processing units in streamtools

type Message

type Message struct {
	// this is a list in case I'm ever brave enough to up the "MaxNumberOfMessages" away from 1
	Body          []string `xml:"ReceiveMessageResult>Message>Body"`
	ReceiptHandle []string `xml:"ReceiveMessageResult>Message>ReceiptHandle"`
}

type OutChanMsg

type OutChanMsg struct {
	// type of action to perform
	Action int
	// new channel to introduce to a block's outChan array
	OutChan chan *BMsg
	// ID of the connection block
	ID string
}

type PQMessage

type PQMessage struct {
	// contains filtered or unexported fields
}

type PriorityQueue

type PriorityQueue []*PQMessage

A PriorityQueue implements heap.Interface and holds Items.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) PeekAndShift

func (pq *PriorityQueue) PeekAndShift(max time.Time, lag time.Duration) (interface{}, time.Duration)

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL