dbresolver

package module
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: MIT Imports: 11 Imported by: 1

README

dbresolver

Golang Database Resolver and Wrapper for any multiple database connections topology, eg. master-slave replication database, cross-region application.

Go Go.Dev

Idea and Inspiration

This DBResolver library will split your connections to correct defined DBs. Eg, all read query will routed to ReadOnly replica db, and all write operation(Insert, Update, Delete) will routed to Primary/Master DB.

Read more for the explanation on this blog post

Excalidraw live diagram

Usecase 1: Separated RW and RO Database connection
Click to Expand
  • You have your application deployed
  • Your application is heavy on read operations
  • Your DBs replicated to multiple replicas for faster queries
  • You separate the connections for optimized query
  • readonly-readwrite
Usecase 2: Cross Region Database
Click to Expand
  • Your application deployed to multi regions.
  • You have your Databases configured globally.
  • cross-region
Usecase 3: Multi-Master (Multi-Primary) Database
Click to Expand
  • You're using a Multi-Master database topology eg, Aurora Multi-Master
  • multi-master

Support

You can file an Issue. See documentation in Go.Dev

Getting Started

Download
go get -u github.com/bxcodec/dbresolver/v2

Example

Implementing DB Resolver using *sql.DB
Click to Expand
package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	"github.com/bxcodec/dbresolver/v2"
	_ "github.com/lib/pq"
)

func main() {
	var (
		host1     = "localhost"
		port1     = 5432
		user1     = "postgresrw"
		password1 = "<password>"
		host2     = "localhost"
		port2     = 5433
		user2     = "postgresro"
		password2 = "<password>"
		dbname    = "<dbname>"
	)
	// connection string
	rwPrimary := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host1, port1, user1, password1, dbname)
	readOnlyReplica := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host2, port2, user2, password2, dbname)

	// open database for primary
	dbPrimary, err := sql.Open("postgres", rwPrimary)
	if err != nil {
		log.Print("go error when connecting to the DB")
	}
	// configure the DBs for other setup eg, tracing, etc
	// eg, tracing.Postgres(dbPrimary)

	// open database for replica
	dbReadOnlyReplica, err := sql.Open("postgres", readOnlyReplica)
	if err != nil {
		log.Print("go error when connecting to the DB")
	}
	// configure the DBs for other setup eg, tracing, etc
	// eg, tracing.Postgres(dbReadOnlyReplica)

	connectionDB := dbresolver.New(
		dbresolver.WithPrimaryDBs(dbPrimary),
		dbresolver.WithReplicaDBs(dbReadOnlyReplica),
		dbresolver.WithLoadBalancer(dbresolver.RoundRobinLB))

	defer connectionDB.Close()
	// now you can use the connection for all DB operation
	_, err = connectionDB.ExecContext(context.Background(), "DELETE FROM book WHERE id=$1") // will use primaryDB
	if err != nil {
		log.Print("go error when executing the query to the DB", err)
	}
	connectionDB.QueryRowContext(context.Background(), "SELECT * FROM book WHERE id=$1") // will use replicaReadOnlyDB
}

Important Notes

  • Primary Database will be used when you call these functions
    • Exec
    • ExecContext
    • Begin (transaction will use primary)
    • BeginTx
    • Queries with "RETURNING" clause
      • Query
      • QueryContext
      • QueryRow
      • QueryRowContext
  • Replica Databases will be used when you call these functions
    • Query
    • QueryContext
    • QueryRow
    • QueryRowContext

Contribution


To contrib to this project, you can open a PR or an issue.

Documentation

Index

Examples

Constants

View Source
const Commit = ""

Commit: is the HEAD commit of this Version

View Source
const ReleaseTime = ""

ReleaseTime is the UTC Time of the release of this Version

View Source
const Version = ""

Version is the current release version.

Variables

This section is empty.

Functions

This section is empty.

Types

type Conn added in v2.1.0

type Conn interface {
	Close() error
	BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error)
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	PingContext(ctx context.Context) error
	PrepareContext(ctx context.Context, query string) (Stmt, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	Raw(f func(driverConn interface{}) error) (err error)
}

Conn is a *sql.Conn wrapper. Its main purpose is to be able to return the internal Tx and Stmt interfaces.

type DB

type DB interface {
	Begin() (Tx, error)
	BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error)
	Close() error
	// Conn only available for the primary db or the first primary db (if using multi-primary)
	Conn(ctx context.Context) (Conn, error)
	Driver() driver.Driver
	Exec(query string, args ...interface{}) (sql.Result, error)
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	Ping() error
	PingContext(ctx context.Context) error
	Prepare(query string) (Stmt, error)
	PrepareContext(ctx context.Context, query string) (Stmt, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	SetConnMaxIdleTime(d time.Duration)
	SetConnMaxLifetime(d time.Duration)
	SetMaxIdleConns(n int)
	SetMaxOpenConns(n int)
	PrimaryDBs() []*sql.DB
	ReplicaDBs() []*sql.DB
	// Stats only available for the primary db or the first primary db (if using multi-primary)
	Stats() sql.DBStats
}

DB interface is a contract that supported by this library. All offered function of this library defined here. This supposed to be aligned with sql.DB, but since some of the functions is not relevant with multi dbs connection, we decided to forward all single connection DB related function to the first primary DB For example, function like, `Conn()“, or `Stats()` only available for the primary DB, or the first primary DB (if using multi-primary)

func New

func New(opts ...OptionFunc) DB

New will resolve all the passed connection with configurable parameters

Example (MultiPrimaryMultiReplicas)
var (
	host1     = "localhost"
	port1     = 5432
	user1     = "postgresrw"
	password1 = "<password>"
	host2     = "localhost"
	port2     = 5433
	user2     = "postgresro"
	password2 = "<password>"
	dbname    = "<dbname>"
)
// connection string
rwPrimary := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host1, port1, user1, password1, dbname)
readOnlyReplica := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host2, port2, user2, password2, dbname)

// open database for primary
dbPrimary1, err := sql.Open("postgres", rwPrimary)
if err != nil {
	log.Print("go error when connecting to the DB")
}
// open database for primary
dbPrimary2, err := sql.Open("postgres", rwPrimary)
if err != nil {
	log.Print("go error when connecting to the DB")
}

// configure the DBs for other setup eg, tracing, etc
// eg, tracing.Postgres(dbPrimary)

// open database for replica
dbReadOnlyReplica1, err := sql.Open("postgres", readOnlyReplica)
if err != nil {
	log.Print("go error when connecting to the DB")
}
// open database for replica
dbReadOnlyReplica2, err := sql.Open("postgres", readOnlyReplica)
if err != nil {
	log.Print("go error when connecting to the DB")
}
// configure the DBs for other setup eg, tracing, etc
// eg, tracing.Postgres(dbReadOnlyReplica)

connectionDB := dbresolver.New(
	dbresolver.WithPrimaryDBs(dbPrimary1, dbPrimary2),
	dbresolver.WithReplicaDBs(dbReadOnlyReplica1, dbReadOnlyReplica2),
	dbresolver.WithLoadBalancer(dbresolver.RoundRobinLB))

// now you can use the connection for all DB operation
_, err = connectionDB.ExecContext(context.Background(), "DELETE FROM book WHERE id=$1") // will use primaryDB
if err != nil {
	log.Print("go error when executing the query to the DB", err)
}
_ = connectionDB.QueryRowContext(context.Background(), "SELECT * FROM book WHERE id=$1") // will use replicaReadOnlyDB
Output:

type DBConnection

type DBConnection interface {
	*sql.DB | *sql.Stmt
}

DBConnection is the generic type for DB and Stmt operation

type DBLoadBalancer

type DBLoadBalancer LoadBalancer[*sql.DB]

DBLoadBalancer is loadbalancer for physical DBs

type DefaultQueryTypeChecker added in v2.1.0

type DefaultQueryTypeChecker struct {
}

DefaultQueryTypeChecker searches for a "RETURNING" string inside the query to detect a write query.

func (DefaultQueryTypeChecker) Check added in v2.1.0

func (c DefaultQueryTypeChecker) Check(query string) QueryType

type LoadBalancer

type LoadBalancer[T DBConnection] interface {
	Resolve([]T) T
	Name() LoadBalancerPolicy
	// contains filtered or unexported methods
}

LoadBalancer define the load balancer contract

type LoadBalancerPolicy

type LoadBalancerPolicy string

LoadBalancerPolicy define the loadbalancer policy data type

const (
	RoundRobinLB LoadBalancerPolicy = "ROUND_ROBIN"
	RandomLB     LoadBalancerPolicy = "RANDOM"
)

Supported Loadbalancer policy

type Option

type Option struct {
	PrimaryDBs       []*sql.DB
	ReplicaDBs       []*sql.DB
	StmtLB           StmtLoadBalancer
	DBLB             DBLoadBalancer
	QueryTypeChecker QueryTypeChecker
}

Option define the option property

type OptionFunc

type OptionFunc func(opt *Option)

OptionFunc used for option chaining

func WithLoadBalancer

func WithLoadBalancer(lb LoadBalancerPolicy) OptionFunc

WithLoadBalancer configure the loadbalancer for the resolver

func WithPrimaryDBs

func WithPrimaryDBs(primaryDBs ...*sql.DB) OptionFunc

WithPrimaryDBs add primaryDBs to the resolver

func WithQueryTypeChecker added in v2.1.0

func WithQueryTypeChecker(checker QueryTypeChecker) OptionFunc

WithQueryTypeChecker sets the query type checker instance. The default one just checks for the presence of the string "RETURNING" in the uppercase query.

func WithReplicaDBs

func WithReplicaDBs(replicaDBs ...*sql.DB) OptionFunc

WithReplicaDBs add replica DBs to the resolver

type QueryType added in v2.1.0

type QueryType int
const (
	QueryTypeUnknown QueryType = iota
	QueryTypeRead
	QueryTypeWrite
)

type QueryTypeChecker added in v2.1.0

type QueryTypeChecker interface {
	Check(query string) QueryType
}

QueryTypeChecker is used to try to detect the query type, like for detecting RETURNING clauses in INSERT/UPDATE clauses.

type RandomLoadBalancer

type RandomLoadBalancer[T DBConnection] struct {
	// contains filtered or unexported fields
}

RandomLoadBalancer represent for Random LB policy

func (RandomLoadBalancer[T]) Name

RandomLoadBalancer return the LB policy name

func (RandomLoadBalancer[T]) Resolve

func (lb RandomLoadBalancer[T]) Resolve(dbs []T) T

Resolve return the resolved option for Random LB

type RoundRobinLoadBalancer

type RoundRobinLoadBalancer[T DBConnection] struct {
	// contains filtered or unexported fields
}

RoundRobinLoadBalancer represent for RoundRobin LB policy

func (RoundRobinLoadBalancer[T]) Name

RandomLoadBalancer return the LB policy name

func (*RoundRobinLoadBalancer[T]) Resolve

func (lb *RoundRobinLoadBalancer[T]) Resolve(dbs []T) T

Resolve return the resolved option for RoundRobin LB

type Stmt

type Stmt interface {
	Close() error
	Exec(...interface{}) (sql.Result, error)
	ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error)
	Query(...interface{}) (*sql.Rows, error)
	QueryContext(ctx context.Context, args ...interface{}) (*sql.Rows, error)
	QueryRow(args ...interface{}) *sql.Row
	QueryRowContext(ctx context.Context, args ...interface{}) *sql.Row
}

Stmt is an aggregate prepared statement. It holds a prepared statement for each underlying physical db.

type StmtLoadBalancer

type StmtLoadBalancer LoadBalancer[*sql.Stmt]

StmtLoadBalancer is loadbalancer for query prepared statements

type Tx added in v2.1.0

type Tx interface {
	Commit() error
	Rollback() error
	Exec(query string, args ...interface{}) (sql.Result, error)
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	Prepare(query string) (Stmt, error)
	PrepareContext(ctx context.Context, query string) (Stmt, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	Stmt(stmt Stmt) Stmt
	StmtContext(ctx context.Context, stmt Stmt) Stmt
}

Tx is a *sql.Tx wrapper. Its main purpose is to be able to return the internal Stmt interface.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL