dbtools

package module
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

README

dbtools

PkgGoDev GitHub go.mod Go version Build Status Coverage Status License Go Report Card

This library contains concurrent safe helpers for retrying transactions until they succeed and handles errors in a developer friendly way. There are helpers for using with go-sqlmock in tests. There is also a Mocha inspired reporter for spec BDD library.

This library supports Go >= 1.18. To use this library use this import path:

github.com/arsham/dbtools/v2
  1. PGX Transaction
  2. SQLMock Helpers
  3. Spec Reports
  4. Development
  5. License

PGX Transaction

The PGX struct helps reducing the amount of code you put in the logic by taking care of errors. For example instead of writing:

tx, err := db.Begin(ctx)
if err != nil {
    return errors.Wrap(err, "starting transaction")
}
err := firstQueryCall(tx)
if err != nil {
    e := errors.Wrap(tx.Rollback(ctx), "rolling back transaction")
    return multierror.Append(err, e).ErrorOrNil()
}
err := secondQueryCall(tx)
if err != nil {
    e := errors.Wrap(tx.Rollback(ctx), "rolling back transaction")
    return multierror.Append(err, e).ErrorOrNil()
}
err := thirdQueryCall(tx)
if err != nil {
    e := errors.Wrap(tx.Rollback(ctx), "rolling back transaction")
    return multierror.Append(err, e).ErrorOrNil()
}

return errors.Wrap(tx.Commit(ctx), "committing transaction")

You will write:

// for using with pgx connections:
p, err := dbtools.NewPGX(conn)
// handle the error!
return p.Transaction(ctx, firstQueryCall, secondQueryCall, thirdQueryCall)

At any point any of the callback functions return an error, the transaction is rolled-back, after the given delay the operation is retried in a new transaction.

You may set the retry count, delays, and the delay method by passing dbtools.ConfigFunc helpers to the constructor. If you don't pass any config, the Transaction method will run only once.

You can prematurely stop retrying by returning a *retry.StopError error:

err = p.Transaction(ctx, func(tx pgx.Tx) error {
    _, err := tx.Exec(ctx, query)
    return &retry.StopError{Err: err}
})

See retry library for more information.

The callback functions should be of func(pgx.Tx) error type. To try up to 20 time until your queries succeed:

// conn is a *pgxpool.Pool instance
p, err := dbtools.NewPGX(conn, dbtools.Retry(20))
// handle the error
err = p.Transaction(ctx, func(tx pgx.Tx) error {
    // use tx to run your queries
    return someErr
  }, func(tx pgx.Tx) error {
    return someErr
  }, func(tx pgx.Tx) error {
    return someErr
  // add more callbacks if required.
})
// handle the error!
Common Patterns

Stop retrying when the row is not found:

err := retrier.Do(func() error {
    const query = `SELECT foo FROM bar WHERE id = $1::int`
    err := conn.QueryRow(ctx, query, msgID).Scan(&foo)
    if errors.Is(err, pgx.ErrNoRows) {
      return &retry.StopError{Err: ErrFooNotFound}
    }
    return errors.Wrap(err, "quering database")
})

Stop retrying when there are integrity errors:

// integrityCheckErr returns a *retry.StopError wrapping the err with the msg
// if the query causes integrity constraint violation error. You should use
// this check to stop the retry mechanism, otherwise the transaction repeats.
func integrityCheckErr(err error, msg string) error {
    var v *pgconn.PgError
    if errors.As(err, &v) && isIntegrityConstraintViolation(v.Code) {
        return &retry.StopError{Err: errors.Wrap(err, msg)}
    }
    return errors.Wrap(err, msg)
}

func isIntegrityConstraintViolation(code string) bool {
    switch code {
    case pgerrcode.IntegrityConstraintViolation,
        pgerrcode.RestrictViolation,
        pgerrcode.NotNullViolation,
        pgerrcode.ForeignKeyViolation,
        pgerrcode.CheckViolation,
        pgerrcode.ExclusionViolation:
        return true
    }
    return false
}

err := p.Transaction(ctx, func(tx pgx.Tx) error {
    const query = `INSERT INTO foo (bar) VALUES ($1::text)`
    err := tx.Exec(ctx, query, name)
    return integrityCheckErr(err, "creating new record")
}, func(tx pgx.Tx) error {
    const query = `UPDATE baz SET updated_at=NOW()::timestamptz WHERE id = $1::int`
    _, err := tx.Exec(ctx, query, msgID)
    return err
})

This is not a part of the dbtools library, but it deserves a mention. Here is a common pattern for querying for multiple rows:

result := make([]Result, 0, expectedTotal)
err := retrier.Do(func() error {
    rows, err := r.pool.Query(ctx, query, args...)
    if err != nil {
        return errors.Wrap(err, "making query")
    }
    defer rows.Close()

    // make sure you reset the slice, otherwise in the next retry it adds the
    // same data to the slice again.
    result = result[:0]
    for rows.Next() {
        var doc Result
        err := rows.Scan(&doc.A, &doc.B)
        if err != nil {
            return errors.Wrap(err, "scanning rows")
        }
        result = append(result, doc)
    }

    return errors.Wrap(rows.Err(), "row error")
})
// handle the error!

SQLMock Helpers

There a couple of helpers for using with go-sqlmock test cases for cases that values are random but it is important to check the values passed in queries.

ValueRecorder

If you have an value and use it in multiple queries, and you want to make sure the queries are passed with correct values, you can use the ValueRecorder. For example UUIDs, time and random values.

For instance if the first query generates a random number but it is essential to use the same value on next queries:

import "database/sql"

func TestFoo(t *testing.T) {
    // ...
    // assume num has been generated randomly
    num := 666
    _, err := tx.ExecContext(ctx, "INSERT INTO life (value) VALUE ($1)", num)
    // error check
    _, err = tx.ExecContext(ctx, "INSERT INTO reality (value) VALUE ($1)", num)
    // error check
    _, err = tx.ExecContext(ctx, "INSERT INTO everywhere (value) VALUE ($1)", num)
    // error check
}

Your tests can be checked easily like this:

import (
    "github.com/arsham/dbtools/v2/dbtesting"
    "github.com/DATA-DOG/go-sqlmock"
)

func TestFoo(t *testing.T) {
    // ...
    rec := dbtesting.NewValueRecorder()
    mock.ExpectExec("INSERT INTO life .+").
        WithArgs(rec.Record("truth")).
        WillReturnResult(sqlmock.NewResult(1, 1))
    mock.ExpectExec("INSERT INTO reality .+").
        WithArgs(rec.For("truth")).
        WillReturnResult(sqlmock.NewResult(1, 1))
    mock.ExpectExec("INSERT INTO everywhere .+").
        WithArgs(rec.For("truth")).
        WillReturnResult(sqlmock.NewResult(1, 1))
}

Recorded values can be retrieved by casting to their types:

rec.Value("true").(string)

There are two rules for using the ValueRecorder:

  1. You can only record for a value once.
  2. You should record a value before you call For or Value.

It will panic if these requirements are not met.

OkValue

If you are only interested in checking some arguments passed to the Exec/Query functions and you don't want to check everything (maybe because thy are not relevant to the current test), you can use OkValue.

import (
    "github.com/arsham/dbtools/v2/dbtesting"
    "github.com/DATA-DOG/go-sqlmock"
)

ok := dbtesting.OkValue
mock.ExpectExec("INSERT INTO life .+").
    WithArgs(
        ok,
        ok,
        ok,
        "important value"
        ok,
        ok,
        ok,
    )

Spec Reports

Mocha is a reporter for printing Mocha inspired reports when using spec BDD library.

Usage
import "github.com/arsham/dbtools/v2/dbtesting"

func TestFoo(t *testing.T) {
    spec.Run(t, "Foo", func(t *testing.T, when spec.G, it spec.S) {
        // ...
    }, spec.Report(&dbtesting.Mocha{}))
}

You can set an io.Writer to Mocha.Out to redirect the output, otherwise it prints to the os.Stdout.

Development

Run the tests target for watching file changes and running tests:

make tests

You can pass flags as such:

make tests flags="-race -v -count=5"

You need to run the dependencies target for installing reflex task runner:

make dependencies

License

Use of this source code is governed by the Apache 2.0 license. License can be found in the LICENSE file.

Documentation

Overview

Package dbtools contains logic for database transaction, using the retry library.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyDatabase is returned when no database connection is set.
	ErrEmptyDatabase = errors.New("no database connection is set")
)

Functions

This section is empty.

Types

type ConfigFunc

type ConfigFunc func(*PGX)

A ConfigFunc function sets up a Transaction.

func DelayMethod

func DelayMethod(m retry.DelayMethod) ConfigFunc

DelayMethod decides how to delay between each tries. Default is retry.StandardDelay.

func Retry

func Retry(r retry.Retry) ConfigFunc

Retry sets the retrier.

func RetryCount

func RetryCount(n int) ConfigFunc

RetryCount defines a transaction should be tried n times. If n is 0, it will be set as 1.

func RetryDelay

func RetryDelay(d time.Duration) ConfigFunc

RetryDelay is the amount of delay between each unsuccessful tries. Set DelayMethod for the method of delay duration.

type PGX

type PGX struct {
	// contains filtered or unexported fields
}

PGX is a concurrent-safe object that can retry a transaction on a pgxpool.Pool connection until it succeeds.

Transaction method will try the provided functions one-by-one until all of them return nil, then commits the transaction. If any of the functions return any error other than a *retry.StopError, it will retry the transaction until the retry count is exhausted. If a running function returns a *retry.StopError, the transaction will be rolled-back and stops retrying. Tryouts will be stopped when the passed contexts are cancelled.

If all attempts return errors, the last error is returned. If a *retry.StopError is returned, transaction is rolled back and the Err inside the *retry.StopError is returned. There will be delays between tries defined by the retry.DelayMethod and Delay duration.

Any panic in functions will be wrapped in an error and will be counted as an error.

func NewPGX

func NewPGX(conn Pool, conf ...ConfigFunc) (*PGX, error)

NewPGX returns an error if conn is nil. It sets the retry attempts to 1 if the value is less than 1. The retry strategy can be set either by providing a retry.Retry method or the individual components. See the ConfigFunc helpers.

Example
// This setup tries the transaction only once.
dbtools.NewPGX(&exampleConn{})

// This setup tries 100 times until succeeds. The delay is set to 10ms and
// it uses the retry.IncrementalDelay method, which means every time it
// increments the delay between retries with a jitter to avoid thunder herd
// problem.
dbtools.NewPGX(&exampleConn{},
	dbtools.RetryCount(100),
	dbtools.RetryDelay(10*time.Millisecond),
	dbtools.DelayMethod(retry.IncrementalDelay),
)
Output:

func (*PGX) Transaction

func (p *PGX) Transaction(ctx context.Context, fns ...func(pgx.Tx) error) error

Transaction returns an error if the connection is not set, or can't begin the transaction, or the after all retries, at least one of the fns returns an error, or the context is deadlined.

It will wrap the commit/rollback methods if there are any. If in the last try any of the fns panics, it puts the stack trace of the panic in the error and returns.

It stops retrying if any of the errors are wrapped in a *retry.StopError.

Example
tr, err := dbtools.NewPGX(&exampleConn{})
if err != nil {
	panic(err)
}
err = tr.Transaction(context.Background(), func(pgx.Tx) error {
	fmt.Println("Running first query.")
	return nil
}, func(pgx.Tx) error {
	fmt.Println("Running second query.")
	return nil
})
fmt.Printf("Transaction's error: %v", err)
Output:

Running first query.
Running second query.
Transaction's error: <nil>
Example (Panics)
tr, err := dbtools.NewPGX(&exampleConn{}, dbtools.RetryCount(10))
if err != nil {
	panic(err)
}
calls := 0
err = tr.Transaction(context.Background(), func(pgx.Tx) error {
	calls++
	fmt.Printf("Call #%d.\n", calls)
	if calls < 5 {
		panic("We have a panic!")
	}
	fmt.Println("All done.")
	return nil
})
fmt.Printf("Transaction's error: %v\n", err)
fmt.Printf("Called %d times.\n", calls)
Output:

Call #1.
Call #2.
Call #3.
Call #4.
Call #5.
All done.
Transaction's error: <nil>
Called 5 times.
Example (Retries)
tr, err := dbtools.NewPGX(&exampleConn{}, dbtools.RetryCount(10))
if err != nil {
	panic(err)
}
called := false
err = tr.Transaction(context.Background(), func(pgx.Tx) error {
	fmt.Println("Running first query.")
	return nil
}, func(pgx.Tx) error {
	if !called {
		called = true
		fmt.Println("Second query error.")
		return assert.AnError
	}
	fmt.Println("Running second query.")
	return nil
})
fmt.Printf("Transaction's error: %v", err)
Output:

Running first query.
Second query error.
Running first query.
Running second query.
Transaction's error: <nil>
Example (StopTrying)
// This example shows how to stop trying when we know an error is not
// recoverable.
tr, err := dbtools.NewPGX(&exampleConn{},
	dbtools.RetryCount(100),
	dbtools.RetryDelay(time.Second),
)
if err != nil {
	panic(err)
}
err = tr.Transaction(context.Background(), func(pgx.Tx) error {
	fmt.Println("Running first query.")
	return nil
}, func(pgx.Tx) error {
	fmt.Println("Running second query.")
	return &retry.StopError{Err: assert.AnError}
})
fmt.Printf("Transaction returns my error: %t", strings.Contains(err.Error(), assert.AnError.Error()))
Output:

Running first query.
Running second query.
Transaction returns my error: true

type Pool

type Pool interface {
	Begin(ctx context.Context) (pgx.Tx, error)
}

Pool is the contract for beginning a transaction with a pgxpool db connection.

type Tx

type Tx interface {
	Commit() error
	Exec(query string, args ...any) (sql.Result, error)
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	Prepare(query string) (*sql.Stmt, error)
	PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
	Query(query string, args ...any) (*sql.Rows, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRow(query string, args ...any) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
	Rollback() error
	Stmt(stmt *sql.Stmt) *sql.Stmt
	StmtContext(ctx context.Context, stmt *sql.Stmt) *sql.Stmt
}

Tx is a transaction began with sql.DB.

Directories

Path Synopsis
Package dbtesting provides handy tools for using with databases.
Package dbtesting provides handy tools for using with databases.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL