singlefleet

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2023 License: MIT Imports: 5 Imported by: 0

README

Singlefleet

Batching mechanism for your simple item fetch routines.

Use cases

Where

  • network round-trip overhead,
  • number of open connections,
  • etc. are significant constraints for your application.

Usage

Example below demonstrates how to use singlefleet in an HTTP API that fetches employee info by employee ID.

package main

import (
    "database/sql"
    "encoding/json"
    "net/http"
    "time"

    "github.com/dadanhrn/singlefleet"
    "github.com/lib/pq"
)

type Employee struct {
	ID           string    `json:"id"`
	Name         string    `json:"name"`
	Salary       int       `json:"salary"`
	StartingDate time.Time `json:"starting_date"`
}

func main() {
    // Initialize PostgreSQL connection
    db, err := sql.Open("postgres", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
	if err != nil {
		fmt.Println(err)
		return
	}

    // Initialize singlefleet fetcher
    // (define fetch operation and batching params)
    sf := singlefleet.NewFetcher(func(ids []string) (map[string]interface{}, error) {
        // Fetch all IDs in one go using PostgreSQL's ANY()
        rows, err := db.Query(`SELECT id, name, salary, starting_date FROM employee WHERE id=ANY($1)`, pq.Array(ids))
        if err != nil {
            return nil, err
        }

        // Map results to their corresponding IDs
        result := make(map[string]interface{})
        for rows.Next() {
            var emp Employee
            if err := rows.Scan(&emp.ID, &emp.Name, &emp.Salary, &emp.StartingDate); err != nil {
                continue
            }
            result[emp.ID] = emp
        }

        return result, nil
    }, 200 * time.Millisecond, 10)

    // Define HTTP handler
    http.HandleFunc("/employee", func(w http.ResponseWriter, r *http.Request) {
        employeeID := r.URL.Query().Get("id")
        emp, ok, err := sf.Fetch(employeeID)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            return
        }

        if !ok {
            w.WriteHeader(http.StatusNotFound)
            return
        }

        body, _ := json.Marshal(data)
		w.WriteHeader(http.StatusOK)
		w.Write(body)
        return
    })

    http.ListenAndServe(":8080", nil)
}

How it works

Basically singlefleet is like singleflight, except that on top of the duplicate call suppression, singlefleet also waits for multiple calls and executes them in an execution batch. So, for example, if your call fetches an Employee entity, you define the call as if you are fetching multiple Employees.

Not just database calls, singlefleet can also fetch data from anything that supports multiple get (i.e. fetching "multiple rows" in one go) such as

One thing to note is that the multiple get mechanism should be natively supported by the data source. If you are, for example, simulating the multiple get by executing multiple single gets in multiple goroutines, then you are missing the entire point of this library.

singlefleet's execution batching takes two params:

  • maxWait or maximum waiting time, and
  • maxBatch or maximum batch size.

Fetcher adds all Fetch calls to its batch pool. Fetch operation will not be executed until

  • at least maxWait has passed since the first Fetch call in current batch,
  • there are at least maxBatch calls in current batch, or
  • FetchNow is called;

whichever comes first. If a given call (identified by their respective id argument) is currently in-flight (called and not yet returned), another incoming identical call will not initiate a new batch or be added to a newer pending batch; it will just wait for the result of the previous call (think singleflight).

Future plans

  • Panic handling
  • Make use of generics to enable custom types for id and return value
  • Handle runtime.Goexit

Documentation

Overview

Package singlefleet provides a batching mechanism that can be incorporated in any simple item fetch routine. Helps in scenarios where network round- trip overhead, database CPU time, etc. are significant constraints.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

A Fetcher represents a fetch operation and contains the batch pool. Must be created with NewFetcher.

func NewFetcher

func NewFetcher(job Job, maxWait time.Duration, maxBatch int) *Fetcher

NewFetcher creates a new Fetcher. It holds the execution of jobs in the batch pool until at least maxWait has passed, there are at least maxBatch requested items in the batch pool, or FetchNow is called; whichever comes first.

To ignore the maxWait rule simply set a sufficiently long duration. Likewise, to ignore the maxBatch rule simply set a sufficiently large integer value.

func (*Fetcher) Fetch

func (fc *Fetcher) Fetch(id string) (val interface{}, ok bool, err error)

Fetch places a fetch job in the batch pool and returns the result of the operation.

func (*Fetcher) FetchNow

func (fc *Fetcher) FetchNow() bool

FetchNow forces the current pending job batch to be executed, disregarding the maxWait and maxBatch rules.

type Job

type Job func(ids []string) (vals map[string]interface{}, err error)

A Job defines a batched fetch operation. ids contains the IDs of items to be fetched in a given batch (guaranteed to be unique). Resulting values from the fetch operation, if found/exists, must be mapped to their corresponding ID. Error returned by this operation will be passed to the calling Fetch(es).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL