multidb

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2020 License: BSD-3-Clause Imports: 10 Imported by: 6

README

Build Status codecov PkgGoDev Go Report Card

Multidb

Package multidb provides a sql.DB multiplexer for parallel queries using Go routines.

Multidb automatically polls which of the connected Nodes is a master. If the master fails, multidb will try to find a new master, which might be found after promotion took place on a slave or the old master gets reconnected. Actual management of master and slaves (such as promotion) is considered outside the scope of this package.

The Node and MultiNode types aim to be interface compatible with sql.DB and sql.Tx. More specifically, multidb fully implements SQLBoiler's boil.Executor and boil.ContextExecutor interface types. This makes it an excellent fit for SQLBoiler's auto-generated models. (And perhaps other ORMs?)

Maturity

Beta stage:

  • The intended design is fully implemented and unit tested with the race detector.
  • We are using this library in other projects and it is being tested for production

Dependencies

This package has been developed against Go 1.15, with module support and will not compile against older Go versions. The core package is slim and only depends on the standard Go libraries. Packages in drivers/ usually depend on their SQL driver counterpart. Unit tests pull in some additional packages like go-sqlmock and sqlboiler.

Installation

go get -u github.com/moapis/multidb

Full test suite can be run with:

go test ./...

Documentation and examples

pkg.go.dev

Copyright (c) 2019, Mohlmann Solutions SRL. All rights reserved. Use of this source code is governed by a License that can be found in the LICENSE file.

Documentation

Overview

Package multidb provides a sql.DB multiplexer for parallel queries using Go routines.

Multidb automatically polls which of the connected Nodes is a master. Actual management of master and slaves (such as promotion) is considered outside the scope of this package.

The Node and MultiNode types aim to be interface compatible with sql.DB and sql.Tx. More specifically, multidb fully implements SQLBoiler's boil.Executor and boil.ContextExecutor interface types. This makes it an excellent fit for SQLBoiler. (And perhaps other ORMs?)

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultWhiteListErrs = []error{
	sql.ErrNoRows,
	sql.ErrTxDone,
	context.Canceled,
}

DefaultWhiteListErrs are errors which do not signal a DB or connection error.

View Source
var (
	// ErrNoNodes is returned when there are no connected nodes available for the requested operation
	ErrNoNodes = errors.New("No available nodes")
)

Functions

This section is empty.

Types

type AutoMasterSelector added in v0.2.0

type AutoMasterSelector struct {
	// contains filtered or unexported fields
}

AutoMasterSelector provides means of automatically running SelectMaster in case of errors on the Master node.

func (*AutoMasterSelector) CheckErr added in v0.2.0

func (ms *AutoMasterSelector) CheckErr(err error) error

CheckErr checks if the passed error is in WhiteListErrs or nil. If it is neither, mdb.SelectMaster is executed. The returned error is always the original error, which may be nil.

func (*AutoMasterSelector) Close added in v0.2.0

func (ms *AutoMasterSelector) Close()

Close cleans up any running Go routines. Calling MasterErr after Close becomes a no-op.

type ErrCallbackFunc added in v0.2.0

type ErrCallbackFunc func(error)

ErrCallbackFunc is called by the individual routines executing queries on multiple nodes. The function will be called concurently.

Example
var (
	// ErrCallBackFunc is called concurrently.
	// A Mutex is required when accessing a shared object
	mu       sync.Mutex
	counters map[string]int
)

ecb := func(err error) {

	// Ignore unimportant errors
	if errors.Is(err, context.Canceled) ||
		errors.Is(err, sql.ErrNoRows) ||
		errors.Is(err, sql.ErrTxDone) {
		return
	}

	nodeErr := new(NodeError)

	if !errors.As(err, &nodeErr) {
		log.Printf("Unknown error: %v", err)
	}

	mu.Lock()

	counters[nodeErr.name]++
	log.Printf("%v; fail count: %d", nodeErr, counters[nodeErr.name])

	mu.Unlock()
}

mdb := new(MultiDB)
_, _ = mdb.MultiNode(3, ecb)
// Continue running queries
Output:

type MasterFunc added in v0.2.0

type MasterFunc func(context.Context, *sql.DB) (bool, error)

MasterFunc should return true if the passed DB is the Master, typically by executing a driver specific query. False should be returned in all other cases. In case the query fails, an error should be returned.

func IsMaster added in v0.2.0

func IsMaster(query string) MasterFunc

IsMaster returns a MasterFunc that executes passed query against a sql.DB. The query should return one row, with a single boolean field, indicating wether the DB is a master of not. (true when master)

type MultiDB

type MultiDB struct {
	MasterFunc MasterFunc
	// contains filtered or unexported fields
}

MultiDB holds the multiple DB objects, capable of Writing and Reading.

Example
const connStr = "user=pqgotest dbname=pqgotest host=%s.moapis.org"

hosts := []string{
	"db1",
	"db2",
	"db3",
}

mdb := &MultiDB{
	MasterFunc: IsMaster(postgresql.MasterQuery),
}
defer mdb.Close()

for _, host := range hosts {
	db, err := sql.Open("postgres", fmt.Sprintf(connStr, host))
	if err != nil {
		panic(err)
	}

	mdb.Add(host, db)
}
Output:

func (*MultiDB) Add added in v0.2.0

func (mdb *MultiDB) Add(name string, db *sql.DB) (old *sql.DB, ok bool)

Add DB Node to MultiDB.

If a DB with the same name already exists, it is replaced. In that case the "old" DB is returned and "ok" is set to true. It is the responisbilty of the caller to close any obsolete DB.

Example
const connStr = "user=pqgotest dbname=pqgotest host=%s.moapis.org"

hosts := []string{
	"db1",
	"db2",
	"db3",
}

for _, host := range hosts {
	db, err := sql.Open("postgres", fmt.Sprintf(connStr, host))
	if err != nil {
		panic(err)
	}

	if old, ok := mdb.Add(host, db); ok {
		// Close error ignored, we don't want to interupt
		// adding of new, healthy nodes.
		old.Close()
	}
}
Output:

func (*MultiDB) All

func (mdb *MultiDB) All() map[string]*sql.DB

All returns all Nodes.

func (*MultiDB) AutoMasterSelector added in v0.2.0

func (mdb *MultiDB) AutoMasterSelector(onceIn time.Duration, whiteList ...error) *AutoMasterSelector

AutoMasterSelector returned will run mdb.SelectMaster up to once in an interval, after CheckMaster is called with an error not in whiteList.

If whitList errors are ommited, DefaultWhiteListErrs will be used.

Example
ms := mdb.AutoMasterSelector(10 * time.Second)

tx, err := mdb.MasterTx(context.TODO(), nil)
if err != nil {
	log.Fatal(err)
}
defer ms.CheckErr(tx.Rollback())

_, err = tx.Exec("insert something")
if ms.CheckErr(err) != nil {
	// If error is not nil and not in whitelist,
	// mdb.SelectMaster will be run in a Go routine

	log.Println(err)
	return
}

err = ms.CheckErr(tx.Commit())
if err != nil {
	log.Println(err)
	return
}
Output:

func (*MultiDB) Close

func (mdb *MultiDB) Close() error

Close the DB connectors on all nodes.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

func (*MultiDB) Delete added in v0.2.0

func (mdb *MultiDB) Delete(names ...string) (deleted map[string]*sql.DB)

Delete DB Nodes from MultiDB, identified by names.

Exisiting DBs are returned in a map, indexed by name. It is the responisbilty of the caller to close any obsolete DB.

Example
deleted := mdb.Delete("db1", "db2", "db3")

for name, db := range deleted {
	if err := db.Close(); err != nil {
		panic(
			fmt.Errorf("Closing %s: %w", name, err),
		)
	}
}
Output:

func (*MultiDB) Master

func (mdb *MultiDB) Master(ctx context.Context) (*sql.DB, error)

Master DB node getter.

If there is no Master set, like after initialization, SelectMaster is ran to find the apropiate Master node.

Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

db, err := mdb.Master(ctx)
if err != nil {
	panic(err)
}

_, err = db.ExecContext(ctx, "insert something")
if err != nil {
	panic(err)
}
Output:

func (*MultiDB) MasterTx

func (mdb *MultiDB) MasterTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)

MasterTx returns the master DB node with an opened transaction

Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

tx, err := mdb.MasterTx(ctx, &sql.TxOptions{})
if err != nil {
	panic(err)
}
defer tx.Rollback()

_, err = tx.ExecContext(ctx, "insert something")
if err != nil {
	panic(err)
}

err = tx.Commit()
if err != nil {
	panic(err)
}
Output:

func (*MultiDB) MultiNode

func (mdb *MultiDB) MultiNode(max int, errCallback ErrCallbackFunc) (*MultiNode, error)

MultiNode returns available *Nodes. Nodes are sorted by the division of InUse/MaxOpenConnections. Up to `max` amount of nodes will be in the returned object. If `max` is set to 0, all available nodes are returned. An error is returned in case no nodes are available.

The nodes may be master or slaves and should only be used for read operations.

Example
mn, err := mdb.MultiNode(2, func(err error) { fmt.Println(err) })
if err != nil {
	log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var i int

err = mn.QueryRowContext(ctx, "select 1").Scan(&i)
if err != nil {
	panic(err)
}
Output:

func (*MultiDB) MultiTx added in v0.1.1

func (mdb *MultiDB) MultiTx(ctx context.Context, opts *sql.TxOptions, max int, errCallback func(error)) (*MultiTx, error)

MultiTx returns a MultiNode with an open transaction

Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

mtx, err := mdb.MultiTx(ctx, nil, 2, func(err error) { fmt.Println(err) })
if err != nil {
	log.Fatal(err)
}
defer mtx.Rollback()

var i int

err = mtx.QueryRowContext(ctx, "select 1").Scan(&i)
if err != nil {
	panic(err)
}
Output:

func (*MultiDB) Node

func (mdb *MultiDB) Node() (*sql.DB, error)

Node returns any DB Node with the lowest value after division of InUse/MaxOpenConnections.

The returned node may be master or slave and should only be used for read operations.

func (*MultiDB) NodeTx

func (mdb *MultiDB) NodeTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)

NodeTx returns any node with an opened transaction. The transaction is created in ReadOnly mode.

func (*MultiDB) SelectMaster added in v0.2.0

func (mdb *MultiDB) SelectMaster(ctx context.Context) (*sql.DB, error)

SelectMaster runs mdb.MasterFunc against all Nodes. The first one to return `true` will be returned and stored as master for future calles to `mdb.Master()`.

This method should be called only if trouble is detected with the current master, which might mean that the master role is shifted to another Node.

type MultiError

type MultiError struct {
	Errors []error
}

MultiError is a collection of errors which can arise from parallel query execution.

func (*MultiError) Error

func (me *MultiError) Error() string

type MultiNode

type MultiNode struct {
	// contains filtered or unexported fields
}

MultiNode holds multiple DB Nodes. All methods on this type run their sql.DB variant in one Go routine per Node.

Example
mn, err := mdb.MultiNode(2, func(err error) { log.Println(err) })
if err != nil {
	panic(err)
}

opCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

ctx, cancel := context.WithCancel(opCtx)
defer cancel()

rows, err := mn.QueryContext(ctx, "select $1;", 1)
if err != nil {
	panic(err)
}

var i int

for rows.Next() {
	if err = rows.Scan(&i); err != nil {
		panic(err)
	}
}

// Cancel the context after rows scanning!
cancel()

ctx, cancel = context.WithCancel(opCtx)
err = mn.QueryRowContext(ctx, "select $1", 2).Scan(&i)
cancel()

if err != nil {
	panic(err)
}
Output:

func (*MultiNode) Begin

func (m *MultiNode) Begin() (*MultiTx, error)

Begin runs BeginTx with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included for consistency.

func (*MultiNode) BeginTx

func (m *MultiNode) BeginTx(ctx context.Context, opts *sql.TxOptions) (mtx *MultiTx, err error)

BeginTx runs sql.DB.BeginTx on the Nodes in separate Go routines. The transactions are created in ReadOnly mode. It waits for all the calls to return or the context to expire. If you have enough nodes available, you might want to set short timeout values on the context to fail fast on non-responding database hosts.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Note: this method can return both a valid Tx and an error value, in case any (but not all) node calls fails. Tx will carry fewer amount of entries than requested. This breaks the common `if err != nil` convention, but we want to leave the descission whetter to proceed or not, up to the caller.

func (*MultiNode) Exec

func (m *MultiNode) Exec(query string, args ...interface{}) (sql.Result, error)

Exec runs ExecContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (*MultiNode) ExecContext

func (m *MultiNode) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

ExecContext runs sql.DB.ExecContext on the Nodes in separate Go routines. The first non-error result is returned immediately and errors from the other Nodes will be ignored.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

It does not make much sense to run this method against multiple Nodes, as they are usually slaves. This method is primarily included to implement boil.ContextExecutor.

func (*MultiNode) Query

func (m *MultiNode) Query(query string, args ...interface{}) (*sql.Rows, error)

Query runs QueryContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (*MultiNode) QueryContext

func (m *MultiNode) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

QueryContext runs sql.DB.QueryContext on the Nodes in separate Go routines. The first non-error result is returned immediately and errors from the other Nodes will be ignored.

It is important to cancel the context as soon as possible after scanning the rows. Preferably, before any next operation on MultiNode.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Implements boil.ContextExecutor.

func (*MultiNode) QueryRow

func (m *MultiNode) QueryRow(query string, args ...interface{}) *sql.Row

QueryRow runs QueryRowContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (*MultiNode) QueryRowContext

func (m *MultiNode) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

QueryRowContext runs sql.DB.QueryRowContext on the Nodes in separate Go routines. The first error free result is returned immediately. If all resulting sql.Row objects contain an error, only the last Row containing an error is returned.

It is important to cancel the context as soon as possible after scanning the row. Preferably, before any next operation on MultiNode.

type MultiTx

type MultiTx struct {
	// contains filtered or unexported fields
}

MultiTx holds a slice of open transactions to multiple nodes. All methods on this type run their sql.Tx variant in one Go routine per Node.

func (*MultiTx) Commit

func (m *MultiTx) Commit() error

Commit runs sql.Tx.Commit on the transactions in separate Go routines. It waits for all the calls to return.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Note: this method returns an error even if some commits where executed successfully. It is up to the caller to decide what to do with those errors. Typically MultiTx calls should only be run against a set if slave databases. herefore it does not make much sense to Commit. If however, you did run this against multiple hosts and some of them failed, you'll now have to deal with an inconsistent dataset.

This method is primarily included to implement boil.Transactor

func (*MultiTx) Exec

func (m *MultiTx) Exec(query string, args ...interface{}) (sql.Result, error)

Exec runs ExecContext with context.Background(). It is highly recommended to stick with the contexted variant in paralel executions. This method is primarily included to implement boil.Executor.

func (*MultiTx) ExecContext

func (m *MultiTx) ExecContext(ctx context.Context, query string, args ...interface{}) (res sql.Result, err error)

ExecContext runs sql.Tx.ExecContext on the transactions in separate Go routines. The first non-error result is returned immediately and errors from the other transactions will be ignored.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

It does not make much sense to run this method against multiple Nodes, as they are ussualy slaves. This method is primarily included to implement boil.ContextExecutor.

func (*MultiTx) Query

func (m *MultiTx) Query(query string, args ...interface{}) (*sql.Rows, error)

Query runs QueryContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (*MultiTx) QueryContext

func (m *MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (rows *sql.Rows, err error)

QueryContext runs sql.Tx.QueryContext on the tranactions in separate Go routines. The first non-error result is returned immediately and errors from the other Nodes will be ignored.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Implements boil.ContextExecutor.

func (*MultiTx) QueryRow

func (m *MultiTx) QueryRow(query string, args ...interface{}) *sql.Row

QueryRow wrapper around sql.DB.QueryRow. Implements boil.Executor. Since errors are deferred until row.Scan, this package cannot monitor such errors.

func (*MultiTx) QueryRowContext

func (m *MultiTx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

QueryRowContext runs sql.Tx.QueryRowContext on the tranactions in separate Go routines. The first result is returned immediately, regardless if that result has an error.

Errors in sql.Tx.QueryRow are deferred until scan and therefore opaque to this package. If you have a choice, stick with a regular QueryContext. This method is primarily included to implement boil.Executor.

func (*MultiTx) Rollback

func (m *MultiTx) Rollback() error

Rollback runs sql.Tx.Rollback on the transactions in separate Go routines. It waits for all the calls to return.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Note: this method returns an error even if some rollbacks where executed successfully. It is up to the caller to decide what to do with those errors. Typically MultiTx calls should only be run against a set of slave databases. In such cases Rollback is only used in a defer to tell the hosts that we are done and errors can safely be ignored.

Implements boil.Transactor

type NoMasterErr added in v0.2.0

type NoMasterErr struct {
	// contains filtered or unexported fields
}

NoMasterErr is returned when there is no master available. The causing error is wrapped and can be obtained through errors.Unwrap().

func (*NoMasterErr) Error added in v0.2.0

func (err *NoMasterErr) Error() string

func (*NoMasterErr) Unwrap added in v0.2.0

func (err *NoMasterErr) Unwrap() error

type NodeError added in v0.2.0

type NodeError struct {
	// contains filtered or unexported fields
}

NodeError is passed to ErrCallbackFunc functions in order to destinguish between DB nodes in concurrent query executions.

func (*NodeError) Error added in v0.2.0

func (ne *NodeError) Error() string

func (*NodeError) Name added in v0.2.0

func (ne *NodeError) Name() string

Name of the node that experienced the error

func (*NodeError) Unwrap added in v0.2.0

func (ne *NodeError) Unwrap() error

Directories

Path Synopsis
drivers
postgresql
Package postgresql defines constants for using PostgreSQL databases with MultiDB.
Package postgresql defines constants for using PostgreSQL databases with MultiDB.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL