core

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2021 License: Apache-2.0 Imports: 2 Imported by: 23

Documentation

Overview

Package core provides the package interfaces.

Index

Constants

View Source
const (
	// MetricLimit is the name of the metric for current limit
	MetricLimit = "limit"
	// MetricDropped is the name of the metric for dropped counts
	MetricDropped = "dropped"
	// MetricInFlight is the name of the metric for current in flight count
	MetricInFlight = "inflight"
	// MetricPartitionLimit is the name of the metric for a current partition's limit
	MetricPartitionLimit = "limit.partition"
	// MetricRTT is the name of the metric for the sample Round Trip Time distribution
	MetricRTT = "rtt"
	// MetricMinRTT is the name of the metric for the Minimum Round Trip Time
	MetricMinRTT = "min_rtt"
	// MetricWindowMinRTT is the name of the metric for the Window's Minimum Round Trip Time
	MetricWindowMinRTT = "window.min_rtt"
	// MetricWindowQueueSize represents the name of the metric for the Window's Queue Size
	MetricWindowQueueSize = "window.queue_size"
	// MetricQueueSize represents the name of the metric for the size of a lifo queue
	MetricQueueSize = "queue_size"
	// MetricQueueLimit represents the name of the metric for the max size of a lifo queue
	MetricQueueLimit = "queue_limit"
)

Variables

View Source
var EmptyMetricRegistryInstance = &EmptyMetricRegistry{}

EmptyMetricRegistryInstance is a singleton empty metric registry instance.

Functions

func PrefixMetricWithName

func PrefixMetricWithName(metric, name string) string

PrefixMetricWithName will prefix a given name with the metric name in the form "<name>.<metric>"

Types

type CommonMetricSampler

type CommonMetricSampler struct {
	RTTListener         MetricSampleListener
	DropCounterListener MetricSampleListener
	InFlightListener    MetricSampleListener
}

CommonMetricSampler is a set of common metrics reported by all Limit implementations

func NewCommonMetricSampler

func NewCommonMetricSampler(registry MetricRegistry, limit Limit, name string, tags ...string) *CommonMetricSampler

NewCommonMetricSampler will create a new CommonMetricSampler that will auto-instrument metrics

func (*CommonMetricSampler) Sample

func (s *CommonMetricSampler) Sample(rtt int64, inFlight int, didDrop bool)

Sample will sample the current sample for metric reporting.

type EmptyMetricRegistry

type EmptyMetricRegistry struct{}

EmptyMetricRegistry implements a void reporting metric registry

func (*EmptyMetricRegistry) RegisterCount

func (*EmptyMetricRegistry) RegisterCount(ID string, tags ...string) MetricSampleListener

RegisterCount will register a count sample to this registry

func (*EmptyMetricRegistry) RegisterDistribution

func (*EmptyMetricRegistry) RegisterDistribution(ID string, tags ...string) MetricSampleListener

RegisterDistribution will register a distribution sample to this registry

func (*EmptyMetricRegistry) RegisterGauge

func (*EmptyMetricRegistry) RegisterGauge(ID string, supplier MetricSupplier, tags ...string)

RegisterGauge will register a gauge sample to this registry

func (*EmptyMetricRegistry) RegisterTiming

func (*EmptyMetricRegistry) RegisterTiming(ID string, tags ...string) MetricSampleListener

RegisterTiming will register a timing distribution sample to this registry

func (*EmptyMetricRegistry) Start

func (*EmptyMetricRegistry) Start()

Start will start the metric registry polling

func (*EmptyMetricRegistry) Stop

func (*EmptyMetricRegistry) Stop()

Stop will stop the metric registry polling

type EmptyMetricSampleListener

type EmptyMetricSampleListener struct{}

EmptyMetricSampleListener implements a sample listener that ignores everything.

func (*EmptyMetricSampleListener) AddSample

func (*EmptyMetricSampleListener) AddSample(value float64, tags ...string)

AddSample will add a metric sample to this listener

type Limit

type Limit interface {
	// EstimatedLimit returns the current estimated limit.
	EstimatedLimit() int

	// NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
	//
	// consumer - the callback
	NotifyOnChange(consumer LimitChangeListener)

	// OnSample the concurrency limit using a new rtt sample.
	//
	// startTime - in epoch nanoseconds
	// rtt - round trip time of sample
	// inFlight - in flight observed count during the sample
	// didDrop - true if there was a timeout
	OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
}

Limit is a Contract for an algorithm that calculates a concurrency limit based on rtt measurements.

type LimitChangeListener

type LimitChangeListener func(limit int)

LimitChangeListener is a callback method to receive a notification whenever the limit is updated to a new value.

type Limiter

type Limiter interface {
	// Acquire a token from the limiter.  Returns a nil listener if the limit has been exceeded.
	// If acquired the caller must call one of the Listener methods when the operation has been completed to release
	// the count.
	//
	// context - Context for the request. The context is used by advanced strategies such as LookupPartitionStrategy.
	Acquire(ctx context.Context) (listener Listener, ok bool)
}

Limiter defines the contract for a concurrency limiter. The caller is expected to call acquire() for each request and must also release the returned listener when the operation completes. Releasing the Listener may trigger an update to the concurrency limit based on error rate or latency measurement.

type LimiterRegistry

type LimiterRegistry interface {
	// Get a limiter given a key.
	Get(key string) Limiter
}

LimiterRegistry lookup for integrations that support multiple Limiters, i.e. one per RPC method.

type Listener

type Listener interface {
	// OnSuccess is called as a notification that the operation succeeded and internally measured latency should be
	// used as an RTT sample.
	OnSuccess()
	// OnIgnore is called to indicate the operation failed before any meaningful RTT measurement could be made and
	// should be ignored to not introduce an artificially low RTT.
	OnIgnore()
	// OnDropped is called to indicate the request failed and was dropped due to being rejected by an external limit or
	// hitting a timeout.  Loss based Limit implementations will likely do an aggressive reducing in limit when this
	// happens.
	OnDropped()
}

Listener implements token listener for callback to the limiter when and how it should be released.

type MeasurementInterface

type MeasurementInterface interface {
	// Add a single sample and update the internal state.
	// returns true if the internal state was updated, also return the current value.
	Add(value float64) (float64, bool)

	// Get the current value.
	Get() float64

	// Reset the internal state as if no samples were ever added.
	Reset()

	// Update will update the value given an operation function
	Update(operation func(value float64) float64)
}

MeasurementInterface defines the contract for tracking a measurement such as a minimum or average of a sample set.

type MetricRegistry

type MetricRegistry interface {
	// RegisterDistribution will register a sample distribution.  Samples are added to the distribution via the returned
	// MetricSampleListener. Will reuse an existing MetricSampleListener if the distribution already exists.
	RegisterDistribution(ID string, tags ...string) MetricSampleListener

	// RegisterTiming will register a sample timing distribution.  Samples are added to the distribution via the
	// returned MetricSampleListener. Will reuse an existing MetricSampleListener if the distribution already exists.
	RegisterTiming(ID string, tags ...string) MetricSampleListener

	// RegisterCount will register a sample counter.  Samples are added to the counter via the returned
	// MetricSampleListener. Will reuse an existing MetricSampleListener if the counter already exists.
	RegisterCount(ID string, tags ...string) MetricSampleListener

	// RegisterGauge will register a gauge using the provided supplier.  The supplier will be polled whenever the gauge
	// value is flushed by the registry.
	RegisterGauge(ID string, supplier MetricSupplier, tags ...string)

	// Start will start the metric registry polling
	Start()

	// Stop will stop the metric registry polling
	Stop()
}

MetricRegistry is a simple abstraction for tracking metrics in the limiters.

type MetricSampleListener

type MetricSampleListener interface {
	// AddSample will add a sample metric to the listener
	AddSample(value float64, tags ...string)
}

MetricSampleListener is a listener to receive samples for a distribution

type MetricSupplier

type MetricSupplier func() (value float64, ok bool)

MetricSupplier will return the supplied metric value

func NewFloat64MetricSupplierWrapper

func NewFloat64MetricSupplierWrapper(s func() float64) MetricSupplier

NewFloat64MetricSupplierWrapper will wrap a float64-return value func to a supplier func

func NewIntMetricSupplierWrapper

func NewIntMetricSupplierWrapper(s func() int) MetricSupplier

NewIntMetricSupplierWrapper will wrap a int-return value func to a supplier func

func NewUint64MetricSupplierWrapper added in v0.6.4

func NewUint64MetricSupplierWrapper(s func() uint64) MetricSupplier

NewUint64MetricSupplierWrapper will wrap a uint64-return value func to a supplier func

type SampleWindow

type SampleWindow interface {
	// StartTimeNanoseconds returns the epoch start time in nanoseconds.
	StartTimeNanoseconds() int64
	// CandidateRTTNanoseconds returns the candidate RTT in the sample window. This is traditionally the minimum rtt.
	CandidateRTTNanoseconds() int64
	// AverageRTTNanoseconds returns the average RTT in the sample window.  Excludes timeouts and dropped rtt.
	AverageRTTNanoseconds() int64
	// MaxInFlight returns the maximum number of in-flight observed during the sample window.
	MaxInFlight() int
	// SampleCount is the number of observed RTTs in the sample window.
	SampleCount() int
	// DidDrop returns True if there was a timeout.
	DidDrop() bool
}

SampleWindow represents the details of the current sample window

type StaticStrategyToken

type StaticStrategyToken struct {
	// contains filtered or unexported fields
}

StaticStrategyToken represents a static strategy token, simple but flexible.

func (*StaticStrategyToken) InFlightCount

func (t *StaticStrategyToken) InFlightCount() int

InFlightCount represents the instantaneous snapshot on token creation in-flight count

func (*StaticStrategyToken) IsAcquired

func (t *StaticStrategyToken) IsAcquired() bool

IsAcquired will return true if the token is acquired

func (*StaticStrategyToken) Release

func (t *StaticStrategyToken) Release()

Release will release the current token, it's very important to release all tokens!

type Strategy

type Strategy interface {
	// TryAcquire will try to acquire a token from the limiter.
	// context Context of the request for partitioned limits.
	// returns not ok if limit is exceeded, or a StrategyToken that must be released when the operation completes.
	TryAcquire(ctx context.Context) (token StrategyToken, ok bool)

	// SetLimit will update the strategy with a new limit.
	SetLimit(limit int)
}

Strategy defines how the limiter logic should acquire or not acquire tokens.

type StrategyToken

type StrategyToken interface {
	// IsAcquired returns true if acquired or false if limit has been reached.
	IsAcquired() bool
	// InFlightCount will return the number of pending requests.
	InFlightCount() int
	// Release the acquired token and decrement the current in-flight count.
	Release()
}

StrategyToken represents a token from a limiter algorithm

func NewAcquiredStrategyToken

func NewAcquiredStrategyToken(inFlightCount int, releaseFunc func()) StrategyToken

NewAcquiredStrategyToken will create a new acquired strategy token.

func NewNotAcquiredStrategyToken

func NewNotAcquiredStrategyToken(inFlightCount int) StrategyToken

NewNotAcquiredStrategyToken will create a new un-acquired strategy token.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL