stream

package
v0.0.0-...-3d178fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2016 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Accumulator

type Accumulator struct {
	// contains filtered or unexported fields
}

Accumulator used to calculate statistics for a period of time by processing provided values

func NewAccumulator

func NewAccumulator(streamKey string, intervalStart int64, intervalEnd int64, intervalType IntervalType, targetSampleCount uint32) *Accumulator

NewAccumulator creates an accumulator

func (*Accumulator) Accumulate

func (accumulator *Accumulator) Accumulate(input chan OrdinalValue, output chan IntervalStatistics, done chan bool)

Accumulate values from a channel

func (*Accumulator) Finalise

func (accumulator *Accumulator) Finalise() IntervalStatistics

Finalise calculates statistics from the accumulator and prevents any further accumulation

func (*Accumulator) Include

func (accumulator *Accumulator) Include(ordinalValue OrdinalValue)

Include a new value within the accumulation

type CachedStatisticsRepository

type CachedStatisticsRepository struct {
	// contains filtered or unexported fields
}

CachedStatisticsRepository provides access to generated interval statistics

func NewCachedStatisticsRepository

func NewCachedStatisticsRepository() CachedStatisticsRepository

NewCachedStatisticsRepository creates an empty statistics repository

func (*CachedStatisticsRepository) EmitIntervalStatisticsForKeys

func (cachedStatisticsRepository *CachedStatisticsRepository) EmitIntervalStatisticsForKeys(keys []string, fromOrdinal int64, untilOrdinal int64, output chan IntervalStatistics) error

EmitIntervalStatisticsForKeys outputs matching interval statistics to a provided channel

func (*CachedStatisticsRepository) RegisterCache

func (cachedStatisticsRepository *CachedStatisticsRepository) RegisterCache(key string, cache *IntervalStatisticsCache) error

RegisterCache includes a new key and cache within the repository

func (*CachedStatisticsRepository) UnregisterCache

func (cachedStatisticsRepository *CachedStatisticsRepository) UnregisterCache(key string) error

UnregisterCache removes a cache and key from the repository

type ClearParams

type ClearParams struct {
	// Identifies the stream
	Stream string `json:"stream,omitempty" xml:"stream,omitempty"`
	// If true, all data for the stream(s) will be cleared
	ClearAll bool `json:"clearAll,omitempty" xml:"clearAll,omitempty"`
	// An array of tags to be matched
	WithTags []string `json:"tagsToAssign,omitempty" xml:"tagsToAssign,omitempty"`
	// An array of tags that exclude a stream from a match
	ExcludingTags []string `json:"tagsToUnassign,omitempty" xml:"tagsToUnassign,omitempty"`
	// Specifies a maximum ordinal value used to restrict the clear operation.
	MaxOrdinal int `json:"maxOrdinal,omitempty" xml:"maxOrdinal,omitempty"`
}

ClearParams encapsulates the information required for the clear action

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller is used to dispatch all operations related to stream processing

func NewController

func NewController(contextKey string) *Controller

NewController creates a controller used to manage streams

func (*Controller) ClearStatisticsCache

func (controller *Controller) ClearStatisticsCache(params *ClearParams) (err error)

ClearStatisticsCache removes existing statistics data from cache

func (*Controller) GetStatistics

func (controller *Controller) GetStatistics(params *StatisticsParams) (err error)

GetStatistics returns statistics matching search parameters

func (*Controller) ModifyTags

func (controller *Controller) ModifyTags(params *TagParams) (err error)

ModifyTags modifies the tags associated with a stream

func (*Controller) Push

func (controller *Controller) Push(params *PushParams) (err error)

Push new values onto a stream

func (*Controller) Register

func (controller *Controller) Register(params *RegisterParams) (err error)

Register a stream within the controller

func (*Controller) Unregister

func (controller *Controller) Unregister(params *UnregisterParams) (err error)

Unregister streams from the controller

type IntervalRouter

type IntervalRouter struct {
	// contains filtered or unexported fields
}

IntervalRouter sends stream data to an accumulator based matching the ordinal value to an interval

func NewIntervalRouter

func NewIntervalRouter(key string,
	intervalSize int64,
	intervalType IntervalType,
	maxIntervalLag uint32,
	targetSampleCount uint32) *IntervalRouter

NewIntervalRouter creates a new router used to assign values to intervals

func (*IntervalRouter) Accumulate

func (intervalRouter *IntervalRouter) Accumulate(ordinalValue OrdinalValue, output chan IntervalStatistics)

Accumulate directs a value to the appropriate accumulator for an ordinal value

func (*IntervalRouter) AccumulateFromChannel

func (intervalRouter *IntervalRouter) AccumulateFromChannel(input chan OrdinalValue, output chan IntervalStatistics, done chan bool)

AccumulateFromChannel directs a value to the appropriate accumulator for an ordinal value

func (*IntervalRouter) FinaliseAll

func (intervalRouter *IntervalRouter) FinaliseAll()

FinaliseAll causes all accumulators to be finalised

func (*IntervalRouter) FinalisePriorTo

func (intervalRouter *IntervalRouter) FinalisePriorTo(ordinal int64)

FinalisePriorTo causes all accumulators for intervals prior to that related to the specified ordinal and removes them from the intervalRouter

func (*IntervalRouter) FinalisePriorToTime

func (intervalRouter *IntervalRouter) FinalisePriorToTime(t time.Time)

FinalisePriorToTime causes all accumulators for intervals prior to that related to the specified time, and removes them from the intervalRouter

type IntervalStatistics

type IntervalStatistics struct {
	StreamKey               string
	IntervalStart           int64
	IntervalEnd             int64
	IntervalType            IntervalType
	Minimum                 float64
	Maximum                 float64
	Mean                    float64
	Count                   uint64
	Sum                     float64
	SampleMean              float64
	SampleSum               float64
	SampleCount             uint32
	SampleStandardDeviation float64
	CoefficientOfVariation  float64
}

IntervalStatistics is a set of statistics generated based on processed events within a period of time or over a range or ordinal positions

type IntervalStatisticsCache

type IntervalStatisticsCache struct {
	// contains filtered or unexported fields
}

IntervalStatisticsCache stores a limited set of interval statistics for a stream

func NewIntervalStatisticsCache

func NewIntervalStatisticsCache(streamKey string, size uint32) *IntervalStatisticsCache

NewIntervalStatisticsCache creates a new cache for interval statistics

func (*IntervalStatisticsCache) EmitIntervalStatistics

func (intervalStatisticsCache *IntervalStatisticsCache) EmitIntervalStatistics(fromOrdinal int64, untilOrdinal int64, output chan IntervalStatistics) error

EmitIntervalStatistics outputs matching interval statistics to a provided channel

func (*IntervalStatisticsCache) GetFromOrdinal

func (intervalStatisticsCache *IntervalStatisticsCache) GetFromOrdinal(fromOrdinal int64) []IntervalStatistics

GetFromOrdinal returns the cached statistics that have an ordinal value equal or greater than the value provided

func (*IntervalStatisticsCache) GetLast

func (intervalStatisticsCache *IntervalStatisticsCache) GetLast(maxCount int) []IntervalStatistics

GetLast returns the most recent statistics from the cache up to the specified maxCount

func (*IntervalStatisticsCache) GetOrdinalRange

func (intervalStatisticsCache *IntervalStatisticsCache) GetOrdinalRange(fromOrdinal int64, untilOrdinal int64) []IntervalStatistics

GetOrdinalRange returns the cached statistics that exist between a range of ordinal values

func (*IntervalStatisticsCache) ProcessAndForward

func (intervalStatisticsCache *IntervalStatisticsCache) ProcessAndForward(input chan IntervalStatistics, output chan IntervalStatistics)

ProcessAndForward stores values from an input channel and then forwards values to an output channel. Only the most recent set of results are stored

type IntervalType

type IntervalType int

IntervalType is a specifier used to differentiate between Ordinal and Time intervals

const (
	//OrdinalInterval indicates that intervals are determined based on ordinal position
	OrdinalInterval IntervalType = iota
	//TimeInterval indicates that intervals are determined based on time
	TimeInterval
)

type OrdinalValue

type OrdinalValue struct {
	StreamKey string
	Ordinal   int64
	Value     float64
}

OrdinalValue represents a value with an ordinal position within a stream

func NewOrdinalValue

func NewOrdinalValue(streamKey string, ordinal int64, value float64) OrdinalValue

NewOrdinalValue creates a new ordinal value

func NewOrdinalValueForTime

func NewOrdinalValueForTime(streamKey string, t time.Time, value float64) OrdinalValue

NewOrdinalValueForTime creates a new ordinal value for a time

type PushParams

type PushParams struct {
	// The ordinal position within the stream
	Ordinal int `json:"ordinal,omitempty" xml:"ordinal,omitempty"`
	// Identifies the stream that the ordinal value relates to
	Stream string `json:"stream,omitempty" xml:"stream,omitempty"`
	// The value at the ordinal position
	Value float64 `json:"value,omitempty" xml:"value,omitempty"`
}

PushParams encapsulates the information required for the push action

type RegisterParams

type RegisterParams struct {
	// Identifies the stream that the definition relates to
	Stream string `json:"stream,omitempty" xml:"stream,omitempty"`
	// The ordinal position within the stream
	IntervalSize int `json:"intervalSize,omitempty" xml:"intervalSize,omitempty"`
	// The value at the ordinal position
	MaxIntervalLag int `json:"maxIntervalLag,omitempty" xml:"maxIntervalLag,omitempty"`
	// A set of tag values to be assigned to the stream
	Tags []string `json:"tags,omitempty" xml:"tags,omitempty"`
	// The value at the ordinal position
	TargetSampleSize int `json:"targetSampleSize,omitempty" xml:"targetSampleSize,omitempty"`
}

RegisterParams encapsulates the information required for the register action

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router sends data to an appropriate channel based on key

func NewRouter

func NewRouter() *Router

NewRouter creates a router for an input channel and begins reading immediately

func (*Router) Register

func (router *Router) Register(streamKey string, c chan OrdinalValue)

Register the channel for a stream based on key

func (*Router) Route

func (router *Router) Route(input chan OrdinalValue, unassigned chan OrdinalValue)

Route values from an input channel

func (*Router) StopAndUnregister

func (router *Router) StopAndUnregister(streamKey string)

StopAndUnregister removes a stream key from the router and closes the stream channel

type StatisticsParams

type StatisticsParams struct {
	// Specifies a maximum date time used to restrict the interval statistics returned.  Only statistics for intervals that are for a time range up until this date time value will be returned.
	MaxDateTime time.Time `json:"maxDateTime,omitempty" xml:"maxDateTime,omitempty"`
	// Specifies a maximum ordinal value used to restrict the interval statistics returned.  Only statistics for intervals that end on or before this ordinal value will be returned.
	MaxOrdinal int `json:"maxOrdinal,omitempty" xml:"maxOrdinal,omitempty"`
	// If true, results across multiple intervals will be merged together to produce a summary result.
	MergeIntervals bool `json:"mergeIntervals,omitempty" xml:"mergeIntervals,omitempty"`
	// If true, results from multiple streams will be merged together to produce a summary result.
	MergeStreams bool `json:"mergeStreams,omitempty" xml:"mergeStreams,omitempty"`
	// Specifies a minimum date time used to restrict the interval statistics returned.  Only statistics for intervals that are for a time range on or after this date time value will be returned.
	MinDateTime time.Time `json:"minDateTime,omitempty" xml:"minDateTime,omitempty"`
	// Specifies a minimum ordinal value used to restrict the interval statistics returned.  Only statistics for intervals that begin on or after this ordinal value will be returned.
	MinOrdinal int `json:"minOrdinal,omitempty" xml:"minOrdinal,omitempty"`
	// Specifies the criteria by which streams are to be matched
	StreamMatchSearchParams streamMatchSearchParams `json:"streamMatchCriteria,omitempty" xml:"streamMatchCriteria,omitempty"`
}

StatisticsParams encapsulates the information required for the GetStatistics action

type TagParams

type TagParams struct {
	// If true, previously assigned tags will be cleared
	ClearAll bool `json:"clearAll,omitempty" xml:"clearAll,omitempty"`
	// Identifies the stream that the definition relates to
	Stream string `json:"stream,omitempty" xml:"stream,omitempty"`
	// An array of tags to be assigned
	TagsToAssign []string `json:"tagsToAssign,omitempty" xml:"tagsToAssign,omitempty"`
	// An array of tags to be unassigned
	TagsToUnassign []string `json:"tagsToUnassign,omitempty" xml:"tagsToUnassign,omitempty"`
}

TagParams encapsulates the information required for the ModifyTags action

type TagRepository

type TagRepository struct {
	// contains filtered or unexported fields
}

TagRepository manages sets of tags by key

func NewTagRepository

func NewTagRepository() TagRepository

NewTagRepository creates an empty tag repository

func (*TagRepository) ApplyTags

func (tagRepository *TagRepository) ApplyTags(key string, tags []string, removeExisting bool)

ApplyTags applies a set of tags to a key

func (*TagRepository) Clear

func (tagRepository *TagRepository) Clear()

Clear removes all tag information

func (*TagRepository) ClearForKey

func (tagRepository *TagRepository) ClearForKey(key string)

ClearForKey removes all tag information for akey

func (*TagRepository) GetMatchingKeys

func (tagRepository *TagRepository) GetMatchingKeys(withTags []string, excludingTags []string) []string

GetMatchingKeys returns all keys that have all tags within the withTags slice and no tags within the excludingTags slice

func (*TagRepository) GetTagsForKey

func (tagRepository *TagRepository) GetTagsForKey(key string) []string

GetTagsForKey returns a set of tags for a key

func (*TagRepository) RemoveTags

func (tagRepository *TagRepository) RemoveTags(key string, tags []string)

RemoveTags removes a set of tags from a key

type UnregisterParams

type UnregisterParams struct {
	// Identifies the stream
	Stream string `json:"stream,omitempty" xml:"stream,omitempty"`
	// An array of tags to be matched
	WithTags []string `json:"tagsToAssign,omitempty" xml:"tagsToAssign,omitempty"`
	// An array of tags that exclude a stream from a match
	ExcludingTags []string `json:"tagsToUnassign,omitempty" xml:"tagsToUnassign,omitempty"`
}

UnregisterParams encapsulates the information required for the unregister action

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL