Documentation ¶
Index ¶
- Constants
- Variables
- type Backoff
- type Executor
- func (e *Executor) CurrentStacktrace() []byte
- func (e *Executor) ForceStop()
- func (e *Executor) MaxWorker() int
- func (e *Executor) MinWorker() int
- func (e *Executor) Release()
- func (e *Executor) ReleaseAndWait()
- func (e *Executor) Running() int32
- func (e *Executor) SubExecutor() *SubExecutor
- func (e *Executor) Submit(fn Job)
- func (e *Executor) TuneMaxWorker(nextMaxWorker int)
- func (e *Executor) TuneMinWorker(nextMinWorker int)
- func (e *Executor) Workers() int32
- type ExecutorOptionFunc
- func ExecutorCollectStacktrace(enable bool) ExecutorOptionFunc
- func ExecutorContext(ctx context.Context) ExecutorOptionFunc
- func ExecutorMaxCapacity(capacity int) ExecutorOptionFunc
- func ExecutorPanicHandler(handler PanicHandler) ExecutorOptionFunc
- func ExecutorReducderInterval(interval time.Duration) ExecutorOptionFunc
- type Job
- type Loop
- func (lo *Loop) Execute()
- func (lo *Loop) ExecuteTimeout(timeout time.Duration)
- func (lo *Loop) SetDefault(h LoopDefaultHandler)
- func (lo *Loop) SetDequeue(h LoopDequeueHandler, q *Queue)
- func (lo *Loop) SetTicker(h LoopTickerHandler, dur time.Duration)
- func (lo *Loop) Stop()
- func (lo *Loop) StopAndWait()
- type LoopDefaultHandler
- type LoopDequeueHandler
- type LoopNext
- type LoopOptionFunc
- type LoopTickerHandler
- type PanicHandler
- type PanicType
- type Parallel
- type ParallelFuture
- type ParallelFutureResult
- type ParallelJob
- type ParallelOptionFunc
- type Pipeline
- type PipelineInputFunc
- type PipelineOptionFunc
- type PipelineOutputFunc
- type Queue
- func (q *Queue) Cap() int
- func (q *Queue) Chan() <-chan interface{}
- func (q *Queue) Close() (closed bool)
- func (q *Queue) Closed() bool
- func (q *Queue) Dequeue() (val interface{}, found bool)
- func (q *Queue) DequeueNB() (val interface{}, found bool)
- func (q *Queue) DequeueRetry(retryInterval time.Duration, retryLimit int) (val interface{}, found bool)
- func (q *Queue) Enqueue(val interface{}) (write bool)
- func (q *Queue) EnqueueNB(val interface{}) (write bool)
- func (q *Queue) EnqueueRetry(val interface{}, retryInterval time.Duration, retryLimit int) (write bool)
- func (q *Queue) Len() int
- type QueueOptionFunc
- type Retry
- type RetryErrorHandler
- type RetryFunc
- type RetryFuture
- type RetryNext
- type RetryOptionFunc
- type SubExecutor
- type ValueError
- type Wait
- type WaitRendezvous
- type WaitReplyFunc
- type WaitRequest
- type WaitRequestReply
- type WaitSequence
- type Worker
- type WorkerAbortQueueHandlerFunc
- type WorkerHandler
- type WorkerHook
- type WorkerOptionFunc
- func WorkerAbortQueueHandler(handler WorkerAbortQueueHandlerFunc) WorkerOptionFunc
- func WorkerAutoShutdown(enable bool) WorkerOptionFunc
- func WorkerCapacity(capacity int) WorkerOptionFunc
- func WorkerContext(ctx context.Context) WorkerOptionFunc
- func WorkerExecutor(executor *Executor) WorkerOptionFunc
- func WorkerMaxDequeueSize(size int) WorkerOptionFunc
- func WorkerPanicHandler(handler PanicHandler) WorkerOptionFunc
- func WorkerPostHook(hook WorkerHook) WorkerOptionFunc
- func WorkerPreHook(hook WorkerHook) WorkerOptionFunc
- func WorkerTimeout(timeout time.Duration) WorkerOptionFunc
Constants ¶
View Source
const (
Version string = "1.0.22"
)
Variables ¶
View Source
var ( ErrRetryNotComplete = errors.New("RetryFuture retry not complete") ErrRetryContextTimeout = errors.New("Retry context timeout or canceled") )
View Source
var (
ErrPipeClosed = errors.New("pipe queue closed")
)
Functions ¶
This section is empty.
Types ¶
type Backoff ¶ added in v1.0.11
type Backoff struct {
// contains filtered or unexported fields
}
func NewBackoff ¶ added in v1.0.11
func NewBackoffNoJitter ¶ added in v1.0.11
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(minWorker, maxWorker int, funcs ...ExecutorOptionFunc) *Executor
func (*Executor) CurrentStacktrace ¶ added in v1.0.20
func (*Executor) ReleaseAndWait ¶
func (e *Executor) ReleaseAndWait()
func (*Executor) SubExecutor ¶ added in v1.0.1
func (e *Executor) SubExecutor() *SubExecutor
func (*Executor) TuneMaxWorker ¶ added in v1.0.8
func (*Executor) TuneMinWorker ¶ added in v1.0.8
type ExecutorOptionFunc ¶
type ExecutorOptionFunc func(*optExecutor)
func ExecutorCollectStacktrace ¶ added in v1.0.20
func ExecutorCollectStacktrace(enable bool) ExecutorOptionFunc
func ExecutorContext ¶
func ExecutorContext(ctx context.Context) ExecutorOptionFunc
func ExecutorMaxCapacity ¶
func ExecutorMaxCapacity(capacity int) ExecutorOptionFunc
func ExecutorPanicHandler ¶
func ExecutorPanicHandler(handler PanicHandler) ExecutorOptionFunc
func ExecutorReducderInterval ¶
func ExecutorReducderInterval(interval time.Duration) ExecutorOptionFunc
type Loop ¶ added in v1.0.7
type Loop struct {
// contains filtered or unexported fields
}
func NewLoop ¶ added in v1.0.7
func NewLoop(e *Executor, funcs ...LoopOptionFunc) *Loop
func (*Loop) ExecuteTimeout ¶ added in v1.0.7
func (*Loop) SetDefault ¶ added in v1.0.7
func (lo *Loop) SetDefault(h LoopDefaultHandler)
func (*Loop) SetDequeue ¶ added in v1.0.7
func (lo *Loop) SetDequeue(h LoopDequeueHandler, q *Queue)
func (*Loop) SetTicker ¶ added in v1.0.7
func (lo *Loop) SetTicker(h LoopTickerHandler, dur time.Duration)
func (*Loop) StopAndWait ¶ added in v1.0.7
func (lo *Loop) StopAndWait()
type LoopDefaultHandler ¶ added in v1.0.7
type LoopDefaultHandler func() LoopNext
type LoopDequeueHandler ¶ added in v1.0.7
type LoopOptionFunc ¶ added in v1.0.7
type LoopOptionFunc func(*optLoop)
func LoopContext ¶ added in v1.0.7
func LoopContext(ctx context.Context) LoopOptionFunc
type LoopTickerHandler ¶ added in v1.0.7
type LoopTickerHandler func() LoopNext
type PanicHandler ¶
type PanicHandler func(PanicType, interface{})
type Parallel ¶ added in v1.0.10
type Parallel struct {
// contains filtered or unexported fields
}
func NewParallel ¶ added in v1.0.10
func NewParallel(e *Executor, funcs ...ParallelOptionFunc) *Parallel
func (*Parallel) Queue ¶ added in v1.0.10
func (p *Parallel) Queue(fn ParallelJob)
func (*Parallel) Submit ¶ added in v1.0.10
func (p *Parallel) Submit() *ParallelFuture
type ParallelFuture ¶ added in v1.0.10
type ParallelFuture struct {
// contains filtered or unexported fields
}
func (*ParallelFuture) Result ¶ added in v1.0.10
func (f *ParallelFuture) Result() []ValueError
type ParallelFutureResult ¶ added in v1.0.10
type ParallelFutureResult struct {
// contains filtered or unexported fields
}
func (*ParallelFutureResult) Results ¶ added in v1.0.10
func (r *ParallelFutureResult) Results() []ValueError
type ParallelJob ¶ added in v1.0.10
type ParallelJob func() (result interface{}, err error)
type ParallelOptionFunc ¶ added in v1.0.10
type ParallelOptionFunc func(*optParallel)
func ParallelContext ¶ added in v1.0.10
func ParallelContext(ctx context.Context) ParallelOptionFunc
func Parallelism ¶ added in v1.0.10
func Parallelism(p int) ParallelOptionFunc
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func NewPipeline(inFunc PipelineInputFunc, outFunc PipelineOutputFunc, opts ...PipelineOptionFunc) *Pipeline
func (*Pipeline) CloseEnqueue ¶
func (*Pipeline) ShutdownAndWait ¶
func (p *Pipeline) ShutdownAndWait()
type PipelineInputFunc ¶
type PipelineInputFunc func(parameter interface{}) (result interface{}, err error)
type PipelineOptionFunc ¶
type PipelineOptionFunc func(*optPipeline)
func PipelineContext ¶ added in v1.0.13
func PipelineContext(ctx context.Context) PipelineOptionFunc
func PipelineExecutor ¶ added in v1.0.6
func PipelineExecutor(executor *Executor) PipelineOptionFunc
func PipelinePanicHandler ¶
func PipelinePanicHandler(handler PanicHandler) PipelineOptionFunc
type PipelineOutputFunc ¶
type PipelineOutputFunc func(result interface{}, err error)
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
func NewQueue(c int, funcs ...QueueOptionFunc) *Queue
func (*Queue) DequeueRetry ¶
func (q *Queue) DequeueRetry(retryInterval time.Duration, retryLimit int) (val interface{}, found bool)
retry w/ dequeue until channel can be read(waiting for channel to write)
type QueueOptionFunc ¶
type QueueOptionFunc func(*optQueue)
func QueuePanicHandler ¶
func QueuePanicHandler(handler PanicHandler) QueueOptionFunc
type Retry ¶ added in v1.0.11
type Retry struct {
// contains filtered or unexported fields
}
func NewRetry ¶ added in v1.0.11
func NewRetry(executor *Executor, funcs ...RetryOptionFunc) *Retry
func NewRetryWithBackoff ¶ added in v1.0.11
func NewRetryWithBackoff(executor *Executor, backoff *Backoff, funcs ...RetryOptionFunc) *Retry
func (*Retry) Retry ¶ added in v1.0.11
func (r *Retry) Retry(fn RetryFunc) *RetryFuture
func (*Retry) RetryWithErrorHandler ¶ added in v1.0.11
func (r *Retry) RetryWithErrorHandler(fn RetryFunc, eh RetryErrorHandler) *RetryFuture
type RetryErrorHandler ¶ added in v1.0.11
type RetryFuture ¶ added in v1.0.11
type RetryFuture struct {
// contains filtered or unexported fields
}
func (*RetryFuture) Result ¶ added in v1.0.11
func (f *RetryFuture) Result() ValueError
type RetryOptionFunc ¶ added in v1.0.11
type RetryOptionFunc func(*optRetry)
func RetryBackoffIntervalMax ¶ added in v1.0.11
func RetryBackoffIntervalMax(dur time.Duration) RetryOptionFunc
func RetryBackoffIntervalMin ¶ added in v1.0.11
func RetryBackoffIntervalMin(dur time.Duration) RetryOptionFunc
func RetryBackoffUseJitter ¶ added in v1.0.11
func RetryBackoffUseJitter(useJitter bool) RetryOptionFunc
func RetryContext ¶ added in v1.0.11
func RetryContext(ctx context.Context) RetryOptionFunc
func RetryMax ¶ added in v1.0.11
func RetryMax(max int) RetryOptionFunc
type SubExecutor ¶ added in v1.0.1
type SubExecutor struct {
// contains filtered or unexported fields
}
func (*SubExecutor) Submit ¶ added in v1.0.1
func (s *SubExecutor) Submit(fn Job)
func (*SubExecutor) Wait ¶ added in v1.0.1
func (s *SubExecutor) Wait()
type ValueError ¶ added in v1.0.10
type ValueError interface { Value() interface{} Err() error }
type WaitRendezvous ¶ added in v1.0.16
type WaitRendezvous struct {
// contains filtered or unexported fields
}
func WaitRendez ¶ added in v1.0.16
func WaitRendez(n int) *WaitRendezvous
func WaitRendezTimeout ¶ added in v1.0.16
func WaitRendezTimeout(dur time.Duration, n int) *WaitRendezvous
func (*WaitRendezvous) Cancel ¶ added in v1.0.16
func (r *WaitRendezvous) Cancel()
func (*WaitRendezvous) Wait ¶ added in v1.0.16
func (r *WaitRendezvous) Wait() error
type WaitReplyFunc ¶ added in v1.0.16
type WaitReplyFunc func(interface{}) (interface{}, error)
type WaitRequest ¶ added in v1.0.16
type WaitRequest struct {
// contains filtered or unexported fields
}
func WaitReq ¶ added in v1.0.16
func WaitReq() *WaitRequest
func WaitReqTimeout ¶ added in v1.0.16
func WaitReqTimeout(dur time.Duration) *WaitRequest
func (*WaitRequest) Cancel ¶ added in v1.0.16
func (r *WaitRequest) Cancel()
func (*WaitRequest) Req ¶ added in v1.0.16
func (r *WaitRequest) Req(v interface{}) error
func (*WaitRequest) Wait ¶ added in v1.0.16
func (r *WaitRequest) Wait() (interface{}, error)
type WaitRequestReply ¶ added in v1.0.16
type WaitRequestReply struct {
// contains filtered or unexported fields
}
func WaitReqReply ¶ added in v1.0.16
func WaitReqReply() *WaitRequestReply
func WaitReqReplyTimeout ¶ added in v1.0.16
func WaitReqReplyTimeout(dur time.Duration) *WaitRequestReply
func (*WaitRequestReply) Cancel ¶ added in v1.0.16
func (r *WaitRequestReply) Cancel()
func (*WaitRequestReply) Reply ¶ added in v1.0.16
func (r *WaitRequestReply) Reply(fn WaitReplyFunc) error
func (*WaitRequestReply) Req ¶ added in v1.0.16
func (r *WaitRequestReply) Req(v interface{}) (interface{}, error)
type WaitSequence ¶ added in v1.0.16
type WaitSequence struct {
// contains filtered or unexported fields
}
func WaitSeq ¶ added in v1.0.16
func WaitSeq(wn ...*Wait) *WaitSequence
func WaitSeqTimeout ¶ added in v1.0.16
func WaitSeqTimeout(dur time.Duration, wn ...*Wait) *WaitSequence
func (*WaitSequence) Cancel ¶ added in v1.0.16
func (w *WaitSequence) Cancel()
func (*WaitSequence) Wait ¶ added in v1.0.16
func (w *WaitSequence) Wait() error
type Worker ¶
type Worker interface { Enqueue(interface{}) bool CloseEnqueue() bool Shutdown() ShutdownAndWait() ForceStop() }
func NewBufferWorker ¶
func NewBufferWorker(handler WorkerHandler, funcs ...WorkerOptionFunc) Worker
func NewDefaultWorker ¶
func NewDefaultWorker(handler WorkerHandler, funcs ...WorkerOptionFunc) Worker
type WorkerAbortQueueHandlerFunc ¶ added in v1.0.13
type WorkerAbortQueueHandlerFunc func(paramter interface{})
type WorkerHandler ¶
type WorkerHandler func(parameter interface{})
type WorkerHook ¶
type WorkerHook func()
type WorkerOptionFunc ¶
type WorkerOptionFunc func(*optWorker)
func WorkerAbortQueueHandler ¶ added in v1.0.13
func WorkerAbortQueueHandler(handler WorkerAbortQueueHandlerFunc) WorkerOptionFunc
func WorkerAutoShutdown ¶ added in v1.0.21
func WorkerAutoShutdown(enable bool) WorkerOptionFunc
func WorkerCapacity ¶ added in v1.0.15
func WorkerCapacity(capacity int) WorkerOptionFunc
func WorkerContext ¶
func WorkerContext(ctx context.Context) WorkerOptionFunc
func WorkerExecutor ¶ added in v1.0.4
func WorkerExecutor(executor *Executor) WorkerOptionFunc
func WorkerMaxDequeueSize ¶ added in v1.0.17
func WorkerMaxDequeueSize(size int) WorkerOptionFunc
func WorkerPanicHandler ¶
func WorkerPanicHandler(handler PanicHandler) WorkerOptionFunc
func WorkerPostHook ¶
func WorkerPostHook(hook WorkerHook) WorkerOptionFunc
func WorkerPreHook ¶
func WorkerPreHook(hook WorkerHook) WorkerOptionFunc
func WorkerTimeout ¶ added in v1.0.21
func WorkerTimeout(timeout time.Duration) WorkerOptionFunc
Click to show internal directories.
Click to hide internal directories.