Documentation ¶
Index ¶
- func Download(fileUrl string, dstDir string) (bool, error)
- func DownloadFiles(fileUrls []string, dstDir string, sleepMilliSec time.Duration) (int, error)
- func GetFileNameFromUrl(fileUrl string) (string, error)
- func IsExist(filename string) bool
- func RegisterProcessorsFromFlow(dispatcher *Dispatcher, flow *Flow, factory ProcessorFactory) error
- type BaseArgs
- type BaseEvent
- func (b *BaseEvent) Copy() Event
- func (b *BaseEvent) CopyRoutes() []string
- func (b *BaseEvent) GetId() uint64
- func (b *BaseEvent) GetLabels() *Labels
- func (b *BaseEvent) GetPayload() EventPayload
- func (b *BaseEvent) GetRoutes() []string
- func (b *BaseEvent) GetSourceId() uint64
- func (b *BaseEvent) PushRoute(route string)
- func (b *BaseEvent) SetId(id uint64)
- func (b *BaseEvent) SetLabels(labels *Labels)
- func (b *BaseEvent) SetPayload(payload EventPayload)
- func (b *BaseEvent) SetRoutes(routes []string)
- func (b *BaseEvent) SetSourceId(id uint64)
- type BaseProcessor
- type CustomLogic
- type CustomLogicSetting
- type CustomProcessor
- type CustomProcessorArgs
- type CustomProcessorGenerator
- type DelayProcessor
- type DelayProcessorArgs
- type DelayProcessorGenerator
- type Dispatcher
- type DistinctLogic
- type Downloader
- type DownloaderArgs
- type DownloaderGenerator
- type Event
- type EventPayload
- type FilterByJSLogic
- type Flow
- type FlowSetting
- type Job
- type Label
- type Labels
- type LogicType
- type Pipeline
- type Processor
- type ProcessorFactory
- type ProcessorGenerator
- type ProcessorSetting
- type SelectPayloadLogic
- type TransformByJSLogic
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DownloadFiles ¶
func GetFileNameFromUrl ¶
func RegisterProcessorsFromFlow ¶
func RegisterProcessorsFromFlow(dispatcher *Dispatcher, flow *Flow, factory ProcessorFactory) error
Types ¶
type BaseEvent ¶
type BaseEvent struct {
// contains filtered or unexported fields
}
func NewBaseEvent ¶
func (*BaseEvent) CopyRoutes ¶
func (*BaseEvent) GetPayload ¶
func (b *BaseEvent) GetPayload() EventPayload
func (*BaseEvent) GetSourceId ¶
func (*BaseEvent) SetPayload ¶
func (b *BaseEvent) SetPayload(payload EventPayload)
func (*BaseEvent) SetSourceId ¶
type BaseProcessor ¶
type BaseProcessor struct { Name string Type string Process func(resource Event) ([]Event, error) // contains filtered or unexported fields }
func NewBaseProcessor ¶
func NewBaseProcessor(queueSize int) *BaseProcessor
func (*BaseProcessor) Enqueue ¶
func (b *BaseProcessor) Enqueue(resource Event)
func (*BaseProcessor) GetName ¶
func (b *BaseProcessor) GetName() string
func (*BaseProcessor) GetType ¶
func (b *BaseProcessor) GetType() string
func (*BaseProcessor) SetName ¶
func (b *BaseProcessor) SetName(name string)
func (*BaseProcessor) SetOutChannel ¶
func (b *BaseProcessor) SetOutChannel(outChan chan Event)
func (*BaseProcessor) Start ¶
func (b *BaseProcessor) Start()
type CustomLogic ¶
type CustomLogic interface {
Run(payload EventPayload) (EventPayload, error)
}
type CustomLogicSetting ¶
type CustomProcessor ¶
type CustomProcessor struct { *BaseProcessor // contains filtered or unexported fields }
func NewCustomProcessor ¶
func NewCustomProcessor(queueSize int, logics []CustomLogic) *CustomProcessor
type CustomProcessorArgs ¶
type CustomProcessorArgs struct {
Logics []CustomLogicSetting
}
type CustomProcessorGenerator ¶
type CustomProcessorGenerator struct{}
func (*CustomProcessorGenerator) Generate ¶
func (g *CustomProcessorGenerator) Generate(intfArgs interface{}) (Processor, error)
func (*CustomProcessorGenerator) GetType ¶
func (g *CustomProcessorGenerator) GetType() string
type DelayProcessor ¶
type DelayProcessor struct { *BaseProcessor // contains filtered or unexported fields }
func NewDelayProcessor ¶
func NewDelayProcessor(args *DelayProcessorArgs) *DelayProcessor
type DelayProcessorArgs ¶
type DelayProcessorGenerator ¶
type DelayProcessorGenerator struct{}
func (*DelayProcessorGenerator) Generate ¶
func (g *DelayProcessorGenerator) Generate(intfArgs interface{}) (Processor, error)
func (*DelayProcessorGenerator) GetType ¶
func (g *DelayProcessorGenerator) GetType() string
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
func NewDispatcher ¶
func NewDispatcher(queueSize int) *Dispatcher
func (*Dispatcher) AddResource ¶
func (d *Dispatcher) AddResource(event Event)
func (*Dispatcher) GetProcessor ¶
func (d *Dispatcher) GetProcessor(name string) (*task, bool)
func (*Dispatcher) RegisterTask ¶
func (d *Dispatcher) RegisterTask(name string, consumeLabels, produceLabels []Label, processor Processor) uint
func (*Dispatcher) Start ¶
func (d *Dispatcher) Start()
func (*Dispatcher) StartProcessors ¶
func (d *Dispatcher) StartProcessors()
type DistinctLogic ¶
type DistinctLogic struct {
// contains filtered or unexported fields
}
func NewDistinctLogic ¶
func NewDistinctLogic(keys []string) *DistinctLogic
func (*DistinctLogic) Run ¶
func (t *DistinctLogic) Run(payload EventPayload) (EventPayload, error)
type Downloader ¶
type Downloader struct { *BaseProcessor // contains filtered or unexported fields }
func NewDownloader ¶
func NewDownloader(args *DownloaderArgs) *Downloader
func (*Downloader) DownloadFromResource ¶
func (p *Downloader) DownloadFromResource(event Event) ([]Event, error)
type DownloaderArgs ¶
type DownloaderGenerator ¶
type DownloaderGenerator struct{}
func (*DownloaderGenerator) Generate ¶
func (g *DownloaderGenerator) Generate(intfArgs interface{}) (Processor, error)
func (*DownloaderGenerator) GetType ¶
func (g *DownloaderGenerator) GetType() string
type Event ¶
type Event interface { GetId() uint64 SetId(uint64) GetSourceId() uint64 SetSourceId(uint64) PushRoute(route string) //GetRoutes() []string SetRoutes([]string) CopyRoutes() []string //PopLabel() Label //PushLabel(label Label) //PushLabels(labels Labels) //SetLabels(labels Labels) //CopyLabels() Labels //GetProduceLabels() Labels //PushProduceLabels() //GetLatestLabel() Label GetLabels() *Labels SetLabels(labels *Labels) SetPayload(payload EventPayload) GetPayload() EventPayload Copy() Event }
type EventPayload ¶
type EventPayload map[string]interface{}
func (EventPayload) HasEveryPayloadKeys ¶
func (e EventPayload) HasEveryPayloadKeys(keys []string) bool
func (EventPayload) HasKey ¶
func (e EventPayload) HasKey(key string) bool
func (EventPayload) HasSomePayloadKeys ¶
func (e EventPayload) HasSomePayloadKeys(keys []string) bool
type FilterByJSLogic ¶
type FilterByJSLogic struct {
// contains filtered or unexported fields
}
func NewFilterByJSLogic ¶
func NewFilterByJSLogic(commands []string) *FilterByJSLogic
func (*FilterByJSLogic) Run ¶
func (t *FilterByJSLogic) Run(payload EventPayload) (EventPayload, error)
type FlowSetting ¶
type Job ¶
type Job struct { ProcessorSetting `yaml:",inline"` Consume []Label Produce []Label }
type ProcessorFactory ¶
type ProcessorFactory struct {
// contains filtered or unexported fields
}
func (*ProcessorFactory) AddGenerator ¶
func (pf *ProcessorFactory) AddGenerator(generator ProcessorGenerator)
func (*ProcessorFactory) Create ¶
func (pf *ProcessorFactory) Create(setting FlowSetting) (Processor, error)
type ProcessorGenerator ¶
type ProcessorSetting ¶
type ProcessorSetting struct { ProcessorType string `yaml:"processor"` Name string Args interface{} }
func (*ProcessorSetting) GetArgs ¶
func (p *ProcessorSetting) GetArgs() interface{}
func (*ProcessorSetting) GetName ¶
func (p *ProcessorSetting) GetName() string
func (*ProcessorSetting) GetProcessorType ¶
func (p *ProcessorSetting) GetProcessorType() string
type SelectPayloadLogic ¶
type SelectPayloadLogic struct {
// contains filtered or unexported fields
}
func NewSelectPayloadLogic ¶
func NewSelectPayloadLogic(keys []string) *SelectPayloadLogic
func (*SelectPayloadLogic) Run ¶
func (t *SelectPayloadLogic) Run(payload EventPayload) (EventPayload, error)
type TransformByJSLogic ¶
type TransformByJSLogic struct {
// contains filtered or unexported fields
}
func NewTransformByJSLogic ¶
func NewTransformByJSLogic(commands []string) *TransformByJSLogic
func (*TransformByJSLogic) Run ¶
func (t *TransformByJSLogic) Run(payload EventPayload) (EventPayload, error)
Click to show internal directories.
Click to hide internal directories.