dbtx

package module
v0.0.0-...-b1203ac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2023 License: BSD-3-Clause Imports: 4 Imported by: 0

README

dbtx

Unit of Work implementation with golang. Abstracts the complexity in setting transactions across different repository. Read more about it in this blog Simplying Transactions in Golang with Unit of Work Pattern.

Interface

Note Implement only the methods you need, nothing more.

// atomic represents the database atomic operations in a transactions.
type atomic interface {
	IsTx() bool
	DBTx(ctx context.Context) DBTX
	DB(ctx context.Context) DBTX
	Tx(ctx context.Context) DBTX

	// In your application service layer (aka usecase layer), this is probably
	// the only interface you need to implement.
	RunInTx(ctx context.Context, fn func(txCtx context.Context) error) (err error)
}

Why DBTX

Option 1: Using WithTx

There are quite a number of examples using WithTx which will return a new pointer to the repository with *sql.Tx as the underlying connection:

func (r *UserRepository) WithTx(tx *sql.Tx) *UserRepository {
	return &UserRepository{
		db: tx,
	}
}

However, this becomes an apparent issue in the application service layer, as it is impossible to define the interface for the repository:

type userRepository interface {
	// WARN: WithTx now returns the concrete implementation, not interface!
	// This means all its methods can be accessed.
	WithTx(tx *sql.Tx) *repository.UserRepository
	Create(ctx context.Context, name string) (*domain.User, error)
}

type UserUsecase struct {
	// WARN: Leaking database driver implementation here.
	db *sql.DB
	repo userRepository
}

func (uc *UserUsecase) Create(ctx context.Context, name string) (*domain.User, error) {
	tx, err := db.Begin()
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var u *domain.User

	// Does not comply to `userRepository`, so it can call all the methods from *UserRepository
	repo := uc.repo.WithTx(tx)
	_ = repo.SomeUnallowedMethod()

	return u, tx.Commit()
}

There are several issues:

  1. The details of the database driver (*sql.DB and *sql.Tx) is leaked
  2. The repository with transaction driver no longer fulfils the interface
  3. Passing down *sql.Tx is dangerous, since there is no way of controlling the time the transaction is committed. By right, the commit method should only be called by the parent that initiates the transaction. 🤷
  4. Nesting of database transaction will cause issue 3 to be more apparent. For example, you may want to chain multiple application service/repository to run in a single transaction. In order to ensure that only the root parent can commit the transaction, you first need to know if an underlying transaction is passed down. dbtx handles this gracefully by reusing the transaction that is passed down through context.
  5. Forgot to Commit/Rollback a transaction in deeply nested layers.

Also, the issue number 2 cannot be solved this way 😃 :

type userRepository interface {
	// Replacing *repository.UserRepository with userRepository does not work.
	WithTx(tx *sql.Tx) userRepository
	Create(ctx context.Context, name string) (*domain.User, error)
}
Option 2: Passing *sql.DB and *sql.Tx explicitly

The only way to solve the issue in Option 1 is to pass the underlying database connection directly through the function or method.

func (r *UserRepository) Create(ctx context.Context, tx *sql.Tx, name string) (*domain.User, error) {
	// Do something ...
}

However, it introduces a new problem, because sometimes we don't want to run a repository method in transaction. A workaround is to use the default *sql.DB if the *sql.Tx is nil:

type UserRepository struct {
	db *sql.DB
}


func (r *UserRepository) Create(ctx context.Context, tx *sql.Tx, name string) (*domain.User, error) {
	if tx == nil {
		// Use r.db
	} else {
		// Use tx
	}
}

However, this complicates the function signature, and passing nil is ugly:

// user_usecase.go
userRepository.Create(ctx, nil, "John Appleseed")

Also, it still leaks the details of the database driver in the usecase layer.

Option 3: Pass the database driver using context.Context

This is the method implemented by dbtx. There are a lot of articles that claims that passing dependencies through context is not idiomatic golang. There is nothing to refute those claims.

Use dbtx only if you are comfortable with the idea.

With dbtx, the implementation details of the database driver are not leaked in the application service (aka usecase) layer.

// user_usecase.go
type atomic interface {
	RunInTx(ctx context.Context, fn func(txCtx context.Context) error) error
}

type userRepository interface {
	Create(ctx context.Context, name string) (*domain.User, error)
}

type UserUsecase struct {
	atomic
	repo userRepository
}

func (uc *UserUsecase) Create(ctx context.Context, name string) (*domain.User, error) {
	var u *domain.User
	err := uc.RunInTx(ctx, func(txCtx context.Context) error {
		err := uc.repo.Create(ctx, name)
		if err != nil {
			return err
		}

		// Chain several repository methods here that requires transactions ...

		return nil
	})

	return u, err
}

The repository require minor modifications in order to support between *sql.DB or *sql.Tx:

// user_repository.go


type atomicDBTX interface {
	DBTx(ctx context.Context) dbtx.DBTX
}

type UserRepository struct {
	atomicDBTX
}

func (r *UserRepository) Create(ctx context.Context, name string) (*domain.User, error) {
	// Returns `*sql.DB` if no transaction context is found.
	// Returns `*sql.Tx` if transaction context is found.
	dbtx := r.DBTx(ctx)

	// NOTE: If you want to ensure that this method to only use `*sql.Tx`, then:
	// tx := r.Tx(ctx)
	//
	// On the other hand, if you want this to only use `*sql.Tx`, then:
	// db := r.DB(ctx)
	//
	// Both methods above will panic if the underlying type does not match.
	// If you want to handle them yourself, use:
	//
	// dbTx, ok := dbtx.DBTx(ctx)
	// tx, ok := dbtx.Tx(ctx)

	// NOTE: Sample code
	err := dbtx.QueryRowContext(ctx, `insert into users ...`, name).Scan(&u)
	return err
}

Nesting of Transaction

When explicitly passing *sql.Tx/*sql.DB to the repository, we can still chain different repositories together. However, what happens when you need to chain multiple usecases? Now all the usecase has to accept the database driver as the method arguments.

type AccountUsecase struct {
	db *sql.DB
	userUsecase userUsecase
	adminUsecase adminUsecase
}

func (uc *AccountUsecase) CreateUserAdmin(ctx context.Context, email string) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}
	defer tx.Rollback()

	if err := uc.userUsecase.CreateUser(ctx, tx, email); err != nil {
		return err
	}

	if err := uc.adminUsecase.CreateAdmin(ctx, tx, email); err != nil {
		return err
	}

	return tx.Commit()
}

Except that it might not work, because in the adminUsecase and/or userUsecase, there is already a method that calls tx.Commit. This could be because originally we already have a usecase where we want to create the user within a transaction:

type UserUsecase struct {
	db *sql.DB
}

func (uc *UserUsecase) CreateUser(ctx, tx *sql.Tx, email string) error {
	var err error
	if tx == nil {
		tx, err = uc.db.BeginTx()
		if err != nil {
			return err
		}
	}
	defer tx.Rollback()

	if err := tx.Exec(`...`, email); err != nil {
		return err
	}

	// WARN: This commit will end the AccountUsecase flow, and no Account will be created.
	return tx.Commit()
}

To make it reusable, we need to create multiple methods:

type UserUsecase struct {
	db *sql.DB
}

// CreateuserTx is a method that delegates the Commit/Rollback to the parent.
func (uc *UserUsecase) CreateUserTx(ctx, tx *sql.Tx, email string) error {
	if err := tx.Exec(`...`, email); err != nil {
		return err
	}

	return nil
}

func (uc *UserUsecase) CreateUser(ctx, email string) error {
	tx, err := uc.db.BeginTx()
	if err != nil {
		return err
	}

	if err := uc.CreateUserTx(ctx, tx, email); err != nil {
		return err
	}


	return tx.Commit()
}

With dbtx, you don't need to create multiple methods, since it automatically detects if the context contains the underlying transaction and reuses it. It guards against nested transaction.

Enforce Tx

If an operation must be absolutely carried out in a transaction, use dbtx.Tx(ctx) to ensure the context contains the *sql.Tx:

func (r *userRepository) Create(ctx context.Context, name string) error {
	tx, ok := dbtx.Tx(ctx)
	if !ok {
		panic(dbtx.ErrNonTransaction)
	}

	_, err := tx.Exec(`insert into users (name) values ($1)`, name)
	return err
}

Transaction-ready Repository

package main

import (
	"context"
	"database/sql"

	"github.com/alextanhongpin/dbtx"
)

type atomic interface {
	RunInTx(ctx context.Context, fn func(txCtx context.Context) error) (err error)
}

type dbCtx interface {
	DBTx(ctx context.Context) dbtx.DBTX
}

type atomicDBCtx interface {
	atomic
	dbCtx
}

func main() {
	var db *sql.DB
	u := dbtx.New(db)
	userRepo := &userRepository{db: db}
	accountRepo := &accountRepository{dbtx: u}
	uc := &authUseCase{
		dbtx:        u,
		userRepo:    userRepo,
		accountRepo: accountRepo,
	}
	_ = uc
}

type userRepository struct {
	db *sql.DB
}

func (r *userRepository) DB(ctx context.Context) dbtx.DBTX {
	// Obtain either a *sql.DB/*sql.Tx from the context, or use the current
	// repository's *sql.DB.
	// A convenient method *dbtx.DB(ctx) is provided (see account repository
	// below).
	v, ok := dbtx.DBTx(ctx)
	if ok {
		return v
	}

	return r.db
}

func (r *userRepository) Create(ctx context.Context, name string) error {
	// This will obtain the Tx from the context, otherwise it will fallback to Db.
	db := r.DB(ctx)
	_, err := db.Exec(`insert into users (name) values ($1)`, name)
	return err
}

type accountRepository struct {
	dbtx atomicDBCtx
}

func (r *accountRepository) Create(ctx context.Context, name string) error {
	db := r.dbtx.DBTx(ctx)
	_, err := db.Exec(`insert into accounts (name) values ($1)`, name)
	return err
}

type authUseCase struct {
	dbtx        atomic
	userRepo    *userRepository
	accountRepo *accountRepository
}

func (uc *authUseCase) Create(ctx context.Context, name string) error {
	// You can pass in options to override *sql.TxOptions.
	ctx = dbtx.ReadOnly(ctx, false)
	ctx = dbtx.IsolationLevel(ctx, sql.LevelDefault)

	return uc.dbtx.RunInTx(ctx, func(ctx context.Context) error {
		err := uc.userRepo.Create(ctx, name)
		if err != nil {
			return err
		}

		return uc.accountRepo.Create(ctx, name)
	})
}

Outbox Pattern

One common usecase when wrapping operations in a transaction is to implement Outbox pattern.

For simple usecases, we can just persist the events in-memory and flush them when the transaction commits. For a more scalable (?) solution, consider using Debezium.

package main

import (
	"context"
	"fmt"
)

type atomic interface {
	IsTx() bool
	RunInTx(ctx context.Context, fn func(txCtx context.Context) error) (err error)
}

func main() {
	repo := newMockOutboxRepo()
	atm := newMockAtomic()
	outbox := newOutboxAtomic(repo, atm)
	uc := &authUsecase{dbtx: outbox}
	fmt.Println(uc.Login(context.Background(), "john@appleseed.com"))
}

type authUsecase struct {
	dbtx atomic
}

func (uc *authUsecase) Login(ctx context.Context, email string) error {
	// NOTE: if passing dependencies through context is not to your liking, you
	// can also pass the outbox as the second argument. Example:
	//
	// return uc.dbtx.RunInTx(ctx, func(txCtx context.Context, outbox Outbox) error {
	return uc.dbtx.RunInTx(ctx, func(txCtx context.Context) error {
		// Retrieve the outbox.
		outbox, ok := outboxValue(txCtx)
		if ok {
			// Fire events. These events will be saved in the same transaction.
			outbox.Fire(
				Event{Type: "user_created", Data: map[string]any{"email": email}},
				Event{Type: "logged_in", Data: map[string]any{"email": email}},
			)
		}

		return nil
	})
}

type Event struct {
	Type string
	Data any
}

type contextKey string

var outboxContextKey contextKey = "outbox"

func withOutboxValue(ctx context.Context, o Outbox) context.Context {
	return context.WithValue(ctx, outboxContextKey, o)
}

func outboxValue(ctx context.Context) (Outbox, bool) {
	o, ok := ctx.Value(outboxContextKey).(Outbox)
	return o, ok
}

// OutboxAtomic is a customized UOW that allows persisting events on transaction commit.
type OutboxAtomic struct {
	atomic
	repo outboxRepo
}

func newOutboxAtomic(repo outboxRepo, atm atomic) *OutboxAtomic {
	return &OutboxAtomic{
		atomic: atm,
		repo:   repo,
	}
}

func (u *OutboxAtomic) RunInTx(ctx context.Context, fn func(ctx context.Context) error) error {
	return u.atomic.RunInTx(ctx, func(txCtx context.Context) error {
		// A new outbox is created per-request.
		o := new(outbox)

		// The context containing the outbox is passed down.
		if err := fn(withOutboxValue(txCtx, o)); err != nil {
			return err
		}

		// Flush events
		return u.repo.Save(txCtx, o.events...)
	})
}

type mockAtomic struct{}

func newMockAtomic() *mockAtomic {
	return new(mockAtomic)
}

func (m *mockAtomic) IsTx() bool { return true }
func (m *mockAtomic) RunInTx(ctx context.Context, fn func(txContext context.Context) error) error {
	return fn(ctx)
}

type mockOutboxRepo struct{}

func newMockOutboxRepo() *mockOutboxRepo {
	return new(mockOutboxRepo)
}

func (r *mockOutboxRepo) Save(ctx context.Context, events ...Event) error {
	if len(events) == 0 {
		return nil
	}

	fmt.Println("[mockOutboxRepo] Save", events)
	return nil
}

var _ atomic = (*OutboxAtomic)(nil)

type outboxRepo interface {
	Save(ctx context.Context, events ...Event) error
}

type Outbox interface {
	Fire(events ...Event)
}

type outbox struct {
	events []Event
}

func (o *outbox) Fire(events ...Event) {
	fmt.Println("fire", events)
	o.events = append(o.events, events...)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNonTransaction    = errors.New("dbtx: underlying type is not a transaction")
	ErrIsTransaction     = errors.New("dbtx: underlying type is transaction")
	ErrNestedTransaction = errors.New("dbtx: transactions cannot be nested")
)
View Source
var ErrContextNotFound = errors.New("dbtx: Atomic not found in context")

Functions

func Depth

func Depth(ctx context.Context) int

func IncDepth

func IncDepth(ctx context.Context) context.Context

func IsTx

func IsTx(ctx context.Context) bool

func IsolationLevel

func IsolationLevel(ctx context.Context, isoLevel sql.IsolationLevel) context.Context

func ReadOnly

func ReadOnly(ctx context.Context, readOnly bool) context.Context

func RunInTx

func RunInTx(ctx context.Context, db *sql.DB, opt *sql.TxOptions, fn func(tx *sql.Tx) error) (err error)

func TxOptions

func TxOptions(ctx context.Context) *sql.TxOptions

func WithValue

func WithValue(ctx context.Context, atm *Atomic) context.Context

Types

type Atomic

type Atomic struct {
	// contains filtered or unexported fields
}

Atomic represents a unit of work.

func MustValue

func MustValue(ctx context.Context) *Atomic

func New

func New(db *sql.DB, opts ...Option) *Atomic

New returns a pointer to Atomic.

func NewTx

func NewTx(tx *sql.Tx, opts ...Option) *Atomic

NewTx returns a Atomic with transaction.

func Value

func Value(ctx context.Context) (*Atomic, bool)

func (*Atomic) DB

func (a *Atomic) DB(ctx context.Context) DBTX

DB returns the underlying *sql.DB as DBTX interface, to avoid the caller to init a new transaction. This also allows wrapping the *sql.DB with other implementations, such as recorder.

func (*Atomic) DBTx

func (a *Atomic) DBTx(ctx context.Context) DBTX

DBTx returns the DBTX from the context, which can be either *sql.DB or *sql.Tx. Returns the atomic underlying type if the context is empty.

func (*Atomic) IsTx

func (a *Atomic) IsTx() bool

IsTx returns true if the underlying type is a transaction.

func (*Atomic) RunInTx

func (a *Atomic) RunInTx(ctx context.Context, fn func(context.Context) error) (err error)

RunInTx wraps the operation in a transaction. If a context containing tx is passed in, then it will use the context tx. Transaction cannot be nested. The transaction can only be committed by the parent.

func (*Atomic) Tx

func (a *Atomic) Tx(ctx context.Context) DBTX

Tx returns the *sql.Tx from context. The return type is still a DBTX interface to avoid client from calling tx.Commit. When dealing with nested transaction, only the parent of the transaction can commit the transaction.

type DBTX

type DBTX interface {
	Exec(query string, args ...any) (sql.Result, error)
	Prepare(query string) (*sql.Stmt, error)
	Query(query string, args ...any) (*sql.Rows, error)
	QueryRow(query string, args ...any) *sql.Row

	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

DBTX represents the common db operations for both *sql.DB and *sql.Tx.

func DBTx

func DBTx(ctx context.Context) (DBTX, bool)

DBTx returns the DBTX from the context, which can be either *sql.DB or *sql.Tx.

func Tx

func Tx(ctx context.Context) (DBTX, bool)

Tx returns the DBTX from the context, only if the underlying type is a *sql.Tx. We still return the DBTX interface here, to avoid client from manually calling the tx.Commit.

type Event

type Event struct {
	Method  string
	Query   string
	Args    []any
	Err     error
	StartAt time.Time
	EndAt   time.Time
}

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger logs the query and args.

func NewLogger

func NewLogger(dbtx DBTX, l logger) *Logger

func (*Logger) Exec

func (r *Logger) Exec(query string, args ...any) (sql.Result, error)

func (*Logger) ExecContext

func (r *Logger) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

func (*Logger) Prepare

func (r *Logger) Prepare(query string) (*sql.Stmt, error)

func (*Logger) PrepareContext

func (r *Logger) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

func (*Logger) Query

func (r *Logger) Query(query string, args ...any) (*sql.Rows, error)

func (*Logger) QueryContext

func (r *Logger) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

func (*Logger) QueryRow

func (r *Logger) QueryRow(query string, args ...any) *sql.Row

func (*Logger) QueryRowContext

func (r *Logger) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

type Middleware

type Middleware func(DBTX) DBTX

func WithLogger

func WithLogger(l logger) Middleware

func WithTracer

func WithTracer(t tracer) Middleware

type Option

type Option interface {
	// contains filtered or unexported methods
}

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

Tracer logs the query, args as well as the execution time and error.

func NewTracer

func NewTracer(dbtx DBTX, t tracer) *Tracer

func (*Tracer) Exec

func (r *Tracer) Exec(query string, args ...any) (res sql.Result, err error)

func (*Tracer) ExecContext

func (r *Tracer) ExecContext(ctx context.Context, query string, args ...any) (res sql.Result, err error)

func (*Tracer) Prepare

func (r *Tracer) Prepare(query string) (stmt *sql.Stmt, err error)

func (*Tracer) PrepareContext

func (r *Tracer) PrepareContext(ctx context.Context, query string) (stmt *sql.Stmt, err error)

func (*Tracer) Query

func (r *Tracer) Query(query string, args ...any) (rows *sql.Rows, err error)

func (*Tracer) QueryContext

func (r *Tracer) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error)

func (*Tracer) QueryRow

func (r *Tracer) QueryRow(query string, args ...any) *sql.Row

func (*Tracer) QueryRowContext

func (r *Tracer) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

Directories

Path Synopsis
postgres

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL