geetplaban

package module
v0.0.0-...-8b48f03 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2016 License: Apache-2.0 Imports: 19 Imported by: 0

README

geetplaban

Geetplaban is a real time stream processing Engine written in Go.

This is a work in progress.

Documentation

Index

Constants

View Source
const (
	INVALID_VAL_INT    = -1
	INVALID_VAL_STR    = ""
	DEFAULT_LOG_PATH   = "<home>/log"
	DEFAULT_LOG_FILE   = "pipeline.log"
	DEFAULT_HOME       = "/tmp"
	DEFAULT_NUM_ACKER  = 8
	DEFAULT_LOG_BCK    = 10
	LOG_ROLLING_SZ     = 10485760
	MIN_LOG_ROLLING_SZ = 1048576
	MAX_LOG_ROLLING_SZ = 2147483648
	MAX_MAX_UNACKED    = 2000000
	MIN_MAX_UNACKED    = 20
	TIME_OUT           = 120000
	CLUSTER_NAME       = "geetplaban"
	STATS_INTERVAL_SEC = 60
	DEFAULT_LOG_LEVEL  = "info"
	MIN_TICK_MILI_SEC  = 1
	MAX_TICK_MILI_SEC  = 60000
)
View Source
const (
	MAX_LOG_BUFFER    = 100 * 1024
	WRITE_SIZE        = 8000
	MAX_LOG_INTERVAL  = 2 * time.Second
	MAX_ROLLOVER_SIZE = 2 * 1024 * 1024 * 1024
)
View Source
const (
	DEFAULT_NUM_SLOTS     = 8
	MAX_NUM_SLOTS         = 256
	MAX_NUM_SLOTS_MINUS_1 = 255
	MIN_INTERVAL          = 20
	MAX_INTERVAL          = 3600
)

Variables

View Source
var CONFIG_DIR string = ""
View Source
var HOME string = "PIPELINE_HOME"

Functions

func CreateExecutionTree

func CreateExecutionTree()

func GetFileSize

func GetFileSize(path string) int64

func GetFileSizeFile

func GetFileSizeFile(file *os.File) int64

func NewExpiryHeap

func NewExpiryHeap() *expiryHeap

func NewRandomUUIDStr

func NewRandomUUIDStr() string

func Run

func Run()

Types

type AckValue

type AckValue struct {
	// contains filtered or unexported fields
}

func NewAckValue

func NewAckValue(id uint64, value uint64) *AckValue

type Acker

type Acker interface {
	AddAck(id uint64, val uint64)
	AddTracking(ids string, trackerId uint) uint64
	AddTracker(id uint, tracker Tracker)
	SignalFail(id uint64)
	Start()
}

func GetAcker

func GetAcker() Acker

type AllStageStats

type AllStageStats struct {
	// contains filtered or unexported fields
}

func NewAllStageStats

func NewAllStageStats(num_slots uint, interval int64) *AllStageStats

type AllStages

type AllStages struct {
	// contains filtered or unexported fields
}
var All *AllStages

func (*AllStages) Print

func (All *AllStages) Print()

type AllStats

type AllStats struct {
	// contains filtered or unexported fields
}

func NewAllStats

func NewAllStats() *AllStats

func Stats

func Stats() *AllStats

func (*AllStats) Stop

func (asts *AllStats) Stop()

type Collector

type Collector interface {
	Emit(tuple map[string]interface{}, context interface{})
	Ack(context interface{})
	Fail(context interface{})
	AddOutput(outchan chan *OutPut)
	IsForDispatcher() bool
	// contains filtered or unexported methods
}

type Config

type Config struct {
	// contains filtered or unexported fields
}

func GetConfig

func GetConfig() *Config

func (*Config) GetIntVal

func (config *Config) GetIntVal(key string) (int, bool)

func (*Config) GetLogFile

func (config *Config) GetLogFile() string

func (*Config) GetLogLevel

func (config *Config) GetLogLevel() string

func (*Config) GetLogRollSize

func (config *Config) GetLogRollSize() int

func (*Config) GetMaxUnacked

func (config *Config) GetMaxUnacked() uint32

func (*Config) GetStrVal

func (config *Config) GetStrVal(key string) (string, bool)

func (*Config) GetTickMilli

func (config *Config) GetTickMilli() time.Duration

type DispPatcherCaller

type DispPatcherCaller struct {
	// contains filtered or unexported fields
}

func (*DispPatcherCaller) Ack

func (dcaller *DispPatcherCaller) Ack(id string)

func (*DispPatcherCaller) Fail

func (dcaller *DispPatcherCaller) Fail(id string)

func (*DispPatcherCaller) NewDispatcher

func (dcaller *DispPatcherCaller) NewDispatcher(d Dispatcher, coll Collector,
	acker Acker, nanodelay int64, id uint) *DispPatcherCaller

func (*DispPatcherCaller) Stop

func (dcaller *DispPatcherCaller) Stop()

func (*DispPatcherCaller) TimedOut

func (dcaller *DispPatcherCaller) TimedOut(id string)

type Dispatcher

type Dispatcher interface {
	LookForWork()
	Fail(string)
	Ack(string)
	TimedOut(string)
	Prepare(col Collector, tr *Tracker)
	Shutdown()
}

type DispatcherRegistry

type DispatcherRegistry struct {
	// contains filtered or unexported fields
}

func GetDispRegistry

func GetDispRegistry() *DispatcherRegistry

func (*DispatcherRegistry) AddType

func (reg *DispatcherRegistry) AddType(name string, disp Dispatcher)

func (*DispatcherRegistry) GetInstance

func (reg *DispatcherRegistry) GetInstance(name string) Dispatcher

type DispatcherStageInfo

type DispatcherStageInfo struct {
	// contains filtered or unexported fields
}

func NewDispatcherStageInfo

func NewDispatcherStageInfo(num_tasks int, dispatcher_class string, name string) *DispatcherStageInfo

func (*DispatcherStageInfo) AddGroupingOutStage

func (dsinfo *DispatcherStageInfo) AddGroupingOutStage(s *StageInfo, groupField []string)

func (*DispatcherStageInfo) AddOutStage

func (dsinfo *DispatcherStageInfo) AddOutStage(s *StageInfo)

func (*DispatcherStageInfo) CheckStage

func (sinfo *DispatcherStageInfo) CheckStage(s *StageInfo)

type Executor

type Executor interface {
	Execute(data map[string]interface{}, context interface{})
	AddCollector(Collector)
	AddIdentity(uint)
}

type ExecutorCaller

type ExecutorCaller struct {
	// contains filtered or unexported fields
}

type ExecutorRegistry

type ExecutorRegistry struct {
	// contains filtered or unexported fields
}

func GetRegistry

func GetRegistry() *ExecutorRegistry

func (*ExecutorRegistry) AddType

func (reg *ExecutorRegistry) AddType(name string, executor Executor)

func (*ExecutorRegistry) GetInstance

func (reg *ExecutorRegistry) GetInstance(name string) Executor

type GraphNode

type GraphNode struct {
	Name string
	// contains filtered or unexported fields
}

func (*GraphNode) AddInNode

func (node *GraphNode) AddInNode(in *GraphNode)

func (*GraphNode) AddOutNode

func (node *GraphNode) AddOutNode(out *GraphNode)

func (*GraphNode) DetectAnyCycle

func (thisnode *GraphNode) DetectAnyCycle(found_in_path map[*GraphNode]bool,
	already_checked map[*GraphNode]bool) bool

func (*GraphNode) String

func (node *GraphNode) String() string

type GraphNodePool

type GraphNodePool struct {
	// contains filtered or unexported fields
}

func GetNewGraphNodePool

func GetNewGraphNodePool() *GraphNodePool

func (*GraphNodePool) DetectAnyCycle

func (npl *GraphNodePool) DetectAnyCycle() bool

func (*GraphNodePool) Exists

func (npl *GraphNodePool) Exists(node *GraphNode) bool

func (*GraphNodePool) Get

func (npl *GraphNodePool) Get(name string) *GraphNode

type LocalAcker

type LocalAcker struct {
	// contains filtered or unexported fields
}

func NewLocalAcker

func NewLocalAcker(parallel uint, timeout int64) *LocalAcker

func (*LocalAcker) AddAck

func (acker *LocalAcker) AddAck(id uint64, val uint64)

func (*LocalAcker) AddTracker

func (acker *LocalAcker) AddTracker(id uint, tracker Tracker)

func (*LocalAcker) AddTracking

func (acker *LocalAcker) AddTracking(ids string, tracker_id uint) uint64

func (*LocalAcker) SignalFail

func (acker *LocalAcker) SignalFail(id uint64)

func (*LocalAcker) Start

func (acker *LocalAcker) Start()

type LogWriter

type LogWriter struct {
	// contains filtered or unexported fields
}

func NewLogWriter

func NewLogWriter(inp chan []byte) *LogWriter

func (*LogWriter) Write

func (mywriter *LogWriter) Write(p []byte) (n int, err error)

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

func NewLogger

func NewLogger() *Logger

func (*Logger) RollOver

func (logger *Logger) RollOver()

type NextIdGetter

type NextIdGetter struct {
	// contains filtered or unexported fields
}

func NewNextIdGettr

func NewNextIdGettr(initial_value uint32) *NextIdGetter

func (*NextIdGetter) NextId

func (next *NextIdGetter) NextId() uint

type OutPut

type OutPut struct {
	// contains filtered or unexported fields
}

type StageInfo

type StageInfo struct {
	// contains filtered or unexported fields
}

func NewStageInfo

func NewStageInfo(num_tasks int, executor_class string, name string) *StageInfo

func (*StageInfo) AddDispGroupingStage

func (sinfo *StageInfo) AddDispGroupingStage(disp *DispatcherStageInfo)

func (*StageInfo) AddGroupingStage

func (sinfo *StageInfo) AddGroupingStage(s *StageInfo, groupFields []string, isinput bool)

func (*StageInfo) AddStage

func (sinfo *StageInfo) AddStage(s *StageInfo, isinput bool)

func (*StageInfo) CheckStage

func (sinfo *StageInfo) CheckStage(s *StageInfo, isinput bool)

type StageStats

type StageStats struct {
	// contains filtered or unexported fields
}

func NewStageStats

func NewStageStats(stage uint) *StageStats

func (*StageStats) String

func (ss *StageStats) String() string

type Tracker

type Tracker interface {
	Fail(id string)
	Ack(id string)
	TimedOut(id string)
}

type TupleCollector

type TupleCollector struct {
	// contains filtered or unexported fields
}

func NewTupleCollector

func NewTupleCollector(stage string, id uint, isdisp uint) *TupleCollector

func (*TupleCollector) Ack

func (tc *TupleCollector) Ack(context interface{})

func (*TupleCollector) AddOutput

func (tc *TupleCollector) AddOutput(chn chan *OutPut)

func (*TupleCollector) Copy

func (tc *TupleCollector) Copy() *TupleCollector

func (*TupleCollector) Emit

func (tc *TupleCollector) Emit(tuple map[string]interface{}, context interface{})

func (*TupleCollector) Fail

func (tc *TupleCollector) Fail(context interface{})

func (*TupleCollector) IsForDispatcher

func (tc *TupleCollector) IsForDispatcher() bool

type UUID

type UUID struct {
	// contains filtered or unexported fields
}

func NewRandomUUID

func NewRandomUUID() *UUID

func NewUUID

func NewUUID(inp []byte) *UUID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL