pgdb

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: ISC Imports: 12 Imported by: 0

README

pgdb

Initial Postgres Setup

Some initial setup to create a role for authentication to the Postgres server, a couple of tablespaces to specify where the bulk data and indexes are stored, as well as the database itself is required. The following are the necessary commands:

Performance tip:

  • The bulk and index tablespaces should ideally be on separate physical media
$ mkdir -p /path/to/bulk_data
$ mkdir -p /path/to/index_data
$ psql -U postgres

At the psql shell:

postgres=# CREATE ROLE brdata WITH LOGIN NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT NOREPLICATION CONNECTION LIMIT -1 PASSWORD 'xxxxxx';
postgres=# CREATE TABLESPACE brbulk OWNER brdata LOCATION '/path/to/bulk_data';
postgres=# CREATE TABLESPACE brindex OWNER brdata LOCATION '/path/to/index_data';
postgres=# CREATE DATABASE brdata OWNER brdata TABLESPACE brbulk;

It is also highly recommended to configure the Postgres server to enable SSL/TLS if not already done. This can be accomplished by setting ssl = on in the server's postgresql.conf file and generating a server certificate and key. The Postgres documentation contains instructions for using openssl if desired. These instructions instead opt to use the github.com/decred/dcrd/cmd/gencerts utility instead:

Notes:

  • The referenced data path is the same directory that houses postgresql.conf
  • The -H parameter may be specified multiple times and must be the hostnames or IPs that certificate is valid for
  • The -L option makes the certificate valid for use by localhost as well
$ cd /path/to/postgres/data
$ gencerts -H "server hostname" -L -o "postgres" server.crt server.key

Usage

func example() error {
	// Open a connection to the database.  Use `pgdb.WithHost` to specify a
	// remote host.  See the documentation for other functions that start with
	// the prefix "With" for additional configuration operations.
	ctx := context.Background()
	db, err := pgdb.Open(ctx, pgdb.WithPassphrase("xxxxxx"),
		pgdb.WithTLS("/path/to/server.crt"))
	if err != nil {
		return err
	}
	defer db.Close()

	// Store a payload at a given rendezvous point.  The rv would ordinarily
	// come from an actual derivation.
	rv := [32]byte{0x01}
	payload := []byte{0x0a, 0x0b, 0x0c}
	err = db.StorePayload(ctx, rv[:], payload, time.Now())
	if err != nil {
		return err
	}

	// Store a different payload at a different rendezvous point.
	rv2 := [32]byte{0x02}
	payload2 := []byte{0x0d, 0x0e, 0x0f}
	err = db.StorePayload(ctx, rv2[:], payload2, time.Now())
	if err != nil {
		return err
	}

	// Fetch the payload at a given rendezvous point.
	result, err := db.FetchPayload(ctx, rv[:])
	if err != nil {
		return err
	}
	if result == nil {
		fmt.Printf("no payload for rv %x\n", rv)
	} else {
		fmt.Printf("got payload len %v\n", len(result.Payload))
	}

	// Remove the payload for a given rendezvous point.
	err = db.RemovePayload(ctx, rv[:])
	if err != nil {
		return err
	}

	// All entries inserted on a given day can be expired in bulk.  The
	// caller is expected to call this on a schedule to cleanup payloads
	// that have never been retrieved and removed.
	dateToExpire := time.Now().UTC()
	numExpired, err := db.Expire(ctx, dateToExpire)
	if err != nil {
		return err
	}
	fmt.Printf("Expired %d entries for %v\n", numExpired,
		dateToExpire.Format("2006-01-02"))

	return nil
}

func main() {
	if err := example(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}

Simulation Tool

A simulation tool for helping test performance and correctness is available in cmd/pgdbsim. See its README.md for further details.

License

pgdb is licensed under the copyfree ISC License.

Documentation

Index

Constants

View Source
const (
	// DefaultHost is the default host that serves the backing database.
	DefaultHost = "127.0.0.1"

	// DefaultPort is the default port for the host that serves the backing
	// database.
	DefaultPort = "5432"

	// DefaultDBName is the default name for the backing database.
	DefaultDBName = "brdata"

	// DefaultRoleName is the default name for the role used to access the
	// database.
	DefaultRoleName = "brdata"

	// DefaultIndexTablespaceName is the default name for the tablespace that
	// is used to store the indexes.
	DefaultIndexTablespaceName = "brindex"

	// DefaultBulkDataTablespaceName is the default name for the tablespace that
	// is used to store the bulk payload data.
	DefaultBulkDataTablespaceName = "brbulk"
)
View Source
const (
	// ErrMissingDatabase indicates the required backend database does not
	// exist.
	ErrMissingDatabase = ErrorKind("ErrMissingDatabase")

	// ErrConnFailed indicates an error when attempting to connect to the
	// database that is not covered by a more specific connection failure error
	// such as a missing database.
	ErrConnFailed = ErrorKind("ErrConnFailed")

	// ErrBeginTx indicates an error when attempting to start a database
	// transaction.
	ErrBeginTx = ErrorKind("ErrBeginTx")

	// ErrCommitTx indicates an error when attempting to commit a database
	// transaction.
	ErrCommitTx = ErrorKind("ErrCommitTx")

	// ErrQueryFailed indicates an unexpected error happened when executing a
	// SQL query on the database.
	ErrQueryFailed = ErrorKind("ErrQueryFailed")

	// ErrMissingRole indicates the required role does not exist.
	ErrMissingRole = ErrorKind("ErrMissingRole")

	// ErrMissingTablespace indicates a required tablespace does not exist.
	ErrMissingTablespace = ErrorKind("ErrMissingTablespace")

	// ErrBadSetting indicates the database does not have a configuration option
	// set to a required value.
	ErrBadSetting = ErrorKind("ErrBadSetting")

	// ErrMissingTable indicates a required table does not exist.
	ErrMissingTable = ErrorKind("ErrMissingTable")

	// ErrBadDataTablespace indicates a table does not have the expected data
	// tablespace.
	ErrBadDataTablespace = ErrorKind("ErrBadDataTablespace")

	// ErrBadIndexTablespace indicates a table does not have the expected index
	// tablespace.
	ErrBadIndexTablespace = ErrorKind("ErrBadIndexTablespace")

	// ErrMissingProc indicates a required stored procedure does not exist.
	ErrMissingProc = ErrorKind("ErrMissingProc")

	// ErrMissingTrigger indicates a required table constraint does not
	// exist.
	ErrMissingTrigger = ErrorKind("ErrMissingTrigger")

	// ErrOldDatabase indicates a database has been upgraded to a newer version
	// that is no longer compatible with the current version of the software.
	ErrOldDatabase = ErrorKind("ErrOldDatabase")

	// ErrUpgradeV2 indicates an error that happened during the upgrade to
	// the version 2 database.
	ErrUpgradeV2 = ErrorKind("ErrUpgradeV2")

	// ErrUpgradeV3 indicates an error that happened during the upgrade to
	// the version 3 database.
	ErrUpgradeV3 = ErrorKind("ErrUpgradeV3")
)

These constants are used to identify a specific ErrorKind.

Variables

This section is empty.

Functions

This section is empty.

Types

type ContextError

type ContextError struct {
	Err         error
	Description string
	RawErr      error
}

ContextError wraps an error with additional context. It has full support for errors.Is and errors.As, so the caller can ascertain the specific wrapped error.

RawErr contains the original error in the case where an error has been converted.

func (ContextError) As

func (e ContextError) As(target interface{}) bool

As implements the interface to work with the standard library's errors.As.

It calls errors.As on both the Err and RawErr fields, in that order, and returns true when one of them matches the target. Otherwise, it returns false.

This means it keeps all of the same semantics typically provided by As in terms of unwrapping error chains and setting the target to the matched error.

func (ContextError) Error

func (e ContextError) Error() string

Error satisfies the error interface and prints human-readable errors.

func (ContextError) Is

func (e ContextError) Is(err error) bool

Is implements the interface to work with the standard library's errors.Is.

It calls errors.Is on both the Err and RawErr fields, in that order, and returns true when one of them matches the target. Otherwise, it returns false.

This means it keeps all of the same semantics typically provided by Is in terms of unwrapping error chains.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB provides access to the backend database for storing, retrieving, and removing payloads associated with rendezvous points along with additional support for bulk expiration by date.

func Open

func Open(ctx context.Context, opts ...Option) (*DB, error)

Open opens a connection to a database, potentially creates any necessary data tables and partitions as needed, and returns a backend instance that is safe for concurrent use.

Callers are responsible for calling Close on the returned instance when finished using it to ensure a clean shutdown.

Use the functions that start with the prefix "With" to provide configuration operations.

For example:

db, err := pgdb.Open(ctx, pgdb.WithHost(host), pgdb.WithPassphrase(pass)
	pgdb.WithTLS("./server.crt"))
if err != nil {
	/* handle err */
}
defer db.Close()

func (*DB) Close

func (db *DB) Close() error

Close closes the backend and prevents new queries from starting. It then waits for all queries that have started processing on the server to finish.

func (*DB) CreatePartition

func (db *DB) CreatePartition(ctx context.Context, date time.Time) error

CreatePartition creates a concrete data parition for the day associated with the provided date if it does not already exist. The provided date will be converted to UTC if needed.

Note that it is typically not necessary to call this manually since the partitions are managed internally by the code that handles storage and expiration.

This is primarily provided in order to expose more flexibility for testing purposes.

func (*DB) Expire

func (db *DB) Expire(ctx context.Context, date time.Time) (uint64, error)

Expire removes all entries that were inserted on the same day as the day associated with the provided date. The provided date will be converted to UTC if needed. It returns the number of entries that were removed.

func (*DB) FetchPayload

func (db *DB) FetchPayload(ctx context.Context, rendezvous ratchet.RVPoint) (*serverdb.FetchPayloadResult, error)

FetchPayload attempts to load the payload at the provided rendezvous point along with the time it was inserted.

When there is no payload available at the provided rendezvous point, rather than returning an error which would require an allocation, nil is returned.

In other words, callers must check if the result is nil to determine when no payload is available at the provided rendezvous point.

func (*DB) IsPushPaymentRedeemed added in v0.1.4

func (db *DB) IsPushPaymentRedeemed(ctx context.Context, payID []byte) (bool, error)

IsPushPaymentRedeemed returns whether the payment done with the specified ID was recorded in the database (i.e. redeemed).

func (*DB) IsSubscriptionPaid

func (db *DB) IsSubscriptionPaid(ctx context.Context, rendezvous ratchet.RVPoint) (bool, error)

IsSubscriptionPaid returns true if the subscription for the given rendezvous point was marked as paid in the DB.

func (*DB) RemovePayload

func (db *DB) RemovePayload(ctx context.Context, rendezvous ratchet.RVPoint) error

RemovePayload removes the payload at the provided rendezvous point if it exists.

func (*DB) StorePayload

func (db *DB) StorePayload(ctx context.Context, rendezvous ratchet.RVPoint,
	payload []byte, insertTime time.Time) error

StorePayload stores the provided payload at the given rendezvous point along with the given insert time, which typically should typically just be the current time. The provided insert time will be converted to UTC if needed. It is an error to attempt to store data at an existing rendezvous point that has not expired.

The data will be stored in the bulk data tablespace while the associated index data later used to efficiently fetch the data will be stored in the index tablespace.

func (*DB) StorePushPaymentRedeemed added in v0.1.4

func (db *DB) StorePushPaymentRedeemed(ctx context.Context, payID []byte, insertTime time.Time) error

StorePushPaymentRedeemed stores the passed payment ID as redeemed at the passed date.

func (*DB) StoreSubscriptionPaid

func (db *DB) StoreSubscriptionPaid(ctx context.Context, rendezvous ratchet.RVPoint, insertTime time.Time) error

StoreSubscriptionPaid marks the specified rendezvous as paid in the specified date (which tipically will be the current time). It is not an error to pay multiple times for the same rendezvous.

func (*DB) TableSpacesSizes

func (db *DB) TableSpacesSizes(ctx context.Context) (uint64, uint64, error)

TableSpacesSizes returns the disk size (in bytes) occupied by the bulk and index tablespaces (respectively) as reported by the underlying db.

type ErrorKind

type ErrorKind string

ErrorKind identifies a kind of error. It has full support for errors.Is and errors.As, so the caller can directly check against an error kind when determining the reason for an error.

func (ErrorKind) Error

func (e ErrorKind) Error() string

Error satisfies the error interface and prints human-readable errors.

type Option

type Option func(*options)

Option represents a modification to the configuration parameters used by Open.

func WithBulkDataTablespace

func WithBulkDataTablespace(tablespace string) Option

WithIndexTablespace overrides the default name of the tablespace that is used to store the bulk payload data with a custom value.

func WithDBName

func WithDBName(dbName string) Option

WithDBName overrides the default name for the backing database with a custom value.

func WithHost

func WithHost(host string) Option

WithHost overrides the default host for the host that serves the backing database with a custom value.

The host may be an IP address for TCP connection, or an absolute path to a UNIX domain socket. In the case UNIX sockets are used, the port should also be set to an empty string via WithPort.

func WithIndexTablespace

func WithIndexTablespace(tablespace string) Option

WithIndexTablespace overrides the default name of the tablespace that is used to store indexes with a custom value.

func WithPassphrase

func WithPassphrase(passphrase string) Option

WithPassphrase overrides the default passphrase that is used to access the database with a custom value.

func WithPort

func WithPort(port string) Option

WithPort overrides the default port for the host that serves the backing database with a custom value.

func WithRole

func WithRole(roleName string) Option

WithRole overrides the default role name that is used to access the database with a custom value.

func WithTLS

func WithTLS(serverCA string) Option

WithTLS connects to the backing database with TLS and verifies that the certificate presented by the server was signed by the provided CA, which is typically the server certicate itself for self-signed certificates, and that the server host name matches the one in the certificate.

The provided server CA can be an empty string to use the system CAs instead for certs that are signed by one of them.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL