controller

package
v0.0.0-...-a6ee7d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FromPeriodReport = "period_report"
	FromLowRU        = "low_ru"
)

Source List

Inf is the infinite rate limit; it allows all events (even if burst is zero).

View Source
const InfDuration = time.Duration(1<<63 - 1)

InfDuration is the duration returned by Delay when a Reservation is not OK.

Variables

This section is empty.

Functions

func WaitReservations

func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) (time.Duration, error)

WaitReservations is used to process a series of reservations so that all limiter tokens are returned if one reservation fails

Types

type Config

type Config struct {
	// RU model config
	ReadBaseCost   RequestUnit
	ReadBytesCost  RequestUnit
	WriteBaseCost  RequestUnit
	WriteBytesCost RequestUnit
	CPUMsCost      RequestUnit

	DegradedModeWaitDuration time.Duration
	// contains filtered or unexported fields
}

Config is the configuration of the resource units, which gives the read/write request units or request resource cost standards. It should be calculated by a given `RequestUnitConfig` or `RequestResourceConfig`.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns the default configuration.

func GenerateConfig

func GenerateConfig(config *ControllerConfig) *Config

GenerateConfig generates the configuration by the given request unit configuration.

type ControllerConfig

type ControllerConfig struct {
	// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
	DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

	// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
	// This configuration should be modified carefully.
	RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
}

ControllerConfig is the configuration of the resource manager controller which includes some option for client needed.

func DefaultControllerConfig

func DefaultControllerConfig() *ControllerConfig

DefaultControllerConfig returns the default resource manager controller configuration.

type KVCalculator

type KVCalculator struct {
	*Config
}

KVCalculator is used to calculate the KV-side consumption.

func (*KVCalculator) AfterKVRequest

func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo)

AfterKVRequest ...

func (*KVCalculator) BeforeKVRequest

func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req RequestInfo)

BeforeKVRequest ...

func (*KVCalculator) Trickle

func (kc *KVCalculator) Trickle(*rmpb.Consumption)

Trickle ...

type Limit

type Limit float64

Limit defines the maximum frequency of some events. Limit is represented as number of events per second. A zero Limit allows no events.

func Every

func Every(interval time.Duration) Limit

Every converts a minimum time interval between events to a Limit.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

A Limiter controls how frequently events are allowed to happen. It implements a "token bucket" of size b, initially full and refilled at rate r tokens per second. Informally, in any large enough time interval, the Limiter limits the rate to r tokens per second, with a maximum burst size of b events. As a special case, if r == Inf (the infinite rate), b is ignored. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.

The zero value is a valid Limiter, but it will reject all events. Use NewLimiter to create non-zero Limiters.

Limiter has one main methods Reserve. If no token is available, Reserve returns a reservation for a future token and the amount of time the caller must wait before using it, or its associated context.Context is canceled.

Some changes about burst(b):

  • If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
  • If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within an unlimited capacity).
  • If b > 0, that means the limiter is limited capacity.

func NewLimiter

func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter

NewLimiter returns a new Limiter that allows events up to rate r and permits bursts of at most b tokens.

func NewLimiterWithCfg

func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter

NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits bursts of at most b tokens.

func (*Limiter) AvailableTokens

func (lim *Limiter) AvailableTokens(now time.Time) float64

AvailableTokens decreases the amount of tokens currently available.

func (*Limiter) GetBurst

func (lim *Limiter) GetBurst() int64

GetBurst returns the burst size of the limiter

func (*Limiter) IsLowTokens

func (lim *Limiter) IsLowTokens() bool

IsLowTokens returns whether the limiter is in low tokens

func (*Limiter) Limit

func (lim *Limiter) Limit() Limit

Limit returns the maximum overall event rate.

func (*Limiter) Reconfigure

func (lim *Limiter) Reconfigure(now time.Time,
	args tokenBucketReconfigureArgs,
	opts ...LimiterOption,
)

Reconfigure modifies all setting for limiter

func (*Limiter) RemoveTokens

func (lim *Limiter) RemoveTokens(now time.Time, amount float64)

RemoveTokens decreases the amount of tokens currently available.

func (*Limiter) Reserve

func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now time.Time, n float64) *Reservation

Reserve returns a Reservation that indicates how long the caller must wait before n events happen. The Limiter takes this Reservation into account when allowing future events. The returned Reservation’s OK() method returns false if wait duration exceeds deadline. Usage example:

r := lim.Reserve(time.Now(), 1)
if !r.OK() {
  // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
  return
}
time.Sleep(r.Delay())
Act()

Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.

func (*Limiter) ResetRemainingNotifyTimes

func (lim *Limiter) ResetRemainingNotifyTimes()

func (*Limiter) SetupNotificationThreshold

func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64)

SetupNotificationThreshold enables the notification at the given threshold.

type LimiterOption

type LimiterOption func(*Limiter)

type RequestInfo

type RequestInfo interface {
	IsWrite() bool
	WriteBytes() uint64
	StoreID() uint64
}

RequestInfo is the interface of the request information provider. A request should be able to tell whether it's a write request and if so, the written bytes would also be provided.

type RequestUnit

type RequestUnit float64

RequestUnit is the basic unit of the resource request management, which has two types:

  • RRU: read request unit
  • WRU: write request unit

type RequestUnitConfig

type RequestUnitConfig struct {
	// ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or
	// the CPU times taken for a request, this cost is inevitable.
	ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"`
	// ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default.
	ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"`
	// WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or
	// the CPU times taken for a request, this cost is inevitable.
	WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"`
	// WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default.
	WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"`
	// CPUMsCost is the cost for each millisecond of CPU time taken.
	// It's 1 RU = 3 millisecond by default.
	CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}

RequestUnitConfig is the configuration of the request units, which determines the coefficients of the RRU and WRU cost. This configuration should be modified carefully.

func DefaultRequestUnitConfig

func DefaultRequestUnitConfig() RequestUnitConfig

DefaultRequestUnitConfig returns the default request unit configuration.

type Reservation

type Reservation struct {
	// contains filtered or unexported fields
}

A Reservation holds information about events that are permitted by a Limiter to happen after a delay. A Reservation may be canceled, which may enable the Limiter to permit additional events.

func (*Reservation) CancelAt

func (r *Reservation) CancelAt(now time.Time)

CancelAt indicates that the reservation holder will not perform the reserved action and reverses tokens which be refilled into limiter.

func (*Reservation) Delay

func (r *Reservation) Delay() time.Duration

Delay is shorthand for DelayFrom(time.Now()).

func (*Reservation) DelayFrom

func (r *Reservation) DelayFrom(now time.Time) time.Duration

DelayFrom returns the duration for which the reservation holder must wait before taking the reserved action. Zero duration means act immediately. InfDuration means the limiter cannot grant the tokens requested in this Reservation within the maximum wait time.

func (*Reservation) OK

func (r *Reservation) OK() bool

OK returns whether the limiter can provide the requested number of tokens within the maximum wait time. If OK is false, Delay returns InfDuration, and Cancel does nothing.

type ResourceCalculator

type ResourceCalculator interface {
	// Trickle is used to calculate the resource consumption periodically rather than on the request path.
	// It's mainly used to calculate like the SQL CPU cost.
	// Need to check if it is a serverless environment
	Trickle(*rmpb.Consumption)
	// BeforeKVRequest is used to calculate the resource consumption before the KV request.
	// It's mainly used to calculate the base and write request cost.
	BeforeKVRequest(*rmpb.Consumption, RequestInfo)
	// AfterKVRequest is used to calculate the resource consumption after the KV request.
	// It's mainly used to calculate the read request cost and KV CPU cost.
	AfterKVRequest(*rmpb.Consumption, RequestInfo, ResponseInfo)
}

ResourceCalculator is used to calculate the resource consumption of a request.

type ResourceControlCreateOption

type ResourceControlCreateOption func(controller *ResourceGroupsController)

ResourceControlCreateOption create a ResourceGroupsController with the optional settings.

func EnableSingleGroupByKeyspace

func EnableSingleGroupByKeyspace() ResourceControlCreateOption

EnableSingleGroupByKeyspace is the option to enable single group by keyspace feature.

func WithMaxWaitDuration

func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption

WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.

type ResourceGroupKVInterceptor

type ResourceGroupKVInterceptor interface {
	// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
	OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
	// OnResponse is used to consume tokens after receiving response
	OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
}

ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.

type ResourceGroupProvider

type ResourceGroupProvider interface {
	ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error)
	GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error)
	AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
	ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
	DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
	AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
	LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]tm.GlobalConfigItem, int64, error)
}

ResourceGroupProvider provides some api to interact with resource manager server。

type ResourceGroupsController

type ResourceGroupsController struct {
	// contains filtered or unexported fields
}

ResourceGroupsController impls ResourceGroupKVInterceptor.

func NewResourceGroupController

func NewResourceGroupController(
	ctx context.Context,
	clientUniqueID uint64,
	provider ResourceGroupProvider,
	requestUnitConfig *RequestUnitConfig,
	opts ...ResourceControlCreateOption,
) (*ResourceGroupsController, error)

NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor

func (*ResourceGroupsController) CheckResourceGroupExist

func (c *ResourceGroupsController) CheckResourceGroupExist(name string) bool

CheckResourceGroupExist checks if groupsController map {rg.name -> resource group controller} contains name. Used for test only.

func (*ResourceGroupsController) GetConfig

func (c *ResourceGroupsController) GetConfig() *Config

GetConfig returns the config of controller. It's only used for test.

func (*ResourceGroupsController) OnRequestWait

func (c *ResourceGroupsController) OnRequestWait(
	ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, error)

OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.

func (*ResourceGroupsController) OnResponse

func (c *ResourceGroupsController) OnResponse(
	resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error)

OnResponse is used to consume tokens after receiving response

func (*ResourceGroupsController) Start

Start starts ResourceGroupController service.

func (*ResourceGroupsController) Stop

func (c *ResourceGroupsController) Stop() error

Stop stops ResourceGroupController service.

type ResponseInfo

type ResponseInfo interface {
	ReadBytes() uint64
	KVCPU() time.Duration
	// Succeed is used to tell whether the request is successfully returned.
	// If not, we need to pay back the WRU cost of the request.
	Succeed() bool
}

ResponseInfo is the interface of the response information provider. A response should be able to tell how many bytes it read and KV CPU cost in milliseconds.

type SQLCalculator

type SQLCalculator struct {
	*Config
}

SQLCalculator is used to calculate the SQL-side consumption.

func (*SQLCalculator) AfterKVRequest

func (dsc *SQLCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo)

AfterKVRequest ...

func (*SQLCalculator) BeforeKVRequest

func (dsc *SQLCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req RequestInfo)

BeforeKVRequest ...

func (*SQLCalculator) Trickle

func (dsc *SQLCalculator) Trickle(consumption *rmpb.Consumption)

Trickle update sql layer CPU consumption.

type TestRequestInfo

type TestRequestInfo struct {
	// contains filtered or unexported fields
}

TestRequestInfo is used to test the request info interface.

func NewTestRequestInfo

func NewTestRequestInfo(isWrite bool, writeBytes uint64, storeID uint64) *TestRequestInfo

NewTestRequestInfo creates a new TestRequestInfo.

func (*TestRequestInfo) IsWrite

func (tri *TestRequestInfo) IsWrite() bool

IsWrite implements the RequestInfo interface.

func (*TestRequestInfo) StoreID

func (tri *TestRequestInfo) StoreID() uint64

StoreID implements the RequestInfo interface.

func (*TestRequestInfo) WriteBytes

func (tri *TestRequestInfo) WriteBytes() uint64

WriteBytes implements the RequestInfo interface.

type TestResponseInfo

type TestResponseInfo struct {
	// contains filtered or unexported fields
}

TestResponseInfo is used to test the response info interface.

func NewTestResponseInfo

func NewTestResponseInfo(readBytes uint64, kvCPU time.Duration, succeed bool) *TestResponseInfo

func (*TestResponseInfo) KVCPU

func (tri *TestResponseInfo) KVCPU() time.Duration

KVCPU implements the ResponseInfo interface.

func (*TestResponseInfo) ReadBytes

func (tri *TestResponseInfo) ReadBytes() uint64

ReadBytes implements the ResponseInfo interface.

func (*TestResponseInfo) Succeed

func (tri *TestResponseInfo) Succeed() bool

Succeed implements the ResponseInfo interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL