Documentation ¶
Index ¶
- Constants
- Variables
- type BucketRateLimiter
- type DelayingInterface
- type DelayingQueue
- type DelayingQueueConf
- type DelayingQueueWithPool
- type DelayingQueueWorkerFunc
- type ExecFuncs
- type FeatureOpts
- type Interface
- type ItemExponentialFailureRateLimiter
- type ItemFastSlowRateLimiter
- type MaxOfRateLimiter
- type Queue
- type QueueConf
- type QueueWithPool
- type QueueWorkerFunc
- type RateLimiter
- func DefaultBucketRateLimiter() RateLimiter
- func DefaultControllerRateLimiter() RateLimiter
- func DefaultItemBasedRateLimiter() RateLimiter
- func NewBucketRateLimiter(limit, burst uint32) RateLimiter
- func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
- func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
- func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
- func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter
- type RateLimitingInterface
- type RateLimitingQueue
- type RateLimitingQueueConf
- type RateLimitingQueueWithPool
- type RateLimitingQueueWorkerFunc
- type WithMaxWaitRateLimiter
Constants ¶
View Source
const MaxMessageRetryCount = math.MaxInt64 - 1
Variables ¶
View Source
var ( QueueIsClosedError = errors.New("queue is closed") QueueIsFullError = errors.New("queue is full") SubmitMessageIsNullError = errors.New("submit message is null") WorkFuncIsNullError = errors.New("workFunc is null") )
View Source
var (
NeedRetrySubmitMessageCountError = errors.New("need retry submit message count")
)
View Source
var (
NeedRetrySubmitMessageDelayError = errors.New("need retry submit message delay")
)
View Source
var (
NeedRetrySubmitMessageError = errors.New("need retry submit message immediately")
)
Functions ¶
This section is empty.
Types ¶
type BucketRateLimiter ¶
type BucketRateLimiter = iwq.BucketRateLimiter
type DelayingInterface ¶
type DelayingInterface = iwq.DelayingInterface
type DelayingQueue ¶
type DelayingQueue struct {
// contains filtered or unexported fields
}
func NewDelayingQueue ¶
func NewDelayingQueue(queueConf *DelayingQueueConf, featureOpts *FeatureOpts, workFunc DelayingQueueWorkerFunc) (*DelayingQueue, error)
func (*DelayingQueue) Start ¶
func (q *DelayingQueue) Start()
func (*DelayingQueue) Submit ¶
func (q *DelayingQueue) Submit(value interface{}) error
func (*DelayingQueue) SubmitAfter ¶
func (q *DelayingQueue) SubmitAfter(value interface{}, delay time.Duration) error
type DelayingQueueConf ¶
type DelayingQueueConf struct { QueueCap uint32 `json:"queueCap,omitempty" yaml:"queueCap,omitempty"` // 队列容量 WorkersNum uint16 `json:"workersNum,omitempty" yaml:"workersNum,omitempty"` // 工作协程数量 Logger *zap.SugaredLogger `json:"-" yaml:"-"` }
func NewDefaultDelayingQueueConfig ¶
func NewDefaultDelayingQueueConfig() *DelayingQueueConf
type DelayingQueueWithPool ¶
type DelayingQueueWithPool struct { DelayingQueue // contains filtered or unexported fields }
func NewDelayingQueueWithPool ¶
func NewDelayingQueueWithPool(queueConf *DelayingQueueConf, featureOpts *FeatureOpts, workFunc DelayingQueueWorkerFunc) (*DelayingQueueWithPool, error)
func (*DelayingQueueWithPool) Start ¶
func (q *DelayingQueueWithPool) Start()
func (*DelayingQueueWithPool) Stop ¶
func (q *DelayingQueueWithPool) Stop()
type DelayingQueueWorkerFunc ¶
type FeatureOpts ¶
type FeatureOpts struct {
EnableMetrics bool `json:"enableMetrics,omitempty" yaml:"enableMetrics,omitempty"` // 开启 metrics 记录
}
func NewDefaultFeatureOpts ¶
func NewDefaultFeatureOpts() *FeatureOpts
type ItemExponentialFailureRateLimiter ¶
type ItemExponentialFailureRateLimiter = iwq.ItemExponentialFailureRateLimiter
type ItemFastSlowRateLimiter ¶
type ItemFastSlowRateLimiter = iwq.ItemFastSlowRateLimiter
type MaxOfRateLimiter ¶
type MaxOfRateLimiter = iwq.MaxOfRateLimiter
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
func NewQueue(queueConf *QueueConf, featureOpts *FeatureOpts, workFunc QueueWorkerFunc) (*Queue, error)
type QueueConf ¶
type QueueConf struct { QueueCap uint32 `json:"queueCap,omitempty" yaml:"queueCap,omitempty"` // 队列容量 WorkersNum uint16 `json:"workersNum,omitempty" yaml:"workersNum,omitempty"` // 工作协程数量 Logger *zap.SugaredLogger `json:"-" yaml:"-"` }
func NewDefaultQueueConfig ¶
func NewDefaultQueueConfig() *QueueConf
type QueueWithPool ¶
type QueueWithPool struct { Queue // contains filtered or unexported fields }
func NewQueueWithPool ¶
func NewQueueWithPool(queueConf *QueueConf, featureOpts *FeatureOpts, workFunc QueueWorkerFunc) (*QueueWithPool, error)
func (*QueueWithPool) Start ¶
func (q *QueueWithPool) Start()
func (*QueueWithPool) Stop ¶
func (q *QueueWithPool) Stop()
type QueueWorkerFunc ¶
type QueueWorkerFunc func(interface{}) error
type RateLimiter ¶
type RateLimiter = iwq.RateLimiter
ratelimiter 结构体
func DefaultBucketRateLimiter ¶
func DefaultBucketRateLimiter() RateLimiter
func DefaultControllerRateLimiter ¶
func DefaultControllerRateLimiter() RateLimiter
func DefaultItemBasedRateLimiter ¶
func DefaultItemBasedRateLimiter() RateLimiter
func NewBucketRateLimiter ¶
func NewBucketRateLimiter(limit, burst uint32) RateLimiter
func NewItemExponentialFailureRateLimiter ¶
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
func NewItemFastSlowRateLimiter ¶
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
func NewMaxOfRateLimiter ¶
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
func NewWithMaxWaitRateLimiter ¶
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter
type RateLimitingInterface ¶
type RateLimitingInterface = iwq.RateLimitingInterface
type RateLimitingQueue ¶
type RateLimitingQueue struct {
// contains filtered or unexported fields
}
func NewRateLimitingQueue ¶
func NewRateLimitingQueue( queueConf *RateLimitingQueueConf, featureOpts *FeatureOpts, rl RateLimiter, workFunc RateLimitingQueueWorkerFunc, ) (*RateLimitingQueue, error)
func (*RateLimitingQueue) Start ¶
func (q *RateLimitingQueue) Start()
func (*RateLimitingQueue) Submit ¶
func (q *RateLimitingQueue) Submit(value interface{}) error
func (*RateLimitingQueue) SubmitAfter ¶
func (q *RateLimitingQueue) SubmitAfter(value interface{}, delay time.Duration) error
func (*RateLimitingQueue) SubmitRateLimited ¶
func (q *RateLimitingQueue) SubmitRateLimited(value interface{}) error
type RateLimitingQueueConf ¶
type RateLimitingQueueConf struct { QueueCap uint32 `json:"queueCap,omitempty" yaml:"queueCap,omitempty"` // 队列容量 WorkersNum uint16 `json:"workersNum,omitempty" yaml:"workersNum,omitempty"` // 工作协程数量 Logger *zap.SugaredLogger `json:"-" yaml:"-"` }
func NewDefaultRateLimitingQueueConfig ¶
func NewDefaultRateLimitingQueueConfig() *RateLimitingQueueConf
type RateLimitingQueueWithPool ¶
type RateLimitingQueueWithPool struct { RateLimitingQueue // contains filtered or unexported fields }
func NewRateLimitingQueueWithPool ¶
func NewRateLimitingQueueWithPool( queueConf *RateLimitingQueueConf, featureOpts *FeatureOpts, rl RateLimiter, workFunc RateLimitingQueueWorkerFunc, ) (*RateLimitingQueueWithPool, error)
func (*RateLimitingQueueWithPool) Start ¶
func (q *RateLimitingQueueWithPool) Start()
func (*RateLimitingQueueWithPool) Stop ¶
func (q *RateLimitingQueueWithPool) Stop()
type WithMaxWaitRateLimiter ¶
type WithMaxWaitRateLimiter = iwq.WithMaxWaitRateLimiter
Click to show internal directories.
Click to hide internal directories.