hermes

package module
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 8 Imported by: 0

README

Hermes PGX 2.1.0

Hermes PGX is an update to the https://github.com/sbowman/hermes package that wraps https://github.com/jackc/pgx in place of the older https://github.com/lib/pq package. This package is much lighter weight than the original Hermes, as much of the original wrapping functionality is baked into the newer pgx package.

At its heart, Hermes PGX supplies a common interface, hermes.Conn, to wrap the database connection pool and transactions so they share common functionality, and can be used interchangeably in the context of a test or function. This makes it easier to leverage functions in different combinations to build database APIs for Go applications.

Note that because this package is based on pgx, it only supports PostgreSQL. If you're using another database, https://github.com/sbowman/hermes remains agnostic.

Hermes v2.1.0 upgrades support for https://github.com/jackc/pgx package to v5.

Godoc license

go get github.com/sbowman/hermes-pgx/v2

Usage

// Sample can take either a reference to the pgx database connection pool, or to a transaction.
func Sample(conn hermes.Conn, name string) error {
    tx, err := conn.Begin()
    if err != nil {
        return err
    }
    
    // Will automatically rollback if an error short-circuits the return
    // before tx.Commit() is called...
    defer tx.Close() 

    res, err := conn.Exec("insert into samples (name) values ($1)", name)
    if err != nil {
        return err
    }

    check, err := res.RowsAffected()
    if check == 0 {
        return fmt.Errorf("Failed to insert row (%s)", err)
    }

    return tx.Commit()
}

func main() {
    // Create a connection pool with max 10 connections, min 2 idle connections...
    conn, err := hermes.Connect("postgres://postgres@127.0.0.1/my_db?sslmode=disable&connect_timeout=10")
    if err != nil {
        return err
    }

    // This works...
    if err := Sample(conn, "Bob"); err != nil {
        fmt.Println("Bob failed!", err.Error())
    }

    // So does this...
    tx, err := conn.Begin()
    if err != nil {
        panic(err)
    }

    // Will automatically rollback if call to sample fails...
    defer tx.Close() 

    if err := Sample(tx, "Frank"); err != nil {
        fmt.Println("Frank failed!", err.Error())
        return
    }

    // Don't forget to commit, or you'll automatically rollback on 
    // "defer tx.Close()" above!
    if err := tx.Commit(); err != nil {
        fmt.Println("Unable to save changes to the database:", err.Error())
    }
}

Using a hermes.Conn parameter in a function also opens up in situ testing of database functionality. You can create a transaction in the test case and pass it to a function that takes a hermes.Conn, run any tests on the results of that function, and simply let the transaction rollback at the end of the test to clean up.

var DB hermes.Conn

// We'll just open one database connection pool to speed up testing, so 
// we're not constantly opening and closing connections.
func TestMain(m *testing.M) {
    conn, err := hermes.Connect(DBTestURI)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Unable to open a database connection: %s\n", err)
        os.Exit(1)
	}
	defer conn.Shutdown()
	
	DB = conn
	
	os.Exit(m.Run())
}

// Test getting a user account from the database.  The signature for the
// function is:  `func GetUser(conn hermes.Conn, email string) (User, error)`
// 
// Passing a hermes.Conn value to the function means we can pass in either
// a reference to the database pool and really update the data, or we can
// pass in the same transaction reference to both the SaveUser and GetUser
// functions.  If we use a transaction, we can let the transaction roll back 
// after we test these functions, or at any failure point in the test case,
// and we know the data is cleaned up. 
func TestGetUser(t *testing.T) {
    u := User{
        Email: "jdoe@nowhere.com",
        Name: "John Doe",
    }
    
    tx, err := db.Begin()
    if err != nil {
        t.Fatal(err)
    }
    defer tx.Close()
    
    if err := tx.SaveUser(tx, u); err != nil {
        t.Fatalf("Unable to create a new user account: %s", err)
    }
    
    check, err := tx.GetUser(tx, u.Email)
    if err != nil {
        t.Fatalf("Failed to get user by email address: %s", err)
    }
    
    if check.Email != u.Email {
        t.Errorf("Expected user email to be %s; was %s", u.Email, check.Email)
    } 
    
    if check.Name != u.Name {
        t.Errorf("Expected user name to be %s; was %s", u.Name, check.Name)
    } 
    
    // Note:  do nothing...when the test case ends, the `defer tx.Close()`
    // is called, and all the data in this transaction is rolled back out.
}

Using transactions, even if a test case fails a returns prematurely, the database transaction is automatically closed, thanks to defer. The database is cleaned up without any fuss or need to remember to delete the data you created at any point in the test.

Shutting down the connection pool

Note that because Hermes overloads the concept of db.Close() and tx.Close(), db.Close() doesn't actually do anything. In pgx, db.Close() would close the connection pool, which we don't want. So instead, call hermes.DB.Shutdown() to clean up your connection pool when your app shuts down.

db, err := hermes.Connect(DBTestURI)
if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to open a database connection: %s\n", err)
    os.Exit(1)
}
defer db.Shutdown()

Advisory Locks

Hermes provides a few support functions for managing PostgreSQL advisory locks.

  • hermes.Conn.Lock creates a session-wide, exclusive advisory lock when called on the root database pool
  • hermes.Conn.Lock creates a transaction-wide, exclusive advisory lock when called on a transaction connection

Both functions return an AdvisoryLock, which should then be released to release the lock.

db, err := hermes.Connect(DBTestURI)
if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to open a database connection: %s\n", err)
    os.Exit(1)
}
defer db.Shutdown()

// Session-wide advisory lock (lock ID = 22)
lock, err := db.Lock(ctx, 22)
if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to create advisory lock: %s\n", err)
    os.Exit(1)
}
defer lock.Release()

tx, err := db.Begin(ctx)
if err != nil {
    fmt.Fprintf(os.Stderr, "Could not create a transaction: %s\n", err)
    os.Exit(1)
}

// This will release a transaction advisory lock
defer tx.Close()

// Transaction-level advisory lock
lock, err := db.Lock(ctx, 22)
if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to create advisory lock: %s\n", err)
    os.Exit(1)
}
// Technically this doesn't release the lock, but it's good practice
defer lock.Release()

// ...

// This will also release the transactional advisory lock...
tx.Commit(ctx)

Note that technically the transaction-level advisory lock doesn't require a call to Release; as it will close automatically when the transaction ends. However, it's a good idea to call Release regardless; that way if the conn is acting as a basic connection, it requires the release, and if it's a transaction it doesn't hurt.

You may also "try" a lock, using the try functions:

lock, err := db.TryLock(ctx, 22)
lock, err := tx.TryLock(ctx, 22)

This will either return an advisory lock if it's available, or it will immediately return ErrLocked if it's not. This can be used in situations where if one instance of an app finds the lock, it can safely assume another instance is performing the function, such as cleaning up the database.

Timeouts (v2.2.0)

Hermes v2.2.0 adds support for carrying connection timeout information with the hermes.Conn objects. This can make it easier to create connections that don't get stuck if the database goes away.

First, set the timeout on the hermes.Conn or hermes.DB as a default when you connect to the database:

db, err := hermes.Connect(DBTestURI)
if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to open a database connection: %s\n", err)
    os.Exit(1)
}
defer db.Shutdown()

db.SetTimeout(config.DBTimeout)  // if config.DBTimeout refers to a setting somewhere

Then you can leverage the hermes.Conn.WithTimeout method to create a timeout context and a cancel function for you to use when making database requests:

ctx, cancel := conn.WithTimeout(ctx) // you may also pass nil if you don't have a context
defer cancel()

rows, err := conn.Query(ctx, "select * from users")

Transactions also support SetTimeout, if you want to override the default, though it's not typically necessary.

If you want to override the default timeout and support a longer running connection, simply pass in your own context with a deadline and Hermes will "fake" a timeout context and simply use yours:

// Elsewhere...
func GetUser(ctx context.Context, conn hermes.Conn, email string) (User, error) {
    ctx, cancel := conn.WithTimeout(ctx)
    defer cancel()

    row := conn.QueryRow(ctx, "select * from users where email = $1", email)
    
    // ... load the user ...

    return user, nil
}

func main() {
    conn, err := hermes.Connect(DBTestURI)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Unable to open a database connection: %s\n", err)
        os.Exit(1)
    }
    defer conn.Shutdown()

    conn.SetTimeout(time.Second)  

    // For some reason it takes a long time to get a user...
    ctx, cancel := context.WithTimeout(ctx, time.Minute)
    defer cancel()

    // If the default for db is 1 second, but the context is set to timeout in a minute, this
    // call may take as long as a minute:
    user, err := GetUser(ctx, conn, "jdoe@nowhere.com")
ContextualTx prototype

There's also a "contextual" transaction that tries to manage the timeout for you. It's experimental, but may be worth a look. Simply call conn.BeginWithTimeout rather than conn.Begin to create a transaction. You can then skip passing in a context to every request and use the context maintained internally in the transaction:

tx, err := conn.BeginWithTimeout(ctx) // if ctx already has a deadline, that deadline is used
if err != nil {
    return err
}
defer tx.Close() // this will cancel the timeout context

var userID int
row := tx.QueryRow("select id from users where email = $1", email)
if err := row.Scan(&userID); err != nil {
    return err
}

tx.Exec("insert into admin_users values ($1)", userID
return tx.Commit()

If ctx above is nil or doesn't have a deadline, BeginWithTimeout will use the default timeout and create a context it carries around with the transaction. If ctx does have a deadline, it'll use that existing context as the underlying context. Every database request will have that context attached to it automatically, and when you call tx.Close() the context.CancelFunc is called as prescribed by the context package.

Deprecated

The pgx package changes how custom types are handled.

References

https://github.com/jackc/pgx

Documentation

Index

Constants

View Source
const (
	OperatorIntervention = "57000"
	QueryCanceled        = "57014"
	AdminShutdown        = "57P01"
	CrashShutdown        = "57P02"
	CannotConnectNow     = "57P03"
	DatabaseDropped      = "57P04"
	IdleSessionTimeout   = "57P05"
)

PostgreSQL disconnect errors - https://www.postgresql.org/docs/current/errcodes-appendix.html

Variables

View Source
var (
	// Disconnects is the list of PostgreSQL error codes that indicate the connection failed.
	Disconnects = []string{
		OperatorIntervention,
		QueryCanceled,
		AdminShutdown,
		CrashShutdown,
		CannotConnectNow,
		DatabaseDropped,
		IdleSessionTimeout,
	}
)
View Source
var ErrLocked = errors.New("advisory lock already acquired")

ErrLocked returned if you try to acquire an advisory lock and it's already in use.

Functions

func IsDisconnected

func IsDisconnected(err error) bool

IsDisconnected returns true if the error is a PostgreSQL disconnect error (SQLSTATE 57P01).

func NoRows

func NoRows(err error) bool

NoRows returns true if the supplied error is one of the NoRows indicators

Types

type AdvisoryLock added in v2.1.0

type AdvisoryLock interface {
	Release() error
}

type Conn

type Conn interface {
	// Begin starts a transaction.  If Conn already represents a transaction, pgx will create a
	// savepoint instead.
	Begin(ctx context.Context) (Conn, error)

	// Commit the transaction.  Does nothing if Conn is a *pgxpool.Pool.  If the transaction is
	// a psuedo-transaction, i.e. a savepoint, releases the savepoint.  Otherwise commits the
	// transaction.
	Commit(ctx context.Context) error

	// Rollback the transaction. Does nothing if Conn is a *pgxpool.Pool.
	Rollback(ctx context.Context) error

	// Close rolls back the transaction if this is a real transaction or rolls back to the
	// savepoint if this is a pseudo nested transaction.  For a *pgxpool.Pool, this call is
	// ignored.
	//
	// Returns ErrTxClosed if the Conn is already closed, but is otherwise safe to call multiple
	// times. Hence, a defer conn.Close() is safe even if conn.Commit() will be called first in
	// a non-error condition.
	//
	// Any other failure of a real transaction will result in the connection being closed.
	Close(ctx context.Context) error

	CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
	SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults

	Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

	// Lock creates a session-wide advisory lock on a connection, and a transactional advisory
	// lock on a transaction.  Will block until the lock is available.  Returns an AdvsioryLock,
	// which must be released when you're done with the lock.
	Lock(ctx context.Context, id uint64) (AdvisoryLock, error)

	// TryLock tries to create a swssion-wide or transactional advisory lock, based on the
	// connection type.  If successful, returns an AdvisoryLock which must be released when
	// you're done with it.  If unsuccessful (lock is in use), returns an ErrLocked error.
	TryLock(ctx context.Context, id uint64) (AdvisoryLock, error)

	// WithTimeout returns a timeout context configured with the timeout setting configured on
	// the database connection pool.  If ctx is already has an expiration, simply returns the
	// existing context.
	WithTimeout(ctx context.Context) (context.Context, context.CancelFunc)

	// SetTimeout sets the default timeout used for WithTimeout calls.
	SetTimeout(dur time.Duration)

	// BeginWithTimeout starts a custom transaction that manages the timeout context for you.
	// If Conn already represents a transaction, pgx will create a savepoint instead.  This is
	// experimental; use at your own risk!
	BeginWithTimeout(ctx context.Context) (*ContextualTx, error)
}

Conn abstracts the *pgxpool.Pool struct and the pgx.Tx interface into a common interface. This can be useful for building domain models more functionally, i.e the same function could be used for a single database query outside of a transaction, or included in a transaction with other function calls.

It's also useful for testing, as you can pass a transaction into any database-related function, don't commit, and simply Close() at the end of the test to clean up the database.

type ContextualTx added in v2.2.0

type ContextualTx struct {
	pgx.Tx
	// contains filtered or unexported fields
}

ContextualTx is a prototype for starting a transaction using the default timeout and using the context on the transaction for any database calls from then on.

This does not support the hermes.Conn interface. At this point you can only use this transaction in a single function if you stick with hermes.Conn in your function parameters.

func (*ContextualTx) Close added in v2.2.0

func (tx *ContextualTx) Close() error

Close rolls back the transaction if this is a real transaction or rolls back to the savepoint if this is a pseudo nested transaction. It also cancels the context for the transaction.

Returns ErrTxClosed if the Conn is already closed, but is otherwise safe to call multiple times. Hence, a defer conn.Close() is safe even if conn.Commit() will be called first in a non-error condition.

Any other failure of a real transaction will result in the connection being closed.

func (*ContextualTx) Commit added in v2.2.0

func (tx *ContextualTx) Commit() error

Commit the transaction. Does nothing if Conn is a *pgxpool.Pool. If the transaction is a psuedo-transaction, i.e. a savepoint, releases the savepoint. Otherwise commits the transaction.

func (*ContextualTx) CopyFrom added in v2.2.0

func (tx *ContextualTx) CopyFrom(tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)

CopyFrom uses the context on the transaction.

func (*ContextualTx) Exec added in v2.2.0

func (tx *ContextualTx) Exec(sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error)

Exec uses the context on the transaction.

func (*ContextualTx) Prepare added in v2.2.0

func (tx *ContextualTx) Prepare(name, sql string) (*pgconn.StatementDescription, error)

Prepare uses the context on the transaction.

func (*ContextualTx) Query added in v2.2.0

func (tx *ContextualTx) Query(sql string, args ...interface{}) (pgx.Rows, error)

Query uses the context on the transaction.

func (*ContextualTx) QueryRow added in v2.2.0

func (tx *ContextualTx) QueryRow(sql string, args ...interface{}) pgx.Row

QueryRow uses the context on the transaction.

func (*ContextualTx) Rollback added in v2.2.0

func (tx *ContextualTx) Rollback() error

Rollback the transaction. Does nothing if Conn is a *pgxpool.Pool.

func (*ContextualTx) SendBatch added in v2.2.0

func (tx *ContextualTx) SendBatch(b *pgx.Batch) pgx.BatchResults

SendBatch uses the context on the transaction.

type DB

type DB struct {
	*pgxpool.Pool
	// contains filtered or unexported fields
}

DB wraps the *pgxpool.Pool and provides the missing hermes function wrappers.

func Connect

func Connect(uri string) (*DB, error)

Connect creates a pgx database connection pool and returns it.

func ConnectConfig

func ConnectConfig(config *pgxpool.Config) (*DB, error)

ConnectConfig creates a pgx database connection pool based on a pool configuration and returns it.

func (*DB) Begin

func (db *DB) Begin(ctx context.Context) (Conn, error)

Begin a new transaction.

func (*DB) BeginWithTimeout added in v2.2.0

func (db *DB) BeginWithTimeout(ctx context.Context) (*ContextualTx, error)

BeginWithTimeout starts a custom transaction that manages the timeout context for you. If Conn already represents a transaction, pgx will create a savepoint instead. This is experimental; use at your own risk!

func (*DB) Close

func (db *DB) Close(context.Context) error

Close does nothing. Since this Close method is meant to be used interchangably with transactions, it doesn't actually close anything, because we don't want to close the underlying database pool at the end of every non-transactional request. Instead, see DB.Shutdown.

func (*DB) Commit

func (db *DB) Commit(context.Context) error

Commit does nothing.

func (*DB) Lock added in v2.1.0

func (db *DB) Lock(ctx context.Context, id uint64) (AdvisoryLock, error)

Lock creates a session-wide advisory lock in the database. Call Release() to release the advisory lock.

func (*DB) Rollback

func (db *DB) Rollback(context.Context) error

Rollback does nothing

func (*DB) SetTimeout added in v2.2.0

func (db *DB) SetTimeout(dur time.Duration)

SetTimeout sets the default timeout for the database connection pool.

func (*DB) Shutdown added in v2.2.0

func (db *DB) Shutdown()

Shutdown the underlying pgx Pool. You should call this when your application is closing to release all the database pool connections.

func (*DB) TryLock added in v2.1.0

func (db *DB) TryLock(ctx context.Context, id uint64) (AdvisoryLock, error)

TryLock tries to create a session-wide advisory lock in the database. If successful, returns the advisory lock. If not, returns ErrLocked. If you acquire the lock, be sure to release it!

func (*DB) WithTimeout added in v2.2.0

func (db *DB) WithTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithTimeout creates a context with a timeout, assigning ctx as the parent of the timeout context. Returns the new context and its cancel function. The timeout is based on the configured database pool connection timeout (see `WithDefaultTimeout`).

Defaults to a 1 second timeout.

Be sure to call the cancel function when you're done to clean up any resources in use!

type RowScanner

type RowScanner interface {
	Scan(dest ...interface{}) error
}

RowScanner is a shared interface between pgx.Rows and pgx.Row

type SessionAdvisoryLock added in v2.1.0

type SessionAdvisoryLock struct {
	ID uint64
	// contains filtered or unexported fields
}

SessionAdvisoryLock creates a session-wide advisory lock.

func (*SessionAdvisoryLock) Release added in v2.1.0

func (lock *SessionAdvisoryLock) Release() error

Release the session-wide advisory lock.

type Tx

type Tx struct {
	pgx.Tx
	// contains filtered or unexported fields
}

Tx wraps the pgx.Tx interface and provides the missing hermes function wrappers.

func (*Tx) Begin

func (tx *Tx) Begin(ctx context.Context) (Conn, error)

Begin starts a pseudo nested transaction.

func (*Tx) BeginWithTimeout added in v2.2.0

func (tx *Tx) BeginWithTimeout(ctx context.Context) (*ContextualTx, error)

BeginWithTimeout starts a custom transaction that manages the timeout context for you. If Conn already represents a transaction, pgx will create a savepoint instead. This is experimental; use at your own risk!

func (*Tx) Close

func (tx *Tx) Close(ctx context.Context) error

Close rolls back the transaction if this is a real transaction or rolls back to the savepoint if this is a pseudo nested transaction.

Returns ErrTxClosed if the Conn is already closed, but is otherwise safe to call multiple times. Hence, a defer conn.Close() is safe even if conn.Commit() will be called first in a non-error condition.

Any other failure of a real transaction will result in the connection being closed.

func (*Tx) Lock added in v2.1.0

func (tx *Tx) Lock(ctx context.Context, id uint64) (AdvisoryLock, error)

Lock creates an transactional advisory lock in the database. This lock will be released at the end of the transaction, on either commit or rollback. You may call AdvisoryLock.Release(), but it does nothing on this type of advisory lock.

func (*Tx) SetTimeout added in v2.2.0

func (tx *Tx) SetTimeout(dur time.Duration)

SetTimeout sets the default timeout for a transaction. If never set, the transaction uses the timeout of the connection from the database pool.

func (*Tx) TryLock added in v2.1.0

func (tx *Tx) TryLock(ctx context.Context, id uint64) (AdvisoryLock, error)

TryLock creates an transactional advisory lock in the database. You may manually call Release() on the AdvisoryLock, or the lock will release automatically on commit or rollback.

func (*Tx) WithTimeout added in v2.2.0

func (tx *Tx) WithTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithTimeout creates a context with a timeout, assigning ctx as the parent of the timeout context. Returns the new context and its cancel function. The timeout is based on the configured database pool connection timeout (see `WithDefaultTimeout`).

Defaults to a 1 second timeout.

Be sure to call the cancel function when you're done to clean up any resources in use!

type TxAdvisoryLock added in v2.1.0

type TxAdvisoryLock struct {
	ID uint64
}

TxAdvisoryLock is a placeholder so the Lock/Release functionality is the same for the hermes.Conn interface.

func (*TxAdvisoryLock) Release added in v2.1.0

func (lock *TxAdvisoryLock) Release() error

Release does nothing on a transactional advisory lock.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL