batchy

package module
v0.0.0-...-67d0408 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: MIT Imports: 2 Imported by: 0

README

Batchy

A nice little package with no dependencies for fan-in batching of highly concurrent workloads

GoDoc Go Report Card Code Coverage

The throughput of APIs, web services and background workers can sometimes be improved by orders of magnitude through the introduction of artificial latency in support of concurrent batching. These efficiency improvements can result in increased service stability and total system throughput while lowering infrastructure costs.

This is a general purpose concurrent batching library. It could be used to batch SQL inserts, API calls, disk writes, queue messages, stream records, emails, etc. The batcher hides asynchronous processing behind a syncronous interface.

Examples

Here are a few examples illustrating performance improvements for different use cases

Structure

architecture diagram

package batchy

type Processor func(items []interface{}) []error

func New(itemLimit int, waitTime time.Duration, processor Processor) *batcher

type Batcher interface {
    Add(interface{}) error
    Stop()
}

Usage

// 100 max batch size
// 100 milliseconds max batch wait time
var batcher = batchy.New(100, 100*time.Millisecond, func(items []interface{}) (errs []error) {
	q := fmt.Sprintf(`INSERT INTO table1 (data) VALUES %s`,
		strings.Trim(strings.Repeat(`(?),`, len(items)), ","))
	_, err := db.Exec(q, items...)
	if err != nil {
		errs = make([]error, len(items))
		for i := range errs {
			errs[i] = err
		}
	}
	return
})
// Call to Add blocks calling go routine for up to 100ms + processing time.
// If batch is filled before wait time expires, blocking will be reduced.
// Wait time begins when the first item is added to a batch.
err := batcher.Add("data")

Design

This package makes use of Go's empty interface interface{}. For this reason, it is best not to export any Batcher directly from your package. Instead the batcher should be hidden behind an existing synchronous interface.

Suppose you have the following code that writes bytes to a file:

package repo

import (
	"io/ioutil"
)

type DataWriter interface {
	Write(data []byte) error
}

type dataWriter struct{}

func (r *dataWriter) Write(data []byte) error {
	return ioutil.WriteFile("test1", data, 0644)
}

func NewDataWriter() *dataWriter {
	return &dataWriter{}
}

You could create a batched version that satisfies the same interface:

package repo

import (
	"io/ioutil"
	"time"

	"github.com/kevburnsjr/batchy"
)

type dataWriterBatched struct {
	batcher batchy.Batcher
}

func (r *dataWriterBatched) Write(data []byte) error {
	return r.batcher.Add(data)
}

func NewDataWriterBatched(maxItems int, maxWait time.Duration) *dataWriterBatched {
	return &dataWriterBatched{batchy.New(maxItems, maxWait, func(items []interface{}) []error {
		errs := make([]error, len(items))
		var data []byte
		for _, d := range items {
			data = append(data, d.([]byte)...)
		}
		err := ioutil.WriteFile("test2", data, 0644)
		if err != nil {
			for i := range errs {
				errs[i] = err
			}
		}
		return errs
	})}
}

Now during dependency injection just replace

dw := repo.NewDataWriter()
dw.Write([]byte("data"))

with

dw := repo.NewDataWriterBatched()
dw.Write([]byte("data"))

and your code shouldn't need to know the difference because you've used interfaces to effectively hide the implementation details (in this case, the use of batching).

Why

I created this repository because:

  1. I frequently see gophers get concurrent batching wrong.
  2. I frequently see gophers avoid batching altogether because concurrency is hard.
  3. I frequently need this sort of batching and I'd rather not solve the same problem differently for every project.

Benchmarks

$ go test ./... -bench=. -benchmem

BenchmarkBatcher/itemLimit_10-12          1000000   1042 ns/op   207 B/op   3 allocs/op
BenchmarkBatcher/itemLimit_20-12          2104995    582 ns/op   110 B/op   2 allocs/op
BenchmarkBatcher/itemLimit_100-12         2558310    479 ns/op    80 B/op   1 allocs/op
BenchmarkBatcher/itemLimit_1000-12        2860184    425 ns/op    66 B/op   1 allocs/op
BenchmarkBatcher100ms/itemLimit_10-12     1002462   1188 ns/op   182 B/op   2 allocs/op
BenchmarkBatcher100ms/itemLimit_20-12     1490853    865 ns/op   110 B/op   1 allocs/op
BenchmarkBatcher100ms/itemLimit_100-12    2189893    592 ns/op    65 B/op   1 allocs/op
BenchmarkBatcher100ms/itemLimit_1000-12   2211993    499 ns/op    51 B/op   1 allocs/op
PASS
ok      github.com/kevburnsjr/batchy    21.737s

Case Study

Benefits of artificial latency
http://kevburnsjr.com/roles/crunchyroll#efficiency

Documentation

Index

Constants

View Source
const ErrBatcherStopped = err("Batcher Stopped")

ErrBatcherStopped indicates that the batcher is not accepting new items because it has been stopped

Variables

This section is empty.

Functions

func New

func New(itemLimit int, waitTime time.Duration, processor Processor) *batcher

New returns a new batcher - itemLimit indicates the maximum number of items per batch - waitTime indicates the amount of time to wait before processing a non-full batch - processor is the processing function to call for the batch

Types

type Batcher

type Batcher interface {
	// Add adds an item to the current batch
	Add(interface{}) error

	// Stop stops the batcher
	Stop()
}

Batcher can add an item, returning the corresponding error

type Processor

type Processor func(items []interface{}) []error

Processor is a function that accepts items and returns a corresponding array of errors

Directories

Path Synopsis
_examples
db

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL