pg

package module
v1.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2024 License: MIT Imports: 20 Imported by: 1

README ΒΆ

kataras/pg

build status report card godocs view examples

A high-performance Go library that provides a simple and elegant way to interact with PostgreSQL databases (works perfectly with the latest 16.x versions). It allows you to define your entities as structs with pg tags, register them in a schema, and perform CRUD operations using a repository pattern. It also handles database connection, schema creation and verification, and query generation and execution. You can use PG to write concise and readable code that works with PostgreSQL databases.


πŸ’» Installation

The only requirement is the Go Programming Language.

Create a new project
$ mkdir myapp
$ cd myapp
$ go mod init myapp
$ go get github.com/kataras/pg@latest
Install on existing project
$ cd myapp
$ go get github.com/kataras/pg@latest

Run

$ go mod tidy -compat=1.21 # -compat="1.21" for windows.
$ go run .

πŸ“– Example

PG contains extensive and thorough documentation making it easy to get started with the library.

package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/kataras/pg"
)

// Base is a struct that contains common fields for all entities.
type Base struct {
  ID        string    `pg:"type=uuid,primary"` // UUID as primary key
  CreatedAt time.Time `pg:"type=timestamp,default=clock_timestamp()"` // Timestamp of creation
  UpdatedAt time.Time `pg:"type=timestamp,default=clock_timestamp()"` // Last update
}

// Customer is a struct that represents a customer entity.
type Customer struct {
  Base // Embed Base struct

  Firstname string `pg:"type=varchar(255)"` // First name of the customer
}

func main() {
  // Default value for struct field tag `pg`.
  // It can be modified to a custom one as well, e.g.
  // pg.SetDefaultTag("db")

  // Create Schema instance.
  schema := pg.NewSchema()
  // First argument is the table name, second is the struct entity.
  schema.MustRegister("customers", Customer{})

  // Create Database instance.
  connString := "postgres://postgres:admin!123@localhost:5432/test_db?sslmode=disable"
  db, err := pg.Open(context.Background(), schema, connString)
  if err != nil {
    log.Fatal(err)
  }
  defer db.Close()

  // If needed, create and verify the database schema
  // based on the pg tags of the structs.
  //
  // Alternatively, you can generate
  // Go schema files from an existing database:
  // see the ./gen sub-package for more details.
  if err = db.CreateSchema(context.Background()); err != nil {
    log.Fatal(err)
  }

  if err = db.CheckSchema(context.Background()); err != nil {
    log.Fatal(err)
  }

  // Create a Repository of Customer type.
  customers := pg.NewRepository[Customer](db)

  var newCustomer = Customer{
    Firstname: John,
  }

  // Insert a new Customer.
  err = customers.InsertSingle(context.Background(), newCustomer, &newCustomer.ID)
  if err != nil {
    log.Fatal(err)
  }

  // Get by id.
  /*
  query := `SELECT * FROM customers WHERE id = $1 LIMIT 1;`
  existing, err := customers.SelectSingle(context.Background(), query, newCustomer.ID)
  OR:
  */
  existing, err := customers.SelectByID(context.Background(), newCustomer.ID)
  if err != nil {
    log.Fatal(err)
  }

  log.Printf("Existing Customer (SelectSingle):\n%#+v\n", existing)

  // List all.
  query = `SELECT * FROM customers ORDER BY created_at DESC;`
  allCustomers, err := customers.Select(context.Background(), query)
  if err != nil {
    log.Fatal(err)
  }

  log.Printf("All Customers (%d):", len(allCustomers))
  for _, customer := range allCustomers {
    fmt.Printf("- (%s) %s\n", customer.ID, customer.Firstname)
  }
}

If you already have a database, you can use the gen sub-package to create structs that match its schema.

βœ’οΈ ASCII art

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  NewSchema() *Schema  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                   β”‚
β”‚                                                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  Schema                                             β”‚     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€     β”‚
β”‚  - MustRegister(tableName string, emptyStruct any)  β”‚     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
                                                            β”‚
                                                            β”‚
                                                            β”‚
                                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                β”‚                                 β”‚                         β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
β”‚  Open(ctx context.Context, schema *Schema, connString string) (*DB, error)  β”‚             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
β”‚                                                                                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  DB                                                                                 β”‚     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€     β”‚
β”‚                                                                                     β”‚     β”‚
β”‚  - CreateSchema(ctx context.Context) error                                          β”‚     β”‚
β”‚  - CheckSchema(ctx context.Context) error                                           β”‚     β”‚
β”‚                                                                                     β”‚     β”‚
β”‚  - InTransaction(ctx context.Context, fn (*DB) error) error                         β”‚     β”‚
β”‚  - IsTransaction() bool                                                             β”‚     β”‚
β”‚                                                                                     β”‚     β”‚
β”‚  - Query(ctx context.Context, query string, args ...any) (Rows, error)              β”‚     β”‚
β”‚  - QueryRow(ctx context.Context, query string, args ...any) Row                     β”‚     β”‚
β”‚                                                                                     β”‚     β”‚
β”‚  - Exec(ctx context.Context, query string, args... any) (pgconn.CommandTag, error)  β”‚     β”‚
β”‚                                                                                     β”‚     β”‚
β”‚  - Listen(ctx context.Context, channel string) (*Listener, error)                   β”‚     β”‚
β”‚  - Notify(ctx context.Context, channel string, payload any) error                   β”‚     β”‚
β”‚  - Unlisten(ctx context.Context, channel string) error                              β”‚     β”‚
β”‚                                                                                     β”‚     β”‚
β”‚  - Close() error                                                                    β”‚     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
                                                                                            β”‚
                                                                                            β”‚
                                                                                            β”‚
                                                                                            β”‚
                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  NewRepository[T](db *DB) *Repository[T]  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Repository[T]                                                                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                                            β”‚
β”‚  - InTransaction(ctx context.Context, fn func(*Repository[T]) error) error                 β”‚
β”‚  - IsTransaction() bool                                                                    β”‚
β”‚                                                                                            β”‚
β”‚  - Select(ctx context.Context, query string, args ...any) ([]T, error)                     β”‚
β”‚  - SelectSingle(ctx context.Context, query string, args ...any) (T, error)                 β”‚
β”‚  - SelectByID(ctx context.Context, id any) (T, error)                                      β”‚
β”‚  - SelectByUsernameAndPassword(ctx context.Context, username, plainPwd string) (T, error)  β”‚
β”‚                                                                                            β”‚
β”‚  - Insert(ctx context.Context, values ...T) error                                          β”‚
β”‚  - InsertSingle(ctx context.Context, value T, destIdPtr any) error                         β”‚
β”‚                                                                                            β”‚
β”‚  - Update(ctx context.Context, values ...T) (int64, error)                                 β”‚
β”‚  - UpdateOnlyColumns(ctx context.Context, columns []string, values ...T) (int64, error)    β”‚
β”‚                                                                                            β”‚
β”‚  - Upsert(ctx context.Context, uniqueIndex string, values ...T) error                      β”‚
β”‚  - UpsertSingle(ctx context.Context, uniqueIndex string, value T, destIdPtr any) error     β”‚
β”‚                                                                                            β”‚
β”‚  - Delete(ctx context.Context, values ...T) (int64, error)                                 β”‚
β”‚                                                                                            β”‚
β”‚  - ListenTable(ctx context.Context, cb func(notification, error) error) (Closer, error)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ›„ Data types

PostgreSQL data type Struct field tag type options
BigInt bigint, int8
BigIntArray bigint[], int8[]
BigSerial bigserial, serial8
Bit bit
BitVarying bit varying, varbit
Boolean boolean, bool
Box box
Bytea bytea
Character character, char
CharacterArray character[], char[]
CharacterVarying character varying, varchar
CharacterVaryingArray character varying[], varchar[]
Cidr cidr
Circle circle
Date date
DoublePrecision double precision, float8
Inet inet
Integer integer, int, int4
IntegerArray integer[], int[], int4[]
IntegerDoubleArray integer[][], int[][], int4[][]
Interval interval
JSON json
JSONB jsonb
Line line
Lseg lseg
MACAddr macaddr
MACAddr8 macaddr8
Money money
Numeric numeric, decimal
Path path
PgLSN pg_lsn
Point point
Polygon polygon
Real real, float4
SmallInt smallint, int2
SmallSerial smallserial, serial2
Serial serial4
Text text
TextArray text[]
TextDoubleArray text[][]
Time time, time without time zone
TimeTZ timetz, time with time zone
Timestamp timestamp, timestamp without time zone
TimestampTZ timestamptz, timestamp with time zone
TsQuery tsquery
TsVector tsvector
TxIDSnapshot txid_snapshot
UUID uuid
UUIDArray []uuid
XML xml
Int4Range int4range
Int4MultiRange int4multirange
Int8Range int8range
Int8MultiRange int8multirange
NumRange numrange
NumMultiRange nummultirange
TsRange tsrange
TsMultirange tsmultirange
TsTzRange tstzrange
TsTzMultiRange tstzmultirange
DateRange daterange
DateMultiRange datemultirange
CIText citext
HStore hstore
Data type examples

UUID

type Entity struct {
  ID string `pg:"type=uuid,primary"`
}

Timestamp

type Entity struct {
  CreatedAt time.Time `pg:"type=timestamp,default=clock_timestamp()"`
}

Varchar

type Entity struct {
  PhotoURL string `pg:"type=varchar(255)"`
}

Varchar Array

type Entity struct {
  SearchTerms []string `pg:"type=varchar[]"`
}

Integer

type Entity struct {
  ReadTimeMinutes int `pg:"type=smallint,default=1,check=read_time_minutes > 0"`
}

Custom JSON Object

type Entity struct {
  Feature Feature `pg:"type=jsonb"`
}

Array of custom JSON objects

type Entity struct {
  Tags []Tag `pg:"type=jsonb"`
}

πŸ“¦ 3rd-party Packages

List of 3rd-party packages based on PG.

πŸ›‘ Security Vulnerabilities

If you discover a security vulnerability within pg, please send an e-mail to kataras2006@hotmail.com. All security vulnerabilities will be promptly addressed.

πŸ“ License

This project is licensed under the MIT license.

Documentation ΒΆ

Overview ΒΆ

Example ΒΆ

Example is a function that demonstrates how to use the Registry and Repository types to perform database operations within a transaction. It uses the Customer, Blog, and BlogPost structs as the entities to be stored and manipulated in the database. It also prints "OK" if everything succeeds, or an error message otherwise.

db, err := openTestConnection(true)
if err != nil {
	fmt.Println(err.Error())
	return
}
defer db.Close()

// Registry code.
registry := NewRegistry(db) // Create a new Registry instance with the DB instance.

// Execute a function within a database transaction, passing it a Registry instance that uses the transactional DB instance.
err = registry.InTransaction(context.Background(), func(registry *Registry) error {
	customers := registry.Customers() // Get the CustomerRepository instance from the Registry.

	customerToInsert := Customer{ // Create a Customer struct to be inserted into the database.
		CognitoUserID: "373f90eb-00ac-410f-9fe0-1a7058d090ba",
		Email:         "kataras2006@hotmail.com",
		Name:          "kataras",
		Username:      "kataras",
	}

	// Insert the customer into the database and get its ID.
	err = customers.InsertSingle(context.Background(), customerToInsert, &customerToInsert.ID)
	if err != nil {
		return fmt.Errorf("insert single: %w", err)
	}

	// Modify cognito user id.
	newCognitoUserID := "1e6a93d0-6276-4a43-b90a-4badad8407bb"
	// Update specific columns by id:
	updated, err := customers.UpdateOnlyColumns(
		context.Background(),
		[]string{"cognito_user_id"},
		Customer{
			BaseEntity: BaseEntity{
				ID: customerToInsert.ID,
			},
			CognitoUserID: newCognitoUserID,
		})
	// Full update of the object by id (except id and created_at, updated_at columns):
	// updated, err := customers.Update(context.Background(),
	// 	Customer{
	// 		BaseEntity: BaseEntity{
	// 			ID: customerToInsert.ID,
	// 		},
	// 		CognitoUserID: newCognitoUserID,
	// 		Email:         customerToInsert.Email,
	// 		Name:          customerToInsert.Name,
	// 	})
	if err != nil {
		return fmt.Errorf("update: %w", err)
	} else if updated == 0 {
		return fmt.Errorf("update: no record was updated")
	}

	// Update a default column to its zero value.
	updated, err = customers.UpdateOnlyColumns(
		context.Background(),
		[]string{"username"},
		Customer{
			BaseEntity: BaseEntity{
				ID: customerToInsert.ID,
			},
			Username: "",
		})
	if err != nil {
		return fmt.Errorf("update username: %w", err)
	} else if updated == 0 {
		return fmt.Errorf("update username: no record was updated")
	}
	// Select the customer from the database by its ID.
	customer, err := customers.SelectSingle(context.Background(), `SELECT * FROM customers WHERE id = $1;`, customerToInsert.ID)
	if err != nil {
		return fmt.Errorf("select single: %w", err)
	}

	if customer.CognitoUserID != newCognitoUserID {
		return fmt.Errorf("expected cognito user id to be updated but it wasn't ('%s' vs '%s')",
			newCognitoUserID, customer.CognitoUserID)
	}
	if customer.Email == "" {
		return fmt.Errorf("expected email field not be removed after update")
	}
	if customer.Name == "" {
		return fmt.Errorf("expected name field not be removed after update")
	}

	// Test Upsert by modifying the email.
	customerToUpsert := Customer{
		CognitoUserID: customer.CognitoUserID,
		Email:         "kataras2023@hotmail.com",
		Name:          "kataras2023",
	}

	// Manually passing a column as the conflict column:
	// err = customers.UpsertSingle(context.Background(), "email", customerToUpsert, &customerToUpsert.ID)
	//
	// Automatically find the conflict column or expression by setting it to empty value:
	// err = customers.UpsertSingle(context.Background(), "", customerToUpsert, &customerToUpsert.ID)
	// Manually passing a unique index name, pg will resolve the conflict columns:
	err = customers.UpsertSingle(context.Background(), "customer_unique_idx", customerToUpsert, &customerToUpsert.ID)
	if err != nil {
		return fmt.Errorf("upsert single: %w", err)
	}

	if customerToUpsert.ID == "" {
		return fmt.Errorf("expected customer id to be filled after upsert")
	}

	// Delete the customer from the database by its struct value.
	deleted, err := customers.Delete(context.Background(), customer)
	if err != nil {
		return fmt.Errorf("delete: %w", err)
	} else if deleted == 0 {
		return fmt.Errorf("delete: was not removed")
	}

	exists, err := customers.Exists(context.Background(), customer.CognitoUserID)
	if err != nil {
		return fmt.Errorf("exists: %w", err)
	}
	if exists {
		return fmt.Errorf("exists: customer should not exist")
	}

	// Do something else with customers.
	return nil
})

if err != nil {
	fmt.Println(fmt.Errorf("in transaction: %w", err))
	return
}

// Insert a blog.
blogs := registry.Blogs()
newBlog := Blog{
	Name: "test_blog_1",
}
err = blogs.InsertSingle(context.Background(), newBlog, &newBlog.ID)
if err != nil {
	fmt.Println(fmt.Errorf("insert single: blog: %w", err))
	return
}

// Insert a blog post to the blog.
blogPosts := registry.BlogPosts()
newBlogPost := BlogPost{
	BlogID:          newBlog.ID,
	Title:           "test_blog_post_1",
	PhotoURL:        "https://test.com/test_blog_post_1.png",
	SourceURL:       "https://test.com/test_blog_post_1.html",
	ReadTimeMinutes: 5,
	Category:        1,
	SearchTerms: []string{
		"test_search_blog_post_1",
		"test_search_blog_post_2",
	},
	ReadDurations: []time.Duration{
		5 * time.Minute,
		10 * time.Minute,
	},
	Feature: Feature{
		IsFeatured: true,
	},
	OtherFeatures: Features{
		Feature{
			IsFeatured: true,
		},
		Feature{
			IsFeatured: false,
		},
	},
	Tags: []Tag{
		{"test_tag_1", "test_tag_value_1"},
		{"test_tag_2", 42},
	},
}
err = blogPosts.InsertSingle(context.Background(), newBlogPost, &newBlogPost.ID)
if err != nil {
	fmt.Println(fmt.Errorf("insert single: blog post: %w", err))
	return
}

query := `SELECT * FROM blog_posts WHERE id = $1 LIMIT 1;`
existingBlogPost, err := blogPosts.SelectSingle(context.Background(), query, newBlogPost.ID)
if err != nil {
	fmt.Println(fmt.Errorf("select single: blog post: %s: %w", newBlogPost.ID, err))
	return
}

// Test select single jsonb column of a custom type of array of custom types.
//
var otherFeatures Features
err = blogPosts.QueryRow(
	context.Background(),
	`SELECT other_features FROM blog_posts WHERE id = $1 LIMIT 1;`,
	newBlogPost.ID,
).Scan(&otherFeatures)
// OR
// otherFeatures, err := QuerySingle[Features](
// 	context.Background(),
// 	db,
// 	`SELECT other_features FROM blog_posts WHERE id = $1 LIMIT 1;`,
// 	newBlogPost.ID,
// )
if err != nil {
	fmt.Println(fmt.Errorf("select single jsonb column of custom array type of custom type: blog post: %s: %w", newBlogPost.ID, err))
	return
}

if expected, got := len(otherFeatures), len(existingBlogPost.OtherFeatures); expected != got {
	fmt.Printf("expected %d other_features but got %d", expected, got)
	return
}

if !reflect.DeepEqual(otherFeatures, existingBlogPost.OtherFeatures) {
	fmt.Printf("expected other_features to be equal but got %#+v and %#+v", otherFeatures, existingBlogPost.OtherFeatures)
	return
}
Output:

Example (Notify_JSON) ΒΆ
schema := NewSchema()
db, err := Open(context.Background(), schema, getTestConnString())
if err != nil {
	fmt.Println(err)
	return
}
// defer db.Close()

const channel = "chat_json"

conn, err := db.Listen(context.Background(), channel)
if err != nil {
	fmt.Println(fmt.Errorf("listen: %w", err))
}

go func() {
	// To just terminate this listener's connection and unlisten from the channel:
	defer conn.Close(context.Background())

	for {
		notification, err := conn.Accept(context.Background())
		if err != nil {
			fmt.Println(fmt.Errorf("accept: %w\n", err))
			return
		}

		payload, err := UnmarshalNotification[Message](notification)
		if err != nil {
			fmt.Println(fmt.Errorf("N: %w", err))
			return
		}

		fmt.Printf("channel: %s, payload.sender: %s, payload.body: %s\n",
			notification.Channel, payload.Sender, payload.Body)
	}
}()

firstMessage := Message{
	Sender: "kataras",
	Body:   "hello",
}
if err = db.Notify(context.Background(), channel, firstMessage); err != nil {
	fmt.Println(fmt.Errorf("notify: first message: %w", err))
	return
}

secondMessage := Message{
	Sender: "kataras",
	Body:   "world",
}

if err = db.Notify(context.Background(), channel, secondMessage); err != nil {
	fmt.Println(fmt.Errorf("notify: second message: %w", err))
	return
}

time.Sleep(5 * time.Second) // give it sometime to receive the notifications, this is too much though.
Output:

channel: chat_json, payload.sender: kataras, payload.body: hello
channel: chat_json, payload.sender: kataras, payload.body: world

Index ΒΆ

Examples ΒΆ

Constants ΒΆ

View Source
const DoNothing = "DO NOTHING"

DoNothing is a constant that can be used as the forceOnConflictExpr argument of Upsert/UpsertSingle to do nothing on conflict.

Variables ΒΆ

View Source
var (

	// NoColumnNameMapper is a column name conversion function.
	// It converts the column name to the same as its struct field name.
	NoColumnNameMapper = func(field reflect.StructField) string { return field.Name }
	// JSONColumnNameMapper is a column name conversion function.
	// It converts the column name to the same as its json tag name
	// and fallbacks to field name (if json tag is missing or "-").
	JSONColumnNameMapper = func(field reflect.StructField) string {
		jsonTag := field.Tag.Get("json")
		if jsonTag == "-" {
			return field.Name
		}

		return strings.SplitN(jsonTag, ",", 2)[0]
	}
)
View Source
var ErrEmptyPayload = fmt.Errorf("empty payload")

ErrEmptyPayload is returned when the notification payload is empty.

View Source
var ErrIntentionalRollback = errors.New("skip error: intentional rollback")

ErrIntentionalRollback is an error that can be returned by a transaction function to rollback the transaction.

View Source
var ErrIsReadOnly = errors.New("repository is read-only")

ErrIsReadOnly is returned by Insert and InsertSingle if the repository is read-only.

View Source
var (
	// ErrNoRows is fired from a query when no results are came back.
	// Usually it's ignored and an empty json array is sent to the client instead.
	ErrNoRows = pgx.ErrNoRows
)
View Source
var Presenter = func(td *desc.Table) bool {
	td.Type = desc.TableTypePresenter
	return true
}

Presenter is a TableFilterFunc that sets the table type to "presenter" and returns true. A presenter is a table that is used to present data from one or more tables with custom select queries. It's not a base table neither a view. Example:

schema.MustRegister("customer_presenter", CustomerPresenter{}, pg.Presenter)
View Source
var View = func(td *desc.Table) bool {
	td.Type = desc.TableTypeView
	return true
}

View is a TableFilterFunc that sets the table type to "view" and returns true.

Example:

schema.MustRegister("customer_master", FullCustomer{}, pg.View)
View Source
var WithLogger = func(logger tracelog.Logger) ConnectionOption {
	return func(poolConfig *pgxpool.Config) error {
		tracer := &tracelog.TraceLog{
			Logger:   logger,
			LogLevel: tracelog.LogLevelTrace,
		}

		poolConfig.ConnConfig.Tracer = tracer
		return nil
	}
}

WithLogger is a ConnectionOption. It sets the logger for the connection pool.

Functions ΒΆ

func IsErrColumnNotExists ΒΆ

func IsErrColumnNotExists(err error, col string) bool

IsErrColumnNotExists reports whether the error is caused because the "col" defined in a select query was not exists in a row. There is no a typed error available in the driver itself.

func IsErrDuplicate ΒΆ

func IsErrDuplicate(err error) (string, bool)

IsErrDuplicate reports whether the return error from `Insert` method was caused because of a violation of a unique constraint (it's not typed error at the underline driver). It returns the constraint key if it's true.

func IsErrForeignKey ΒΆ

func IsErrForeignKey(err error) (string, bool)

IsErrForeignKey reports whether an insert or update command failed due to an invalid foreign key: a foreign key is missing or its source was not found. E.g. ERROR: insert or update on table "food_user_friendly_units" violates foreign key constraint "fk_food" (SQLSTATE 23503)

func IsErrInputSyntax ΒΆ

func IsErrInputSyntax(err error) (string, bool)

IsErrInputSyntax reports whether the return error from `Insert` method was caused because of invalid input syntax for a specific postgres column type.

func QuerySingle ΒΆ

func QuerySingle[T any](ctx context.Context, db *DB, query string, args ...any) (entry T, err error)

QuerySingle executes the given query and returns a single T entry.

Example:

names, err := QuerySingle[MyType](ctx, db, "SELECT a_json_field FROM users;")

func QuerySlice ΒΆ

func QuerySlice[T any](ctx context.Context, db *DB, query string, args ...any) ([]T, error)

QuerySlice executes the given query and returns a list of T entries. Note that the rows scanner will directly scan an element of T, meaning that the type of T should be a database scannabled type (e.g. string, int, time.Time, etc.).

The ErrNoRows is discarded, an empty list and a nil error will be returned instead. If a string column is empty then it's skipped from the returning list. Example:

names, err := QuerySlice[string](ctx, db, "SELECT name FROM users;")

func QueryTwoSlices ΒΆ added in v1.0.6

func QueryTwoSlices[T, V any](ctx context.Context, db *DB, query string, args ...any) ([]T, []V, error)

QueryTwoSlices executes the given query and returns two lists of T and V entries. Same behavior as QuerySlice but with two lists.

func SetDefaultColumnNameMapper ΒΆ added in v1.0.3

func SetDefaultColumnNameMapper(fn func(field reflect.StructField) string)

SetDefaultColumnNameMapper sets the default column name conversion function. This is used when the "name" pg tag option is missing for one or more struct fields. Set to nil function to use the default column name conversion function.

func SetDefaultSearchPath ΒΆ

func SetDefaultSearchPath(searchPath string)

SetDefaultSearchPath sets the default search path for the database.

func SetDefaultTag ΒΆ

func SetDefaultTag(tag string)

SetDefaultTag sets the default tag name for the struct fields.

func UnmarshalNotification ΒΆ

func UnmarshalNotification[T any](n *Notification) (T, error)

UnmarshalNotification returns the notification payload as a custom type of T.

Types ΒΆ

type Closer ΒΆ added in v1.0.7

type Closer interface {
	Close(ctx context.Context) error
}

Closer is the interface which is implemented by the Listener. It's used to close the underline connection.

type Column ΒΆ

type Column = desc.Column

Column is a type alias for desc.Column.

type ColumnFilter ΒΆ

type ColumnFilter = desc.ColumnFilter

ColumnFilter is a type alias for desc.ColumnFilter.

type ConnectionOption ΒΆ

type ConnectionOption func(*pgxpool.Config) error

ConnectionOption is a function that takes a *pgxpool.Config and returns an error. It is used to set the connection options for the connection pool. It is used by the Open function.

See `WithLogger` package-level function too.

type DB ΒΆ

type DB struct {
	Pool              *pgxpool.Pool
	ConnectionOptions *pgx.ConnConfig
	// contains filtered or unexported fields
}

DB represents a database connection that can execute queries and transactions. It wraps a pgxpool.Pool and a pgx.ConnConfig to manage the connection options and the search path. It also holds a reference to a Schema that defines the database schema and migrations.

func Open ΒΆ

func Open(ctx context.Context, schema *Schema, connString string, opts ...ConnectionOption) (*DB, error)

Open creates a new DB instance by parsing the connection string and establishing a connection pool. It also sets the search path to the one specified in the connection string or to the default one if not specified. It takes a context and a schema as arguments and returns the DB instance or an error if any.

Example Code:

const (

	host     = "localhost" // The host name or IP address of the database server.
	port     = 5432        // The port number of the database server.
	user     = "postgres"  // The user name to connect to the database with.
	password = "admin!123" // The password to connect to the database with.
	schema   = "public"    // The schema name to use in the database.
	dbname   = "test_db"   // The database name to connect to.
	sslMode  = "disable"   // The SSL mode to use for the connection. Can be disable, require, verify-ca or verify-full.

)

connString := fmt.Sprintf("host=%s port=%d user=%s password=%s search_path=%s dbname=%s sslmode=%s pool_max_conns=%d pool_min_conns=%d pool_max_conn_lifetime=%s pool_max_conn_idle_time=%s pool_health_check_period=%s", ...)
OR
connString := "postgres://postgres:admin!123@localhost:5432/test_db?sslmode=disable&search_path=public"

db, err := Open(context.Background(), schema, connString)
Example ΒΆ
package main

import (
	"context"
	"fmt"
)

func main() {
	db, err := openTestConnection(true)
	if err != nil {
		handleExampleError(err)
		return
	}
	defer db.Close()

	// Work with the database...
}

func openTestConnection(resetSchema bool) (*DB, error) {
	// Database code.
	schema := NewSchema()
	schema.MustRegister("customers", Customer{})  // Register the Customer struct as a table named "customers".
	schema.MustRegister("blogs", Blog{})          // Register the Blog struct as a table named "blogs".
	schema.MustRegister("blog_posts", BlogPost{}) // Register the BlogPost struct as a table named "blog_posts".

	// Open a connection to the database using the schema and the connection string.
	db, err := Open(context.Background(), schema, getTestConnString())
	if err != nil {
		return nil, err
	}
	// Let the caller close the database connection pool: defer db.Close()

	if resetSchema {
		// Let's clear the schema, so we can run the tests even if already ran once in the past.
		if err = db.DeleteSchema(context.Background()); err != nil { // DON'T DO THIS ON PRODUCTION.
			return nil, fmt.Errorf("delete schema: %w", err)
		}

		if err = db.CreateSchema(context.Background()); err != nil { // Create the schema in the database if it does not exist.
			return nil, fmt.Errorf("create schema: %w", err)
		}

		if err = db.CheckSchema(context.Background()); err != nil { // Check if the schema in the database matches the schema in the code.
			return nil, fmt.Errorf("check schema: %w", err)
		}
	}

	return db, nil
}

func openEmptyTestConnection() (*DB, error) { // without a schema.
	schema := NewSchema()
	// Open a connection to the database using the schema and the connection string.
	return Open(context.Background(), schema, getTestConnString())
}

func createTestConnectionSchema() error {
	db, err := openTestConnection(true)
	if err != nil {
		return err
	}

	db.Close()
	return nil
}

// getTestConnString returns a connection string for connecting to a test database.
// It uses constants to define the host, port, user, password, schema, dbname, and sslmode parameters.
func getTestConnString() string {
	const (
		host     = "localhost" // The host name or IP address of the database server.
		port     = 5432        // The port number of the database server.
		user     = "postgres"  // The user name to connect to the database with.
		password = "admin!123" // The password to connect to the database with.
		schema   = "public"    // The schema name to use in the database.
		dbname   = "test_db"   // The database name to connect to.
		sslMode  = "disable"   // The SSL mode to use for the connection. Can be disable, require, verify-ca or verify-full.
	)

	connString := fmt.Sprintf("host=%s port=%d user=%s password=%s search_path=%s dbname=%s sslmode=%s",
		host, port, user, password, schema, dbname, sslMode) // Format the connection string using the parameters.

	return connString // Return the connection string.
}
Output:

func OpenPool ΒΆ

func OpenPool(schema *Schema, pool *pgxpool.Pool) *DB

OpenPool creates a new DB instance with the given context, schema and pool. It copies the connection config from the pool and sets the search path and schema fields of the DB instance. It returns a pointer to the DB instance.

Use the `Open` function to create a new DB instance of a connection string instead.

func (*DB) Begin ΒΆ

func (db *DB) Begin(ctx context.Context) (*DB, error)

Begin starts a new database transaction and returns a new DB instance that operates within that transaction.

func (*DB) CheckSchema ΒΆ

func (db *DB) CheckSchema(ctx context.Context) error

CheckSchema checks if the database schema matches the expected table definitions by querying the information schema and comparing the results.

func (*DB) Close ΒΆ

func (db *DB) Close()

Close closes the database connection pool and its transactions.

func (*DB) Commit ΒΆ

func (db *DB) Commit(ctx context.Context) error

Commit commits the current database transaction and returns any error that occurs.

func (*DB) CreateSchema ΒΆ

func (db *DB) CreateSchema(ctx context.Context) error

CreateSchema creates the database schema by executing a series of SQL commands in a transaction.

func (*DB) CreateSchemaDumpSQL ΒΆ

func (db *DB) CreateSchemaDumpSQL(ctx context.Context) (string, error)

CreateSchemaDumpSQL dumps the SQL commands for creating the database schema.

func (*DB) Delete ΒΆ

func (db *DB) Delete(ctx context.Context, values ...any) (int64, error)

Delete deletes one or more values from the database by building and executing an SQL query based on the values and the table definition.

func (*DB) DeleteSchema ΒΆ

func (db *DB) DeleteSchema(ctx context.Context) error

DeleteSchema drops the database schema.

func (*DB) DisableAutoVacuum ΒΆ

func (db *DB) DisableAutoVacuum(ctx context.Context) error

DisableAutoVacuum disables autovacuum for the whole database.

func (*DB) DisableTableAutoVacuum ΒΆ

func (db *DB) DisableTableAutoVacuum(ctx context.Context, tableName string) error

DisableTableAutoVacuum disables autovacuum for a specific table.

func (*DB) Duplicate ΒΆ added in v1.0.6

func (db *DB) Duplicate(ctx context.Context, value any, idPtr any) error

Duplicate duplicates a row in the database by building and executing an SQL query based on the value's primary key (uses SELECT for insert column values). The idPtr parameter can be used to get the primary key value of the inserted row. If idPtr is nil, the primary key value is not returned. If the value is nil, the method returns nil.

func (*DB) Exec ΒΆ

func (db *DB) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)

Exec executes SQL. The query can be either a prepared statement name or an SQL string. Arguments should be referenced positionally from the sql "query" string as $1, $2, etc.

func (*DB) ExecFiles ΒΆ

func (db *DB) ExecFiles(ctx context.Context, fileReader interface {
	ReadFile(name string) ([]byte, error)
}, filenames ...string) error

ExecFiles executes the SQL statements in the given files.

Example:

//go:embed _embed
var embedDir embed.FS

[...]
err := db.ExecFiles(context.Background(), embedDir, "_embed/triggers.sql", "_embed/functions.sql")

func (*DB) Exists ΒΆ

func (db *DB) Exists(ctx context.Context, value any) (bool, error)

Exists returns true if a row exists in the table that matches the given value's non-zero fields or false otherwise.

func (*DB) GetSize ΒΆ added in v1.0.7

func (db *DB) GetSize(ctx context.Context) (t SizeInfo, err error)

GetSize returns the sum of size of all the database tables.

func (*DB) GetVersion ΒΆ

func (db *DB) GetVersion(ctx context.Context) (string, error)

GetVersion returns the version number of the PostgreSQL database as a string.

func (*DB) InTransaction ΒΆ

func (db *DB) InTransaction(ctx context.Context, fn func(*DB) error) error

InTransaction runs a function within a database transaction and commits or rolls back depending on the error value returned by the function. Note that: After the first error in the transaction, the transaction is rolled back. After the first error in query execution, the transaction is aborted and no new commands should be sent through the same transaction.

func (*DB) Insert ΒΆ

func (db *DB) Insert(ctx context.Context, values ...any) error

Insert inserts one or more values into the database by calling db.InsertSingle for each value within a transaction.

func (*DB) InsertSingle ΒΆ

func (db *DB) InsertSingle(ctx context.Context, value any, idPtr any) error

InsertSingle inserts a single value into the database by building and executing an SQL query based on the value and the table definition.

func (*DB) IsAutoVacuumEnabled ΒΆ

func (db *DB) IsAutoVacuumEnabled(ctx context.Context) (enabled bool, err error)

IsAutoVacuumEnabled returns true if autovacuum is enabled for the database.

Read more about autovacuum at: https://www.postgresql.org/docs/current/runtime-config-autovacuum.html.

func (*DB) IsTransaction ΒΆ

func (db *DB) IsTransaction() bool

IsTransaction reports whether this database instance is in transaction.

func (*DB) ListColumns ΒΆ

func (db *DB) ListColumns(ctx context.Context, tableNames ...string) ([]*desc.Column, error)

ListColumns returns a list of columns definitions for the given table names.

Example ΒΆ
db, err := openTestConnection(true)
if err != nil {
	handleExampleError(err)
	return
}
defer db.Close()

columns, err := db.ListColumns(context.Background())
if err != nil {
	handleExampleError(err)
	return
}

expectedTags := []string{
	`[blog_posts.id] pg:"name=id,type=uuid,primary,default=gen_random_uuid()"`,
	`[blog_posts.created_at] pg:"name=created_at,type=timestamp,default=clock_timestamp()"`,
	`[blog_posts.updated_at] pg:"name=updated_at,type=timestamp,default=clock_timestamp()"`,
	`[blog_posts.blog_id] pg:"name=blog_id,type=uuid,ref=blogs(id CASCADE deferrable),index=btree"`,
	`[blog_posts.title] pg:"name=title,type=varchar(255),unique_index=uk_blog_post"`,
	`[blog_posts.photo_url] pg:"name=photo_url,type=varchar(255)"`,
	`[blog_posts.source_url] pg:"name=source_url,type=varchar(255),unique_index=uk_blog_post"`,
	`[blog_posts.read_time_minutes] pg:"name=read_time_minutes,type=smallint,default=1,check=read_time_minutes > 0"`,
	`[blog_posts.category] pg:"name=category,type=smallint,default=0"`,
	`[blog_posts.search_terms] pg:"name=search_terms,type=varchar[]"`,
	`[blog_posts.read_durations] pg:"name=read_durations,type=bigint[]"`,
	`[blog_posts.feature] pg:"name=feature,type=jsonb"`,
	`[blog_posts.other_features] pg:"name=other_features,type=jsonb"`,
	`[blog_posts.tags] pg:"name=tags,type=jsonb"`,
	`[blogs.id] pg:"name=id,type=uuid,primary,default=gen_random_uuid()"`,
	`[blogs.created_at] pg:"name=created_at,type=timestamp,default=clock_timestamp()"`,
	`[blogs.updated_at] pg:"name=updated_at,type=timestamp,default=clock_timestamp()"`,
	`[blogs.name] pg:"name=name,type=varchar(255)"`,
	`[customers.id] pg:"name=id,type=uuid,primary,default=gen_random_uuid()"`,
	`[customers.created_at] pg:"name=created_at,type=timestamp,default=clock_timestamp()"`,
	`[customers.updated_at] pg:"name=updated_at,type=timestamp,default=clock_timestamp()"`,
	`[customers.cognito_user_id] pg:"name=cognito_user_id,type=uuid,unique_index=customer_unique_idx"`,
	`[customers.email] pg:"name=email,type=varchar(255),unique_index=customer_unique_idx"`,
	`[customers.name] pg:"name=name,type=varchar(255),index=btree"`,
	`[customers.username] pg:"name=username,type=varchar(255),default=''::character varying"`, // before columns convert from struct field should match this.
}

if len(columns) != len(expectedTags) {
	fmt.Printf("expected %d columns but got %d\n%s", len(expectedTags), len(columns), strings.Join(expectedTags, "\n"))
	fmt.Println("\n=========")
	for _, c := range columns {
		fmt.Println(c.Name)
	}
	return
}

for i, column := range columns {
	expected := expectedTags[i]
	got := fmt.Sprintf("[%s.%s] %s", column.TableName, column.Name, column.FieldTagString(true))

	if expected != got {
		fmt.Printf("expected tag:\n%s\nbut got:\n%s\n", expected, got)
	}
}

fmt.Println("OK")
Output:

OK

func (*DB) ListColumnsInformationSchema ΒΆ

func (db *DB) ListColumnsInformationSchema(ctx context.Context, tableNames ...string) ([]*desc.ColumnBasicInfo, error)

ListColumnsInformationSchema returns a list of basic columns information for the given table names or all.

Example ΒΆ
if err := createTestConnectionSchema(); err != nil {
	handleExampleError(err)
	return
}

db, err := openEmptyTestConnection()
if err != nil {
	handleExampleError(err)
	return
}
defer db.Close()

columns, err := db.ListColumnsInformationSchema(context.Background())
if err != nil {
	handleExampleError(err)
	return
}

for _, column := range columns {
	fmt.Printf("%#+v\n", column)
}
Output:

&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"id", OrdinalPosition:1, Description:"", Default:"gen_random_uuid()", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"created_at", OrdinalPosition:2, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"updated_at", OrdinalPosition:3, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"blog_id", OrdinalPosition:4, Description:"", Default:"", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"title", OrdinalPosition:5, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"photo_url", OrdinalPosition:6, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"source_url", OrdinalPosition:7, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"read_time_minutes", OrdinalPosition:8, Description:"", Default:"1", DataType:0x24, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"category", OrdinalPosition:9, Description:"", Default:"0", DataType:0x24, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"search_terms", OrdinalPosition:10, Description:"", Default:"", DataType:0xc, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"read_durations", OrdinalPosition:11, Description:"", Default:"", DataType:0x2, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"feature", OrdinalPosition:12, Description:"", Default:"", DataType:0x18, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"other_features", OrdinalPosition:13, Description:"", Default:"", DataType:0x18, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"tags", OrdinalPosition:14, Description:"", Default:"", DataType:0x18, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"id", OrdinalPosition:1, Description:"", Default:"gen_random_uuid()", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"created_at", OrdinalPosition:2, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"updated_at", OrdinalPosition:3, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"name", OrdinalPosition:4, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"id", OrdinalPosition:1, Description:"", Default:"gen_random_uuid()", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"created_at", OrdinalPosition:2, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"updated_at", OrdinalPosition:3, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"cognito_user_id", OrdinalPosition:4, Description:"", Default:"", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"email", OrdinalPosition:5, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"name", OrdinalPosition:6, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}
&desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"username", OrdinalPosition:7, Description:"", Default:"''::character varying", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}

func (*DB) ListConstraints ΒΆ

func (db *DB) ListConstraints(ctx context.Context, tableNames ...string) ([]*desc.Constraint, error)

ListConstraints returns a list of constraint definitions in the database schema by querying the pg_constraint table and.

Example ΒΆ
connString := getTestConnString()
schema := NewSchema()
db, err := Open(context.Background(), schema, connString)
if err != nil {
	handleExampleError(err)
	return
}
defer db.Close()

/*
	table_name	column_name	constraint_name	constraint_type	constraint_definition	index_type
	blog_posts		blog_posts_blog_id_fkey	i	CREATE INDEX blog_posts_blog_id_fkey ON public.blog_posts USING btree (blog_id)
	blog_posts	blog_id	blog_posts_blog_id_fkey	f	FOREIGN KEY (blog_id) REFERENCES blogs(id) ON DELETE CASCADE DEFERRABLE
	blog_posts	id	blog_posts_pkey	p	PRIMARY KEY (id)	btree
	blog_posts	read_time_minutes	blog_posts_read_time_minutes_check	c	CHECK ((read_time_minutes > 0))
	blog_posts	source_url	uk_blog_post	u	UNIQUE (title, source_url)	btree
	blog_posts	title	uk_blog_post	u	UNIQUE (title, source_url)	btree
	blogs	id	blogs_pkey	p	PRIMARY KEY (id)	btree
	customers		customers_name_idx	i	CREATE INDEX customers_name_idx ON public.customers USING btree (name)
	customers	cognito_user_id	customer_unique_idx	u	UNIQUE (cognito_user_id, email)	btree
	customers	email	customer_unique_idx	u	UNIQUE (cognito_user_id, email)	btree
	customers	id	customers_pkey	p	PRIMARY KEY (id)	btree
*/id)	btree
*/
var expectedConstraints = []*desc.Constraint{
	{
		TableName:      "blog_posts",
		ColumnName:     "blog_id",
		ConstraintName: "blog_posts_blog_id_fkey",
		ConstraintType: desc.IndexConstraintType,
		IndexType:      desc.Btree,
	},
	{
		TableName:      "blog_posts",
		ColumnName:     "blog_id",
		ConstraintName: "blog_posts_blog_id_fkey",
		ConstraintType: desc.ForeignKeyConstraintType,
		ForeignKey: &desc.ForeignKeyConstraint{
			ColumnName:          "blog_id",
			ReferenceTableName:  "blogs",
			ReferenceColumnName: "id",
			OnDelete:            "CASCADE",
			Deferrable:          true,
		},
	},
	{
		TableName:      "blog_posts",
		ColumnName:     "id",
		ConstraintName: "blog_posts_pkey",
		ConstraintType: desc.PrimaryKeyConstraintType,
		IndexType:      desc.Btree,
	},
	{
		TableName:      "blog_posts",
		ColumnName:     "read_time_minutes",
		ConstraintName: "blog_posts_read_time_minutes_check",
		ConstraintType: desc.CheckConstraintType,
		Check: &desc.CheckConstraint{
			Expression: "read_time_minutes > 0",
		},
	},
	{
		TableName:      "blog_posts",
		ColumnName:     "source_url",
		ConstraintName: "uk_blog_post",
		ConstraintType: desc.UniqueConstraintType,
		IndexType:      desc.Btree,
		Unique: &desc.UniqueConstraint{
			Columns: []string{"title", "source_url"},
		},
	},
	{
		TableName:      "blog_posts",
		ColumnName:     "title",
		ConstraintName: "uk_blog_post",
		ConstraintType: desc.UniqueConstraintType,
		IndexType:      desc.Btree,
		Unique: &desc.UniqueConstraint{
			Columns: []string{"title", "source_url"},
		},
	},
	{
		TableName:      "blogs",
		ColumnName:     "id",
		ConstraintName: "blogs_pkey",
		ConstraintType: desc.PrimaryKeyConstraintType,
		IndexType:      desc.Btree,
	},
	{
		TableName:      "customers",
		ColumnName:     "name",
		ConstraintName: "customers_name_idx",
		ConstraintType: desc.IndexConstraintType,
		IndexType:      desc.Btree,
	},
	{
		TableName:      "customers",
		ColumnName:     "cognito_user_id",
		ConstraintName: "customer_unique_idx",
		ConstraintType: desc.UniqueConstraintType,
		IndexType:      desc.Btree,
		Unique: &desc.UniqueConstraint{
			Columns: []string{"cognito_user_id", "email"},
		},
	},
	{
		TableName:      "customers",
		ColumnName:     "email",
		ConstraintName: "customer_unique_idx",
		ConstraintType: desc.UniqueConstraintType,
		IndexType:      desc.Btree,
		Unique: &desc.UniqueConstraint{
			Columns: []string{"cognito_user_id", "email"},
		},
	},
	{
		TableName:      "customers",
		ColumnName:     "id",
		ConstraintName: "customers_pkey",
		ConstraintType: desc.PrimaryKeyConstraintType,
		IndexType:      desc.Btree,
	},
}

columns, err := db.ListConstraints(context.Background())
if err != nil {
	handleExampleError(err)
	return
}

for i, got := range columns {
	expected := expectedConstraints[i]
	if !reflect.DeepEqual(expected, got) {

		if expected.ForeignKey != nil && got.ForeignKey != nil {
			if !reflect.DeepEqual(expected.ForeignKey, got.ForeignKey) {
				fmt.Printf("expected foreign key:\n%#+v\nbut got:\n%#+v", expected.ForeignKey, got.ForeignKey)
			}
			continue
		}
		if expected.Unique != nil && got.Unique != nil {
			if !reflect.DeepEqual(expected.Unique, got.Unique) {
				fmt.Printf("expected unique:\n%#+v\nbut got:\n%#+v", expected.Unique, got.Unique)
			}
			continue
		}

		if expected.Check != nil && got.Check != nil {
			if !reflect.DeepEqual(expected.Check, got.Check) {
				fmt.Printf("expected check:\n%#+v\nbut got:\n%#+v", expected.Check, got.Check)
			}
			continue
		}

		fmt.Printf("expected:\n%#+v\nbut got:\n%#+v", expected, got)
	}
}

fmt.Println("OK")
Output:

OK

func (*DB) ListTableSizes ΒΆ added in v1.0.7

func (db *DB) ListTableSizes(ctx context.Context) ([]TableSizeInfo, error)

ListTableSizes lists the disk size of tables (not only the registered ones) in the database.

func (*DB) ListTables ΒΆ

func (db *DB) ListTables(ctx context.Context, opts ListTablesOptions) ([]*desc.Table, error)

ListTables returns a list of converted table definitions from the remote database schema.

func (*DB) ListTriggers ΒΆ

func (db *DB) ListTriggers(ctx context.Context) ([]*desc.Trigger, error)

ListTriggers returns a list of triggers in the database for a given set of tables The method takes a context and returns a slice of Trigger pointers, and an error if any.

func (*DB) ListUniqueIndexes ΒΆ added in v1.0.6

func (db *DB) ListUniqueIndexes(ctx context.Context, tableNames ...string) ([]*desc.UniqueIndex, error)

ListUniqueIndexes returns a list of unique indexes in the database schema by querying the pg_index table and filtering the results to only include unique indexes.

func (*DB) Listen ΒΆ

func (db *DB) Listen(ctx context.Context, channel string) (*Listener, error)

Listen listens for notifications on the given channel and returns a Listener instance.

Example Code:

conn, err := db.Listen(context.Background(), channel)
if err != nil {
	fmt.Println(fmt.Errorf("listen: %w\n", err))
	return
}

// To just terminate this listener's connection and unlisten from the channel:
defer conn.Close(context.Background())

for {
	notification, err := conn.Accept(context.Background())
	if err != nil {
		fmt.Println(fmt.Errorf("accept: %w\n", err))
		return
	}

	fmt.Printf("channel: %s, payload: %s\n", notification.Channel, notification.Payload)
}
Example ΒΆ

go test -coverprofile=cov go tool cover -html=cov

db, err := openEmptyTestConnection()
if err != nil {
	handleExampleError(err)
	return
}
// defer db.Close()

const channel = "chat_db"

conn, err := db.Listen(context.Background(), channel)
if err != nil {
	fmt.Println(fmt.Errorf("listen: %w\n", err))
	return
}

go func() {
	// To just terminate this listener's connection and unlisten from the channel:
	defer conn.Close(context.Background())

	for {
		notification, err := conn.Accept(context.Background())
		if err != nil {
			fmt.Println(fmt.Errorf("accept: %w\n", err))
			return
		}

		fmt.Printf("channel: %s, payload: %s\n", notification.Channel, notification.Payload)
	}
}()

if err = db.Notify(context.Background(), channel, "hello"); err != nil {
	fmt.Println(fmt.Errorf("notify: hello: %w", err))
	return
}

if err = db.Notify(context.Background(), channel, "world"); err != nil {
	fmt.Println(fmt.Errorf("notify: world: %w", err))
	return
}

time.Sleep(5 * time.Second) // give it sometime to receive the notifications.
Output:

channel: chat_db, payload: hello
channel: chat_db, payload: world

func (*DB) ListenTable ΒΆ added in v1.0.7

func (db *DB) ListenTable(ctx context.Context, opts *ListenTableOptions, callback func(TableNotificationJSON, error) error) (Closer, error)

ListenTable registers a function which notifies on the given "table" changes (INSERT, UPDATE, DELETE), the subscribed postgres channel is named 'table_change_notifications'.

The callback function can return any other error to stop the listener. The callback function can return nil to continue listening.

TableNotification's New and Old fields are raw json values, use the "json.Unmarshal" to decode them to the actual type.

Example ΒΆ
db, err := openTestConnection(true)
if err != nil {
	handleExampleError(err)
	return
}
defer db.Close()

opts := &ListenTableOptions{
	Tables: map[string][]TableChangeType{"customers": defaultChangesToWatch},
}
closer, err := db.ListenTable(context.Background(), opts, func(evt TableNotificationJSON, err error) error {
	if err != nil {
		fmt.Printf("received error: %v\n", err)
		return err
	}

	if evt.Change == "INSERT" {
		fmt.Printf("table: %s, event: %s, old: %s\n", evt.Table, evt.Change, string(evt.Old)) // new can't be predicated through its ID and timestamps.
	} else {
		fmt.Printf("table: %s, event: %s\n", evt.Table, evt.Change)
	}

	return nil
})
if err != nil {
	fmt.Println(err)
	return
}
defer closer.Close(context.Background())

newCustomer := Customer{
	CognitoUserID: "766064d4-a2a7-442d-aa75-33493bb4dbb9",
	Email:         "kataras2024@hotmail.com",
	Name:          "Makis",
}
err = db.InsertSingle(context.Background(), newCustomer, &newCustomer.ID)
if err != nil {
	fmt.Println(err)
	return
}

newCustomer.Name = "Makis_UPDATED"
_, err = db.UpdateOnlyColumns(context.Background(), []string{"name"}, newCustomer)
if err != nil {
	fmt.Println(err)
	return
}
time.Sleep(8 * time.Second) // give it sometime to receive the notifications.
Output:

table: customers, event: INSERT, old: null
table: customers, event: UPDATE

func (*DB) Notify ΒΆ

func (db *DB) Notify(ctx context.Context, channel string, payload any) error

Notify sends a notification using pg_notify to the database.

See the `Listen` package-level function too.

func (*DB) PoolStat ΒΆ added in v1.0.6

func (db *DB) PoolStat() PoolStat

PoolStat returns a snapshot of the database pool statistics. The returned structure can be represented through JSON.

func (*DB) PrepareListenTable ΒΆ added in v1.0.7

func (db *DB) PrepareListenTable(ctx context.Context, opts *ListenTableOptions) error

PrepareListenTable prepares the table for listening for live table updates. See "db.ListenTable" method for more.

func (*DB) Query ΒΆ

func (db *DB) Query(ctx context.Context, query string, args ...any) (Rows, error)

Query executes the given "query" with args. If there is an error the returned Rows will be returned in an error state.

func (*DB) QueryBoolean ΒΆ added in v1.0.6

func (db *DB) QueryBoolean(ctx context.Context, query string, args ...any) (ok bool, err error)

QueryBoolean executes a query that returns a single boolean value and returns it as a bool and an error.

func (*DB) QueryRow ΒΆ

func (db *DB) QueryRow(ctx context.Context, query string, args ...any) Row

QueryRow is a convenience wrapper over QueryRow. Any error that occurs while querying is deferred until calling Scan on the returned Row. That Row will error with ErrNoRows if no rows are returned.

func (*DB) Rollback ΒΆ

func (db *DB) Rollback(ctx context.Context) error

Rollback rolls back the current database transaction and returns any error that occurs.

func (*DB) Schema ΒΆ added in v1.0.5

func (db *DB) Schema() *Schema

Schema returns the Schema instance of the database. It should NOT be modified by the caller.

func (*DB) SearchPath ΒΆ

func (db *DB) SearchPath() string

SearchPath returns the search path of the database.

func (*DB) Select ΒΆ

func (db *DB) Select(ctx context.Context, scannerFunc func(Rows) error, query string, args ...any) error

Select executes a query that returns rows and calls the scanner function on them. It takes a context, a query string, a scanner function and a variadic list of arguments as parameters. It returns an error if the query fails, the scanner function returns an error or the rows have an error.

func (*DB) SelectByID ΒΆ

func (db *DB) SelectByID(ctx context.Context, destPtr any, id any) error

SelectByID selects a row from a table by matching the primary key column with the given argument.

func (*DB) SelectByUsernameAndPassword ΒΆ

func (db *DB) SelectByUsernameAndPassword(ctx context.Context, destPtr any, username, plainPassword string) error

SelectByUsernameAndPassword selects a row from a table by matching the username and password columns with the given arguments and scans the row into the destPtr variable.

func (*DB) Unlisten ΒΆ

func (db *DB) Unlisten(ctx context.Context, channel string) error

Unlisten removes the given channel from the list of channels that the database is listening on. Available channels: - Any custom one - * (for all)

func (*DB) Update ΒΆ

func (db *DB) Update(ctx context.Context, values ...any) (int64, error)

Update updates one or more values in the database by building and executing an SQL query based on the values and the table definition.

func (*DB) UpdateExceptColumns ΒΆ added in v1.0.3

func (db *DB) UpdateExceptColumns(ctx context.Context, columnsToExcept []string, values ...any) (int64, error)

UpdateExceptColumns updates one or more values in the database by building and executing an SQL query based on the values and the table definition.

The columnsToExcept parameter can be used to specify which columns should NOT be updated.

func (*DB) UpdateJSONB ΒΆ added in v1.0.8

func (db *DB) UpdateJSONB(ctx context.Context, tableName, columnName, rowID string, values map[string]any, fieldsToUpdate []string) (int64, error)

UpdateJSONB updates a JSONB column (full or partial) in the database by building and executing an SQL query based on the provided values and the given tableName and columnName. The values parameter is a map of key-value pairs where the key is the json field name and the value is its new value, new keys are accepted. Note that tableName and columnName are not escaped.

func (*DB) UpdateOnlyColumns ΒΆ

func (db *DB) UpdateOnlyColumns(ctx context.Context, columnsToUpdate []string, values ...any) (int64, error)

UpdateOnlyColumns updates one or more values in the database by building and executing an SQL query based on the values and the table definition.

The columnsToUpdate parameter can be used to specify which columns should be updated.

func (*DB) Upsert ΒΆ

func (db *DB) Upsert(ctx context.Context, forceOnConflictExpr string, values ...any) error

Upsert inserts or updates one or more values into the database by calling db.UpsertSingle for each value within a transaction.

func (*DB) UpsertSingle ΒΆ

func (db *DB) UpsertSingle(ctx context.Context, value any, idPtr any, forceOnConflictExpr string) error

UpsertSingle inserts or updates a single value into the database by building and executing an SQL query based on the value and the table definition.

type DataType ΒΆ

type DataType = desc.DataType

DataType is a type alias for desc.DataType.

type ListTablesOptions ΒΆ

type ListTablesOptions struct {
	TableNames []string

	Filter desc.TableFilter // Filter allows to customize the StructName and its Column field types.
}

ListTableOptions are the options for listing tables.

type ListenTableOptions ΒΆ added in v1.0.7

type ListenTableOptions struct {
	// Tables map of table name and changes to listen for.
	//
	// Key is the table to listen on for changes.
	// Value is changes is the list of table changes to listen for.
	// Defaults to {"*": ["INSERT", "UPDATE", "DELETE"] }.
	Tables map[string][]TableChangeType

	// Channel is the name of the postgres channel to listen on.
	// Default: "table_change_notifications".
	Channel string

	// Function is the name of the postgres function
	// which is used to notify on table changes, the
	// trigger name is <table_name>_<Function>.
	// Defaults to "table_change_notify".
	Function string
}

ListenTableOptions is the options for the "DB.ListenTable" method.

type Listener ΒΆ

type Listener struct {
	// contains filtered or unexported fields
}

Listener represents a postgres database LISTEN connection.

func (*Listener) Accept ΒΆ

func (l *Listener) Accept(ctx context.Context) (*Notification, error)

Accept waits for a notification and returns it.

func (*Listener) Close ΒΆ

func (l *Listener) Close(ctx context.Context) error

Close closes the listener connection.

type MapTypeFilter ΒΆ

type MapTypeFilter map[string]any

MapTypeFilter is a map of expressions inputs text to field type. It's a TableFilter.

Example on LsitTableOptions of the ListTables method:

Filter: pg.MapTypeFilter{
	"customer_profiles.fields.jsonb": entity.Fields{},
},

func (MapTypeFilter) FilterTable ΒΆ

func (m MapTypeFilter) FilterTable(t *Table) bool

FilterTable implements the TableFilter interface.

type Notification ΒΆ

type Notification = pgconn.Notification

Notification is a type alias of pgconn.Notification type.

type PoolStat ΒΆ added in v1.0.6

type PoolStat struct {
	// AcquireCount is the cumulative count of successful acquires from the pool.
	AcquireCount int64 `json:"acquire_count"`
	// AcquireDuration is the total duration of all successful acquires from
	// the pool.
	AcquireDuration time.Duration `json:"acquire_duration"`
	// AcquiredConns is the number of currently acquired connections in the pool.
	AcquiredConns int32 `json:"acquired_conns"`
	// CanceledAcquireCount is the cumulative count of acquires from the pool
	// that were canceled by a context.
	CanceledAcquireCount int64 `json:"canceled_acquire_count"`
	// ConstructingConns is the number of conns with construction in progress in
	// the pool.
	ConstructingConns int32 `json:"constructing_conns"`
	// EmptyAcquireCount is the cumulative count of successful acquires from the pool
	// that waited for a resource to be released or constructed because the pool was
	// empty.
	EmptyAcquireCount int64 `json:"empty_acquire_count"`
	// IdleConns is the number of currently idle conns in the pool.
	IdleConns int32 `json:"idle_conns"`
	// MaxConns is the maximum size of the pool.
	MaxConns int32 `json:"max_conns"`
	// TotalConns is the total number of resources currently in the pool.
	// The value is the sum of ConstructingConns, AcquiredConns, and
	// IdleConns.
	TotalConns int32 `json:"total_conns"`
}

PoolStat holds the database pool's statistics.

type Repository ΒΆ

type Repository[T any] struct {
	// contains filtered or unexported fields
}

Repository is a generic type that represents a repository for a specific type T.

func NewRepository ΒΆ

func NewRepository[T any](db *DB) *Repository[T]

NewRepository creates and returns a new Repository instance for a given type T and a DB instance. It panics if the T was not registered to the schema.

Example ΒΆ
package main

import (
	"context"
	"fmt"
)

// Repositories.

// CustomerRepository is a struct that wraps a generic Repository instance with the Customer type parameter.
// It provides methods for accessing and manipulating customer data in the database.
type CustomerRepository struct {
	*Repository[Customer]
}

// NewCustomerRepository creates and returns a new CustomerRepository instance with the given DB instance.
func NewCustomerRepository(db *DB) *CustomerRepository {
	return &CustomerRepository{
		Repository: NewRepository[Customer](db),
	}
}

// InTransaction overrides the pg Repository's InTransaction method to include the custom type of CustomerRepository.
// It takes a context and a function as arguments and executes the function within a database transaction,
// passing it a CustomerRepository instance that uses the transactional DB instance.
func (r *CustomerRepository) InTransaction(ctx context.Context, fn func(*CustomerRepository) error) (err error) {
	if r.DB().IsTransaction() {
		return fn(r)
	}

	return r.DB().InTransaction(ctx, func(db *DB) error {
		txRepository := NewCustomerRepository(db)
		return fn(txRepository)
	})
}

// Exists is a custom method that uses the pg repository's Database instance to execute a query and return a result.
// It takes a context and a cognitoUserID as arguments and checks if there is any customer row with that cognitoUserID in the database.
func (r *CustomerRepository) Exists(ctx context.Context, cognitoUserID string) (exists bool, err error) {
	// query := `SELECT EXISTS(SELECT 1 FROM customers WHERE cognito_user_id = $1)`
	// err = r.QueryRow(ctx, query, cognitoUserID).Scan(&exists)
	// OR:

	exists, err = r.Repository.Exists(ctx, Customer{CognitoUserID: cognitoUserID})
	return
}

// Registry is (optional) a struct that holds references to different repositories for accessing and manipulating data in the database.
// It has a db field that is a pointer to a DB instance, and a customers field that is a pointer to a CustomerRepository instance.
type Registry struct {
	db *DB

	customers *CustomerRepository
	blogs     *Repository[Blog]
	blogPosts *Repository[BlogPost]
}

// NewRegistry creates and returns a new Registry instance with the given DB instance.
// It also initializes the customers field with a new CustomerRepository instance that uses the same DB instance.
func NewRegistry(db *DB) *Registry {
	return &Registry{
		db: db,

		customers: NewCustomerRepository(db),
		blogs:     NewRepository[Blog](db),
		blogPosts: NewRepository[BlogPost](db),
	}
}

// InTransaction overrides the pg Repository's InTransaction method to include the custom type of Registry.
// It takes a context and a function as arguments and executes the function within a database transaction,
// passing it a Registry instance that uses the transactional DB instance.
func (r *Registry) InTransaction(ctx context.Context, fn func(*Registry) error) (err error) {
	if r.db.IsTransaction() {
		return fn(r)
	}

	return r.db.InTransaction(ctx, func(db *DB) error {
		txRegistry := NewRegistry(db)
		return fn(txRegistry)
	})
}

// Customers returns the CustomerRepository instance of the Registry.
func (r *Registry) Customers() *CustomerRepository {
	return r.customers
}

// Blogs returns the Repository instance of the Blog entity.
func (r *Registry) Blogs() *Repository[Blog] {
	return r.blogs
}

// BlogPosts returns the Repository instance of the BlogPost entity.
func (r *Registry) BlogPosts() *Repository[BlogPost] {
	return r.blogPosts
}

func main() {
	db, err := openTestConnection(true)
	if err != nil {
		handleExampleError(err)
		return
	}
	defer db.Close()

	registry := NewRegistry(db)       // Create a new Registry instance with the DB instance.
	customers := registry.Customers() // Get the CustomerRepository instance from the Registry.

	// Repository example code.
	customerToInsert := Customer{ // Create a Customer struct to be inserted into the database.
		CognitoUserID: "373f90eb-00ac-410f-9fe0-1a7058d090ba",
		Email:         "kataras2006@hotmail.com",
		Name:          "kataras",
	}

	err = customers.InsertSingle(context.Background(), customerToInsert, &customerToInsert.ID)
	if err != nil {
		handleExampleError(err)
		return
	}

	fmt.Println(customerToInsert.ID)
}
Output:

func (*Repository[T]) DB ΒΆ

func (repo *Repository[T]) DB() *DB

DB returns the DB instance associated with the Repository instance.

func (*Repository[T]) Delete ΒΆ

func (repo *Repository[T]) Delete(ctx context.Context, values ...T) (int64, error)

Delete deletes one or more values of type T from the database by their primary key values.

func (*Repository[T]) DeleteByID ΒΆ added in v1.0.4

func (repo *Repository[T]) DeleteByID(ctx context.Context, id any) (bool, error)

DeleteByID deletes a single row from a table by matching the id column with the given argument and reports whether the entry was removed or not.

The difference between Delete and DeleteByID is that DeleteByID accepts just the id value instead of the whole entity structure value.

func (*Repository[T]) Duplicate ΒΆ added in v1.0.6

func (repo *Repository[T]) Duplicate(ctx context.Context, id any, newIDPtr any) error

Duplicate duplicates a row from a table by matching the id column with the given argument. The idPtr parameter can be used to get the primary key value of the inserted row. If idPtr is nil, the primary key value is not returned. If the value is nil, the method returns nil.

func (*Repository[T]) Exec ΒΆ

func (repo *Repository[T]) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)

Exec executes a query that does not return rows and returns a command tag and an error.

func (*Repository[T]) Exists ΒΆ

func (repo *Repository[T]) Exists(ctx context.Context, value T) (bool, error)

Exists returns true if a row exists in the table that matches the given value's non-zero fields or false otherwise.

func (*Repository[T]) InTransaction ΒΆ

func (repo *Repository[T]) InTransaction(ctx context.Context, fn func(*Repository[T]) error) error

InTransaction runs a function within a database transaction and commits or rolls back depending on the error value returned by the function.

func (*Repository[T]) Insert ΒΆ

func (repo *Repository[T]) Insert(ctx context.Context, values ...T) error

Insert inserts one or more values of type T into the database by calling repo.InsertSingle for each value within a transaction.

func (*Repository[T]) InsertSingle ΒΆ

func (repo *Repository[T]) InsertSingle(ctx context.Context, value T, idPtr any) error

InsertSingle inserts a single value of type T into the database by calling repo.db.InsertSingle with the value and the idPtr.

If it is not null then the value is updated by its primary key value.

func (*Repository[T]) IsReadOnly ΒΆ

func (repo *Repository[T]) IsReadOnly() bool

IsReadOnly returns true if the underline repository's table is read-only or false otherwise.

func (*Repository[T]) IsTransaction ΒΆ

func (repo *Repository[T]) IsTransaction() bool

IsTransaction returns true if the underline database is already in a transaction or false otherwise.

func (*Repository[T]) ListenTable ΒΆ added in v1.0.7

func (repo *Repository[T]) ListenTable(ctx context.Context, callback func(TableNotification[T], error) error) (Closer, error)

ListenTable registers a function which notifies on the current table's changes (INSERT, UPDATE, DELETE), the subscribed postgres channel is named 'table_change_notifications'. The callback function is called on a separate goroutine.

The callback function can return ErrStop to stop the listener without actual error. The callback function can return any other error to stop the listener and return the error. The callback function can return nil to continue listening.

Example ΒΆ
db, err := openTestConnection(true)
if err != nil {
	handleExampleError(err)
	return
}
defer db.Close()

customers := NewRepository[Customer](db)

closer, err := customers.ListenTable(context.Background(), func(evt TableNotification[Customer], err error) error {
	if err != nil {
		fmt.Printf("received error: %v\n", err)
		return err
	}

	fmt.Printf("table: %s, event: %s, old name: %s new name: %s\n", evt.Table, evt.Change, evt.Old.Name, evt.New.Name)
	return nil
})
if err != nil {
	fmt.Println(err)
	return
}
defer closer.Close(context.Background())

newCustomer := Customer{
	CognitoUserID: "766064d4-a2a7-442d-aa75-33493bb4dbb9",
	Email:         "kataras2024@hotmail.com",
	Name:          "Makis",
}
err = customers.InsertSingle(context.Background(), newCustomer, &newCustomer.ID)
if err != nil {
	fmt.Println(err)
	return
}

newCustomer.Name = "Makis_UPDATED"
_, err = customers.UpdateOnlyColumns(context.Background(), []string{"name"}, newCustomer)
if err != nil {
	fmt.Println(err)
	return
}
time.Sleep(8 * time.Second) // give it sometime to receive the notifications.
Output:

table: customers, event: INSERT, old name:  new name: Makis
table: customers, event: UPDATE, old name: Makis new name: Makis_UPDATED

func (*Repository[T]) Query ΒΆ

func (repo *Repository[T]) Query(ctx context.Context, query string, args ...any) (Rows, error)

Query executes a query that returns multiple rows and returns them as a Rows instance and an error.

func (*Repository[T]) QueryBoolean ΒΆ added in v1.0.6

func (repo *Repository[T]) QueryBoolean(ctx context.Context, query string, args ...any) (bool, error)

QueryBoolean executes a query that returns a single boolean value and returns it as a bool and an error.

func (*Repository[T]) QueryRow ΒΆ

func (repo *Repository[T]) QueryRow(ctx context.Context, query string, args ...any) Row

QueryRow executes a query that returns at most one row and returns it as a Row instance.

func (*Repository[T]) Select ΒΆ

func (repo *Repository[T]) Select(ctx context.Context, query string, args ...any) ([]T, error)

Select executes a SQL query and returns a slice of values of type T that match the query results.

func (*Repository[T]) SelectByID ΒΆ

func (repo *Repository[T]) SelectByID(ctx context.Context, id any) (T, error)

SelectByID selects a row from a table by matching the id column with the given argument and returns the row or ErrNoRows.

func (*Repository[T]) SelectByUsernameAndPassword ΒΆ

func (repo *Repository[T]) SelectByUsernameAndPassword(ctx context.Context, username, plainPassword string) (T, error)

SelectByUsernameAndPassword selects a row from a table by matching the username and password columns with the given arguments and returns the row or ErrNoRows.

func (*Repository[T]) SelectSingle ΒΆ

func (repo *Repository[T]) SelectSingle(ctx context.Context, query string, args ...any) (T, error)

SelectSingle executes a SQL query and returns a single value of type T that matches the query result.

func (*Repository[T]) Table ΒΆ added in v1.0.5

func (repo *Repository[T]) Table() *desc.Table

Table returns the Table definition instance associated with the Repository instance. It should NOT be modified by the caller.

func (*Repository[T]) Update ΒΆ

func (repo *Repository[T]) Update(ctx context.Context, values ...T) (int64, error)

Update updates one or more values of type T in the database by their primary key values.

func (*Repository[T]) UpdateExceptColumns ΒΆ added in v1.0.3

func (repo *Repository[T]) UpdateExceptColumns(ctx context.Context, columnsToExcept []string, values ...T) (int64, error)

UpdateExceptColumns updates one or more values of type T in the database by their primary key values. The columnsToExcept parameter can be used to specify which columns should NOT be updated.

func (*Repository[T]) UpdateOnlyColumns ΒΆ

func (repo *Repository[T]) UpdateOnlyColumns(ctx context.Context, columnsToUpdate []string, values ...T) (int64, error)

UpdateOnlyColumns updates one or more values of type T in the database by their primary key values.

The columnsToUpdate parameter can be used to specify which columns should be updated.

func (*Repository[T]) Upsert ΒΆ

func (repo *Repository[T]) Upsert(ctx context.Context, forceOnConflictExpr string, values ...T) error

Upsert inserts or updates one or more values of type T into the database.

func (*Repository[T]) UpsertSingle ΒΆ

func (repo *Repository[T]) UpsertSingle(ctx context.Context, forceOnConflictExpr string, value T, idPtr any) error

UpsertSingle inserts or updates a single value of type T into the database.

If idPtr is not null then the value is updated by its primary key value.

type Row ΒΆ

type Row = pgx.Row

Row is a type alias for pgx.Row.

type Rows ΒΆ

type Rows = pgx.Rows

Rows is a type alias for pgx.Rows.

type Schema ΒΆ

type Schema struct {

	// The name of the "updated_at" column. Defaults to "updated_at" but it can be modified,
	// this is useful to set when triggers should be registered automatically.
	//
	// If set to empty then triggers will not be registered automatically.
	UpdatedAtColumnName string
	// Set the name of the trigger that sets the "updated_at" column, defaults to "set_timestamp".
	//
	// If set to empty then triggers will not be registered automatically.
	SetTimestampTriggerName string

	// Strict reports whether the schema should be strict on the database side.
	// It's enabled by default.
	Strict bool
	// contains filtered or unexported fields
}

Schema is a type that represents a schema for the database.

func NewSchema ΒΆ

func NewSchema() *Schema

NewSchema creates and returns a new Schema with an initialized struct cache.

Example ΒΆ
package main

import (
	"fmt"
	"time"
)

// Structs.

// BaseEntity is a struct that defines common fields for all entities in the database.
// It has an ID field of type uuid that is the primary key, and two timestamp fields
// for tracking the creation and update times of each row.
type BaseEntity struct {
	ID        string    `pg:"type=uuid,primary"`
	CreatedAt time.Time `pg:"type=timestamp,default=clock_timestamp()"`
	UpdatedAt time.Time `pg:"type=timestamp,default=clock_timestamp()"`
}

// Customer is a struct that represents a customer entity in the database.
// It embeds the BaseEntity struct and adds a CognitoUserID field of type uuid
// that is required and unique. It also specifies a conflict resolution strategy
// for the CognitoUserID field in case of duplicate values.
type Customer struct {
	BaseEntity
	// CognitoUserID string `pg:"type=uuid,unique,conflict=DO UPDATE SET cognito_user_id=EXCLUDED.cognito_user_id"`

	CognitoUserID string `pg:"type=uuid,unique_index=customer_unique_idx"`
	Email         string `pg:"type=varchar(255),unique_index=customer_unique_idx"`
	// ^ optional: unique to allow upsert by "email"-only column confliction instead of the unique_index.
	Name string `pg:"type=varchar(255),index=btree"`

	Username string `pg:"type=varchar(255),default=''"`
}

// Blog is a struct that represents a blog entity in the database.
// It embeds the BaseEntity struct and has no other fields.
type Blog struct {
	BaseEntity

	Name string `pg:"type=varchar(255)"`
}

// BlogPost is a struct that represents a blog post entity in the database.
// It embeds the BaseEntity struct and adds several fields for the blog post details,
// such as BlogID, Title, PhotoURL, SourceURL, ReadTimeMinutes, and Category.
// The BlogID field is a foreign key that references the ID field of the blogs table,
// with cascade option for deletion and deferrable option for constraint checking.
// The Title and SourceURL fields are part of a unique index named uk_blog_post,
// which ensures that no two blog posts have the same title or source URL.
// The ReadTimeMinutes field is a smallint with a default value of 1 and a check constraint
// that ensures it is positive. The Category field is a smallint with a default value of 0.
type BlogPost struct {
	BaseEntity

	BlogID          string `pg:"type=uuid,index,ref=blogs(id cascade deferrable)"`
	Title           string `pg:"type=varchar(255),unique_index=uk_blog_post"`
	PhotoURL        string `pg:"type=varchar(255)"`
	SourceURL       string `pg:"type=varchar(255),unique_index=uk_blog_post"`
	ReadTimeMinutes int    `pg:"type=smallint,default=1,check=read_time_minutes > 0"`
	Category        int    `pg:"type=smallint,default=0"`

	SearchTerms   []string        `pg:"type=varchar[]"` // Test a slice of strings.
	ReadDurations []time.Duration `pg:"type=bigint[]"`  // Test a slice of time.Duration based on an int64.

	// Custom types.
	Feature       Feature  `pg:"type=jsonb"` // Test a JSON structure.
	OtherFeatures Features `pg:"type=jsonb"` // Test a JSON array of structures behind a custom type.
	Tags          []Tag    `pg:"type=jsonb"` // Test a JSON array of structures.
}

type Features []Feature

type Feature struct {
	IsFeatured bool `json:"is_featured"`
}

type Tag struct {
	Name  string `json:"name"`
	Value any    `json:"value"`
}

func main() {
	// Database code.
	schema := NewSchema()
	schema.MustRegister("customers", Customer{})  // Register the Customer struct as a table named "customers".
	schema.MustRegister("blogs", Blog{})          // Register the Blog struct as a table named "blogs".
	schema.MustRegister("blog_posts", BlogPost{}) // Register the BlogPost struct as a table named "blog_posts".

	fmt.Println("OK")
}
Output:

OK

func (*Schema) Get ΒΆ

func (s *Schema) Get(typ reflect.Type) (*desc.Table, error)

Get takes a reflect.Type that represents a struct type and returns a pointer to a Table that represents the table definition for the database or an error if the type is not registered in the schema.

func (*Schema) GetByTableName ΒΆ

func (s *Schema) GetByTableName(tableName string) (*desc.Table, error)

GetByTableName takes a table name as a string and returns a pointer to a Table that represents the table definition for the database or an error if the table name is not registered in the schema.

func (*Schema) HandlePassword ΒΆ

func (s *Schema) HandlePassword(handler desc.PasswordHandler) *Schema

HandlePassword sets the password handler.

func (*Schema) HasColumnType ΒΆ

func (s *Schema) HasColumnType(dataTypes ...desc.DataType) bool

HasColumnType takes a DataType that represents a data type for the database and returns true if any of the tables in the schema has a column with that data type.

func (*Schema) HasPassword ΒΆ

func (s *Schema) HasPassword() bool

HasPassword reports whether the tables in the schema have a column with the password feature enabled.

func (*Schema) Last ΒΆ added in v1.0.6

func (s *Schema) Last() *desc.Table

Last returns the last registered table definition.

func (*Schema) MustRegister ΒΆ

func (s *Schema) MustRegister(tableName string, emptyStructValue any, opts ...TableFilterFunc) *Schema

MustRegister same as "Register" but it panics on errors and returns the Schema instance instead of the Table one.

func (*Schema) Register ΒΆ

func (s *Schema) Register(tableName string, emptyStructValue any, opts ...TableFilterFunc) (*desc.Table, error)

Register registers a database model (a struct value) mapped to a specific database table name. Returns the generated Table definition.

func (*Schema) TableNames ΒΆ

func (s *Schema) TableNames(types ...desc.TableType) []string

TableNames returns a slice of strings that represents all the table names in the schema.

func (*Schema) Tables ΒΆ

func (s *Schema) Tables(types ...desc.TableType) []*desc.Table

Tables returns a slice of pointers to Table that represents all the table definitions in the schema sorted by their registered position.

type SizeInfo ΒΆ added in v1.0.7

type SizeInfo struct {
	SizePretty string `json:"size_pretty"`
	// The on-disk size in bytes of one fork of that relation.
	// A fork is a variant of the main data file that stores additional information,
	// such as the free space map, the visibility map, or the initialization fork.
	// By default, this is the size of the main data fork only.
	Size float64 `json:"size"`

	SizeTotalPretty string `json:"size_total_pretty"`
	// The total on-disk space used for that table, including all associated indexes. This is equivalent to pg_table_size + pg_indexes_size.
	SizeTotal float64 `json:"size_total"`
}

SizeInfo is a struct which contains the size information (for individual table or the whole database).

type Table ΒΆ

type Table = desc.Table

Table is a type alias for desc.Table.

type TableChangeType ΒΆ added in v1.0.7

type TableChangeType string

TableChangeType is the type of the table change. Available values: INSERT, UPDATE, DELETE.

const (
	// TableChangeTypeInsert is the INSERT table change type.
	TableChangeTypeInsert TableChangeType = "INSERT"
	// TableChangeTypeUpdate is the UPDATE table change type.
	TableChangeTypeUpdate TableChangeType = "UPDATE"
	// TableChangeTypeDelete is the DELETE table change type.
	TableChangeTypeDelete TableChangeType = "DELETE"
)

type TableFilterFunc ΒΆ

type TableFilterFunc = desc.TableFilterFunc

TableFilter is a type alias for desc.TableFilter.

type TableNotification ΒΆ added in v1.0.7

type TableNotification[T any] struct {
	Table  string          `json:"table"`
	Change TableChangeType `json:"change"` // INSERT, UPDATE, DELETE.

	New T `json:"new"`
	Old T `json:"old"`
	// contains filtered or unexported fields
}

TableNotification is the notification message sent by the postgresql server when a table change occurs. The subscribed postgres channel is named 'table_change_notifications'. The "old" and "new" fields are the old and new values of the row. The "old" field is only available for UPDATE and DELETE table change types. The "new" field is only available for INSERT and UPDATE table change types. The "old" and "new" fields are raw json values, use the "json.Unmarshal" to decode them. See "DB.ListenTable" method.

func (TableNotification[T]) GetPayload ΒΆ added in v1.0.7

func (tn TableNotification[T]) GetPayload() string

GetPayload returns the raw payload of the notification.

type TableNotificationJSON ΒΆ added in v1.0.7

type TableNotificationJSON = TableNotification[json.RawMessage]

TableNotificationJSON is the generic version of the TableNotification.

type TableSizeInfo ΒΆ added in v1.0.7

type TableSizeInfo struct {
	TableName string `json:"table_name"`
	SizeInfo
}

TableSizeInfo is a struct which contains the table size information used as an output parameter of the `db.ListTableSizes` method.

Directories ΒΆ

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL