cockroach: github.com/cockroachdb/cockroach/pkg/util/quotapool Index | Examples | Files

package quotapool

import "github.com/cockroachdb/cockroach/pkg/util/quotapool"

Package quotapool provides an abstract implementation of a pool of resources to be distributed among concurrent clients.

The library also offers a concrete implementation of such a quota pool for single-dimension integer quota. This IntPool acts like a weighted semaphore that additionally offers FIFO ordering for serving requests.

Index

Examples

Package Files

config.go intpool.go manual_time.go notify_queue.go quotapool.go time.go

Variables

var ErrNotEnoughQuota = fmt.Errorf("not enough quota available")

ErrNotEnoughQuota is returned by IntRequestFuncs when they want to be called again once there's new resources.

func HasErrClosed Uses

func HasErrClosed(err error) bool

HasErrClosed returns true if this error is or contains an ErrClosed error.

func LogSlowAcquisition Uses

func LogSlowAcquisition(ctx context.Context, poolName string, r Request, start time.Time) func()

LogSlowAcquisition is a SlowAcquisitionFunc.

type AcquisitionFunc Uses

type AcquisitionFunc func(
    ctx context.Context, poolName string, r Request, start time.Time,
)

AcquisitionFunc is used to configure a quotapool to call a function after an acquisition has occurred.

type ErrClosed Uses

type ErrClosed struct {
    // contains filtered or unexported fields
}

ErrClosed is returned from Acquire after Close has been called.

func (*ErrClosed) Error Uses

func (ec *ErrClosed) Error() string

Error implements error.

type IntAlloc Uses

type IntAlloc struct {
    // contains filtered or unexported fields
}

IntAlloc is an allocated quantity which should be released.

func (*IntAlloc) Acquired Uses

func (ia *IntAlloc) Acquired() uint64

Acquired returns the quantity that this alloc has acquired.

func (*IntAlloc) Freeze Uses

func (ia *IntAlloc) Freeze()

Freeze informs the quota pool that this allocation will never be Release()ed. Releasing it later is illegal and will lead to panics.

Using Freeze and UpdateCapacity on the same pool may require explicit coordination. It is illegal to freeze allocated capacity which is no longer available - specifically it is illegal to make the capacity of an IntPool negative. Imagine the case where the capacity of an IntPool is initially 10. An allocation of 10 is acquired. Then, while it is held, the pool's capacity is updated to be 9. Then the outstanding allocation is frozen. This would make the total capacity of the IntPool negative which is not allowed and will lead to a panic. In general it's a bad idea to freeze allocated quota from a pool which will ever have its capacity decreased.

AcquireFunc() requests will be woken up with an updated Capacity, and Alloc() requests will be trimmed accordingly.

func (*IntAlloc) Merge Uses

func (ia *IntAlloc) Merge(other *IntAlloc)

Merge adds the acquired resources in other to ia. Other may not be used after it has been merged. It is illegal to merge allocs from different pools and doing so will result in a panic.

func (*IntAlloc) Release Uses

func (ia *IntAlloc) Release()

Release releases an IntAlloc back into the IntPool. It is safe to release into a closed pool.

func (*IntAlloc) String Uses

func (ia *IntAlloc) String() string

String formats an IntAlloc as a string.

type IntPool Uses

type IntPool struct {
    // contains filtered or unexported fields
}

IntPool manages allocating integer units of quota to clients. Clients may acquire quota in two ways, using Acquire which requires the client to specify the quantity of quota at call time and AcquireFunc which allows the client to provide a function which will be used to determine whether a quantity of quota is sufficient when it becomes available.

func NewIntPool Uses

func NewIntPool(name string, capacity uint64, options ...Option) *IntPool

NewIntPool creates a new named IntPool.

capacity is the amount of quota initially available. The maximum capacity is math.MaxInt64. If the capacity argument exceeds that value, this function will panic.

func (*IntPool) Acquire Uses

func (p *IntPool) Acquire(ctx context.Context, v uint64) (*IntAlloc, error)

Acquire acquires the specified amount of quota from the pool. On success, a non-nil alloc is returned and Release() must be called on it to return the quota to the pool.

If 'v' is greater than the total capacity of the pool, we instead try to acquire quota equal to the maximum capacity. If the maximum capacity is decreased while this request is ongoing, the request is again truncated to the maximum capacity.

Acquisitions of 0 return immediately with no error, even if the IntPool is closed.

Acquisitions of more than 0 from a pool with 0 capacity always returns an ErrNotEnoughQuota.

Safe for concurrent use.

func (*IntPool) AcquireFunc Uses

func (p *IntPool) AcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, error)

AcquireFunc acquires a quantity of quota determined by a function which is called with a quantity of available quota.

An example use case for AcquireFunc is a pool of workers attempting to acquire resources to run a heterogenous set of jobs. Imagine for example we have a set of workers and a list of jobs which need to be run. The function might be used to choose the largest job which can be run by the existing quantity of quota.

Code:

const quota = 7
const workers = 3
qp := NewIntPool("work units", quota)
type job struct {
    name string
    cost uint64
}
jobs := []*job{
    {name: "foo", cost: 3},
    {name: "bar", cost: 2},
    {name: "baz", cost: 4},
    {name: "qux", cost: 6},
    {name: "quux", cost: 3},
    {name: "quuz", cost: 3},
}
// sortJobs sorts the jobs in highest-to-lowest order with nil last.
sortJobs := func() {
    sort.Slice(jobs, func(i, j int) bool {
        ij, jj := jobs[i], jobs[j]
        if ij != nil && jj != nil {
            return ij.cost > jj.cost
        }
        return ij != nil
    })
}
// getJob finds the largest job which can be run with the current quota.
getJob := func(
    ctx context.Context, qp *IntPool,
) (j *job, alloc *IntAlloc, err error) {
    alloc, err = qp.AcquireFunc(ctx, func(
        ctx context.Context, pi PoolInfo,
    ) (took uint64, err error) {
        sortJobs()
        // There are no more jobs, take 0 and return.
        if jobs[0] == nil {
            return 0, nil
        }
        // Find the largest jobs which can be run.
        for i := range jobs {
            if jobs[i] == nil {
                break
            }
            if jobs[i].cost <= pi.Available {
                j, jobs[i] = jobs[i], nil
                return j.cost, nil
            }
        }
        return 0, ErrNotEnoughQuota
    })
    return j, alloc, err
}
runWorker := func(workerNum int) func(ctx context.Context) error {
    return func(ctx context.Context) error {
        for {
            j, alloc, err := getJob(ctx, qp)
            if err != nil {
                return err
            }
            if j == nil {
                return nil
            }
            alloc.Release()
        }
    }
}
g := ctxgroup.WithContext(context.Background())
for i := 0; i < workers; i++ {
    g.GoCtx(runWorker(i))
}
if err := g.Wait(); err != nil {
    panic(err)
}

func (*IntPool) ApproximateQuota Uses

func (p *IntPool) ApproximateQuota() (q uint64)

ApproximateQuota will report approximately the amount of quota available in the pool. It's "approximate" because, if there's an acquisition in progress, this might return an "intermediate" value - one that does not fully reflect the capacity either before that acquisitions started or after it will have finished.

func (*IntPool) Capacity Uses

func (p *IntPool) Capacity() uint64

Capacity returns the amount of quota managed by this pool.

func (*IntPool) Close Uses

func (p *IntPool) Close(reason string)

Close signals to all ongoing and subsequent acquisitions that the pool is closed and that an error should be returned.

Safe for concurrent use.

func (*IntPool) Closer Uses

func (p *IntPool) Closer(reason string) IntPoolCloser

Closer returns a struct which implements stop.Closer.

func (*IntPool) Full Uses

func (p *IntPool) Full() bool

Full returns true if no quota is outstanding.

func (*IntPool) Len Uses

func (p *IntPool) Len() int

Len returns the current length of the queue for this IntPool.

func (*IntPool) Release Uses

func (p *IntPool) Release(allocs ...*IntAlloc)

Release will release allocs back to their pool. Allocs which are from p are merged into a single alloc before being added to avoid synchronizing on o multiple times. Allocs which did not come from p are released one at a time. It is legal to pass nil values in allocs.

func (*IntPool) TryAcquire Uses

func (p *IntPool) TryAcquire(ctx context.Context, v uint64) (*IntAlloc, error)

TryAcquire is like Acquire but if there is insufficient quota to acquire immediately the method will return ErrNotEnoughQuota.

func (*IntPool) TryAcquireFunc Uses

func (p *IntPool) TryAcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, error)

TryAcquireFunc is like AcquireFunc but if insufficient quota exists the method will return ErrNotEnoughQuota rather than waiting for quota to become available.

func (*IntPool) UpdateCapacity Uses

func (p *IntPool) UpdateCapacity(newCapacity uint64)

UpdateCapacity sets the capacity to newCapacity. If the current capacity is higher than the new capacity, currently running requests will not be affected. When the capacity is increased, new quota will be added. The total quantity of outstanding quota will never exceed the maximum value of the capacity which existed when any outstanding quota was acquired.

type IntPoolCloser Uses

type IntPoolCloser struct {
    // contains filtered or unexported fields
}

IntPoolCloser implements stop.Closer.

func (IntPoolCloser) Close Uses

func (ipc IntPoolCloser) Close()

Close makes the IntPoolCloser a stop.Closer.

type IntRequestFunc Uses

type IntRequestFunc func(ctx context.Context, p PoolInfo) (took uint64, err error)

IntRequestFunc is used to request a quantity of quota determined when quota is available rather than before requesting.

If the request is satisfied, the function returns the amount of quota consumed and no error. If the request is not satisfied because there's no enough quota currently available, ErrNotEnoughQuota is returned to cause the function to be called again where more quota becomes available. took has to be 0 (i.e. it is not allowed for the request to save some quota for later use). If any other error is returned, took again has to be 0. The function will not be called any more and the error will be returned from IntPool.AcquireFunc().

type ManualTime Uses

type ManualTime struct {
    // contains filtered or unexported fields
}

ManualTime is a testing implementation of TimeSource.

func NewManualTime Uses

func NewManualTime(initialTime time.Time) *ManualTime

NewManualTime constructs a new ManualTime.

func (*ManualTime) Advance Uses

func (m *ManualTime) Advance(duration time.Duration)

Advance forwards the current time by the given duration.

func (*ManualTime) AdvanceTo Uses

func (m *ManualTime) AdvanceTo(t time.Time)

AdvanceTo advances the current time to t. If t is earlier than the current time then AdvanceTo is a no-op.

func (*ManualTime) NewTimer Uses

func (m *ManualTime) NewTimer() Timer

NewTimer constructs a new Timer.

func (*ManualTime) Now Uses

func (m *ManualTime) Now() time.Time

Now returns the current time.

func (*ManualTime) Timers Uses

func (m *ManualTime) Timers() []time.Time

Timers returns a snapshot of the timestamps of the pending timers.

type Option Uses

type Option interface {
    // contains filtered or unexported methods
}

Option is used to configure a QuotaPool.

func OnAcquisition Uses

func OnAcquisition(f AcquisitionFunc) Option

OnAcquisition creates an Option to configure a callback upon acquisition. It is often useful for recording metrics.

func OnSlowAcquisition Uses

func OnSlowAcquisition(threshold time.Duration, f SlowAcquisitionFunc) Option

OnSlowAcquisition creates an Option to configure a callback upon slow acquisitions. Only one OnSlowAcquisition may be used. If multiple are specified only the last will be used.

func WithTimeSource Uses

func WithTimeSource(ts TimeSource) Option

WithTimeSource is used to configure a quotapool to use the provided TimeSource.

type PoolInfo Uses

type PoolInfo struct {
    // Available is the amount of quota available to be consumed. This is the
    // maximum value that the `took` return value from IntRequestFunc can be set
    // to.
    // Note that Available() can be 0. This happens when the IntRequestFunc() is
    // called as a result of the pool's capacity decreasing.
    Available uint64

    // Capacity returns the maximum capacity available in the pool. This can
    // decrease over time. It can be used to determine that the resources required
    // by a request will never be available.
    Capacity uint64
}

PoolInfo represents the information that the IntRequestFunc gets about the current quota pool conditions.

type QuotaPool Uses

type QuotaPool struct {
    // contains filtered or unexported fields
}

QuotaPool is an abstract implementation of a pool that stores some unit of Resource. The basic idea is that it allows requests to acquire a quantity of Resource from the pool in FIFO order in a way that interacts well with context cancelation.

func New Uses

func New(name string, initialResource Resource, options ...Option) *QuotaPool

New returns a new quota pool initialized with a given quota. The quota is capped at this amount, meaning that callers may return more quota than they acquired without ever making more than the quota capacity available.

func (*QuotaPool) Acquire Uses

func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error)

Acquire attempts to fulfill the Request with Resource from the qp. Requests are serviced in a FIFO order; only a single request is ever being offered resources at a time. A Request will be offered the pool's current quantity of Resource until it is fulfilled or its context is canceled.

Safe for concurrent use.

func (*QuotaPool) Add Uses

func (qp *QuotaPool) Add(val interface{})

Add adds the provided Alloc back to the pool. The value will be merged with the existing resources in the QuotaPool if there are any.

Safe for concurrent use.

func (*QuotaPool) ApproximateQuota Uses

func (qp *QuotaPool) ApproximateQuota(f func(Resource))

ApproximateQuota will report approximately the amount of quota available in the pool to f. The provided Resource must not be mutated.

func (*QuotaPool) Close Uses

func (qp *QuotaPool) Close(reason string)

Close signals to all ongoing and subsequent acquisitions that they are free to return to their callers. They will receive an *ErrClosed which contains this reason.

Safe for concurrent use.

func (*QuotaPool) Len Uses

func (qp *QuotaPool) Len() int

Len returns the current length of the queue for this QuotaPool.

func (*QuotaPool) TimeSource Uses

func (qp *QuotaPool) TimeSource() TimeSource

TimeSource returns the TimeSource associated with this QuotaPool.

type Request Uses

type Request interface {

    // Acquire decides whether a Request can be fulfilled by a given quantity of
    // Resource.
    //
    // If it is not fulfilled it must not modify or retain the passed alloc.
    // If it is fulfilled, it should modify the Resource value accordingly.
    //
    // If tryAgainAfter is positive, acquisition will be attempted again after
    // the specified duration. This is critical for the implementation of
    // rate limiters on top of this package.
    Acquire(context.Context, Resource) (fulfilled bool, tryAgainAfter time.Duration)

    // ShouldWait indicates whether this request should be queued. If this method
    // returns false and there is insufficient capacity in the pool when the
    // request is queued then ErrNotEnoughQuota will be returned from calls to
    // Acquire.
    ShouldWait() bool
}

Request is an interface used to acquire quota from the pool. Request is responsible for subdividing a resource into the portion which is retained when the Request is fulfilled and the remainder.

type Resource Uses

type Resource interface {

    // Merge combines val into the current resource.
    // After val is passed to Merge, the QuotaPool will never use
    // that Resource again. This behavior allows clients to pool instances of
    // Resources by creating Resource during Acquisition and destroying them in
    // Merge.
    Merge(val interface{}) (shouldNotify bool)
}

Resource is an interface that represents a quantity which is being pooled and allocated. It is any quantity that can be subdivided and combined.

This library does not provide any concrete implementations of Resource but internally the *IntAlloc is used as a resource.

type SlowAcquisitionFunc Uses

type SlowAcquisitionFunc func(
    ctx context.Context, poolName string, r Request, start time.Time,
) (onAcquire func())

SlowAcquisitionFunc is used to configure a quotapool to call a function when quota acquisition is slow. The returned callback is called when the acquisition occurs.

type TimeSource Uses

type TimeSource interface {
    Now() time.Time
    NewTimer() Timer
}

TimeSource is used to interact with clocks and timers. Generally exposed for testing.

type Timer Uses

type Timer interface {
    Reset(duration time.Duration)
    Stop() bool
    Ch() <-chan time.Time
    MarkRead()
}

Timer is an interface wrapping timeutil.Timer.

Package quotapool imports 13 packages (graph) and is imported by 13 packages. Updated 2020-07-11. Refresh now. Tools for package owners.