gentle

package
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2017 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package gentle defines Stream and Handler interfaces and provides composable resilient implementations of them.

Stream and Handler

Stream and Handler are our fundamental abstractions to achieve back-pressure. Stream has Get() that emits Messages. Handler has Handle() that transforms given Messages.

Stream(https://godoc.org/github.com/cfchou/go-gentle/gentle#Stream)
Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#Handler)

Developers should implement their own logic in the forms of Stream/Handler. For simple cases, these named types SimpleStream and SimpleHandler help to directly use a function as a Stream/Handler.

SimpleStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleStream)
SimpleHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleHandler)

Our Resilience Streams and Handlers

Resiliency patterns are indispensable in distributed systems because external services are not reliable at all time. We provide some useful patterns in the forms of Streams/Handlers. They include rate-limiting, retry(also known as back-off), bulkhead and circuit-breaker. Each of them can be freely composed with other Streams/Handlers as one sees fit.

rateLimitedStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRateLimitedStream)
retryStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRetryStream)
bulkheadStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewBulkheadStream)
circuitBreakerStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewCircuitBreakerStream)

rateLimitedHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRateLimitedHandler)
retryHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewRetryHandler)
bulkheadHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewBulkheadHandler)
circuitBreakerHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#NewCircuitBreakerHandler)

Composability

A Stream/Handler can chain with an arbitrary number of Handlers. Their semantic is that any failing element in the chain would skip the rest of all. Also note that any element can also be a nested chain itself.

AppendHandlersStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersStream)
AppendHandlersHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersHandler)

If simply appending Streams/Handlers is not enough, developers can define Streams/Handlers with advanced flow controls, like these resilience Streams/Handlers defined in this package

Note

The implementation of Stream.Get() and Handler.Handle() should be thread-safe. A good practice is to make Stream/Handler state-less. A Message needs not to be immutable but it's good to be so. Our resilience Streams/Handlers are all thread-safe and don't mutate Messages.

External References

Some of our implementations make heavy use of third-party packages. It helps to checkout their documents.

Circuit-breaker is based on hystrix-go(https://godoc.org/github.com/afex/hystrix-go/hystrix).
Rate-limiting is based on ratelimit(https://godoc.org/github.com/juju/ratelimit).
Logging is based on log15(https://godoc.org/gopkg.in/inconshreveable/log15.v2).

Index

Constants

View Source
const (
	// Types of resilience, are most often used as part of RegistryKey.
	HandlerRateLimited = "hRate"
	HandlerRetry       = "hRetry"
	HandlerBulkhead    = "hBulk"
	//HandlerSemaphore      = "hSem"
	HandlerCircuitBreaker = "hCircuit"
)
View Source
const (
	// Observation supported by all Stream.Get(), it observes the time spent
	// with the label "result" of possible values "ok" or "err"
	MxStreamGet = "get"

	// Observation supported by all Handler.Handle(), it observes the time
	// spent with the label "result" of possible values "ok" or "err"
	MxHandlerHandle = "handle"

	// Observation supported by RetryStreams.Get(), it observes the total
	// number tries with the label "result" of possible values "ok" or "err"
	MxStreamRetryTry = "try"

	// Observation supported by retryHandler.Handle(), it observes the total
	// number of tries with the label "result" of possible values "ok" or
	// "err"
	MxHandlerRetryTry = "try"

	// Observation supported by circuitBreakerStream.Get(), when an error is
	// met, it observes 1 with the label "err" of possible values of
	// "ErrCbCircuitOpen", "ErrCbMaxConcurrency", "ErrCbTimeout" or
	// "NonCbErr"
	MxStreamCircuitBreakerHxerr = "hxerr"

	// Observation supported by circuitBreakerHandler.Handle(), when an
	// error is met, it observes 1 with the label "err" of possible values
	// of "ErrCbCircuitOpen", "ErrCbMaxConcurrency", "ErrCbTimeout" or
	// "NonCbErr"
	MxHandlerCircuitBreakerHxerr = "hxerr"
)
View Source
const (
	// Types of resilience, are most often used as part of RegistryKey.
	StreamRateLimited = "sRate"
	StreamRetry       = "sRetry"
	StreamBulkhead    = "sBulk"
	//StreamSemaphore      = "sSem"
	StreamCircuitBreaker = "sCircuit"
)
View Source
const BackOffStop time.Duration = -1

BackOffStop is a sentinel that BackOff.Next() should return to stop backing off.

Variables

View Source
var (
	// Log is a package level logger. It's the parent logger of all loggers used
	// by resilience Streams/Handlers defined in this package.
	Log = log15.New()

	// ErrCbOpen suggests the circuit is opened.
	ErrCbOpen = errors.New(hystrix.ErrCircuitOpen.Error())
	// ErrCbMaxConcurrency suggests the circuit has reached its maximum
	// concurrency of operations.
	ErrCbMaxConcurrency = errors.New(hystrix.ErrMaxConcurrency.Error())
	// ErrCbTimeout suggests the operation has run for too long.
	ErrCbTimeout = errors.New(hystrix.ErrTimeout.Error())

	// ErrMaxConcurrency suggests BulkheadStream/BulkheadHandler has reached
	// its maximum concurrency of operations.
	ErrMaxConcurrency = errors.New("Reached Max Concurrency")
)
View Source
var (
	ErrMxNotFound = errors.New("Metric Not found")
)

Functions

func IntToMillis

func IntToMillis(millis int) time.Duration

Converts $millis of int to time.Duration.

func RegisterObservation

func RegisterObservation(key *RegistryKey, observation Observation)

Registration helpers. Not thread-safe so synchronization has be done by application if needed.

func UnRegisterObservation

func UnRegisterObservation(key *RegistryKey)

Registration helpers. Not thread-safe so synchronization has be done by application if needed.

Types

type BackOff

type BackOff interface {
	// Next() should immediately return
	Next() time.Duration
}

type BackOffFactory

type BackOffFactory interface {
	NewBackOff() BackOff
}

type BulkheadHandlerOpts

type BulkheadHandlerOpts struct {
	MaxConcurrency int
	// contains filtered or unexported fields
}

func NewBulkheadHandlerOpts

func NewBulkheadHandlerOpts(namespace, name string, maxConcurrency int) *BulkheadHandlerOpts

type BulkheadStreamOpts

type BulkheadStreamOpts struct {
	MaxConcurrency int
	// contains filtered or unexported fields
}

func NewBulkheadStreamOpts

func NewBulkheadStreamOpts(namespace, name string, maxConcurrency int) *BulkheadStreamOpts

type CircuitBreakerConf

type CircuitBreakerConf struct {
	// Timeout is how long to wait for command to complete
	Timeout time.Duration
	// MaxConcurrent is how many commands of the same type can run
	// at the same time
	MaxConcurrent int
	// VolumeThreshold is the minimum number of requests needed
	// before a circuit can be tripped due to health
	VolumeThreshold int
	// ErrorPercentThreshold causes circuits to open once the
	// rolling measure of errors exceeds this percent of requests
	ErrorPercentThreshold int
	// SleepWindow is how long to wait after a circuit opens before testing
	// for recovery is allowed
	SleepWindow time.Duration
}

func NewDefaultCircuitBreakerConf

func NewDefaultCircuitBreakerConf() *CircuitBreakerConf

func (*CircuitBreakerConf) RegisterFor

func (c *CircuitBreakerConf) RegisterFor(circuit string)

type CircuitBreakerHandlerOpts

type CircuitBreakerHandlerOpts struct {
	MetricCbErr Metric
	Circuit     string
	// contains filtered or unexported fields
}

func NewCircuitBreakerHandlerOpts

func NewCircuitBreakerHandlerOpts(namespace, name, circuit string) *CircuitBreakerHandlerOpts

type CircuitBreakerStreamOpts

type CircuitBreakerStreamOpts struct {
	MetricCbErr Metric
	Circuit     string
	// contains filtered or unexported fields
}

func NewCircuitBreakerStreamOpts

func NewCircuitBreakerStreamOpts(namespace, name, circuit string) *CircuitBreakerStreamOpts

type Clock

type Clock interface {
	Now() time.Time
	Sleep(d time.Duration)
}

type ConstantBackOffFactory

type ConstantBackOffFactory struct {
	// contains filtered or unexported fields
}

ConstantBackOffFactory creates non-thread-safe constant BackOff objects.

func (*ConstantBackOffFactory) NewBackOff

func (f *ConstantBackOffFactory) NewBackOff() BackOff

type ConstantBackOffFactoryOpts

type ConstantBackOffFactoryOpts struct {
	Interval time.Duration

	// After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff
	// stops.
	// If both are 0, it never stops backing off.
	// If only one of them is 0, then the other is checked.
	MaxElapsedTime time.Duration
	MaxNumBackOffs int64
	Clock          Clock
}

ConstantBackOffFactoryOpts is default options for ConstantBackOffFactory.

func NewConstantBackOffFactoryOpts

func NewConstantBackOffFactoryOpts(interval time.Duration,
	maxElapsedTime time.Duration) *ConstantBackOffFactoryOpts

type ExponentialBackOffFactory

type ExponentialBackOffFactory struct {
	// contains filtered or unexported fields
}

ExponentialBackOffFactory creates non-thread-safe exponential BackOff objects

func (*ExponentialBackOffFactory) NewBackOff

func (f *ExponentialBackOffFactory) NewBackOff() BackOff

type ExponentialBackOffFactoryOpts

type ExponentialBackOffFactoryOpts struct {
	// Next() returns a randomizedInterval which is:
	// currentInterval * rand(range [1-RandomizationFactor, 1+RandomizationFactor])
	InitialInterval     time.Duration
	RandomizationFactor float64
	Multiplier          float64

	// If currentInterval * Multiplier >= MaxInterval, then currentInterval = b.MaxInterval
	// Otherwise, currentInterval *= Multiplier
	MaxInterval time.Duration

	// After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff
	// stops.
	// If both are 0, it never stops backing off.
	// If only one of them is 0, then the other is checked.
	MaxElapsedTime time.Duration
	MaxNumBackOffs int64
	Clock          Clock
}

func NewExponentialBackOffFactoryOpts

func NewExponentialBackOffFactoryOpts(initInterval time.Duration,
	multiplier float64, maxInterval time.Duration, maxElapsedTime time.Duration) *ExponentialBackOffFactoryOpts

type Handler

type Handler interface {
	// Handle() takes a Message as input and then returns either a Message or
	// an error exclusively.
	Handle(msg Message) (Message, error)
}

Handler transforms a Message.

func AppendFallbacksHandler

func AppendFallbacksHandler(handler Handler, fallbacks ...HandlerFallback) Handler

func AppendHandlersHandler

func AppendHandlersHandler(handler Handler, handlers ...Handler) Handler

func NewBulkheadHandler

func NewBulkheadHandler(opts *BulkheadHandlerOpts, handler Handler) Handler

Create a bulkheadHandler that allows at maximum $max_concurrency Handle() to run concurrently.

func NewCircuitBreakerHandler

func NewCircuitBreakerHandler(opts *CircuitBreakerHandlerOpts, handler Handler) Handler

In hystrix-go, a circuit-breaker must be given a unique name. NewCircuitBreakerStream() creates a circuitBreakerStream with a circuit-breaker named $circuit.

func NewRateLimitedHandler

func NewRateLimitedHandler(opts *RateLimitedHandlerOpts, handler Handler) Handler

func NewRetryHandler

func NewRetryHandler(opts *RetryHandlerOpts, handler Handler) Handler

type HandlerFallback

type HandlerFallback func(Message, error) (Message, error)

HandlerFallback is a fallback function of an error and the causal Message of that error.

type Identity

type Identity interface {
	GetNames() *Names
}

Identity is supported by resilience Streams/Handlers defined in this packages.

type Logger

type Logger interface {
	// Log a message at the given level with key/value pairs. The number of ctx
	// must be a multiple of two(for a key/value pair).
	Debug(msg string, ctx ...interface{})
	Info(msg string, ctx ...interface{})
	Warn(msg string, ctx ...interface{})
	Error(msg string, ctx ...interface{})
	Crit(msg string, ctx ...interface{})
}

Logger provides structural logging

type Message

type Message interface {
	// ID() returns a unique string that identifies this Message.
	ID() string
}

Message is passed around Streams/Handlers.

type Metric

type Metric interface {
	Observe(value float64, labels map[string]string)
}

Metric

type Names

type Names struct {
	Namespace  string
	Resilience string
	Name       string
}

Names identifies resilience Streams/Handlers defined in this package.

type Observation

type Observation interface {
	//Metric
	Observe(value float64, labels map[string]string)
}

Instead of commonly used Counter/Gauge/Timer/Histogram/Percentile, Observation is a general interface that doesn't limit the implementation. An implementation can be whatever meaningful.

func GetObservation

func GetObservation(key *RegistryKey) (Observation, error)

Registration helpers. Not thread-safe so synchronization has be done by application if needed.

func NoOpObservationIfNonRegistered

func NoOpObservationIfNonRegistered(key *RegistryKey) Observation

type RateLimit

type RateLimit interface {
	// Wait for $count tokens to be granted(return true) or timeout(return
	// false). If $timeout == 0, it would block as long as it needs.
	Wait(count int, timeout time.Duration) bool
}

RateLimit is an interface for a "token bucket" algorithm.

type RateLimitedHandlerOpts

type RateLimitedHandlerOpts struct {
	Limiter RateLimit
	// contains filtered or unexported fields
}

func NewRateLimitedHandlerOpts

func NewRateLimitedHandlerOpts(namespace, name string, limiter RateLimit) *RateLimitedHandlerOpts

type RateLimitedStreamOpts

type RateLimitedStreamOpts struct {
	Limiter RateLimit
	// contains filtered or unexported fields
}

func NewRateLimitedStreamOpts

func NewRateLimitedStreamOpts(namespace, name string, limiter RateLimit) *RateLimitedStreamOpts

type RegistryKey

type RegistryKey struct {
	Namespace, Resilience, Name, Mx string
}

type RetryHandlerOpts

type RetryHandlerOpts struct {
	MetricTryNum   Metric
	Clock          Clock
	BackOffFactory BackOffFactory
	// contains filtered or unexported fields
}

func NewRetryHandlerOpts

func NewRetryHandlerOpts(namespace, name string, backOffFactory BackOffFactory) *RetryHandlerOpts

type RetryStreamOpts

type RetryStreamOpts struct {
	MetricTryNum   Metric
	Clock          Clock
	BackOffFactory BackOffFactory
	// contains filtered or unexported fields
}

func NewRetryStreamOpts

func NewRetryStreamOpts(namespace, name string, backOffFactory BackOffFactory) *RetryStreamOpts

type SimpleHandler

type SimpleHandler func(Message) (Message, error)

func (SimpleHandler) Handle

func (r SimpleHandler) Handle(msg Message) (Message, error)

type SimpleStream

type SimpleStream func() (Message, error)

func (SimpleStream) Get

func (r SimpleStream) Get() (Message, error)

type Stream

type Stream interface {
	// Get() returns either a Message or an error exclusively.
	Get() (Message, error)
}

Stream emits Message.

func AppendFallbacksStream

func AppendFallbacksStream(stream Stream, fallbacks ...StreamFallback) Stream

func AppendHandlersStream

func AppendHandlersStream(stream Stream, handlers ...Handler) Stream

func NewBulkheadStream

func NewBulkheadStream(opts *BulkheadStreamOpts, upstream Stream) Stream

Create a bulkheadStream that allows at maximum $max_concurrency Get() to run concurrently.

func NewCircuitBreakerStream

func NewCircuitBreakerStream(opts *CircuitBreakerStreamOpts, stream Stream) Stream

In hystrix-go, a circuit-breaker must be given a unique name. NewCircuitBreakerStream() creates a circuitBreakerStream with a circuit-breaker named $circuit.

func NewRateLimitedStream

func NewRateLimitedStream(opts *RateLimitedStreamOpts, upstream Stream) Stream

func NewRetryStream

func NewRetryStream(opts *RetryStreamOpts, upstream Stream) Stream

type StreamFallback

type StreamFallback func(error) (Message, error)

StreamFallback is a fallback function of an error.

type TokenBucketRateLimit

type TokenBucketRateLimit struct {
	// contains filtered or unexported fields
}

func NewTokenBucketRateLimit

func NewTokenBucketRateLimit(requestsInterval time.Duration,
	maxRequestBurst int) *TokenBucketRateLimit

$requestsInterval The minimum interval, in milliseconds, between two consecutive requests. N is the amount of requests allowed when a burst of requests coming in after not seeing requests for N * RequestsInterval. N is capped by $maxRequestBurst. If $maxRequestBurst == 1, then no burst allowed.

func (*TokenBucketRateLimit) Wait

func (rl *TokenBucketRateLimit) Wait(count int, timeout time.Duration) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL