Documentation ¶
Index ¶
- Constants
- Variables
- func ErrRescheduleJobAt(t time.Time, reason string) error
- func ErrRescheduleJobIn(d time.Duration, reason string) error
- func GetWorkerIdx(ctx context.Context) int
- func RandomStringID() string
- type Backoff
- type Client
- func (c *Client) Enqueue(ctx context.Context, j *Job) error
- func (c *Client) EnqueueBatch(ctx context.Context, jobs []*Job) error
- func (c *Client) EnqueueBatchTx(ctx context.Context, jobs []*Job, tx pgx.Tx) error
- func (c *Client) EnqueueTx(ctx context.Context, j *Job, tx pgx.Tx) error
- func (c *Client) LockNextScheduledJob(ctx context.Context, limits []QueueLimit) (jobs []*Job, err error)
- func (c *Client) RestoreStuck(ctx context.Context, runAfter time.Duration, queue ...QueueLimit) (err error)
- type ErrJobReschedule
- type HookFunc
- type Job
- type PollStrategy
- type QueueLimit
- type WorkFunc
- type WorkMap
- type WorkerPool
- type WorkerPoolOption
- func WithBackoff(backoff Backoff) WorkerPoolOption
- func WithLogger(l *zap.Logger) WorkerPoolOption
- func WithPoolGracefulShutdown(handlerCtx func() context.Context) WorkerPoolOption
- func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption
- func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption
- func WithPoolID(id string) WorkerPoolOption
- func WithPoolInterval(d time.Duration) WorkerPoolOption
- func WithPoolPanicStackBufSize(size int) WorkerPoolOption
- func WithPoolQueueRestore(restoreAfter, interval time.Duration) WorkerPoolOption
- func WithWorkerPanicStackBufSize(size int) WorkerPoolOption
- func WithWorkerPanicWorkerMap(workMap WorkMap) WorkerPoolOption
- func WithWorkerPoolHandler(jobType string, h WorkFunc) WorkerPoolOption
- func WithWorkerPoolQueue(queue ...QueueLimit) WorkerPoolOption
Constants ¶
const (
// WorkerIdxUnknown is returned when worker index in the pool is not set for some reasons.
WorkerIdxUnknown = -1
)
Variables ¶
var ( // DefaultExponentialBackoff is the exponential Backoff implementation with default config applied DefaultExponentialBackoff = NewExponentialBackoff(exp.Config{ BaseDelay: 1.0 * time.Second, Multiplier: 1.6, Jitter: 0.2, MaxDelay: 1.0 * time.Hour, }) // BackoffNever is the Backoff implementation that never returns errored job to the queue for retry, // but discards it in case of the error. BackoffNever = func(retries int) time.Duration { return -1 } )
var ( Meter = otel.Meter("guex") EnqueueMeter metric.Int64Counter )
var ErrDiscard = errors.New("error discard")
var ErrMissingType = errors.New("job type must be specified")
ErrMissingType is returned when you attempt to enqueue a job with no Type specified.
Functions ¶
func ErrRescheduleJobAt ¶
ErrRescheduleJobAt spawns an error that reschedules a job to run at some predefined time.
func ErrRescheduleJobIn ¶
ErrRescheduleJobIn spawns an error that reschedules a job to run after some predefined duration.
func GetWorkerIdx ¶
GetWorkerIdx gets the index of the worker in the pool from the worker context. Returns WorkerIdxUnknown if the context is not set or the value is not found there.
func RandomStringID ¶
func RandomStringID() string
RandomStringID returns random alphanumeric string that can be used as ID.
Types ¶
type Backoff ¶
Backoff is the interface for backoff implementation that will be used to reschedule errored jobs to a later time. If the Backoff implementation returns negative duration - the job will be discarded.
func NewConstantBackoff ¶
NewConstantBackoff instantiates new backoff implementation with teh constant retry duration that does not depend on the retry.
func NewExponentialBackoff ¶
NewExponentialBackoff instantiates new exponential Backoff implementation with config
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Gue client that can add jobs to the queue and remove jobs from the queue.
func (*Client) EnqueueBatch ¶
EnqueueBatch adds a batch of jobs. Operation is atomic, so either all jobs are added, or none.
func (*Client) EnqueueBatchTx ¶
EnqueueBatchTx adds a batch of jobs within the scope of the transaction. This allows you to guarantee that an enqueued batch will either be committed or rolled back atomically with other changes in the course of this transaction.
It is the caller's responsibility to Commit or Rollback the transaction after this function is called.
func (*Client) EnqueueTx ¶
EnqueueTx adds a job to the queue within the scope of the transaction. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.
It is the caller's responsibility to Commit or Rollback the transaction after this function is called.
func (*Client) LockNextScheduledJob ¶
func (c *Client) LockNextScheduledJob(ctx context.Context, limits []QueueLimit) (jobs []*Job, err error)
LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.
This function cares about the scheduled time first to lock earliest to execute jobs first even if there are ones with a higher priority scheduled to a later time but already eligible for execution
Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.
After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).
func (*Client) RestoreStuck ¶
type ErrJobReschedule ¶
ErrJobReschedule interface implementation allows errors to reschedule jobs in the individual basis.
type HookFunc ¶
HookFunc is a function that may react to a Job lifecycle events. All the callbacks are being executed synchronously, so be careful with the long-running locking operations. Hooks do not return an error, therefore they can not and must not be used to affect the Job execution flow, e.g. cancel it - this is the WorkFunc responsibility. Modifying Job fields and calling any methods that are modifying its state within hooks may lead to undefined behaviour. Please never do this.
Depending on the event err parameter may be empty or not - check the event description for its meaning.
type Job ¶
Job is a single unit of work for Gue to perform.
func (*Job) Done ¶
Done commits transaction that marks job as done. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
func (*Job) Error ¶
Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.
This call marks job as done and releases (commits) transaction, so calling Done() is not required, although calling it will not cause any issues. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
type PollStrategy ¶
type PollStrategy string
PollStrategy determines how the DB is queried for the next job to work on
type QueueLimit ¶
type WorkFunc ¶
WorkFunc is the handler function that performs the Job. If an error is returned, the Job is either re-enqueued with the given backoff or is discarded based on the worker backoff strategy and returned error.
Modifying Job fields and calling any methods that are modifying its state within the handler may lead to undefined behaviour. Please never do this.
type WorkMap ¶
WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a pool of Workers, each working jobs from the queue at the specified interval using the WorkMap.
func NewWorkerPool ¶
func NewWorkerPool(c *Client, options ...WorkerPoolOption) (*WorkerPool, error)
NewWorkerPool creates a new WorkerPool with count workers using the Client c.
Each Worker in the pool default to a poll interval of 5 seconds, which can be overridden by WithPoolInterval option. The default queue is the nameless queue "", which can be overridden by WithPoolQueue option.
func (*WorkerPool) Client ¶ added in v0.1.1
func (w *WorkerPool) Client() *Client
func (*WorkerPool) Run ¶
func (w *WorkerPool) Run(parentCtx context.Context) (err error)
Run runs all the Workers in the WorkerPool in own goroutines. Run blocks until all workers exit. Use context cancellation for shutdown.
func (*WorkerPool) Stop ¶
func (w *WorkerPool) Stop()
func (*WorkerPool) WorkMap ¶
func (w *WorkerPool) WorkMap(wm WorkMap)
type WorkerPoolOption ¶
type WorkerPoolOption func(pool *WorkerPool)
WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
func WithBackoff ¶ added in v0.1.2
func WithBackoff(backoff Backoff) WorkerPoolOption
WithBackoff sets backoff implementation that will be applied to errored jobs within current client session.
func WithLogger ¶
func WithLogger(l *zap.Logger) WorkerPoolOption
func WithPoolGracefulShutdown ¶
func WithPoolGracefulShutdown(handlerCtx func() context.Context) WorkerPoolOption
WithPoolGracefulShutdown enables graceful shutdown mode for all workers in the pool. See WithWorkerGracefulShutdown for details.
func WithPoolHooksJobDone ¶
func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.
func WithPoolHooksUnknownJobType ¶
func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
func WithPoolID ¶
func WithPoolID(id string) WorkerPoolOption
WithPoolID sets worker pool ID for easier identification in logs
func WithPoolInterval ¶
func WithPoolInterval(d time.Duration) WorkerPoolOption
WithPoolInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPanicStackBufSize ¶
func WithPoolPanicStackBufSize(size int) WorkerPoolOption
WithPoolPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.
func WithPoolQueueRestore ¶
func WithPoolQueueRestore(restoreAfter, interval time.Duration) WorkerPoolOption
func WithWorkerPanicStackBufSize ¶
func WithWorkerPanicStackBufSize(size int) WorkerPoolOption
WithWorkerPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.
func WithWorkerPanicWorkerMap ¶
func WithWorkerPanicWorkerMap(workMap WorkMap) WorkerPoolOption
func WithWorkerPoolHandler ¶
func WithWorkerPoolHandler(jobType string, h WorkFunc) WorkerPoolOption
func WithWorkerPoolQueue ¶
func WithWorkerPoolQueue(queue ...QueueLimit) WorkerPoolOption
WithWorkerPoolQueue overrides default worker queue name with the given value.