speculatively

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: MIT Imports: 2 Imported by: 0

README

speculatively

Documentation Build status Code coverage Go report card

Package speculatively provides a simple mechanism to speculatively execute a task in parallel only after some initial timeout has elapsed:

// An example task that will wait for a random amount of time before returning
task := func(ctx context.Context) (string, error) {
    delay := time.Duration(float64(250*time.Millisecond) * rand.Float64())
    select {
    case <-time.After(delay):
        return "success", nil
    case <-ctx.Done():
        return "timeout", ctx.Err()
    }
}

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

// If task doesn't return within 20ms, it will be executed again in parallel
result, err := speculatively.Do(ctx, 20*time.Millisecond, task)

This was inspired by the "Defeat your 99th percentile with speculative task" blog post, which describes it nicely:

The inspiration came from BigData world. In Spark when task execution runs suspiciously long the application master starts the same task speculatively on a different executor but it lets the long running tasks to continue. The solution looked elegant:

  • Service response time limit is 50ms.

  • If the first attempt doesn’t finish within 25ms start a new one, but keep the first thread running.

  • Wait for either thread to finish and take result from the first one ready.

The speculative tasks implemented here are similar to "hedged requests" as described in The Tail at Scale and implemented in the Query example function in Go Concurrency Patterns: Timing out, moving on, but they a) have no knowledge of different replicas and b) wait for a caller-controlled timeout before launching additional tasks.

Documentation

Overview

Package speculatively provides a simple mechanism to speculatively execute a task in parallel only after some initial timeout has elapsed.

Example

Example demonstrates use of speculatively.Do to execute an expensive task one or more times in parallel.

Every time the task is executed, it will wait half as long before returning.

Given an operation timeout of 100ms and an initial task latency of 256ms, the first execution of our expensive task will always time out. Every 20ms, the task will be executed again, until it returns successfully or the timeout expires.

The overall timings of each execution look like this:

call 0:    0s wait + 256ms latency = 256ms overall
call 1:  20ms wait + 128ms latency = 148ms overall
call 2:  40ms wait +  64ms latency = 104ms overall
call 3:  60ms wait +  32ms latency =  92ms overall <-- winner
call 4:  80ms wait +  16ms latency =  96ms overall
package main

import (
	"context"
	"fmt"
	"math"
	"sync"
	"time"

	"github.com/mccutchen/speculatively"
)

// ExpensiveTask is an example of a task that takes some time to execute
type ExpensiveTask struct {
	initialLatency time.Duration
	patience       time.Duration
	callCount      int
	mu             sync.Mutex
}

// Execute will wait for some time before returning its own call count.  Every
// call will decrease the amount of time it waits by half.
func (t *ExpensiveTask) Execute(ctx context.Context) (int, error) {
	t.mu.Lock()
	initialLatency := t.initialLatency
	patience := t.patience
	call := t.callCount
	t.callCount++
	t.mu.Unlock()

	latency := initialLatency / time.Duration(int64(math.Exp2(float64(call))))
	wait := patience * time.Duration(call)

	fmt.Printf("call %d: %5s wait + %5s latency = %5s overall\n", call, wait, latency, wait+latency)

	select {
	case <-time.After(latency):
		return call, nil
	case <-ctx.Done():
		return 0, ctx.Err()
	}
}

// Example demonstrates use of speculatively.Do to execute an expensive task
// one or more times in parallel.
//
// Every time the task is executed, it will wait half as long before returning.
//
// Given an operation timeout of 100ms and an initial task latency of 256ms,
// the first execution of our expensive task will always time out.  Every 20ms,
// the task will be executed again, until it returns successfully or the
// timeout expires.
//
// The overall timings of each execution look like this:
//
//	call 0:    0s wait + 256ms latency = 256ms overall
//	call 1:  20ms wait + 128ms latency = 148ms overall
//	call 2:  40ms wait +  64ms latency = 104ms overall
//	call 3:  60ms wait +  32ms latency =  92ms overall <-- winner
//	call 4:  80ms wait +  16ms latency =  96ms overall
func main() {
	var (
		timeout        = 100 * time.Millisecond
		initialLatency = 256 * time.Millisecond
		patience       = 20 * time.Millisecond
	)

	expensiveTask := &ExpensiveTask{
		initialLatency: initialLatency,
		patience:       patience,
	}

	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	successfulCall, err := speculatively.Do(ctx, patience, func(ctx context.Context) (interface{}, error) {
		return expensiveTask.Execute(ctx)
	})
	if err != nil {
		fmt.Printf("unexpected error: %s\n", err)
		return
	}
	fmt.Printf("succeeded on call number %d\n", successfulCall)

}
Output:

call 0:    0s wait + 256ms latency = 256ms overall
call 1:  20ms wait + 128ms latency = 148ms overall
call 2:  40ms wait +  64ms latency = 104ms overall
call 3:  60ms wait +  32ms latency =  92ms overall
call 4:  80ms wait +  16ms latency =  96ms overall
succeeded on call number 3

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Do

func Do[T any](ctx context.Context, patience time.Duration, thunk Thunk[T]) (T, error)

Do speculatively executes a Thunk one or more times in parallel, waiting for the given patience duration between subsequent executions.

Note that for Do to respect context cancelations, the given Thunk must respect them.

Types

type Thunk

type Thunk[T any] func(context.Context) (T, error)

Thunk is a computation to be speculatively executed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL