scheduler

package
v2.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NumFairnessQueues = 1 << 8
)

Memory pool for heapRequest(s).

Functions

This section is empty.

Types

type BasicTokenBucket

type BasicTokenBucket struct {
	// contains filtered or unexported fields
}

BasicTokenBucket is a basic token bucket implementation.

func NewBasicTokenBucket

func NewBasicTokenBucket(clk clockwork.Clock, fillRate float64, metrics *TokenBucketMetrics) *BasicTokenBucket

NewBasicTokenBucket creates a new BasicTokenBucket with adjusted fill rate.

func (*BasicTokenBucket) GetFillRate

func (btb *BasicTokenBucket) GetFillRate() float64

GetFillRate returns the fill rate of the BasicTokenBucket.

func (*BasicTokenBucket) GetPassThrough

func (btb *BasicTokenBucket) GetPassThrough() bool

GetPassThrough returns the passThrough flag of the BasicTokenBucket.

func (*BasicTokenBucket) PreprocessRequest

func (btb *BasicTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool

PreprocessRequest decides whether to proactively accept a request.

func (*BasicTokenBucket) Return

func (btb *BasicTokenBucket) Return(_ context.Context, tokens float64, _ string)

Return returns tokens to the basic token bucket.

func (*BasicTokenBucket) SetFillRate

func (btb *BasicTokenBucket) SetFillRate(fillRate float64)

SetFillRate adjusts the fill rate of the BasicTokenBucket.

func (*BasicTokenBucket) SetPassThrough

func (btb *BasicTokenBucket) SetPassThrough(passThrough bool)

SetPassThrough sets the passThrough flag of the BasicTokenBucket.

func (*BasicTokenBucket) Take

Take takes tokens from the basic token bucket even if available tokens are less than asked. If tokens are not available at the moment, it will return amount of wait time and checks whether the operation was successful or not.

func (*BasicTokenBucket) TakeIfAvailable

func (btb *BasicTokenBucket) TakeIfAvailable(_ context.Context, tokens float64) (bool, time.Duration, float64, float64, string)

TakeIfAvailable takes tokens from the basic token bucket if available, otherwise return false.

type GlobalTokenBucket

type GlobalTokenBucket struct {
	// contains filtered or unexported fields
}

GlobalTokenBucket is a distributed rate-limiter token bucket implementation.

func NewGlobalTokenBucket

func NewGlobalTokenBucket(key string, limiter ratelimiter.RateLimiter) *GlobalTokenBucket

NewGlobalTokenBucket creates a new instance of GlobalTokenBucket.

func (*GlobalTokenBucket) GetPassThrough

func (gtb *GlobalTokenBucket) GetPassThrough() bool

GetPassThrough returns the passthrough value.

func (*GlobalTokenBucket) PreprocessRequest

func (gtb *GlobalTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool

PreprocessRequest is a no-op.

func (*GlobalTokenBucket) Return

func (gtb *GlobalTokenBucket) Return(ctx context.Context, tokens float64, _ string)

Return returns tokens.

func (*GlobalTokenBucket) SetPassThrough

func (gtb *GlobalTokenBucket) SetPassThrough(passthrough bool)

SetPassThrough sets the passthrough value.

func (*GlobalTokenBucket) Take

Take takes tokens.

func (*GlobalTokenBucket) TakeIfAvailable

func (gtb *GlobalTokenBucket) TakeIfAvailable(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)

TakeIfAvailable takes tokens if available.

type GlobalTokenCounter added in v2.29.0

type GlobalTokenCounter struct {
	// contains filtered or unexported fields
}

GlobalTokenCounter is a distributed rate-limiter token bucket implementation.

func NewGlobalTokenCounter added in v2.29.0

func NewGlobalTokenCounter(key string, limiter concurrencylimiter.ConcurrencyLimiter) *GlobalTokenCounter

NewGlobalTokenCounter creates a new instance of GlobalTokenCounter.

func (*GlobalTokenCounter) GetPassThrough added in v2.29.0

func (gtc *GlobalTokenCounter) GetPassThrough() bool

GetPassThrough returns the passthrough value.

func (*GlobalTokenCounter) PreprocessRequest added in v2.29.0

func (gtc *GlobalTokenCounter) PreprocessRequest(_ context.Context, request *Request) bool

PreprocessRequest is a no-op.

func (*GlobalTokenCounter) Return added in v2.29.0

func (gtc *GlobalTokenCounter) Return(ctx context.Context, tokens float64, reqID string)

Return returns tokens.

func (*GlobalTokenCounter) SetPassThrough added in v2.29.0

func (gtc *GlobalTokenCounter) SetPassThrough(passthrough bool)

SetPassThrough sets the passthrough value.

func (*GlobalTokenCounter) Take added in v2.29.0

Take takes tokens.

func (*GlobalTokenCounter) TakeIfAvailable added in v2.29.0

func (gtc *GlobalTokenCounter) TakeIfAvailable(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)

TakeIfAvailable takes tokens if available.

type LoadMultiplierTokenBucket

type LoadMultiplierTokenBucket struct {
	// contains filtered or unexported fields
}

LoadMultiplierTokenBucket is a token bucket with load multiplier.

func NewLoadMultiplierTokenBucket

func NewLoadMultiplierTokenBucket(
	clk clockwork.Clock,
	slotCount uint8,
	slotDuration time.Duration,
	lmGauge prometheus.Gauge,
	tbMetrics *TokenBucketMetrics,
) *LoadMultiplierTokenBucket

NewLoadMultiplierTokenBucket creates a new TokenBucketLoadMultiplier.

func (*LoadMultiplierTokenBucket) GetPassThrough

func (tbls *LoadMultiplierTokenBucket) GetPassThrough() bool

GetPassThrough gets value of passThrough flag.

func (*LoadMultiplierTokenBucket) LoadMultiplier

func (tbls *LoadMultiplierTokenBucket) LoadMultiplier() float64

LoadMultiplier returns the current load multiplier.

func (*LoadMultiplierTokenBucket) PreprocessRequest

func (tbls *LoadMultiplierTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool

PreprocessRequest preprocesses a request and makes decision whether to pro-actively accept a request.

func (*LoadMultiplierTokenBucket) Return

func (tbls *LoadMultiplierTokenBucket) Return(_ context.Context, tokens float64, _ string)

Return returns tokens to the token bucket.

func (*LoadMultiplierTokenBucket) SetContinuousTracking

func (tbls *LoadMultiplierTokenBucket) SetContinuousTracking(continuousTracking bool)

SetContinuousTracking sets whether to continuously track the token rate and adjust the fill rate based on load multiplier.

func (*LoadMultiplierTokenBucket) SetLoadDecisionValues added in v2.5.0

func (tbls *LoadMultiplierTokenBucket) SetLoadDecisionValues(loadDecision *policysyncv1.LoadDecision)

SetLoadDecisionValues sets the load multiplier number --> 0 = no load accepted, 1 = accept up to 100% of current load, 2 = accept up to 200% of current load.

func (*LoadMultiplierTokenBucket) SetPassThrough

func (tbls *LoadMultiplierTokenBucket) SetPassThrough(passThrough bool)

SetPassThrough sets PassThrough flag which decides whether to pass through requests.

func (*LoadMultiplierTokenBucket) Take

Take takes tokens from the token bucket even if available tokens are less than asked. If tokens are not available at the moment, it will return amount of wait time and checks whether the operation was successful or not.

func (*LoadMultiplierTokenBucket) TakeIfAvailable

func (tbls *LoadMultiplierTokenBucket) TakeIfAvailable(_ context.Context, tokens float64) (bool, time.Duration, float64, float64, string)

TakeIfAvailable takes tokens from the token bucket if available, otherwise return false.

type PreemptionMetrics added in v2.31.0

type PreemptionMetrics struct {
	// contains filtered or unexported fields
}

PreemptionMetrics holds metrics related to preemption and delay for a queuing system.

func NewPreemptionMetrics added in v2.31.0

func NewPreemptionMetrics(
	metricsLabels prometheus.Labels,
	workloadPreemptedTokensSummary *prometheus.SummaryVec,
	workloadDelayedTokensSummary *prometheus.SummaryVec,
	workloadOnTimeCounter *prometheus.CounterVec,
) *PreemptionMetrics

NewPreemptionMetrics creates a new PreemptionMetrics object.

type Request

type Request struct {
	WorkloadLabel string  // for identifying workload
	FairnessLabel string  // for enforcing fairness
	Tokens        float64 // tokens (e.g. expected latency or complexity) for this request
	InvPriority   float64 // larger values represent higher priority
}

Request is metadata for request in a flow that is to be allowed or dropped based on controlled delay and queue limits.

func NewRequest added in v2.5.0

func NewRequest(workloadLabel, fairnessLabel string, tokens float64, invPriority float64) *Request

NewRequest calculates the inverse priority and returns a new Request.

type Scheduler

type Scheduler interface {
	// Schedule sends RequestContext to the underlying scheduler and returns a boolean value,
	// where true means accept and false means reject.
	Schedule(ctx context.Context, request *Request) (accept bool, remaining float64, current float64, reqID string)
	// Info returns the last access time and number of requests that are currently in the queue.
	Info() (time.Time, int)
	// Identifiers returns flowID and workloadID for the given request
	Identifiers(workloadLabel, fairnessLabel string, priority float64, generation uint64) (string, string)
}

Scheduler : Interface for schedulers.

func NewWFQScheduler

func NewWFQScheduler(clk clockwork.Clock, tokenManger TokenManager, metrics *WFQMetrics, metricsLabels prometheus.Labels) Scheduler

NewWFQScheduler creates a new weighted fair queue scheduler.

type TokenBucketMetrics

type TokenBucketMetrics struct {
	FillRateGauge        prometheus.Gauge
	BucketCapacityGauge  prometheus.Gauge
	AvailableTokensGauge prometheus.Gauge
}

TokenBucketMetrics holds metrics related to internals of TokenBucket.

type TokenManager

type TokenManager interface {
	// Take tokens if available, otherwise return false
	TakeIfAvailable(ctx context.Context, tokens float64) (allowed bool, waitTime time.Duration, remaining float64, current float64, requestID string)
	// Take tokens even if available tokens are less than asked - returns wait time if tokens are not available immediately. The other return value conveys whether the operation was successful or not.
	Take(ctx context.Context, tokens float64) (allowed bool, waitTime time.Duration, remaining float64, current float64, requestID string)
	// Return tokens, useful when requests choose to drop themselves on timeout or cancellation
	Return(ctx context.Context, tokens float64, requestID string)
	// Provides TokenManager the request that the scheduler processing -- some TokenManager implementations use this level of visibility for their algorithms. Return value decides whether the request has to be accepted right away in case TokenManger is not yet ready or configured to accept all traffic (short circuit).
	PreprocessRequest(ctx context.Context, request *Request) (accept bool)
}

TokenManager : Interface for token managers.

type WFQMetrics

type WFQMetrics struct {
	IncomingTokensCounter          prometheus.Counter
	AcceptedTokensCounter          prometheus.Counter
	RejectedTokensCounter          prometheus.Counter
	RequestInQueueDurationSummary  *prometheus.SummaryVec
	WorkloadPreemptedTokensSummary *prometheus.SummaryVec
	WorkloadDelayedTokensSummary   *prometheus.SummaryVec
	WorkloadOnTimeCounter          *prometheus.CounterVec
	FairnessPreemptedTokensSummary *prometheus.SummaryVec
	FairnessDelayedTokensSummary   *prometheus.SummaryVec
	FairnessOnTimeCounter          *prometheus.CounterVec
}

WFQMetrics holds metrics related to internal workings of WFQScheduler.

type WFQScheduler

type WFQScheduler struct {
	// contains filtered or unexported fields
}

WFQScheduler : Weighted Fair Queue Scheduler.

func (*WFQScheduler) GetPendingFlows

func (sched *WFQScheduler) GetPendingFlows() int

GetPendingFlows returns the number of flows in the scheduler.

func (*WFQScheduler) GetPendingRequests

func (sched *WFQScheduler) GetPendingRequests() int

GetPendingRequests returns the number of requests in the scheduler.

func (*WFQScheduler) Identifiers added in v2.31.0

func (sched *WFQScheduler) Identifiers(workloadLabel, fairnessLabel string, priority float64, generation uint64) (string, string)

Identifiers computes fairnessQueueID by hashing fairnessLabel and doing a bit-wise AND with number of fairness queues. Constructs workloadID by appending workloadLabel, Priority and Generation. Constructs flowID by appending workloadID and fairnessQueueID.

func (*WFQScheduler) Info

func (sched *WFQScheduler) Info() (time.Time, int)

Info returns the last access time and number of requests that are currently in the queue.

func (*WFQScheduler) Schedule

func (sched *WFQScheduler) Schedule(ctx context.Context, request *Request) (bool, float64, float64, string)

Schedule blocks until the request is scheduled or until timeout. Return value - true: Accept, false: Reject.

type WindowedCounter

type WindowedCounter struct {
	// contains filtered or unexported fields
}

WindowedCounter is a token bucket with a windowed counter.

func NewWindowedCounter

func NewWindowedCounter(clk clockwork.Clock, totalSlots uint8, slotDuration time.Duration) *WindowedCounter

NewWindowedCounter creates a new WindowedCounter with extra slot for the current window.

func (*WindowedCounter) AddTokens

func (counter *WindowedCounter) AddTokens(request *Request) bool

AddTokens to the counter. Return value is true when counter shifted slots and the all the slots in the counter is valid.

func (*WindowedCounter) CalculateTokenRate

func (counter *WindowedCounter) CalculateTokenRate() float64

CalculateTokenRate returns the calculated token rate in the current window.

func (*WindowedCounter) IsBootstrapping

func (counter *WindowedCounter) IsBootstrapping() bool

IsBootstrapping checks whether the counter is in bootstrapping mode.

type WorkloadState added in v2.31.0

type WorkloadState struct {
	// contains filtered or unexported fields
}

WorkloadState holds the state of a workload.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL