Documentation ¶
Overview ¶
Package gentle defines Stream and Handler interfaces and provides composable resilient implementations of them.
Stream and Handler ¶
Stream and Handler are our fundamental abstractions to achieve back-pressure. Stream has Get() that emits Messages. Handler has Handle() that transforms given Messages.
Stream(https://godoc.org/github.com/cfchou/go-gentle/gentle#Stream) Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#Handler)
Developers should implement their own logic in the forms of Stream/Handler. For simple cases, these named types SimpleStream and SimpleHandler help to directly use a function as a Stream/Handler.
SimpleStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleStream) SimpleHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleHandler)
Our Resilience Streams and Handlers ¶
Resiliency patterns are indispensable in distributed systems because external services are not reliable at all time. We provide some useful patterns in the forms of Streams/Handlers. They include rate-limiting, retry(also known as back-off), bulkhead and circuit-breaker. Each of them can be freely composed with other Streams/Handlers as one sees fit.
rateLimitedStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRateLimitedStream) retryStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRetryStream) bulkheadStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewBulkheadStream) circuitBreakerStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewCircuitBreakerStream) rateLimitedHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRateLimitedHandler) retryHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRetryHandler) bulkheadHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewBulkheadHandler) circuitBreakerHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewCircuitBreakerHandler)
Composability ¶
A Stream/Handler can chain with an arbitrary number of Handlers. Their semantic is that any failing element in the chain would skip the rest of all. Also note that any element can also be a nested chain itself.
AppendHandlersStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersStream) AppendHandlersHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersHandler)
If simply appending Streams/Handlers is not enough, developers can define Streams/Handlers with advanced flow controls, like these resilience Streams/Handlers defined in this package
Note ¶
The implementation of Stream.Get() and Handler.Handle() should be thread-safe. A good practice is to make Stream/Handler state-less. A Message needs not to be immutable but it's good to be so. Our resilience Streams/Handlers are all thread-safe and don't mutate Messages.
External References ¶
Some of our implementations make heavy use of third-party packages. It helps to checkout their documents.
Circuit-breaker is based on hystrix-go(https://godoc.org/github.com/afex/hystrix-go/hystrix). Rate-limiting is based on ratelimit(https://godoc.org/github.com/juju/ratelimit). Logging is based on log15(https://godoc.org/gopkg.in/inconshreveable/log15.v2).
Index ¶
- Constants
- Variables
- func IntToMillis(millis int) time.Duration
- func RegisterObservation(key *RegistryKey, observation Observation)
- func UnRegisterObservation(key *RegistryKey)
- type BackOff
- type BackOffFactory
- type BulkheadHandlerOpts
- type BulkheadStreamOpts
- type CircuitBreakerConf
- type CircuitBreakerHandlerOpts
- type CircuitBreakerStreamOpts
- type Clock
- type ConstantBackOffFactory
- type ConstantBackOffFactoryOpts
- type ExponentialBackOffFactory
- type ExponentialBackOffFactoryOpts
- type Handler
- func AppendFallbacksHandler(handler Handler, fallbacks ...HandlerFallback) Handler
- func AppendHandlersHandler(handler Handler, handlers ...Handler) Handler
- func NewBulkheadHandler(opts *BulkheadHandlerOpts, handler Handler) Handler
- func NewCircuitBreakerHandler(opts *CircuitBreakerHandlerOpts, handler Handler) Handler
- func NewRateLimitedHandler(opts *RateLimitedHandlerOpts, handler Handler) Handler
- func NewRetryHandler(opts *RetryHandlerOpts, handler Handler) Handler
- type HandlerFallback
- type Identity
- type Logger
- type Message
- type Metric
- type Names
- type Observation
- type RateLimit
- type RateLimitedHandlerOpts
- type RateLimitedStreamOpts
- type RegistryKey
- type RetryHandlerOpts
- type RetryStreamOpts
- type SimpleHandler
- type SimpleStream
- type Stream
- func AppendFallbacksStream(stream Stream, fallbacks ...StreamFallback) Stream
- func AppendHandlersStream(stream Stream, handlers ...Handler) Stream
- func NewBulkheadStream(opts *BulkheadStreamOpts, upstream Stream) Stream
- func NewCircuitBreakerStream(opts *CircuitBreakerStreamOpts, stream Stream) Stream
- func NewRateLimitedStream(opts *RateLimitedStreamOpts, upstream Stream) Stream
- func NewRetryStream(opts *RetryStreamOpts, upstream Stream) Stream
- type StreamFallback
- type TokenBucketRateLimit
Constants ¶
const ( // Types of resilience, are most often used as part of RegistryKey. HandlerRateLimited = "hRate" HandlerRetry = "hRetry" HandlerBulkhead = "hBulk" //HandlerSemaphore = "hSem" HandlerCircuitBreaker = "hCircuit" )
const ( // Observation supported by all Stream.Get(), it observes the time spent // with the label "result" of possible values "ok" or "err" MxStreamGet = "get" // Observation supported by all Handler.Handle(), it observes the time // spent with the label "result" of possible values "ok" or "err" MxHandlerHandle = "handle" // Observation supported by RetryStreams.Get(), it observes the total // number tries with the label "result" of possible values "ok" or "err" MxStreamRetryTry = "try" // Observation supported by retryHandler.Handle(), it observes the total // number of tries with the label "result" of possible values "ok" or // "err" MxHandlerRetryTry = "try" // Observation supported by circuitBreakerStream.Get(), when an error is // met, it observes 1 with the label "err" of possible values of // "ErrCbCircuitOpen", "ErrCbMaxConcurrency", "ErrCbTimeout" or // "NonCbErr" MxStreamCircuitBreakerHxerr = "hxerr" // Observation supported by circuitBreakerHandler.Handle(), when an // error is met, it observes 1 with the label "err" of possible values // of "ErrCbCircuitOpen", "ErrCbMaxConcurrency", "ErrCbTimeout" or // "NonCbErr" MxHandlerCircuitBreakerHxerr = "hxerr" )
const ( // Types of resilience, are most often used as part of RegistryKey. StreamRateLimited = "sRate" StreamRetry = "sRetry" StreamBulkhead = "sBulk" //StreamSemaphore = "sSem" StreamCircuitBreaker = "sCircuit" )
const BackOffStop time.Duration = -1
BackOffStop is a sentinel that BackOff.Next() should return to stop backing off.
Variables ¶
var ( // Log is a package level logger. It's the parent logger of all loggers used // by resilience Streams/Handlers defined in this package. Log = log15.New() // ErrCbOpen suggests the circuit is opened. ErrCbOpen = errors.New(hystrix.ErrCircuitOpen.Error()) // ErrCbMaxConcurrency suggests the circuit has reached its maximum // concurrency of operations. ErrCbMaxConcurrency = errors.New(hystrix.ErrMaxConcurrency.Error()) // ErrCbTimeout suggests the operation has run for too long. ErrCbTimeout = errors.New(hystrix.ErrTimeout.Error()) // ErrMaxConcurrency suggests BulkheadStream/BulkheadHandler has reached // its maximum concurrency of operations. ErrMaxConcurrency = errors.New("Reached Max Concurrency") )
var (
ErrMxNotFound = errors.New("Metric Not found")
)
Functions ¶
func IntToMillis ¶
Converts $millis of int to time.Duration.
func RegisterObservation ¶
func RegisterObservation(key *RegistryKey, observation Observation)
Registration helpers. Not thread-safe so synchronization has be done by application if needed.
func UnRegisterObservation ¶
func UnRegisterObservation(key *RegistryKey)
Registration helpers. Not thread-safe so synchronization has be done by application if needed.
Types ¶
type BackOffFactory ¶
type BackOffFactory interface {
NewBackOff() BackOff
}
type BulkheadHandlerOpts ¶
type BulkheadHandlerOpts struct { MaxConcurrency int // contains filtered or unexported fields }
func NewBulkheadHandlerOpts ¶
func NewBulkheadHandlerOpts(namespace, name string, maxConcurrency int) *BulkheadHandlerOpts
type BulkheadStreamOpts ¶
type BulkheadStreamOpts struct { MaxConcurrency int // contains filtered or unexported fields }
func NewBulkheadStreamOpts ¶
func NewBulkheadStreamOpts(namespace, name string, maxConcurrency int) *BulkheadStreamOpts
type CircuitBreakerConf ¶
type CircuitBreakerConf struct { // Timeout is how long to wait for command to complete Timeout time.Duration // MaxConcurrent is how many commands of the same type can run // at the same time MaxConcurrent int // VolumeThreshold is the minimum number of requests needed // before a circuit can be tripped due to health VolumeThreshold int // ErrorPercentThreshold causes circuits to open once the // rolling measure of errors exceeds this percent of requests ErrorPercentThreshold int // SleepWindow is how long to wait after a circuit opens before testing // for recovery is allowed SleepWindow time.Duration }
func NewDefaultCircuitBreakerConf ¶
func NewDefaultCircuitBreakerConf() *CircuitBreakerConf
func (*CircuitBreakerConf) RegisterFor ¶
func (c *CircuitBreakerConf) RegisterFor(circuit string)
type CircuitBreakerHandlerOpts ¶
type CircuitBreakerHandlerOpts struct { MetricCbErr Metric Circuit string // contains filtered or unexported fields }
func NewCircuitBreakerHandlerOpts ¶
func NewCircuitBreakerHandlerOpts(namespace, name, circuit string) *CircuitBreakerHandlerOpts
type CircuitBreakerStreamOpts ¶
type CircuitBreakerStreamOpts struct { MetricCbErr Metric Circuit string // contains filtered or unexported fields }
func NewCircuitBreakerStreamOpts ¶
func NewCircuitBreakerStreamOpts(namespace, name, circuit string) *CircuitBreakerStreamOpts
type ConstantBackOffFactory ¶
type ConstantBackOffFactory struct {
// contains filtered or unexported fields
}
ConstantBackOffFactory creates non-thread-safe constant BackOff objects.
func NewConstantBackOffFactory ¶
func NewConstantBackOffFactory(opts *ConstantBackOffFactoryOpts) *ConstantBackOffFactory
func (*ConstantBackOffFactory) NewBackOff ¶
func (f *ConstantBackOffFactory) NewBackOff() BackOff
type ConstantBackOffFactoryOpts ¶
type ConstantBackOffFactoryOpts struct { Interval time.Duration // After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff // stops. // If both are 0, it never stops backing off. // If only one of them is 0, then the other is checked. MaxElapsedTime time.Duration MaxNumBackOffs int64 Clock Clock }
ConstantBackOffFactoryOpts is default options for ConstantBackOffFactory.
func NewConstantBackOffFactoryOpts ¶
func NewConstantBackOffFactoryOpts(interval time.Duration, maxElapsedTime time.Duration) *ConstantBackOffFactoryOpts
type ExponentialBackOffFactory ¶
type ExponentialBackOffFactory struct {
// contains filtered or unexported fields
}
ExponentialBackOffFactory creates non-thread-safe exponential BackOff objects
func NewExponentialBackOffFactory ¶
func NewExponentialBackOffFactory(opts *ExponentialBackOffFactoryOpts) *ExponentialBackOffFactory
func (*ExponentialBackOffFactory) NewBackOff ¶
func (f *ExponentialBackOffFactory) NewBackOff() BackOff
type ExponentialBackOffFactoryOpts ¶
type ExponentialBackOffFactoryOpts struct { // Next() returns a randomizedInterval which is: // currentInterval * rand(range [1-RandomizationFactor, 1+RandomizationFactor]) InitialInterval time.Duration RandomizationFactor float64 Multiplier float64 // If currentInterval * Multiplier >= MaxInterval, then currentInterval = b.MaxInterval // Otherwise, currentInterval *= Multiplier MaxInterval time.Duration // After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff // stops. // If both are 0, it never stops backing off. // If only one of them is 0, then the other is checked. MaxElapsedTime time.Duration MaxNumBackOffs int64 Clock Clock }
type Handler ¶
type Handler interface { // Handle() takes a Message as input and then returns either a Message or // an error exclusively. Handle(msg Message) (Message, error) }
Handler transforms a Message.
func AppendFallbacksHandler ¶
func AppendFallbacksHandler(handler Handler, fallbacks ...HandlerFallback) Handler
func AppendHandlersHandler ¶
func NewBulkheadHandler ¶
func NewBulkheadHandler(opts *BulkheadHandlerOpts, handler Handler) Handler
Create a bulkheadHandler that allows at maximum $max_concurrency Handle() to run concurrently.
func NewCircuitBreakerHandler ¶
func NewCircuitBreakerHandler(opts *CircuitBreakerHandlerOpts, handler Handler) Handler
In hystrix-go, a circuit-breaker must be given a unique name. NewCircuitBreakerStream() creates a circuitBreakerStream with a circuit-breaker named $circuit.
func NewRateLimitedHandler ¶
func NewRateLimitedHandler(opts *RateLimitedHandlerOpts, handler Handler) Handler
func NewRetryHandler ¶
func NewRetryHandler(opts *RetryHandlerOpts, handler Handler) Handler
type HandlerFallback ¶
HandlerFallback is a fallback function of an error and the causal Message of that error.
type Identity ¶
type Identity interface {
GetNames() *Names
}
Identity is supported by resilience Streams/Handlers defined in this packages.
type Logger ¶
type Logger interface { // Log a message at the given level with key/value pairs. The number of ctx // must be a multiple of two(for a key/value pair). Debug(msg string, ctx ...interface{}) Info(msg string, ctx ...interface{}) Warn(msg string, ctx ...interface{}) Error(msg string, ctx ...interface{}) Crit(msg string, ctx ...interface{}) }
Logger provides structural logging
type Message ¶
type Message interface { // ID() returns a unique string that identifies this Message. ID() string }
Message is passed around Streams/Handlers.
type Observation ¶
Instead of commonly used Counter/Gauge/Timer/Histogram/Percentile, Observation is a general interface that doesn't limit the implementation. An implementation can be whatever meaningful.
func GetObservation ¶
func GetObservation(key *RegistryKey) (Observation, error)
Registration helpers. Not thread-safe so synchronization has be done by application if needed.
func NoOpObservationIfNonRegistered ¶
func NoOpObservationIfNonRegistered(key *RegistryKey) Observation
type RateLimit ¶
type RateLimit interface { // Wait for $count tokens to be granted(return true) or timeout(return // false). If $timeout == 0, it would block as long as it needs. Wait(count int, timeout time.Duration) bool }
RateLimit is an interface for a "token bucket" algorithm.
type RateLimitedHandlerOpts ¶
type RateLimitedHandlerOpts struct { Limiter RateLimit // contains filtered or unexported fields }
func NewRateLimitedHandlerOpts ¶
func NewRateLimitedHandlerOpts(namespace, name string, limiter RateLimit) *RateLimitedHandlerOpts
type RateLimitedStreamOpts ¶
type RateLimitedStreamOpts struct { Limiter RateLimit // contains filtered or unexported fields }
func NewRateLimitedStreamOpts ¶
func NewRateLimitedStreamOpts(namespace, name string, limiter RateLimit) *RateLimitedStreamOpts
type RegistryKey ¶
type RegistryKey struct {
Namespace, Resilience, Name, Mx string
}
type RetryHandlerOpts ¶
type RetryHandlerOpts struct { MetricTryNum Metric Clock Clock BackOffFactory BackOffFactory // contains filtered or unexported fields }
func NewRetryHandlerOpts ¶
func NewRetryHandlerOpts(namespace, name string, backOffFactory BackOffFactory) *RetryHandlerOpts
type RetryStreamOpts ¶
type RetryStreamOpts struct { MetricTryNum Metric Clock Clock BackOffFactory BackOffFactory // contains filtered or unexported fields }
func NewRetryStreamOpts ¶
func NewRetryStreamOpts(namespace, name string, backOffFactory BackOffFactory) *RetryStreamOpts
type SimpleHandler ¶
type SimpleStream ¶
func (SimpleStream) Get ¶
func (r SimpleStream) Get() (Message, error)
type Stream ¶
type Stream interface { // Get() returns either a Message or an error exclusively. Get() (Message, error) }
Stream emits Message.
func AppendFallbacksStream ¶
func AppendFallbacksStream(stream Stream, fallbacks ...StreamFallback) Stream
func AppendHandlersStream ¶
func NewBulkheadStream ¶
func NewBulkheadStream(opts *BulkheadStreamOpts, upstream Stream) Stream
Create a bulkheadStream that allows at maximum $max_concurrency Get() to run concurrently.
func NewCircuitBreakerStream ¶
func NewCircuitBreakerStream(opts *CircuitBreakerStreamOpts, stream Stream) Stream
In hystrix-go, a circuit-breaker must be given a unique name. NewCircuitBreakerStream() creates a circuitBreakerStream with a circuit-breaker named $circuit.
func NewRateLimitedStream ¶
func NewRateLimitedStream(opts *RateLimitedStreamOpts, upstream Stream) Stream
func NewRetryStream ¶
func NewRetryStream(opts *RetryStreamOpts, upstream Stream) Stream
type StreamFallback ¶
StreamFallback is a fallback function of an error.
type TokenBucketRateLimit ¶
type TokenBucketRateLimit struct {
// contains filtered or unexported fields
}
func NewTokenBucketRateLimit ¶
func NewTokenBucketRateLimit(requestsInterval time.Duration, maxRequestBurst int) *TokenBucketRateLimit
$requestsInterval The minimum interval, in milliseconds, between two consecutive requests. N is the amount of requests allowed when a burst of requests coming in after not seeing requests for N * RequestsInterval. N is capped by $maxRequestBurst. If $maxRequestBurst == 1, then no burst allowed.