microbatch

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: Apache-2.0 Imports: 7 Imported by: 1

README

Fillmore Labs Micro Batcher

Go Reference Build Status GitHub Workflow Test Coverage Maintainability Go Report Card License FOSSA Status

Micro-batching is a technique often used in stream processing to achieve near real-time computation while reducing the overhead compared to single record processing. It balances latency versus throughput and enables simplified parallelization while optimizing resource utilization.

See also the definition in the Hazelcast Glossary and explanation by Jakob Jenkov. Popular examples are Spark Structured Streaming and Apache Kafka. It is also used in other contexts, like the Facebook DataLoader.

Usage

Try the example at the Go Playground.

Implement Job and JobResult
type (
	Job struct {
		ID      string `json:"id"`
		Request string `json:"body"`
	}

	JobResult struct {
		ID       string `json:"id"`
		Response string `json:"body"`
		Error    string `json:"error"`
	}

	Jobs       []*Job
	JobResults []*JobResult

	RemoteError struct{ msg string }
)

func (q *Job) JobID() string       { return q.ID }
func (r *JobResult) JobID() string { return r.ID }

func (e RemoteError) Error() string { return e.msg }

// unwrap unwraps a JobResult to payload and error.
func unwrap(r *JobResult, err error) (string, error) {
	if err != nil {
		return "", err
	}

	if r.Error != "" {
		return "", RemoteError{r.Error}
	}

	return r.Response, nil
}
Implement the Batch Processor
func processJobs(jobs Jobs) (JobResults, error) {
	results := make(JobResults, 0, len(jobs))
	for _, job := range jobs {
		result := &JobResult{
			ID:       job.ID,
			Response: "Processed job " + job.ID,
		}
		results = append(results, result)
	}

	return results, nil
}
Use the Batcher
	const (
		batchSize        = 3
		maxBatchDuration = 10 * time.Millisecond
		iterations       = 5
	)

	// Initialize
	batcher := microbatch.NewBatcher(
		processJobs,
		(*Job).JobID,
		(*JobResult).JobID,
		microbatch.WithSize(batchSize),
		microbatch.WithTimeout(maxBatchDuration),
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var wg sync.WaitGroup
	for i := 1; i <= iterations; i++ {
		future := batcher.Submit(&Job{ID: strconv.Itoa(i)})

		wg.Add(1)
		go func(i int) {
			defer wg.Done()

			result, err := unwrap(future.Await(ctx))
			if err == nil {
				fmt.Println(result)
			} else {
				fmt.Printf("Error executing job %d: %v\n", i, err)
			}
		}(i)
	}

	// Shut down
	batcher.Send()
	wg.Wait()

Design

The package is designed to handle request batching efficiently, with a focus on code testability and modular architecture. The codebase is organized into two packages: the public microbatch.Batcher structure and an internal helper, processor.Processor.

Motivation

The primary design goal is to enhance code testability, enabling unit testing of individual components in isolation, with less emphasis on immediate performance gains.

While an alternative approach might involve constructing the correlation map during batch collection for performance reasons, the current design prioritizes testability and separation of concerns. In this context, the batcher remains independent of correlation IDs, focusing solely on batch size and timing decisions. The responsibility of correlating requests and responses is encapsulated within the processor, contributing to a cleaner and more modular architecture.

Component Description

By maintaining a modular structure and addressing concurrency issues, the codebase is designed to achieve good testability while still maintaining high performance and offering flexibility for future optimizations. The deliberate use of channels and immutability contributes to a more straightforward and reliable execution.

Public Interface (microbatch.Batcher)

The public interface is the entry point for users interacting with the batching functionality. It is designed to be thread-safe, allowing safe invocation from any goroutine and simplifying usage. The batcher is responsible for managing queued requests and initiating batch processing. The batcher maintains an array of queued requests and, when a complete batch is formed or a maximum collection time is reached, spawns a processor. The processor takes ownership of the queued requests, correlating individual requests and responses.

Processor (processor.Processor)

The processor wraps the user-supplied processor, handling the correlation of requests and responses. Once constructed, the fields are accessed read-only, ensuring immutability. This enables multiple processors to operate in parallel without conflicts. By encapsulating the responsibility of correlation, the processor contributes to a modular and clean architecture, promoting separation of concerns.

Documentation

Overview

Package microbatch simplifies asynchronous microbatching.

Example (Asynchronous)

Example (Asynchronous) demonstrates how to use [Batcher.SubmitJob] with a timeout. Note that you can shut down the batcher without waiting for the jobs to finish.

package main

import (
	"context"
	"fmt"
	"sync"

	"fillmore-labs.com/microbatch"
)

type (
	JobID int

	Job struct {
		ID JobID
	}

	JobResult struct {
		ID   JobID
		Body string
	}

	Jobs       []*Job
	JobResults []*JobResult
)

func (j *Job) JobID() JobID       { return j.ID }
func (j *JobResult) JobID() JobID { return j.ID }

// unwrap unwraps a JobResult to payload and error.
func unwrap(r *JobResult, err error) (string, error) {
	if err != nil {
		return "", err
	}

	return r.Body, nil
}

type RemoteProcessor struct{}

func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) {
	results := make(JobResults, 0, len(jobs))
	for _, job := range jobs {
		result := &JobResult{
			ID:   job.ID,
			Body: fmt.Sprintf("Processed job %d", job.ID),
		}
		results = append(results, result)
	}

	return results, nil
}

func main() {
	// Initialize
	processor := &RemoteProcessor{}
	batcher := microbatch.NewBatcher(
		processor.ProcessJobs,
		(*Job).JobID,
		(*JobResult).JobID,
		microbatch.WithSize(3),
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	const iterations = 5

	var wg sync.WaitGroup
	for i := 1; i <= iterations; i++ {
		future := batcher.Submit(&Job{ID: JobID(i)})

		wg.Add(1)
		go func(i int) {
			defer wg.Done()

			result, err := unwrap(future.Await(ctx))
			if err == nil {
				fmt.Println(result)
			} else {
				fmt.Printf("Error executing job %d: %v\n", i, err)
			}
		}(i)
	}

	// Shut down
	batcher.Send()
	wg.Wait()
}
Output:

Processed job 1
Processed job 2
Processed job 3
Processed job 4
Processed job 5
Example (Blocking)

Example (Blocking) demonstrates how to use [Batcher.SubmitJob] in a single line.

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"fillmore-labs.com/microbatch"
)

type (
	JobID int

	Job struct {
		ID JobID
	}

	JobResult struct {
		ID   JobID
		Body string
	}

	Jobs       []*Job
	JobResults []*JobResult
)

func (j *Job) JobID() JobID       { return j.ID }
func (j *JobResult) JobID() JobID { return j.ID }

// unwrap unwraps a JobResult to payload and error.
func unwrap(r *JobResult, err error) (string, error) {
	if err != nil {
		return "", err
	}

	return r.Body, nil
}

type RemoteProcessor struct{}

func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) {
	results := make(JobResults, 0, len(jobs))
	for _, job := range jobs {
		result := &JobResult{
			ID:   job.ID,
			Body: fmt.Sprintf("Processed job %d", job.ID),
		}
		results = append(results, result)
	}

	return results, nil
}

func main() {
	// Initialize
	processor := &RemoteProcessor{}
	batcher := microbatch.NewBatcher(
		processor.ProcessJobs,
		(*Job).JobID,
		(*JobResult).JobID,
		microbatch.WithSize(3),
		microbatch.WithTimeout(10*time.Millisecond),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer cancel()

	const iterations = 5
	var wg sync.WaitGroup

	// Submit jobs
	for i := 1; i <= iterations; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			if result, err := unwrap(batcher.Execute(ctx, &Job{ID: JobID(i)})); err == nil {
				fmt.Println(result)
			}
		}(i) // https://go.dev/doc/faq#closures_and_goroutines
	}

	// Shut down
	wg.Wait()
	batcher.Send()
}
Output:

Processed job 1
Processed job 2
Processed job 3
Processed job 4
Processed job 5

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoResult is returned when the response from processJobs is missing a
	// matching correlation ID.
	ErrNoResult = errors.New("no result")
	// ErrDuplicateID is returned when a job has an already existing correlation ID.
	ErrDuplicateID = errors.New("duplicate correlation ID")
)

Functions

This section is empty.

Types

type Batcher

type Batcher[Q, R any] struct {
	// contains filtered or unexported fields
}

Batcher handles submitting requests in batches and returning results through channels.

func NewBatcher

func NewBatcher[Q, R any, C comparable, QQ ~[]Q, RR ~[]R](
	processJobs func(jobs QQ) (RR, error),
	correlateRequest func(request Q) C,
	correlateResult func(result R) C,
	opts ...Option,
) *Batcher[Q, R]

NewBatcher creates a new Batcher.

  • batchProcessor is used to process batches of jobs.
  • correlateRequest and correlateResult functions are used to get a common key from a job and result for correlating results back to jobs.
  • opts are used to configure the batch size and timeout.

The batch collector is run in a goroutine which must be terminated with [Batcher.Shutdown].

func (*Batcher[Q, R]) Execute added in v0.4.0

func (b *Batcher[Q, R]) Execute(ctx context.Context, request Q) (R, error)

Execute submits a job and waits for the result.

func (*Batcher[_, _]) Send added in v0.4.0

func (b *Batcher[_, _]) Send()

Send sends a batch early.

func (*Batcher[Q, R]) Submit added in v0.4.0

func (b *Batcher[Q, R]) Submit(request Q) promise.Future[R]

Submit submits a job without waiting for the result.

type Option added in v0.1.0

type Option interface {
	// contains filtered or unexported methods
}

Option defines configurations for NewBatcher.

func WithSize added in v0.1.0

func WithSize(size int) Option

WithSize is an option to configure the batch size.

func WithTimeout added in v0.1.0

func WithTimeout(timeout time.Duration) Option

WithTimeout is an option to configure the batch timeout.

Directories

Path Synopsis
internal
mocks
Package mocks contains generated test code.
Package mocks contains generated test code.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL