loadshed

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2019 License: Apache-2.0 Imports: 7 Imported by: 0

README

loadshed

Proportional HTTP request rejection based on system load.

This library provides options to loadshed either a service by using a http middleware or calls to a dependent service using http transport.

Options

This package exports a middleware via the middleware.New() method that returns a func(http.Handler) http.Handler which should be compatible with virtually any mux implementation or middleware chain management tool. By default, the generated middleware is a passthrough. Load shedding based on system metrics is enabled by passing to the constructor a load shedder with the options.

This package also exports a transport via the transport.New() method that returns a func(http.RoundTripper) http.RoundTripper which should be compatible with any RoundTripper management tool. By default, the generated RoundTripper is a pass through. Load shedding based on system metrics is enabled by passing to the constructor a load shedder with the options.

Both the middleware and transport have middleware and transport options you can pass in for callbacks and error codes apart from loadshed options. They have been incorporated in the examples below.

CPU

The CPU option enables rejection of new requests based on CPU usage of the host. The example below is for a middleware, with a callback option and cpu loadshedder:


import (
  loadshedmiddleware "github.com/asecurityteam/loadshed/wrappers/middleware"
)

var lowerThreshold = 0.6
var upperThreshold = 0.8
var pollingInterval = time.Second
var windowSize = 10
var middleware = loadshedmiddleware.New(
  loadshed.New(
    loadshed.CPU(lowerThreshold, upperThreshold, pollingInterval, windowSize)),
  loadshedmiddleware.Callback(
		http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    }),
)

The above configures the load shedding middleware to record a 10 second, rolling window of CPU usage data. As long as the average CPU usage within the window is below the lowerThreshold value then all new requests pass through to the wrapped handler. Once the rolling window exceed the lowerThreshold then the middleware will begin rejecting requests with a 503 response at a rate proportional to the distance of the average between the two thresholds. Once the value exceed the upper threshold then all new requests are rejected until it lowers again.

Concurrency

The Concurrency option enables rejections of new requests when there are too many requests currently in flight.


import (
  loadshedmiddleware "github.com/asecurityteam/loadshed/wrappers/middleware"
)

var lowerThreshold = 2500
var upperThreshold = 5000
var wg = loadshed.NewWaitGroup()
var middleware = loadshedmiddleware.New(
  loadshed.New(loadshed.Concurrency(lowerThreshold, upperThreshold, wg)),
)

The above configures the load shedding middleware to track in-flight requests being handled by the server. The middleware will begin rejecting a proportional number of new requests between the lower and upper thresholds like the CPU option above.

For convenience, this package exposes a wrapper around the sync.WaitGroup feature in the standard library that wraps it in an interface compatible with the metric aggregation system used by the middleware. The loadshed.WaitGroup.Add() method will be called on every new request and the corresponding Done() call as each request completes. This is intended to act as a drop-in replacement for graceful shutdown uses of sync.WaitGroup.

AverageLatency

The AverageLatency option enables rejection of new requests when the average latency of all requests within a rolling time window is too high.


import (
  loadshedmiddleware "github.com/asecurityteam/loadshed/wrappers/middleware"
)

var lowerThreshold = .2
var upperThreshold = 1.0
var bucketSize = time.Second
var buckets = 10
var preallocationHint = 2000
var middleware = loadshedmiddleare.NewMiddleware(
  loadshed.New(
  loadshed.AverageLatency(lowerThreshold, upperThreshold, bucketSize, buckets, preallocationHint, requiredPoints)),
loadshedmiddleware.Callback(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
..
}),
)

The above configures the load shedding middleware to track the duration of handling requests. It records the information into bucketSize segments of time and keeps a rolling window of buckets number of segments. The above, for example, keeps a 10 second rolling window with a granularity of 1 second intervals.

The upper and lower thresholds are the time, in fractional seconds, that it takes to execute the wrapped handler. As the average latency of all requests in the window grows beyond the lower threshold then the middleware will begin rejecting new requests. If the latency exceeds the upper threshold then all new requests will be rejected until the average drops again. This will happen over time either as outliers expire or until the entire window has rolled.

The requiredPoints value sets the minimum number of data points recorded in the window before the filter takes effect. This is to help ensure that a sufficient number of data points are collected to satisfy the aggregate before a service begins denying new requests.

The preallocationHint is an optional optimisation for the internals of the rolling window. It should be set to the projected number of data points that will be contained within each bucket of the window. For example, the above service expects to receive approximately 2,000 requests per second. This value is only an optimisation and can be left as 0 if the projected rate is not known.

PercentileLatency

The PercentileLatency option works exactly the same as the AverageLatency option except that it is based on a rolling percentile calculation rather than an average.


import (
  loadshedmiddleware "github.com/asecurityteam/loadshed/wrappers/middleware"
)

var lowerThreshold = .2
var upperThreshold = 1.0
var bucketSize = time.Second
var buckets = 10
var preallocationHint = 2000
var percentile = 95.0
var middleware = loadshedmiddleware.New(
  loadshed.New(
    loadshed.PercentileLatency(lowerThreshold, upperThreshold, bucketSize, buckets, preallocationHint, requiredPoints, percentile)),
)
ErrorRate

The ErrorRate option enables rejection of new requests when the error rate of all requests within a rolling time window is too high. This requires the transport/middleware errorCodes option to pass in the error codes which are to be accounted for in the error rate.

import (
  loadshedtransport "github.com/asecurityteam/loadshed/wrappers/transport"
)

var lowerThreshold = .2
var upperThreshold = 1.0
var bucketSize = time.Second
var buckets = 10
var preallocationHint = 2000
var transport = loadshedtransport.New(
  loadshed.New(
    loadshed.ErrorRate(lowerThreshold, upperThreshold, bucketSize, buckets, preallocationHint, requiredPoints)),
	loadshedtransport.Callback(func(r *http.Request) {}),
	loadshedtransport.ErrorCodes([]int{400, 404, 500, 501, 502, 503}),

)
Aggregator

The Aggregator enables injection of custom metrics that are not already included in this package. The option relies on the Aggregator interface provided by github.com/asecurityteam/rolling and the given aggregator must return a value that is a percentage of requests to reject between 0.0 and 1.0.

// Inject a random amount of chaos when the system is not under load.
type chaosAggregator struct {
  amount float64
}

func (a *chaosAggregator) Aggregate() float64 {
  return a.amount
}

// use it with middleware
var middleware = loadshed.New(
  loadshed.New(
    loadshed.Aggregator(&chaosAggregator{.01}))
)

// or use it with a Transport
var transport = loadshedtransport.New(
  loadshed.New(
    loadshed.Aggregator(&chaosAggregator{.01})),
)

Contributors

Pull requests, issues and comments welcome. For pull requests:

  • Add tests for new features and bug fixes
  • Follow the existing style
  • Separate unrelated changes into multiple pull requests

See the existing issues for things to start contributing.

For bigger changes, make sure you start a discussion first by creating an issue and explaining the intended change.

Atlassian requires contributors to sign a Contributor License Agreement, known as a CLA. This serves as a record stating that the contributor is entitled to contribute the code/documentation/translation to the project and is willing to have it used in distributions and derivative works (or is willing to transfer ownership).

Prior to accepting your contributions we ask that you please follow the appropriate link below to digitally sign the CLA. The Corporate CLA is for those who are contributing as a member of an organization and the individual CLA is for those contributing as an individual.

License

Copyright (c) 2017 Atlassian and others. Apache 2.0 licensed, see LICENSE.txt file.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Doer

type Doer interface {
	Do(func() error) error
}

Doer is an interface representing load shedding interface with Do method

type Loadshed

type Loadshed struct {
	// contains filtered or unexported fields
}

Loadshed is a struct containing all the aggregators that rejects a percentage of requests based on aggregation of system load data.

func New

func New(options ...Option) *Loadshed

New generators a Loadshed struct that sheds load based on some definition of system load

func (*Loadshed) Do

func (l *Loadshed) Do(runfn func() error) error

Do function inputs a function which returns an error

type Option

type Option func(*Loadshed) *Loadshed

Option is a partial initializer for Loadshed

func Aggregator

func Aggregator(a rolling.Aggregator) Option

Aggregator adds an arbitrary Aggregator to the evaluation for load shedding. The result of the aggregator will be interpreted as a percentage value between 0.0 and 1.0. This value will be used as the percentage of requests to reject.

func AverageLatency

func AverageLatency(lower float64, upper float64, bucketSize time.Duration, buckets int, preallocHint int, requiredPoints int) Option

AverageLatency generates an option that adds average request latency within a rolling time window to the load shedding calculation. If the average value, in seconds, falls between lower and upper then a percentage of new requests will be rejected. The rolling window is configured by defining a bucket size and number of buckets. The preallocHint is an optimisation for keeping the number of alloc calls low. If the hint is zero then a default value is used.

func CPU

func CPU(lower float64, upper float64, pollingInterval time.Duration, windowSize int) Option

CPU generates an option that adds a rolling average of CPU usage to the load shedding calculation. It will configure the Decorator to reject a percentage of traffic once the average CPU usage is between lower and upper.

func Concurrency

func Concurrency(lower int, upper int, wg *WaitGroup) Option

Concurrency generates an option that adds total concurrent requests to the load shedding calculation. Once the requests in flight reaches a value between lower and upper the Decorator will begin rejecting new requests based on the distance between the threshold values.

func ErrorRate

func ErrorRate(lower float64, upper float64, bucketSize time.Duration, buckets int, preallocHint int, requiredPoints int) Option

ErrorRate generates an option that calculates the error rate percentile within a rolling time window to the load shedding calculation. If the error rate value falls between the lower and upper then a percentage of new requests will be rejected. The rolling window is configured by defining a bucket size and number of buckets. The preallocHint is an optimisation for keeping the number of alloc calls low. If the hint is zero then a default value is used.

func PercentileLatency

func PercentileLatency(lower float64, upper float64, bucketSize time.Duration, buckets int, preallocHint int, requiredPoints int, percentile float64) Option

PercentileLatency generates an option much like AverageLatency except the aggregation is computed as a percentile of the data recorded rather than an average. The percentile should be given as N%. For example, 95.0 or 99.0. Fractional percentiles, like 99.9, are also valid.

type Rejected

type Rejected struct {
	Aggregate *rolling.Aggregate
}

Rejected is error returned when a request is rejected because of load shedding

func (Rejected) Error

func (r Rejected) Error() string

type WaitGroup

type WaitGroup struct {
	*sync.WaitGroup
	// contains filtered or unexported fields
}

WaitGroup wraps a sync.WaitGroup to make it usable as a load shedding tool.

func NewWaitGroup

func NewWaitGroup() *WaitGroup

NewWaitGroup generates a specialised WaitGroup that tracks the number of concurrent operations. This implementation also satisfies the Aggregator interface from github.com/asecurityteam/rolling so that this can be fed into a calculation of system health.

func (*WaitGroup) Add

func (c *WaitGroup) Add(delta int)

Add some number of concurrent operations.

func (*WaitGroup) Aggregate

func (c *WaitGroup) Aggregate() *rolling.Aggregate

Aggregate returns the current concurrency value

func (*WaitGroup) Done

func (c *WaitGroup) Done()

Done marks an operation as complete and removes the tracking.

func (*WaitGroup) Wait

func (c *WaitGroup) Wait()

Wait for all operations to complete.

Directories

Path Synopsis
wrappers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL