cockroach: github.com/cockroachdb/cockroach/pkg/util/quotapool Index | Examples | Files

package quotapool

import "github.com/cockroachdb/cockroach/pkg/util/quotapool"

Package quotapool provides an abstract implementation of a pool of resources to be distributed among concurrent clients.

The library also offers a concrete implementation of such a quota pool for single-dimension integer quota. This IntPool acts like a weighted semaphore that additionally offers FIFO ordering for serving requests.

Index

Examples

Package Files

config.go intpool.go notify_queue.go quotapool.go

Variables

var ErrNotEnoughQuota = fmt.Errorf("not enough quota available")

ErrNotEnoughQuota is returned by IntRequestFuncs when they want to be called again once there's new resources.

var LogSlowAcquisition = OnSlowAcquisition(base.SlowRequestThreshold, logSlowAcquire)

LogSlowAcquisition is an Option to log slow acquisitions.

type AcquisitionFunc Uses

type AcquisitionFunc func(
    ctx context.Context, poolName string, r Request, start time.Time,
)

AcquisitionFunc is used to configure a quotapool to call a function after an acquisition has occurred.

type ErrClosed Uses

type ErrClosed struct {
    // contains filtered or unexported fields
}

ErrClosed is returned from Acquire after Close has been called.

func (*ErrClosed) Error Uses

func (ec *ErrClosed) Error() string

Error implements error.

type IntAlloc Uses

type IntAlloc struct {
    // contains filtered or unexported fields
}

IntAlloc is an allocated quantity which should be released.

func (*IntAlloc) Acquired Uses

func (ia *IntAlloc) Acquired() uint64

Acquired returns the quantity that this alloc has acquired.

func (*IntAlloc) Freeze Uses

func (ia *IntAlloc) Freeze()

Freeze informs the quota pool that this allocation will never be Release()ed. Releasing it later is illegal.

AcquireFunc() requests will be woken up with an updated Capacity, and Alloc() requests will be trimmed accordingly.

func (*IntAlloc) Merge Uses

func (ia *IntAlloc) Merge(other *IntAlloc)

Merge adds the acquired resources in other to ia. Other may not be used after it has been merged. It is illegal to merge allocs from different pools and doing so will result in a panic.

func (*IntAlloc) Release Uses

func (ia *IntAlloc) Release()

Release releases an IntAlloc back into the IntPool. It is safe to release into a closed pool.

func (*IntAlloc) String Uses

func (ia *IntAlloc) String() string

String formats an IntAlloc as a string.

type IntPool Uses

type IntPool struct {
    // contains filtered or unexported fields
}

IntPool manages allocating integer units of quota to clients. Clients may acquire quota in two ways, using Acquire which requires the client to specify the quantity of quota at call time and AcquireFunc which allows the client to provide a function which will be used to determine whether a quantity of quota is sufficient when it becomes available.

func NewIntPool Uses

func NewIntPool(name string, capacity uint64, options ...Option) *IntPool

NewIntPool creates a new named IntPool.

capacity is the amount of quota initially available.

func (*IntPool) Acquire Uses

func (p *IntPool) Acquire(ctx context.Context, v uint64) (*IntAlloc, error)

Acquire acquires the specified amount of quota from the pool. On success, a non-nil alloc is returned and Release() must be called on it to return the quota to the pool.

If 'v' is greater than the total capacity of the pool, we instead try to acquire quota equal to the maximum capacity. If the maximum capacity is decreased while this request is ongoing, the request is again truncated to the maximum capacity.

Acquisitions of 0 return immediately with no error, even if the IntPool is closed.

Safe for concurrent use.

func (*IntPool) AcquireFunc Uses

func (p *IntPool) AcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, error)

AcquireFunc acquires a quantity of quota determined by a function which is called with a quantity of available quota.

An example use case for AcquireFunc is a pool of workers attempting to acquire resources to run a heterogenous set of jobs. Imagine for example we have a set of workers and a list of jobs which need to be run. The function might be used to choose the largest job which can be run by the existing quantity of quota.

Code:

const quota = 7
const workers = 3
qp := NewIntPool("work units", quota)
type job struct {
    name string
    cost uint64
}
jobs := []*job{
    {name: "foo", cost: 3},
    {name: "bar", cost: 2},
    {name: "baz", cost: 4},
    {name: "qux", cost: 6},
    {name: "quux", cost: 3},
    {name: "quuz", cost: 3},
}
// sortJobs sorts the jobs in highest-to-lowest order with nil last.
sortJobs := func() {
    sort.Slice(jobs, func(i, j int) bool {
        ij, jj := jobs[i], jobs[j]
        if ij != nil && jj != nil {
            return ij.cost > jj.cost
        }
        return ij != nil
    })
}
// getJob finds the largest job which can be run with the current quota.
getJob := func(
    ctx context.Context, qp *IntPool,
) (j *job, alloc *IntAlloc, err error) {
    alloc, err = qp.AcquireFunc(ctx, func(
        ctx context.Context, pi PoolInfo,
    ) (took uint64, err error) {
        sortJobs()
        // There are no more jobs, take 0 and return.
        if jobs[0] == nil {
            return 0, nil
        }
        // Find the largest jobs which can be run.
        for i := range jobs {
            if jobs[i] == nil {
                break
            }
            if jobs[i].cost <= pi.Available {
                j, jobs[i] = jobs[i], nil
                return j.cost, nil
            }
        }
        return 0, ErrNotEnoughQuota
    })
    return j, alloc, err
}
runWorker := func(workerNum int) func(ctx context.Context) error {
    return func(ctx context.Context) error {
        for {
            j, alloc, err := getJob(ctx, qp)
            if err != nil {
                return err
            }
            if j == nil {
                return nil
            }
            alloc.Release()
        }
    }
}
g := ctxgroup.WithContext(context.Background())
for i := 0; i < workers; i++ {
    g.GoCtx(runWorker(i))
}
if err := g.Wait(); err != nil {
    panic(err)
}

func (*IntPool) ApproximateQuota Uses

func (p *IntPool) ApproximateQuota() (q uint64)

ApproximateQuota will report approximately the amount of quota available in the pool.

func (*IntPool) Capacity Uses

func (p *IntPool) Capacity() uint64

Capacity returns the amount of quota managed by this pool.

func (*IntPool) Close Uses

func (p *IntPool) Close(reason string)

Close signals to all ongoing and subsequent acquisitions that the pool is closed and that an error should be returned.

Safe for concurrent use.

func (*IntPool) Len Uses

func (p *IntPool) Len() int

Len returns the current length of the queue for this IntPool.

func (*IntPool) Release Uses

func (p *IntPool) Release(allocs ...*IntAlloc)

Release will release allocs back to their pool. Allocs which are from p are merged into a single alloc before being added to avoid synchronizing on p multiple times. Allocs which did not come from p are released one at a time. It is legal to pass nil values in allocs.

type IntRequestFunc Uses

type IntRequestFunc func(ctx context.Context, p PoolInfo) (took uint64, err error)

IntRequestFunc is used to request a quantity of quota determined when quota is available rather than before requesting.

If the request is satisfied, the function returns the amount of quota consumed and no error. If the request is not satisfied because there's no enough quota currently available, ErrNotEnoughQuota is returned to cause the function to be called again where more quota becomes available. took has to be 0 (i.e. it is not allowed for the request to save some quota for later use). If any other error is returned, took again has to be 0. The function will not be called any more and the error will be returned from IntPool.AcquireFunc().

type Option Uses

type Option interface {
    // contains filtered or unexported methods
}

Option is used to configure a QuotaPool.

func OnAcquisition Uses

func OnAcquisition(f AcquisitionFunc) Option

OnAcquisition creates an Option to configure a callback upon acquisition. It is often useful for recording metrics.

func OnSlowAcquisition Uses

func OnSlowAcquisition(threshold time.Duration, f SlowAcquisitionFunc) Option

OnSlowAcquisition creates an Option to configure a callback upon slow acquisitions. Only one OnSlowAcquisition may be used. If multiple are specified only the last will be used.

type PoolInfo Uses

type PoolInfo struct {
    // Available is the amount of quota available to be consumed. This is the
    // maximum value that the `took` return value from IntRequestFunc can be set
    // to.
    // Note that Available() can be 0. This happens when the IntRequestFunc() is
    // called as a result of the pool's capacity decreasing.
    Available uint64

    // Capacity returns the maximum capacity available in the pool. This can
    // decrease over time. It can be used to determine that the resources required
    // by a request will never be available.
    Capacity uint64
}

PoolInfo represents the information that the IntRequestFunc gets about the current quota pool conditions.

type QuotaPool Uses

type QuotaPool struct {
    // contains filtered or unexported fields
}

QuotaPool is an abstract implementation of a pool that stores some unit of Resource. The basic idea is that it allows requests to acquire a quantity of Resource from the pool in FIFO order in a way that interacts well with context cancelation.

func New Uses

func New(name string, initialResource Resource, options ...Option) *QuotaPool

New returns a new quota pool initialized with a given quota. The quota is capped at this amount, meaning that callers may return more quota than they acquired without ever making more than the quota capacity available.

func (*QuotaPool) Acquire Uses

func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error)

Acquire attempts to fulfill the Request with Resource from the qp. Requests are serviced in a FIFO order; only a single request is ever being offered resources at a time. A Request will be offered the pool's current quantity of Resource until it is fulfilled or its context is canceled.

Safe for concurrent use.

func (*QuotaPool) Add Uses

func (qp *QuotaPool) Add(v Resource)

Add adds the provided Alloc back to the pool. The value will be merged with the existing resources in the QuotaPool if there are any.

Safe for concurrent use.

func (*QuotaPool) ApproximateQuota Uses

func (qp *QuotaPool) ApproximateQuota(f func(Resource))

ApproximateQuota will report approximately the amount of quota available in the pool to f. The provided Resource must not be mutated.

func (*QuotaPool) Close Uses

func (qp *QuotaPool) Close(reason string)

Close signals to all ongoing and subsequent acquisitions that they are free to return to their callers. They will receive an *ErrClosed which contains this reason.

Safe for concurrent use.

func (*QuotaPool) Len Uses

func (qp *QuotaPool) Len() int

Len returns the current length of the queue for this QuotaPool.

type Request Uses

type Request interface {

    // Acquire decides whether a Request can be fulfilled by a given quantity of
    // Resource.
    //
    // If it is not fulfilled it must not modify or retain the passed alloc.
    // If it is fulfilled, it should return any portion of the Alloc it does
    // not intend to use.
    //
    // It is up to the implementer to decide if it makes sense to return a
    // zero-valued, non-nil Resource or nil as unused when acquiring all of the
    // passed Resource. If nil is returned and there is a notion of acquiring a
    // zero-valued Resource unit from the pool then those acquisitions may need to
    // wait until the pool is non-empty before proceeding. Those zero-valued
    // acquisitions will still need to wait to be at the front of the queue. It
    // may make sense for implementers to special case zero-valued acquisitions
    // entirely as IntPool does.
    Acquire(context.Context, Resource) (fulfilled bool, unused Resource)
}

Request is an interface used to acquire quota from the pool. Request is responsible for subdividing a resource into the portion which is retained when the Request is fulfilled and the remainder.

type Resource Uses

type Resource interface {

    // Merge combines other into the current resource.
    // After a Resource (other) is passed to Merge, the QuotaPool will never use
    // that Resource again. This behavior allows clients to pool instances of
    // Resources by creating Resource during Acquisition and destroying them in
    // Merge.
    Merge(other Resource)
}

Resource is an interface that represents a quantity which is being pooled and allocated. It is any quantity that can be subdivided and combined.

This library does not provide any concrete implementations of Resource but internally the *IntAlloc is used as a resource.

type SlowAcquisitionFunc Uses

type SlowAcquisitionFunc func(
    ctx context.Context, poolName string, r Request, start time.Time,
) (onAcquire func())

SlowAcquisitionFunc is used to configure a quotapool to call a function when quota acquisition is slow. The returned callback is called when the acquisition occurs.

Package quotapool imports 11 packages (graph) and is imported by 3 packages. Updated 2019-08-17. Refresh now. Tools for package owners.