queue

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 6 Imported by: 1

Documentation

Overview

Package queue provides helpers for working with client-go's `workqueues`.

`queue.OperationsContext` can be used from within a `Handler` to control the behavior of the queue that has called the handler.

The queue operations are:

- Done (stop processing the current key) - Requeue (requeue the current key) - RequeueAfter (wait for some period of time before requeuing the current key) - ReqeueueErr (record an error and requeue) - RequeueAPIError (requeue after waiting according to the priority and fairness response from the apiserver)

If calling these controls from a handler, it's important to `return` immediately so that the handler does not continue processing a key that the queue thinks has stopped.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ShouldRetry

func ShouldRetry(err error) (bool, time.Duration)

ShouldRetry returns true if the error is transient. It returns a delay if the server suggested one.

Types

type Interface

type Interface interface {
	Done()
	RequeueAfter(duration time.Duration)
	Requeue()
	RequeueErr(err error)
	RequeueAPIErr(err error)
	Error() error
}

Interface is the standard queue control interface

type Operations

type Operations struct {
	// contains filtered or unexported fields
}

Operations deals with the current queue key and provides controls for requeueing or stopping reconciliation.

func NewOperations

func NewOperations(done func(), requeueAfter func(time.Duration), cancel context.CancelFunc) *Operations
Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// queue has an object in it
queue.Add("current_key")
key, _ := queue.Get()

// operations are per-key
operations := NewOperations(func() {
	queue.Done(key)
}, func(duration time.Duration) {
	queue.AddAfter(key, duration)
}, cancel)

// typically called from a handler
handler.NewHandlerFromFunc(func(_ context.Context) {
	// do some work
	operations.Done()
}, "example").Handle(ctx)
fmt.Println(queue.Len())

operations.Requeue()
fmt.Println(queue.Len())
Output:

0
1

func (*Operations) Done

func (c *Operations) Done()

Done marks the current key as finished. Note that processing should stop as soon as possible after calling `Done`, since marking it as done frees the queue to potentially process the same key again.

func (*Operations) Error added in v0.5.0

func (c *Operations) Error() error

Error returns the last recorded error, if any

func (*Operations) Requeue

func (c *Operations) Requeue()

Requeue requeues the current key immediately.

func (*Operations) RequeueAPIErr

func (c *Operations) RequeueAPIErr(err error)

RequeueAPIErr checks to see if `err` is a kube api error with retry data. If so, it requeues after the wait period, otherwise, it requeues immediately.

func (*Operations) RequeueAfter

func (c *Operations) RequeueAfter(duration time.Duration)

RequeueAfter requeues the current key after duration.

func (*Operations) RequeueErr

func (c *Operations) RequeueErr(err error)

RequeueErr sets err on the object and requeues the current key.

type OperationsContext

type OperationsContext struct {
	*typedctx.Key[Interface]
}

OperationsContext is like Interface, but fetches the object from a context.

func NewQueueOperationsCtx

func NewQueueOperationsCtx() OperationsContext

NewQueueOperationsCtx returns a new OperationsContext

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// queue has an object in it
queue.Add("current_key")

key, _ := queue.Get()

// operations are per-key
CtxQueue := NewQueueOperationsCtx().WithValue(ctx, NewOperations(func() {
	queue.Done(key)
}, func(duration time.Duration) {
	queue.AddAfter(key, duration)
}, cancel))

// queue controls are passed via context
handler.NewHandlerFromFunc(func(_ context.Context) {
	// do some work
	CtxQueue.Done()
}, "example").Handle(ctx)

fmt.Println(queue.Len())
Output:

0

func (OperationsContext) Done

func (h OperationsContext) Done(ctx context.Context)

func (OperationsContext) Error added in v0.5.0

func (h OperationsContext) Error(ctx context.Context) error

func (OperationsContext) Requeue

func (h OperationsContext) Requeue(ctx context.Context)

func (OperationsContext) RequeueAPIErr

func (h OperationsContext) RequeueAPIErr(ctx context.Context, err error)

func (OperationsContext) RequeueAfter

func (h OperationsContext) RequeueAfter(ctx context.Context, duration time.Duration)

func (OperationsContext) RequeueErr

func (h OperationsContext) RequeueErr(ctx context.Context, err error)

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL