Documentation ¶
Index ¶
- func DelaySecondsLevel(seconds int) int
- type BackoffCalculator
- type ConsumerOptions
- type GenericHandler
- type GenericMiddlewareHandler
- type Message
- type NextMiddlewareFunc
- type Publisher
- func (e *Publisher) Publish(routingKey string, msg interface{}) (*Message, error)
- func (e *Publisher) PublishIn(routingKey string, secondsFromNow int64, msg map[string]interface{}) (*Message, error)
- func (e *Publisher) PublishInMessage(job *Message, secondsFromNow int64) error
- func (e *Publisher) PublishMessage(msg *Message) error
- type Q
- type WorkerPool
- func (wp *WorkerPool) Consumer(name string, fn interface{}) *WorkerPool
- func (wp *WorkerPool) ConsumerWithOptions(routingKey string, consumerOpts ConsumerOptions, fn interface{}) *WorkerPool
- func (wp *WorkerPool) Drain()
- func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool
- func (wp *WorkerPool) Start()
- func (wp *WorkerPool) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DelaySecondsLevel ¶ added in v0.6.0
DelaySecondsLevel 根据传入的 seconds 返回具体延迟的 level 时间 5s 10s 20s 30s 60s 120s 180s 240s 300s 360s 420s 480s 540s 600s 1200s 1800s 2400s 3600s 7200s 10800s
Types ¶
type BackoffCalculator ¶ added in v0.6.0
You may provide your own backoff function for retrying failed jobs or use the builtin one. Returns the number of seconds to wait until the next attempt.
The builtin backoff calculator provides an exponentially increasing wait function.
type ConsumerOptions ¶ added in v0.6.0
type ConsumerOptions struct { QueueName string // 是否从指定的一个 Queue 中获取消息 Prefetch int // 指定队列的 Prefetch 数量 Priority uint // Priority from 1 to 10000 MaxFails uint // 1: send straight to dead (unless SkipDead) SkipDead bool // If true, don't send failed jobs to the dead queue when retries are exhausted. MaxConcurrency uint // Max number of jobs to keep in flight (default is 0, meaning no max) Backoff BackoffCalculator // If not set, uses the default backoff algorithm }
ConsumerOptions can be passed to ConsumerWithOptions.
type GenericHandler ¶
GenericHandler is a job handler without any custom context.
type GenericMiddlewareHandler ¶
type GenericMiddlewareHandler func(*Message, NextMiddlewareFunc) error
GenericMiddlewareHandler is a middleware without any custom context.
type Message ¶ added in v0.6.0
type Message struct { *amqp.Delivery `json:"-"` // Inputs when making a new job Name string `json:"-"` // contains filtered or unexported fields }
Message represents a job.
type NextMiddlewareFunc ¶
type NextMiddlewareFunc func() error
NextMiddlewareFunc is a function type (whose instances are named 'next') that you call to advance to the next middleware.
type Publisher ¶ added in v0.6.0
type Publisher struct { Namespace string // eg, "myapp-work" // contains filtered or unexported fields }
Publisher can enqueue jobs.
func NewPublisher ¶ added in v0.6.0
func NewPublisherWithExchange ¶ added in v0.6.0
NewPublisher creates a new enqueuer with the specified Redis namespace and Redis pool.
func (*Publisher) Publish ¶ added in v0.6.0
Publish will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. Example: e.Publish("send_email", work.Q{"addr": "test@example.com"})
func (*Publisher) PublishIn ¶ added in v0.6.0
func (e *Publisher) PublishIn(routingKey string, secondsFromNow int64, msg map[string]interface{}) (*Message, error)
PublishIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.
func (*Publisher) PublishInMessage ¶ added in v0.6.0
PublishInMessage 压入延时的 job
func (*Publisher) PublishMessage ¶ added in v0.6.0
PublishMessage 压入一个 job 任务
type Q ¶
type Q map[string]interface{}
Q is a shortcut to easily specify arguments for jobs when enqueueing them. Example: e.Publish("send_email", work.Q{"addr": "test@example.com", "track": true})
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.
func NewWorkerPool ¶
func NewWorkerPoolWithExchangeName ¶ added in v0.6.0
func NewWorkerPoolWithExchangeName(ctx interface{}, concurrency uint, namespace, exchangeName string, puber *Publisher, opts ...cony.ClientOpt) *WorkerPool
NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently. 期望 cli 与 enqueuer 是两个 connection, 避免并发时候的竞争
func (*WorkerPool) Consumer ¶ added in v0.6.0
func (wp *WorkerPool) Consumer(name string, fn interface{}) *WorkerPool
Message registers the job name to the specified handler fn. For instance, when workers pull jobs from the name queue they'll be processed by the specified handler function. fn can take one of these forms: (*ContextType).func(*Message) error, (ContextType matches the type of ctx specified when creating a pool) func(*Message) error, for the generic handler format. 使用 routing-key 作为 consumerType 的 key
func (*WorkerPool) ConsumerWithOptions ¶ added in v0.6.0
func (wp *WorkerPool) ConsumerWithOptions(routingKey string, consumerOpts ConsumerOptions, fn interface{}) *WorkerPool
ConsumerWithOptions adds a handler for 'name' jobs as per the Message function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them. name: 大小写不敏感 TODO: 让 Consumer 可以动态被添加, 这样可以在运行时添加新的 consumer
func (*WorkerPool) Drain ¶
func (wp *WorkerPool) Drain()
Drain drains all jobs in the queue before returning. Note that if jobs are added faster than we can process them, this function wouldn't return. TODO: 这个方法还不正确, 需要调整
func (*WorkerPool) Middleware ¶
func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool
Middleware appends the specified function to the middleware chain. The fn can take one of these forms: (*ContextType).func(*Message, NextMiddlewareFunc) error, (ContextType matches the type of ctx specified when creating a pool) func(*Message, NextMiddlewareFunc) error, for the generic middleware format.
func (*WorkerPool) Start ¶
func (wp *WorkerPool) Start()
Start starts the workers and associated processes.
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()
Stop stops the workers and associated processes.