kniv

package
v0.0.0-...-37e3ad1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2018 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Download

func Download(fileUrl string, dstDir string) (bool, error)

func DownloadFiles

func DownloadFiles(fileUrls []string, dstDir string, sleepMilliSec time.Duration) (int, error)

func GetFileNameFromUrl

func GetFileNameFromUrl(fileUrl string) (string, error)

func IsExist

func IsExist(filename string) bool

func RegisterProcessorsFromFlow

func RegisterProcessorsFromFlow(dispatcher *Dispatcher, flow *Flow, factory ProcessorFactory) error

Types

type BaseArgs

type BaseArgs struct {
	QueueSize int
}

type BaseEvent

type BaseEvent struct {
	// contains filtered or unexported fields
}

func NewBaseEvent

func NewBaseEvent(labelsCapacity, routesCapacity int) *BaseEvent

func (*BaseEvent) Copy

func (b *BaseEvent) Copy() Event

func (*BaseEvent) CopyRoutes

func (b *BaseEvent) CopyRoutes() []string

func (*BaseEvent) GetId

func (b *BaseEvent) GetId() uint64

func (*BaseEvent) GetLabels

func (b *BaseEvent) GetLabels() *Labels

func (*BaseEvent) GetPayload

func (b *BaseEvent) GetPayload() EventPayload

func (*BaseEvent) GetRoutes

func (b *BaseEvent) GetRoutes() []string

func (*BaseEvent) GetSourceId

func (b *BaseEvent) GetSourceId() uint64

func (*BaseEvent) PushRoute

func (b *BaseEvent) PushRoute(route string)

func (*BaseEvent) SetId

func (b *BaseEvent) SetId(id uint64)

func (*BaseEvent) SetLabels

func (b *BaseEvent) SetLabels(labels *Labels)

func (*BaseEvent) SetPayload

func (b *BaseEvent) SetPayload(payload EventPayload)

func (*BaseEvent) SetRoutes

func (b *BaseEvent) SetRoutes(routes []string)

func (*BaseEvent) SetSourceId

func (b *BaseEvent) SetSourceId(id uint64)

type BaseProcessor

type BaseProcessor struct {
	Name string
	Type string

	Process func(resource Event) ([]Event, error)
	// contains filtered or unexported fields
}

func NewBaseProcessor

func NewBaseProcessor(queueSize int) *BaseProcessor

func (*BaseProcessor) Enqueue

func (b *BaseProcessor) Enqueue(resource Event)

func (*BaseProcessor) GetName

func (b *BaseProcessor) GetName() string

func (*BaseProcessor) GetType

func (b *BaseProcessor) GetType() string

func (*BaseProcessor) SetName

func (b *BaseProcessor) SetName(name string)

func (*BaseProcessor) SetOutChannel

func (b *BaseProcessor) SetOutChannel(outChan chan Event)

func (*BaseProcessor) Start

func (b *BaseProcessor) Start()

type CustomLogic

type CustomLogic interface {
	Run(payload EventPayload) (EventPayload, error)
}

type CustomLogicSetting

type CustomLogicSetting struct {
	Type     logicType
	Commands []string
	Keys     []string
}

type CustomProcessor

type CustomProcessor struct {
	*BaseProcessor
	// contains filtered or unexported fields
}

func NewCustomProcessor

func NewCustomProcessor(queueSize int, logics []CustomLogic) *CustomProcessor

func (*CustomProcessor) Process

func (p *CustomProcessor) Process(event Event) ([]Event, error)

type CustomProcessorArgs

type CustomProcessorArgs struct {
	Logics []CustomLogicSetting
}

type CustomProcessorGenerator

type CustomProcessorGenerator struct{}

func (*CustomProcessorGenerator) Generate

func (g *CustomProcessorGenerator) Generate(intfArgs interface{}) (Processor, error)

func (*CustomProcessorGenerator) GetType

func (g *CustomProcessorGenerator) GetType() string

type DelayProcessor

type DelayProcessor struct {
	*BaseProcessor
	// contains filtered or unexported fields
}

func NewDelayProcessor

func NewDelayProcessor(args *DelayProcessorArgs) *DelayProcessor

type DelayProcessorArgs

type DelayProcessorArgs struct {
	BaseArgs         `mapstructure:",squash"`
	IntervalMilliSec time.Duration
	Group            []string
}

type DelayProcessorGenerator

type DelayProcessorGenerator struct{}

func (*DelayProcessorGenerator) Generate

func (g *DelayProcessorGenerator) Generate(intfArgs interface{}) (Processor, error)

func (*DelayProcessorGenerator) GetType

func (g *DelayProcessorGenerator) GetType() string

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(queueSize int) *Dispatcher

func (*Dispatcher) AddResource

func (d *Dispatcher) AddResource(event Event)

func (*Dispatcher) GetProcessor

func (d *Dispatcher) GetProcessor(name string) (*task, bool)

func (*Dispatcher) RegisterTask

func (d *Dispatcher) RegisterTask(name string, consumeLabels, produceLabels []Label, processor Processor) uint

func (*Dispatcher) Start

func (d *Dispatcher) Start()

func (*Dispatcher) StartProcessors

func (d *Dispatcher) StartProcessors()

type DistinctLogic

type DistinctLogic struct {
	// contains filtered or unexported fields
}

func NewDistinctLogic

func NewDistinctLogic(keys []string) *DistinctLogic

func (*DistinctLogic) Run

func (t *DistinctLogic) Run(payload EventPayload) (EventPayload, error)

type Downloader

type Downloader struct {
	*BaseProcessor
	// contains filtered or unexported fields
}

func NewDownloader

func NewDownloader(args *DownloaderArgs) *Downloader

func (*Downloader) DownloadFromResource

func (p *Downloader) DownloadFromResource(event Event) ([]Event, error)

type DownloaderArgs

type DownloaderArgs struct {
	BaseArgs
	RootDestination string
}

type DownloaderGenerator

type DownloaderGenerator struct{}

func (*DownloaderGenerator) Generate

func (g *DownloaderGenerator) Generate(intfArgs interface{}) (Processor, error)

func (*DownloaderGenerator) GetType

func (g *DownloaderGenerator) GetType() string

type Event

type Event interface {
	GetId() uint64
	SetId(uint64)
	GetSourceId() uint64
	SetSourceId(uint64)
	PushRoute(route string)
	//GetRoutes() []string
	SetRoutes([]string)
	CopyRoutes() []string
	//PopLabel() Label
	//PushLabel(label Label)
	//PushLabels(labels Labels)
	//SetLabels(labels Labels)
	//CopyLabels() Labels
	//GetProduceLabels() Labels
	//PushProduceLabels()
	//GetLatestLabel() Label
	GetLabels() *Labels
	SetLabels(labels *Labels)
	SetPayload(payload EventPayload)
	GetPayload() EventPayload
	Copy() Event
}

type EventPayload

type EventPayload map[string]interface{}

func (EventPayload) GetString

func (e EventPayload) GetString(key string) (string, error)

func (EventPayload) HasEveryPayloadKeys

func (e EventPayload) HasEveryPayloadKeys(keys []string) bool

func (EventPayload) HasKey

func (e EventPayload) HasKey(key string) bool

func (EventPayload) HasSomePayloadKeys

func (e EventPayload) HasSomePayloadKeys(keys []string) bool

type FilterByJSLogic

type FilterByJSLogic struct {
	// contains filtered or unexported fields
}

func NewFilterByJSLogic

func NewFilterByJSLogic(commands []string) *FilterByJSLogic

func (*FilterByJSLogic) Run

func (t *FilterByJSLogic) Run(payload EventPayload) (EventPayload, error)

type Flow

type Flow struct {
	Pipelines []*Pipeline
}

func LoadFlowFromFile

func LoadFlowFromFile(filepath string) *Flow

type FlowSetting

type FlowSetting interface {
	GetProcessorType() string
	GetName() string
	GetArgs() interface{}
}

type Job

type Job struct {
	ProcessorSetting `yaml:",inline"`
	Consume          []Label
	Produce          []Label
}

type Label

type Label string

type Labels

type Labels []Label

func (*Labels) Copy

func (ls *Labels) Copy() *Labels

func (*Labels) GetLatest

func (ls *Labels) GetLatest() Label

func (*Labels) Pop

func (ls *Labels) Pop() Label

func (*Labels) Push

func (ls *Labels) Push(label Label)

func (*Labels) PushAll

func (ls *Labels) PushAll(labels *Labels)

type LogicType

type LogicType string

type Pipeline

type Pipeline struct {
	Name string
	Jobs []*Job
}

func (*Pipeline) GetJob

func (p *Pipeline) GetJob(jobName string) (*Job, bool)

type Processor

type Processor interface {
	GetType() string
	GetName() string
	SetName(name string)
	Enqueue(resource Event)
	SetOutChannel(outChan chan Event)
	Start()
}

type ProcessorFactory

type ProcessorFactory struct {
	// contains filtered or unexported fields
}

func (*ProcessorFactory) AddGenerator

func (pf *ProcessorFactory) AddGenerator(generator ProcessorGenerator)

func (*ProcessorFactory) Create

func (pf *ProcessorFactory) Create(setting FlowSetting) (Processor, error)

type ProcessorGenerator

type ProcessorGenerator interface {
	GetType() string
	Generate(intfArgs interface{}) (Processor, error)
}

type ProcessorSetting

type ProcessorSetting struct {
	ProcessorType string `yaml:"processor"`
	Name          string
	Args          interface{}
}

func (*ProcessorSetting) GetArgs

func (p *ProcessorSetting) GetArgs() interface{}

func (*ProcessorSetting) GetName

func (p *ProcessorSetting) GetName() string

func (*ProcessorSetting) GetProcessorType

func (p *ProcessorSetting) GetProcessorType() string

type SelectPayloadLogic

type SelectPayloadLogic struct {
	// contains filtered or unexported fields
}

func NewSelectPayloadLogic

func NewSelectPayloadLogic(keys []string) *SelectPayloadLogic

func (*SelectPayloadLogic) Run

type TransformByJSLogic

type TransformByJSLogic struct {
	// contains filtered or unexported fields
}

func NewTransformByJSLogic

func NewTransformByJSLogic(commands []string) *TransformByJSLogic

func (*TransformByJSLogic) Run

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL