gentle

package
v3.0.8+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2017 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package gentle defines Stream and Handler interfaces and provides composable resilient implementations of them.

Stream and Handler

Stream and Handler are our fundamental abstractions. Stream has Get() that emits Messages. Handler has Handle() that transforms given Messages.

Message(https://godoc.org/github.com/cfchou/go-gentle/gentle#Message)
Stream(https://godoc.org/github.com/cfchou/go-gentle/gentle#Stream)
Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#Handler)

Developers should implement their own logic in the forms of Stream/Handler. For simple cases, named types SimpleStream and SimpleHandler help directly make a function a Stream/Handler.

SimpleStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleStream)
SimpleHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleHandler)

Example:

// GameScore implements gentle.Message interface
type GameScore struct {
	id string // better to be unique for tracing it in log
	Score int
}

// a gentle.Message must support ID
func (s GameScore) ID() string {
	return s.id
}

// scoreStream is a gentle.Stream that wraps an API call to an external service for
// getting game scores.
// For simple cases that the logic can be defined entirely in a function, we can
// simply define it to be a gentle.SimpleStream.
var scoreStream gentle.SimpleStream = func(_ context.Context) (gentle.Message, error) {
	// simulate a result from an external service
	return &GameScore{
		id: "",
		Score: rand.Intn(100),
	}, nil
}

// DbWriter is a gentle.Handler that writes scores to the database.
// Instead of using gentle.SimpleHandler, we define a struct explicitly
// implementing gentle.Handler interface.
type DbWriter struct {
	db *sql.DB
	table string
}

func (h *DbWriter) Handle(_ context.Context, msg gentle.Message) (gentle.Message, error) {
	gameScore := msg.(*GameScore)
	statement := fmt.Sprintf("INSERT INTO %s (score, date) VALUES (?, DATETIME());", h.table)
	_, err := h.db.Exec(statement, gameScore.Score)
	if err != nil {
		return nil, err
	}
	return msg, nil
}

// example continues in the next section

Gentle-ments -- our resilience Streams and Handlers

Resiliency patterns are indispensable in distributed systems because external services are not always reliable. Some useful patterns in the forms of Streams/Handlers are provided in this package(pun to call them gentle-ments). They include rate-limiting, retry(back-off), bulkhead and circuit-breaker. Each of them can be freely composed with other Streams/Handlers as one sees fit.

RateLimitedStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#RateLimitedStream)
RetryStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#RetryStream)
BulkheadStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#BulkheadStream)
CircuitStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#CircuitStream)

RateLimitedHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#RateLimitedHandler)
RetryHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#RetryHandler)
BulkheadHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#BulkheadHandler)
CircuitHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#CircuitHandler)

Generally, users call one of the option constructors like NewRetryHandlerOpts, to get an option object filled with default values which can be mutated if necessary. Then, pass it to one of the gentle-ment constructors above.

Example cont.(error handling is omitted for brevity):

func main() {
	db, _ := sql.Open("sqlite3", "scores.sqlite")
	defer db.Close()
	db.Exec("DROP TABLE IF EXISTS game;")
	db.Exec("CREATE TABLE game (score INTEGER, date DATETIME);")

	dbWriter := &DbWriter{
		db: db,
		table: "game",
	}

	// Rate-limit the queries while allowing burst of some
	gentleScoreStream := gentle.NewRateLimitedStream(
		gentle.NewRateLimitedStreamOpts("myApp", "rlQuery",
			gentle.NewTokenBucketRateLimit(500*time.Millisecond, 5)),
		scoreStream)

	// Limit concurrent writes to Db
	limitedDbWriter := gentle.NewBulkheadHandler(
		gentle.NewBulkheadHandlerOpts("myApp", "bkWrite", 16),
		dbWriter)

	// Constantly backing off when limitedDbWriter returns an error
	backoffFactory := gentle.NewConstBackOffFactory(
		gentle.NewConstBackOffFactoryOpts(500*time.Millisecond, 5*time.Minute))
	gentleDbWriter := gentle.NewRetryHandler(
		gentle.NewRetryHandlerOpts("myApp", "rtWrite", backoffFactory),
		limitedDbWriter)

	// Compose the final Stream
	stream := gentle.AppendHandlersStream(gentleScoreStream, gentleDbWriter)

	// Keep fetching scores from the remote service to our database.
	// The amount of simultaneous go-routines are capped by the size of ticketPool.
	ticketPool := make(chan struct{}, 1000)
	for {
		ticketPool <- struct{}{}
		go func() {
			defer func(){ <-ticketPool }()
			stream.Get(context.Background())
		}()
	}
}

Full example(https://gist.github.com/c2ac4060aaf0fcada38a3d85b3c07a71)

Convention

Throughout the package, we follow a convention to create Streams/Handlers or other constructs. Firstly, we calling NewXxxOpts() to obtain an option object initialized with default values. An option object is open for mutation. Once its values are settled, we'll pass it to the corresponding constructor of gentle-ments.

Note that generally an option object is one-off. That is, it should not be reused to construct more than one gentle-ment instance.

Composability

Like gentle-ments, users may define Streams/Handlers to compose other ones the way they want. For simple cases, there are helpers for chaining Streams/Handlers. Their semantic is that any failing element in the chain would skip the rest of all. Also note that any element can also be a nested chain itself.

AppendHandlersStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersStream)
AppendHandlersHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersHandler)

We also define types of fallback and related helpers for appending a chain of fallbacks to Stream/Handler. Any successful fallback in the chain would skip the rest of all.

StreamFallback(https://godoc.org/github.com/cfchou/go-gentle/gentle#StreamFallback)
AppendFallbacksStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendFallbacksStream)

HandlerFallback(https://godoc.org/github.com/cfchou/go-gentle/gentle#HandlerFallback)
AppendFallbacksHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendFallbacksHandler)

Context Support

