Documentation ¶
Index ¶
- func NewConsoleLogger() *consoleLogger
- func RegisterEventName(eventID EventID, name string)
- func ResumeWhenHasError() func(p *ChannelStream)
- func RunInParallel(ctx context.Context, inputs []interface{}, ...) ([]interface{}, error)
- func SetWorkers(workers int) func(p *ChannelStream)
- func StopWhenHasError() func(p *ChannelStream)
- type Actor
- type Aggregator
- type AggregatorOption
- type BatchProcessFunc
- type ChannelStream
- func (p *ChannelStream) Cancel()
- func (p *ChannelStream) Drain() (bool, []error)
- func (p *ChannelStream) Harvest(harvestFunc HarvestFunc) (bool, []error)
- func (p *ChannelStream) Pipe(dataPipeFunc PipeFunc, optionFuncs ...SetStreamOptionFunc) *ChannelStream
- func (p *ChannelStream) Race(raceFunc RaceFunc)
- type ErrorHandler
- type ErrorHandlerFunc
- type Event
- type EventBus
- type EventHandler
- type EventID
- type EventJob
- type HarvestFunc
- type Item
- type JobStatus
- type Logger
- type ParallelRunner
- type PipeFunc
- type Promise
- type RaceFunc
- type SeedFunc
- type SetActorOptionFunc
- type SetAggregatorOptionFunc
- type SetParallelRunner
- type SetStreamOptionFunc
- type SuccessHandler
- type WorkFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewConsoleLogger ¶
func NewConsoleLogger() *consoleLogger
func RegisterEventName ¶ added in v0.3.0
RegisterEventName registers event name
func ResumeWhenHasError ¶
func ResumeWhenHasError() func(p *ChannelStream)
An option means resume the stream when has error
func RunInParallel ¶ added in v0.2.0
func RunInParallel(ctx context.Context, inputs []interface{}, worker func(context.Context, interface{}) (interface{}, error), workers int) ([]interface{}, error)
RunInParallel is the short cut for ParallelRunner's Run
func SetWorkers ¶
func SetWorkers(workers int) func(p *ChannelStream)
An option to set the count of go routines in the stream
func StopWhenHasError ¶
func StopWhenHasError() func(p *ChannelStream)
An option means stop the stream when has error
Types ¶
type Actor ¶ added in v0.2.0
type Actor struct {
// contains filtered or unexported fields
}
Represents the actor
func NewActor ¶ added in v0.2.0
func NewActor(setActorOptionFuncs ...SetActorOptionFunc) *Actor
Creates a new actor
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
Represents the aggregator
func NewAggregator ¶
func NewAggregator(batchProcessor BatchProcessFunc, optionFuncs ...SetAggregatorOptionFunc) *Aggregator
Creates a new aggregator
func (*Aggregator) Enqueue ¶
func (agt *Aggregator) Enqueue(item interface{})
Enqueue an item, will be blocked if the queue is full
func (*Aggregator) SafeStop ¶
func (agt *Aggregator) SafeStop()
Stop the aggregator safely, the difference with Stop is it guarantees no item is missed during stop
func (*Aggregator) TryEnqueue ¶
func (agt *Aggregator) TryEnqueue(item interface{}) bool
Try enqueue an item, and it is non-blocked
type AggregatorOption ¶
type AggregatorOption struct { BatchSize int Workers int ChannelBufferSize int LingerTime time.Duration ErrorHandler ErrorHandlerFunc Logger Logger }
Represents the aggregator option
type BatchProcessFunc ¶
type BatchProcessFunc func([]interface{}) error
the func to batch process items
type ChannelStream ¶
type ChannelStream struct {
// contains filtered or unexported fields
}
Represent the channel stream
func NewChannelStream ¶
func NewChannelStream(seedFunc SeedFunc, optionFuncs ...SetStreamOptionFunc) *ChannelStream
Create a new channel stream
func (*ChannelStream) Drain ¶
func (p *ChannelStream) Drain() (bool, []error)
Drain the output of current stream to make sure all the items got processed
func (*ChannelStream) Harvest ¶
func (p *ChannelStream) Harvest(harvestFunc HarvestFunc) (bool, []error)
Harvest the output of current stream
func (*ChannelStream) Pipe ¶
func (p *ChannelStream) Pipe(dataPipeFunc PipeFunc, optionFuncs ...SetStreamOptionFunc) *ChannelStream
Pipe current steam output as another stream's input
func (*ChannelStream) Race ¶
func (p *ChannelStream) Race(raceFunc RaceFunc)
Set race condition of current stream's output
type ErrorHandler ¶ added in v0.2.0
type ErrorHandler func(error) interface{}
Represent the error handler
type ErrorHandlerFunc ¶
type ErrorHandlerFunc func(err error, items []interface{}, batchProcessFunc BatchProcessFunc, aggregator *Aggregator)
the func to handle error
type EventBus ¶ added in v0.3.0
type EventBus struct {
// contains filtered or unexported fields
}
EventBus ...
func NewEventBus ¶ added in v0.3.0
func NewEventBus( logger Logger, chanBuffer, eventWorkers, autoRetryTimes int, retryInterval, timeout time.Duration, ) *EventBus
NewEventBus ...
func (*EventBus) Subscribe ¶ added in v0.3.0
func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler)
Subscribe ...
func (*EventBus) Unsubscribe ¶ added in v0.3.0
func (eb *EventBus) Unsubscribe(eventID EventID, handlers ...EventHandler)
Unsubscribe ...
type EventHandler ¶ added in v0.3.0
type EventHandler interface { OnEvent(ctx context.Context, event Event) error Logger() Logger CanAutoRetry(err error) bool }
EventHandler ...
type EventJob ¶ added in v0.3.0
type EventJob struct {
// contains filtered or unexported fields
}
EventJob ...
type ParallelRunner ¶ added in v0.2.1
type ParallelRunner struct {
// contains filtered or unexported fields
}
ParallelRunner represents the runner configurations
func NewParallelRunner ¶ added in v0.2.1
func NewParallelRunner(workers int, configurations ...SetParallelRunner) *ParallelRunner
NewParallelRunner creates a new ParallelRunner
type Promise ¶ added in v0.2.0
type Promise struct {
// contains filtered or unexported fields
}
Represent the promise
func (*Promise) Then ¶ added in v0.2.0
func (p *Promise) Then(successHandler SuccessHandler, errorHandler ErrorHandler) *Promise
Chain the promise with success handler and error handler
func (*Promise) ThenError ¶ added in v0.2.0
func (p *Promise) ThenError(errorHandler ErrorHandler) *Promise
Chain the promise with error handler
func (*Promise) ThenSuccess ¶ added in v0.2.0
func (p *Promise) ThenSuccess(successHandler SuccessHandler) *Promise
Chain the promise with success handler
type SeedFunc ¶
type SeedFunc func(seedChan chan<- Item, quitChannel chan struct{})
The func to generate the seed in NewChannelStream
type SetActorOptionFunc ¶ added in v0.2.0
type SetActorOptionFunc func(actor *Actor)
Represents the func to set actor option
func SetActorBuffer ¶ added in v0.2.0
func SetActorBuffer(buffer int) SetActorOptionFunc
Set actor buffer
type SetAggregatorOptionFunc ¶ added in v0.2.0
type SetAggregatorOptionFunc func(option AggregatorOption) AggregatorOption
the func to set option for aggregator
type SetParallelRunner ¶ added in v0.2.1
type SetParallelRunner func(runner *ParallelRunner)
SetParallelRunner represents the runner configuration setter
func QuitOnError ¶ added in v0.2.1
func QuitOnError() SetParallelRunner
QuitOnError set runner quit on error
func QuitWhenAllFinished ¶ added in v0.2.1
func QuitWhenAllFinished() SetParallelRunner
QuitWhenAllFinished sets runner quit when all request are finished
type SetStreamOptionFunc ¶ added in v0.2.0
type SetStreamOptionFunc func(cs *ChannelStream)
The func to set option in NewChannelStream/Pipe
type SuccessHandler ¶ added in v0.2.0
type SuccessHandler func(interface{}) (interface{}, error)
Represent the success handler