Stream.Get() and Handler.Handle() both take context.Context. One of Context's common usages is to collaborate request-scoped timeout. Our gentle-ments respect timeout as much as possible and loyally pass the context to the user-defined upstreams or up-handlers which may also respect context's timeout.

Thread Safety

Stream.Get() and Handler.Handle() should be thread-safe. A good practice is to make Stream/Handler state-less. A Message needs not to be immutable but it's good to be so. That said, gentle-ments' Get()/Handle() are all thread-safe and don't mutate Messages.

Logging

Users may plug in whatever logging library they like as long as it supports interface Logger(https://godoc.org/github.com/cfchou/go-gentle/gentle#Logger). Fans of log15 and logurs may check out the sibling package extra/log(https://godoc.org/github.com/cfchou/go-gentle/extra/log) for adapters already available at hand.

There's a root logger gentle.Log which if not specified is a no-op logger. Every gentle-ment has its own logger. Users can get/set the logger in the option object which is then be used to initialize a gentle-ment. By default, each of these loggers is a child returned by gentle.Log.New(fields) where fields are key-value pairs of:

"namespace": "namespace of this Stream/Handler"
"name": "name of this Stream/Handler"
"gentle": "type of this Stream/Handler"

Logger interface doesn't have methods like SetHandler or SetLevel, because such functions are often implementation-dependent. Instead, you set up the logger and then assign it to gentle.Log or a gentle-ment's option. That way, we have fine-grained controls over logging. Check out the sibling package extra/log for examples.

Metrics

Currently there're three metric interfaces of metrics collectors for gentle-ments:

Metric for RateLimitedStream/Handler, BulkheadStream/Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#Metric)
RetryMetric for RetryStream/Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#RetryMetric)
CbMetric for CircuitStream/Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#CbMetric)

In the sibling package extra/metric(https://godoc.org/github.com/cfchou/go-gentle/extra/metric), we have provided implementations for prometheus and statsd and examples. Generally, it's similar to Logger in that one can change an option's metrics collector before creating a gentle-ment. By default, metrics collectors are all no-op.

OpenTracing

Gentle-ments integrate OpenTracing(https://github.com/opentracing/opentracing-go). Users may create a span in the root context which is then passed around by Streams/Handlers. Gentle-ments' options come with a opentracing.Tracer which is by default a global tracer. There's an example of using Uber's jaeger as the backend(https://github.com/cfchou/go-gentle/blob/master/extra/tracing/jaeger.go).

Index

Examples

Constants

View Source
const (
	// BackOffStop is a sentinel return by BackOff.Next() indicating that no
	// more retry should be made.
	BackOffStop time.Duration = -1

	// DefaultMaxNumBackOffs is the default for ContantBackOffFactoryOpts.MaxNumBackOffs
	// and ExpBackOffFactoryOpts.MaxNumBackOffs
	DefaultMaxNumBackOffs = 0

	// DefaultRandomizationFactor is the default for ExpBackOffFactoryOpts.RandomizationFactor
	DefaultRandomizationFactor = 0.5
)
View Source
const (
	// StreamRateLimited and other constants are types of resilience.
	// They are most often used with namespace & name to form an identifier in
	// logging/metric/tracing.
	StreamRateLimited  = "sRate"
	StreamRetry        = "sRetry"
	StreamBulkhead     = "sBulk"
	StreamCircuit      = "sCircuit"
	HandlerRateLimited = "hRate"
	HandlerRetry       = "hRetry"
	HandlerBulkhead    = "hBulk"
	HandlerCircuit     = "hCircuit"
)
View Source
const (
	// Default values of a CircuitConf
	DefaultCbTimeout             = 10 * time.Second
	DefaultCbMaxConcurrent       = 1024
	DefaultCbVolumeThreshold     = 20
	DefaultCbErrPercentThreshold = 50
	DefaultCbSleepWindow         = 5 * time.Second
)

Variables

View Source
var (

	// ErrCbOpen suggests the circuit is opened.
	ErrCbOpen = errors.New(hystrix.ErrCircuitOpen.Error())

	// ErrCbMaxConcurrency suggests the circuit has reached its maximum
	// concurrency of operations.
	ErrCbMaxConcurrency = errors.New(hystrix.ErrMaxConcurrency.Error())

	// ErrCbTimeout suggests the operation has run for too long.
	ErrCbTimeout = errors.New(hystrix.ErrTimeout.Error())

	// ErrMaxConcurrency suggests BulkheadStream/BulkheadHandler has reached
	// its maximum concurrency of operations.
	ErrMaxConcurrency = errors.New("Reached Max Concurrency")
)

Functions

func CircuitReset

func CircuitReset()

CircuitReset resets all states(incl. metrics) of all circuits.

Types

type BackOff

type BackOff interface {
	// Next() returns duration-to-wait.
	Next() time.Duration
}

BackOff provides a series of back-offs. When Next() returning BackOffStop, it'll no longer be called Next() again.

type BackOffFactory

type BackOffFactory interface {
	NewBackOff() BackOff
}

BackOffFactory creates a BackOff every time when RetryStream.Get() or RetryHandler.Handle() needs a series of back-offs.

type BulkheadHandler

type BulkheadHandler struct {
	// contains filtered or unexported fields
}

BulkheadHandler is a Handler that limits concurrent access to the up-handler.

func NewBulkheadHandler

func NewBulkheadHandler(opts *BulkheadHandlerOpts, upHandler Handler) *BulkheadHandler

NewBulkheadHandler creates a BulkheadHandler to guard the up-handler.

Example
var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	return msg, nil
}

count := 10
// limit concurrent access to fakeHandler
handler := NewBulkheadHandler(
	NewBulkheadHandlerOpts("", "test", 2),
	fakeHandler)

wg := &sync.WaitGroup{}
wg.Add(count)
var msgID int64
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		id := atomic.AddInt64(&msgID, 1)
		msg, err := handler.Handle(context.Background(),
			SimpleMessage(strconv.FormatInt(id, 10)))
		if err != nil {
			if err == ErrMaxConcurrency {
				fmt.Println("Reached MaxConcurrency")
			} else {
				fmt.Println("Other err:", err)
			}
		} else {
			fmt.Println("msg:", msg.ID())
		}
	}()
}
wg.Wait()
Output:

func (*BulkheadHandler) Handle

func (r *BulkheadHandler) Handle(ctx context.Context, msg Message) (Message, error)

Handle returns ErrMaxConcurrency when running over the threshold.

type BulkheadHandlerOpts

type BulkheadHandlerOpts struct {
	HandlerOpts
	Metric Metric
	// MaxConcurrency limits the amount of concurrent Handle()
	MaxConcurrency int
}

BulkheadHandlerOpts contains options that'll be used by NewBulkheadHandler.

func NewBulkheadHandlerOpts

func NewBulkheadHandlerOpts(namespace, name string, maxConcurrency int) *BulkheadHandlerOpts

NewBulkheadHandlerOpts returns BulkheadHandlerOpts with default values.

type BulkheadStream

type BulkheadStream struct {
	// contains filtered or unexported fields
}

BulkheadStream is a Stream that limits concurrent access to the upstream.

func NewBulkheadStream

func NewBulkheadStream(opts *BulkheadStreamOpts, upstream Stream) *BulkheadStream

NewBulkheadStream creates a BulkheadStream to guard the upstream.

Example
var msgID int64
var fakeStream SimpleStream = func(_ context.Context) (Message, error) {
	id := atomic.AddInt64(&msgID, 1)
	return SimpleMessage(strconv.FormatInt(id, 10)), nil
}

count := 10
// limit concurrent access to fakeStream
stream := NewBulkheadStream(
	NewBulkheadStreamOpts("", "test", 2),
	fakeStream)

wg := &sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		msg, err := stream.Get(context.Background())
		if err != nil {
			if err == ErrMaxConcurrency {
				fmt.Println("Reached MaxConcurrency")
			} else {
				fmt.Println("Other err:", err)
			}
		} else {
			fmt.Println("msg:", msg.ID())
		}
	}()
}
wg.Wait()
Output:

func (*BulkheadStream) Get

func (r *BulkheadStream) Get(ctx context.Context) (Message, error)

Get returns ErrMaxConcurrency when running over the threshold.

type BulkheadStreamOpts

type BulkheadStreamOpts struct {
	StreamOpts
	Metric         Metric
	MaxConcurrency int
}

BulkheadStreamOpts contains options that'll be used by NewBulkheadStream.

func NewBulkheadStreamOpts

func NewBulkheadStreamOpts(namespace, name string, maxConcurrency int) *BulkheadStreamOpts

NewBulkheadStreamOpts returns BulkheadStreamOpts with default values.

type CbMetric

type CbMetric interface {
	// Successful Stream.Get()/Handler.Handle() with timespan
	ObserveOk(timespan time.Duration)
	// Failed Stream.Get()/Handler.Handle() with timespan and error.
	ObserveErr(timespan time.Duration, err error)
}

CbMetric is an interface for collecting metrics by CircuitStream/CircuitHandler

type CircuitConf

type CircuitConf struct {
	// Timeout is how long to wait for command to complete
	Timeout time.Duration
	// MaxConcurrent is how many commands of the same type can run
	// at the same time
	MaxConcurrent int
	// VolumeThreshold is the minimum number of requests needed
	// before a circuit can be tripped due to health
	VolumeThreshold int
	// ErrorPercentThreshold causes circuits to open once the
	// rolling measure of errors exceeds this percent of requests
	ErrorPercentThreshold int
	// SleepWindow is how long to wait after a circuit opens before testing
	// for recovery is allowed
	SleepWindow time.Duration
}

CircuitConf is the configuration of a circuit-breaker.

func (*CircuitConf) RegisterFor

func (c *CircuitConf) RegisterFor(circuit string)

RegisterFor applies the CircuitConf to a circuit-breaker identified by its name.

type CircuitHandler

type CircuitHandler struct {
	// contains filtered or unexported fields
}

CircuitHandler is a Handler that guards the up-handler with a circuit-breaker.

func NewCircuitHandler

func NewCircuitHandler(opts *CircuitHandlerOpts, upHandler Handler) *CircuitHandler

NewCircuitHandler creates a CircuitHandler to guard the up-handler.

Example
rand.Seed(time.Now().UnixNano())
fakeErr := errors.New("fake err")
// fakeHandler with randomized outcome
var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	if rand.Intn(10)%10 == 0 {
		// 1/10 chances to return fakeErr
		return nil, fakeErr
	}
	if rand.Intn(10)%10 == 1 {
		// 1/10 chances to sleep until circuit's timeout
		time.Sleep(DefaultCbTimeout + 10*time.Millisecond)
	}
	return msg, nil
}

// resets all states(incl. metrics) of all circuits.
CircuitReset()
// create CircuitHandler to protect fakeHandler
handler := NewCircuitHandler(
	NewCircuitHandlerOpts("", "test", xid.New().String()),
	fakeHandler)

count := 100
wg := &sync.WaitGroup{}
wg.Add(count)
var msgID int64
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		id := atomic.AddInt64(&msgID, 1)
		msg, err := handler.Handle(context.Background(),
			SimpleMessage(strconv.FormatInt(id, 10)))
		if err != nil {
			switch err {
			case ErrCbMaxConcurrency:
				fmt.Println("Reached Circuit's MaxConcurrency")
			case ErrCbTimeout:
				fmt.Println("Reached Circuit's Timeout")
			case ErrCbOpen:
				fmt.Println("Reached Circuit's threshold so it opens")
			default:
				fmt.Println("Other err:", err)
			}
			return
		}
		fmt.Println("msg:", msg.ID())
	}()
}
wg.Wait()
Output:

Example (CustomCircuit)
rand.Seed(time.Now().UnixNano())
fakeErr := errors.New("fake err")
// fakeHandler with randomized outcome
var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	if rand.Intn(10)%10 == 0 {
		// 1/10 chances to return fakeErr
		return nil, fakeErr
	}
	if rand.Intn(10)%10 == 1 {
		// 1/10 chances to sleep until circuit's timeout
		time.Sleep(DefaultCbTimeout + 10*time.Millisecond)
	}
	return msg, nil
}

// customize circuit's setting
circuit := xid.New().String()
opts := NewCircuitHandlerOpts("", "test", circuit)
opts.CircuitConf.MaxConcurrent = 20
// resets all states(incl. metrics) of all circuits.
CircuitReset()
// create CircuitHandler to protect fakeHandler
handler := NewCircuitHandler(opts, fakeHandler)

count := 100
wg := &sync.WaitGroup{}
wg.Add(count)
var msgID int64
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		id := atomic.AddInt64(&msgID, 1)
		msg, err := handler.Handle(context.Background(),
			SimpleMessage(strconv.FormatInt(id, 10)))
		if err != nil {
			switch err {
			case ErrCbMaxConcurrency:
				fmt.Println("Reached Circuit's MaxConcurrency")
			case ErrCbTimeout:
				fmt.Println("Reached Circuit's Timeout")
			case ErrCbOpen:
				fmt.Println("Reached Circuit's threshold so it opens")
			default:
				fmt.Println("Other err:", err)
			}
			return
		}
		fmt.Println("msg:", msg.ID())
	}()
}
wg.Wait()
Output:

func (*CircuitHandler) GetCircuitName

func (r *CircuitHandler) GetCircuitName() string

GetCircuitName returns the name of the internal circuit.

func (*CircuitHandler) Handle

func (r *CircuitHandler) Handle(ctx context.Context, msg Message) (Message, error)

Handle may return errors that generated by the up-handler and errors from circuit itself including ErrCircuitOpen, ErrCbMaxConcurrency and ErrCbTimeout.

type CircuitHandlerOpts

type CircuitHandlerOpts struct {
	HandlerOpts
	// CbMetric is the circuit's metric collector. Default is no-op.
	CbMetric CbMetric
	// Circuit is the name of the circuit-breaker to create. Each circuit-breaker
	// must have an unique name associated to its CircuitConf and internal hystrix metrics.
	Circuit     string
	CircuitConf CircuitConf
}

CircuitHandlerOpts contains options that'll be used by NewCircuitHandler.

func NewCircuitHandlerOpts

func NewCircuitHandlerOpts(namespace, name, circuit string) *CircuitHandlerOpts

NewCircuitHandlerOpts returns CircuitHandlerOpts with default values. Note that circuit is the name of the circuit. Every circuit must have a unique name that maps to its CircuitConf.

type CircuitStream

type CircuitStream struct {
	// contains filtered or unexported fields
}

CircuitStream is a Stream that guards the upstream with a circuit-breaker.

func NewCircuitStream

func NewCircuitStream(opts *CircuitStreamOpts, upstream Stream) *CircuitStream

NewCircuitStream creates a CircuitStream to guard the upstream.

Example
rand.Seed(time.Now().UnixNano())
fakeErr := errors.New("fake err")
var msgID int64
// fakeStream with randomized outcome
var fakeStream SimpleStream = func(_ context.Context) (Message, error) {
	if rand.Intn(10)%10 == 0 {
		// 1/10 chances to return fakeErr
		return nil, fakeErr
	}
	id := atomic.AddInt64(&msgID, 1)
	if rand.Intn(10)%10 == 1 {
		// 1/10 chances to sleep until circuit's timeout
		time.Sleep(DefaultCbTimeout + 10*time.Millisecond)
	}
	return SimpleMessage(strconv.FormatInt(id, 10)), nil
}

// resets all states(incl. metrics) of all circuits.
CircuitReset()
// create CircuitStream to protect fakeStream
stream := NewCircuitStream(
	NewCircuitStreamOpts("", "test", xid.New().String()),
	fakeStream)

count := 100
wg := &sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		msg, err := stream.Get(context.Background())
		if err != nil {
			switch err {
			case ErrCbMaxConcurrency:
				fmt.Println("Reached Circuit's MaxConcurrency")
			case ErrCbTimeout:
				fmt.Println("Reached Circuit's Timeout")
			case ErrCbOpen:
				fmt.Println("Reached Circuit's threshold so it opens")
			default:
				fmt.Println("Other err:", err)
			}
			return
		}
		fmt.Println("msg:", msg.ID())
	}()
}
wg.Wait()
Output:

Example (CustomCircuit)
rand.Seed(time.Now().UnixNano())
fakeErr := errors.New("fake err")
var msgID int64
// fakeStream with randomized outcome
var fakeStream SimpleStream = func(_ context.Context) (Message, error) {
	if rand.Intn(10)%10 == 0 {
		// 1/10 chances to return fakeErr
		return nil, fakeErr
	}
	id := atomic.AddInt64(&msgID, 1)
	if rand.Intn(10)%10 == 1 {
		// 1/10 chances to sleep until circuit's timeout
		time.Sleep(DefaultCbTimeout + 10*time.Millisecond)
	}
	return SimpleMessage(strconv.FormatInt(id, 10)), nil
}

// customize circuit's setting
circuit := xid.New().String()
opts := NewCircuitStreamOpts("", "test", circuit)
opts.CircuitConf.MaxConcurrent = 20
// resets all states(incl. metrics) of all circuits.
CircuitReset()
// create CircuitStream to protect fakeStream
stream := NewCircuitStream(opts, fakeStream)

count := 100
wg := &sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		msg, err := stream.Get(context.Background())
		if err != nil {
			switch err {
			case ErrCbMaxConcurrency:
				fmt.Println("Reached Circuit's MaxConcurrency")
			case ErrCbTimeout:
				fmt.Println("Reached Circuit's Timeout")
			case ErrCbOpen:
				fmt.Println("Reached Circuit's threshold so it opens")
			default:
				fmt.Println("Other err:", err)
			}
			return
		}
		fmt.Println("msg:", msg.ID())
	}()
}
wg.Wait()
Output:

func (*CircuitStream) Get

func (r *CircuitStream) Get(ctx context.Context) (Message, error)

Get may return errors that generated by the upstream and errors from circuit itself including ErrCircuitOpen, ErrCbMaxConcurrency and ErrCbTimeout.

func (*CircuitStream) GetCircuitName

func (r *CircuitStream) GetCircuitName() string

GetCircuitName returns the name of the internal circuit.

type CircuitStreamOpts

type CircuitStreamOpts struct {
	StreamOpts
	CbMetric CbMetric

	// Circuit is the name of the circuit-breaker to create. Each circuit-breaker
	// must have an unique name associated to its CircuitConf and internal hystrix metrics.
	Circuit     string
	CircuitConf CircuitConf
}

CircuitStreamOpts contains options that'll be used by NewCircuitStream.

func NewCircuitStreamOpts

func NewCircuitStreamOpts(namespace, name, circuit string) *CircuitStreamOpts

NewCircuitStreamOpts returns CircuitStreamOpts with default values. Note that circuit is the name of the circuit. Every circuit must have a unique name that maps to its CircuitConf.

type ConstBackOffFactory

type ConstBackOffFactory struct {
	// contains filtered or unexported fields
}

ConstBackOffFactory creates non-thread-safe constant BackOffs.

func NewConstBackOffFactory

func NewConstBackOffFactory(opts *ConstBackOffFactoryOpts) *ConstBackOffFactory

NewConstBackOffFactory creates a ConstBackOffFactory.

func (*ConstBackOffFactory) NewBackOff

func (f *ConstBackOffFactory) NewBackOff() BackOff

NewBackOff creates a constant BackOff object.

type ConstBackOffFactoryOpts

type ConstBackOffFactoryOpts struct {
	Interval time.Duration

	// After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff
	// stops.
	// If both are 0, it never stops backing off.
	// If only one of them is 0, then the other is checked.
	MaxElapsedTime time.Duration
	MaxNumBackOffs int64
	// Clock provides a way to mock time. Mainly used in tests.
	Clock clock.Clock
}

ConstBackOffFactoryOpts is settings for ConstBackOffFactory.

func NewConstBackOffFactoryOpts

func NewConstBackOffFactoryOpts(interval time.Duration,
	maxElapsedTime time.Duration) *ConstBackOffFactoryOpts

NewConstBackOffFactoryOpts creates a default ConstBackOffFactoryOpts.

type ExpBackOffFactory

type ExpBackOffFactory struct {
	// contains filtered or unexported fields
}

ExpBackOffFactory creates non-thread-safe exponential BackOffs.

func NewExpBackOffFactory

func NewExpBackOffFactory(opts *ExpBackOffFactoryOpts) *ExpBackOffFactory

NewExpBackOffFactory creates a ExpBackOffFactory

func (*ExpBackOffFactory) NewBackOff

func (f *ExpBackOffFactory) NewBackOff() BackOff

NewBackOff creates an exponential BackOff object.

type ExpBackOffFactoryOpts

type ExpBackOffFactoryOpts struct {
	// Next() returns a randomizedInterval which is:
	// currentInterval * rand(range [1-RandomizationFactor, 1+RandomizationFactor])
	InitialInterval     time.Duration
	RandomizationFactor float64
	Multiplier          float64

	// If currentInterval * Multiplier >= MaxInterval, then currentInterval = b.MaxInterval
	// Otherwise, currentInterval *= Multiplier
	MaxInterval time.Duration

	// After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff
	// stops.
	// If both are 0, it never stops backing off.
	// If only one of them is 0, then the other is checked.
	MaxElapsedTime time.Duration
	MaxNumBackOffs int64
	// Clock provides a way to mock time. Mainly used in tests.
	Clock clock.Clock
}

ExpBackOffFactoryOpts is settings for ExpBackOffFactory.

func NewExpBackOffFactoryOpts

func NewExpBackOffFactoryOpts(initInterval time.Duration,
	multiplier float64, maxInterval time.Duration, maxElapsedTime time.Duration) *ExpBackOffFactoryOpts

NewExpBackOffFactoryOpts creates a default ExpBackOffFactoryOpts.

type Handler

type Handler interface {
	// Handle() takes a Message as input and then returns either a Message or
	// an error exclusively.
	Handle(context.Context, Message) (Message, error)
}

Handler transforms a Message.

func AppendFallbacksHandler

func AppendFallbacksHandler(handler Handler, fallbacks ...HandlerFallback) Handler

AppendFallbacksHandler appends multiple HandlerFallbacks to a Handler. HandlerFallbacks run in sequence. The first fallback takes the error from handler.Handle() and every subsequent fallback accepts an error returned by the previous one. Moreover every fallback also takes the same Message passed to handler.

Example
var firstHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	fmt.Println("handler err caused by msg:", msg.ID())
	return nil, errors.New("handler err")
}
var fallbackA HandlerFallback = func(_ context.Context, msg Message, err error) (Message, error) {
	fmt.Println("fallbackA fails to deal with msg:", msg.ID())
	return nil, errors.New("fallback err")
}
var fallbackB HandlerFallback = func(_ context.Context, msg Message, err error) (Message, error) {
	msgOut := SimpleMessage("xyz")
	fmt.Printf("fallbackB successfully deals with msg: %s, returns msg: %s\n",
		msg.ID(), msgOut.ID())
	return msgOut, nil
}

handler := AppendFallbacksHandler(firstHandler, fallbackA, fallbackB)
msg, _ := handler.Handle(context.Background(), SimpleMessage("abc"))
fmt.Println("msg finally returned:", msg.ID())
Output:

handler err caused by msg: abc
fallbackA fails to deal with msg: abc
fallbackB successfully deals with msg: abc, returns msg: xyz
msg finally returned: xyz

func AppendHandlersHandler

func AppendHandlersHandler(handler Handler, handlers ...Handler) Handler

AppendHandlersHandler appends multiple Handlers to a Handler. Handlers run in sequence and every handler accepts a Message returned by the previous handler.

Example
var repeatHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	msgOut := SimpleMessage(msg.ID() + "+")
	fmt.Printf("handle msg: %s, returns msg: %s\n", msg.ID(), msgOut.ID())
	return msgOut, nil
}
handler := AppendHandlersHandler(repeatHandler, repeatHandler, repeatHandler)
msg, _ := handler.Handle(context.Background(), SimpleMessage("abc"))
fmt.Println("msg finally returned:", msg.ID())
Output:

handle msg: abc, returns msg: abc+
handle msg: abc+, returns msg: abc++
handle msg: abc++, returns msg: abc+++
msg finally returned: abc+++

type HandlerFallback

type HandlerFallback func(context.Context, Message, error) (Message, error)

HandlerFallback is defined to be a function that takes the error returned by Handler.Handle() and the causal Message of that error. It then either returns a Message or return an error.

type HandlerOpts

type HandlerOpts struct {
	// Namespace and Name are for logically organizing Streams/Handlers. They
	// appear along with the type of resilience in log or tracing
	Namespace  string
	Name       string
	Log        Logger
	Tracer     opentracing.Tracer
	TracingRef TracingRef
}

HandlerOpts is options that every XxxHandlerOpts must have.

type Logger

type Logger interface {
	// Log a message at the given level with key/value pairs. The number of
	// fields must be multiple of two for a key and a value.
	Debug(msg string, fields ...interface{})
	Info(msg string, fields ...interface{})
	Warn(msg string, fields ...interface{})
	Error(msg string, fields ...interface{})
	Crit(msg string, fields ...interface{})

	// Logger may support structured/contextual logging. Our Streams/Handlers
	// by default will acquire an logger by calling package root logger
	// gentle.Log.New(
	//   "namespace", "namespace of this Stream/Handler",
	//   "name", "name of this Stream/Handler",
	//   "gentle", "type of this stream/handler")
	New(fields ...interface{}) Logger
}

Logger provides structural logging interface.

var Log Logger

Log is a package level logger. It's the parent logger of all loggers used by resilience Streams/Handlers defined in this package.

type Message

type Message interface {
	// ID() should return a unique string representing this Message.
	ID() string
}

Message is passed around Streams/Handlers.

type Metric

type Metric interface {
	// Successful Stream.Get()/Handler.Handle() with timespan
	ObserveOk(timespan time.Duration)
	// Failed Stream.Get()/Handler.Handle() with timespan
	ObserveErr(timespan time.Duration)
}

Metric is an interface for collecting metrics by RateLimitStream/RateLimitHandler/BulkheadStream/BulkheadHandler

type RateLimit

type RateLimit interface {
	// Wait for $count tokens to be granted(return true) or timeout(return
	// false). If $timeout == 0, it would block as long as it needs.
	Wait(count int, timeout time.Duration) bool
}

RateLimit is an interface for a "token bucket" algorithm.

type RateLimitedHandler

type RateLimitedHandler struct {
	// contains filtered or unexported fields
}

RateLimitedHandler is a Handler that runs the up-handler in a rate-limited manner.

func NewRateLimitedHandler

func NewRateLimitedHandler(opts *RateLimitedHandlerOpts, upHandler Handler) *RateLimitedHandler

NewRateLimitedHandler creates a RateLimitedHandler to guard the up-handler.

Example
var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	return msg, nil
}

count := 5
interval := 100 * time.Millisecond
minimum := time.Duration(count-1) * interval

// limit the rate to access fakeStream
handler := NewRateLimitedHandler(
	NewRateLimitedHandlerOpts("", "test",
		NewTokenBucketRateLimit(interval, 1)),
	fakeHandler)

begin := time.Now()
wg := &sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		handler.Handle(context.Background(), SimpleMessage("abc"))
	}()
}
wg.Wait()
fmt.Printf("Spend more than %s? %t\n", minimum,
	time.Now().After(begin.Add(minimum)))
Output:

Spend more than 400ms? true

func (*RateLimitedHandler) Handle

func (r *RateLimitedHandler) Handle(ctx context.Context, msg Message) (Message, error)

Handle blocks when requests coming too fast.

type RateLimitedHandlerOpts

type RateLimitedHandlerOpts struct {
	HandlerOpts
	Metric  Metric
	Limiter RateLimit
}

RateLimitedHandlerOpts contains options that'll be used by NewRateLimitedHandler.

func NewRateLimitedHandlerOpts

func NewRateLimitedHandlerOpts(namespace, name string, limiter RateLimit) *RateLimitedHandlerOpts

NewRateLimitedHandlerOpts returns RateLimitedHandlerOpts with default values.

type RateLimitedStream

type RateLimitedStream struct {
	// contains filtered or unexported fields
}

RateLimitedStream is a Stream that runs the upstream in a rate-limited manner.

func NewRateLimitedStream

func NewRateLimitedStream(opts *RateLimitedStreamOpts, upstream Stream) *RateLimitedStream

NewRateLimitedStream creates a RateLimitedStream to guard the upstream.

Example
var msgID int64
var fakeStream SimpleStream = func(_ context.Context) (Message, error) {
	id := atomic.AddInt64(&msgID, 1)
	return SimpleMessage(strconv.FormatInt(id, 10)), nil
}

count := 5
interval := 100 * time.Millisecond
minimum := time.Duration(count-1) * interval

// limit the rate to access fakeStream
stream := NewRateLimitedStream(
	NewRateLimitedStreamOpts("", "test",
		NewTokenBucketRateLimit(interval, 1)),
	fakeStream)

begin := time.Now()
wg := &sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
	go func() {
		defer wg.Done()
		stream.Get(context.Background())
	}()
}
wg.Wait()
fmt.Printf("Spend more than %s? %t\n", minimum,
	time.Now().After(begin.Add(minimum)))
Output:

Spend more than 400ms? true

func (*RateLimitedStream) Get

Get blocks when requests coming too fast.

type RateLimitedStreamOpts

type RateLimitedStreamOpts struct {
	StreamOpts
	Metric  Metric
	Limiter RateLimit
}

RateLimitedStreamOpts contains options that'll be used by NewRateLimitedStream.

func NewRateLimitedStreamOpts

func NewRateLimitedStreamOpts(namespace, name string, limiter RateLimit) *RateLimitedStreamOpts

NewRateLimitedStreamOpts returns RateLimitedStreamOpts with default values.

type RetryHandler

type RetryHandler struct {
	// contains filtered or unexported fields
}

RetryHandler is a Handler that retries the up-handler with back-offs.

func NewRetryHandler

func NewRetryHandler(opts *RetryHandlerOpts, upHandler Handler) *RetryHandler

NewRetryHandler creates a RetryHandler to guard the up-handler.

Example (ContantBackOff)
fakeErr := errors.New("fake err")
// fakeHandler keeps triggering back-offs.
var fakeHandler SimpleHandler = func(_ context.Context, _ Message) (Message, error) {
	return nil, fakeErr
}

// No more back-off when total execution + back-offs elapsed more than 1s.
backOffOpts := NewConstBackOffFactoryOpts(100*time.Millisecond, time.Second)
backOffFactory := NewConstBackOffFactory(backOffOpts)
opts := NewRetryHandlerOpts("", "test", backOffFactory)
// Retry with back-offs to access fakeHandler
handler := NewRetryHandler(opts, fakeHandler)
_, err := handler.Handle(context.Background(), SimpleMessage("abc"))
fmt.Println(err)
Output:

fake err
Example (ExpBackOff)
fakeErr := errors.New("fake err")
// fakeHandler keeps triggering back-offs.
var fakeHandler SimpleHandler = func(_ context.Context, _ Message) (Message, error) {
	return nil, fakeErr
}

// No more back-off when total execution + back-offs elapsed more than 2s.
backOffOpts := NewExpBackOffFactoryOpts(100*time.Millisecond, 2,
	time.Second, 2*time.Second)
backOffFactory := NewExpBackOffFactory(backOffOpts)
opts := NewRetryHandlerOpts("", "test", backOffFactory)
// Retry with back-offs to access fakeHandler
handler := NewRetryHandler(opts, fakeHandler)
_, err := handler.Handle(context.Background(), SimpleMessage("abc"))
fmt.Println(err)
Output:

fake err

func (*RetryHandler) Handle

func (r *RetryHandler) Handle(ctx context.Context, msg Message) (Message, error)

Handle retries with back-offs when up-handler returns an error.

type RetryHandlerOpts

type RetryHandlerOpts struct {
	HandlerOpts
	RetryMetric RetryMetric
	// TODO
	// remove the dependency to package clock for this exported symbol
	Clock clock.Clock
	// BackOffFactory for instantiating BackOff objects
	BackOffFactory BackOffFactory
}

RetryHandlerOpts contains options that'll be used by NewRetryHandler.

func NewRetryHandlerOpts

func NewRetryHandlerOpts(namespace, name string, backOffFactory BackOffFactory) *RetryHandlerOpts

NewRetryHandlerOpts returns RetryHandlerOpts with default values.

type RetryMetric

type RetryMetric interface {
	// Successful Stream.Get()/Handler.Handle() with timespan and the number of retries
	ObserveOk(timespan time.Duration, retry int)
	// Failed Stream.Get()/Handler.Handle()  with timespan and the number of retries
	ObserveErr(timespan time.Duration, retry int)
}

RetryMetric is an interface for collecting metrics by RetryStream/RetryHandler

type RetryStream

type RetryStream struct {
	// contains filtered or unexported fields
}

RetryStream will, when Get() encounters error, back off for some time and then retries.

func NewRetryStream

func NewRetryStream(opts *RetryStreamOpts, upstream Stream) *RetryStream

NewRetryStream creates a RetryStream to guard the upstream.

Example (ContantBackOff)
fakeErr := errors.New("fake err")
// fakeStream keeps triggering back-offs.
var fakeStream SimpleStream = func(_ context.Context) (Message, error) {
	return nil, fakeErr
}

// No more back-off when total execution + back-offs elapsed more than 1s.
backOffOpts := NewConstBackOffFactoryOpts(100*time.Millisecond, time.Second)
backOffFactory := NewConstBackOffFactory(backOffOpts)
opts := NewRetryStreamOpts("", "test", backOffFactory)
// Retry with back-offs to access fakeStream
stream := NewRetryStream(opts, fakeStream)
_, err := stream.Get(context.Background())
fmt.Println(err)
Output:

fake err
Example (ExpBackOff)
fakeErr := errors.New("fake err")
// fakeStream keeps triggering back-offs.
var fakeStream SimpleStream = func(_ context.Context) (Message, error) {
	return nil, fakeErr
}

// No more back-off when total execution + back-offs elapsed more than 2s.
backOffOpts := NewExpBackOffFactoryOpts(100*time.Millisecond, 2,
	time.Second, 2*time.Second)
backOffFactory := NewExpBackOffFactory(backOffOpts)
opts := NewRetryStreamOpts("", "test", backOffFactory)
// Retry with back-offs to access fakeStream
stream := NewRetryStream(opts, fakeStream)
_, err := stream.Get(context.Background())
fmt.Println(err)
Output:

fake err

func (*RetryStream) Get

func (r *RetryStream) Get(ctx context.Context) (Message, error)

Get retries with back-offs when upstream returns an error.

type RetryStreamOpts

type RetryStreamOpts struct {
	StreamOpts
	RetryMetric RetryMetric
	// TODO
	// remove the dependency to package clock for this exported symbol
	Clock          clock.Clock
	BackOffFactory BackOffFactory
}

RetryStreamOpts contains options that'll be used by NewRetryStream.

func NewRetryStreamOpts

func NewRetryStreamOpts(namespace, name string, backOffFactory BackOffFactory) *RetryStreamOpts

NewRetryStreamOpts returns RetryStreamOpts with default values.

type SimpleHandler

type SimpleHandler func(context.Context, Message) (Message, error)

SimpleHandler turns a function into a Handler

Example
var handler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	return msg, nil
}

msg, _ := handler.Handle(context.Background(), SimpleMessage("abc"))
fmt.Println("msg:", msg.ID())
Output:

msg: abc

func (SimpleHandler) Handle

func (r SimpleHandler) Handle(ctx context.Context, msg Message) (Message, error)

Handle handles the incoming message.

type SimpleMessage

type SimpleMessage string

SimpleMessage essentially wraps a string to be a Message.

func (SimpleMessage) ID

func (m SimpleMessage) ID() string

ID return's the identifier of the SimpleMessage.

type SimpleStream

type SimpleStream func(context.Context) (Message, error)

SimpleStream turns a function into a Stream

Example
msgID := 0
var stream SimpleStream = func(_ context.Context) (Message, error) {
	msgID++
	return SimpleMessage(strconv.Itoa(msgID)), nil
}

for i := 0; i < 5; i++ {
	msg, _ := stream.Get(context.Background())
	fmt.Println("msg:", msg.ID())
}
Output:

msg: 1
msg: 2
msg: 3
msg: 4
msg: 5

func (SimpleStream) Get

func (r SimpleStream) Get(ctx context.Context) (Message, error)

Get emits a message.

type Stream

type Stream interface {
	// Get() returns either a Message or an error exclusively.
	Get(context.Context) (Message, error)
}

Stream emits Message.

func AppendFallbacksStream

func AppendFallbacksStream(stream Stream, fallbacks ...StreamFallback) Stream

AppendFallbacksStream appends multiple StreamFallbacks to a Stream. StreamFallbacks run in sequence. The first fallback accepts Message returned by the Stream and then every subsequent fallback accepts an error returned by the previous fallback.

Example
var upstream SimpleStream = func(_ context.Context) (Message, error) {
	fmt.Println("upstream fails to return a msg")
	return nil, errors.New("stream error")
}

var fallbackA StreamFallback = func(_ context.Context, err error) (Message, error) {
	fmt.Println("fallbackA fails to correct the error:", err)
	return nil, errors.New("fallbackA error")
}

var fallbackB StreamFallback = func(_ context.Context, err error) (Message, error) {
	fmt.Println("fallbackB successfully deals with the error:", err)
	return SimpleMessage("abc"), nil
}

stream := AppendFallbacksStream(upstream, fallbackA, fallbackB)
msg, _ := stream.Get(context.Background())
fmt.Println("msg finally returned:", msg.ID())
Output:

upstream fails to return a msg
fallbackA fails to correct the error: stream error
fallbackB successfully deals with the error: fallbackA error
msg finally returned: abc

func AppendHandlersStream

func AppendHandlersStream(stream Stream, handlers ...Handler) Stream

AppendHandlersStream appends multiple Handlers to a Stream. Handlers run in sequence. The first handler accepts Message returned by the stream, and then every subsequent handler accepts a Message returned by the previous handler.

Example
var upstream SimpleStream = func(_ context.Context) (Message, error) {
	msg := SimpleMessage("abc")
	fmt.Println("stream returns msg:", msg.ID())
	return msg, nil
}

var repeatHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) {
	msgOut := SimpleMessage(msg.ID() + "+")
	fmt.Printf("handle msg: %s, returns msg: %s\n", msg.ID(), msgOut.ID())
	return msgOut, nil
}

stream := AppendHandlersStream(upstream, repeatHandler, repeatHandler)
msg, _ := stream.Get(context.Background())
fmt.Println("msg finally returned:", msg.ID())
Output:

stream returns msg: abc
handle msg: abc, returns msg: abc+
handle msg: abc+, returns msg: abc++
msg finally returned: abc++

type StreamFallback

type StreamFallback func(context.Context, error) (Message, error)

StreamFallback is defined to be a function that takes the error returned by Stream.Get() and then either returns a Message or return an error.

type StreamOpts

type StreamOpts struct {
	Namespace  string
	Name       string
	Log        Logger
	Tracer     opentracing.Tracer
	TracingRef TracingRef
}

StreamOpts is options that every XxxStreamOpts must have.

type TokenBucketRateLimit

type TokenBucketRateLimit struct {
	// contains filtered or unexported fields
}

func NewTokenBucketRateLimit

func NewTokenBucketRateLimit(requestsInterval time.Duration,
	maxRequestBurst int) *TokenBucketRateLimit

$requestsInterval The minimum interval, in milliseconds, between two consecutive requests. N is the amount of requests allowed when a burst of requests coming in after not seeing requests for N * RequestsInterval. N is capped by $maxRequestBurst. If $maxRequestBurst == 1, then no burst allowed.

func (*TokenBucketRateLimit) Wait

func (rl *TokenBucketRateLimit) Wait(count int, timeout time.Duration) bool

type TracingRef

type TracingRef int

TracingRef is opentracing tracing causal reference, currently either a ChildOf or a FollowsFrom.

const (
	// TracingDisabled indicates that tracing is not enabled regardless whether
	// a span exists in the given context.
	TracingDisabled TracingRef = iota
	// TracingChildOf represents tracing causal reference ChildOf.
	TracingChildOf
	// TracingFollowsFrom represents tracing causal reference FromFrom.
	TracingFollowsFrom
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL