Documentation ΒΆ
Overview ΒΆ
Example ΒΆ
Example is a function that demonstrates how to use the Registry and Repository types to perform database operations within a transaction. It uses the Customer, Blog, and BlogPost structs as the entities to be stored and manipulated in the database. It also prints "OK" if everything succeeds, or an error message otherwise.
db, err := openTestConnection(true) if err != nil { fmt.Println(err.Error()) return } defer db.Close() // Registry code. registry := NewRegistry(db) // Create a new Registry instance with the DB instance. // Execute a function within a database transaction, passing it a Registry instance that uses the transactional DB instance. err = registry.InTransaction(context.Background(), func(registry *Registry) error { customers := registry.Customers() // Get the CustomerRepository instance from the Registry. customerToInsert := Customer{ // Create a Customer struct to be inserted into the database. CognitoUserID: "373f90eb-00ac-410f-9fe0-1a7058d090ba", Email: "kataras2006@hotmail.com", Name: "kataras", Username: "kataras", } // Insert the customer into the database and get its ID. err = customers.InsertSingle(context.Background(), customerToInsert, &customerToInsert.ID) if err != nil { return fmt.Errorf("insert single: %w", err) } // Modify cognito user id. newCognitoUserID := "1e6a93d0-6276-4a43-b90a-4badad8407bb" // Update specific columns by id: updated, err := customers.UpdateOnlyColumns( context.Background(), []string{"cognito_user_id"}, Customer{ BaseEntity: BaseEntity{ ID: customerToInsert.ID, }, CognitoUserID: newCognitoUserID, }) // Full update of the object by id (except id and created_at, updated_at columns): // updated, err := customers.Update(context.Background(), // Customer{ // BaseEntity: BaseEntity{ // ID: customerToInsert.ID, // }, // CognitoUserID: newCognitoUserID, // Email: customerToInsert.Email, // Name: customerToInsert.Name, // }) if err != nil { return fmt.Errorf("update: %w", err) } else if updated == 0 { return fmt.Errorf("update: no record was updated") } // Update a default column to its zero value. updated, err = customers.UpdateOnlyColumns( context.Background(), []string{"username"}, Customer{ BaseEntity: BaseEntity{ ID: customerToInsert.ID, }, Username: "", }) if err != nil { return fmt.Errorf("update username: %w", err) } else if updated == 0 { return fmt.Errorf("update username: no record was updated") } // Select the customer from the database by its ID. customer, err := customers.SelectSingle(context.Background(), `SELECT * FROM customers WHERE id = $1;`, customerToInsert.ID) if err != nil { return fmt.Errorf("select single: %w", err) } if customer.CognitoUserID != newCognitoUserID { return fmt.Errorf("expected cognito user id to be updated but it wasn't ('%s' vs '%s')", newCognitoUserID, customer.CognitoUserID) } if customer.Email == "" { return fmt.Errorf("expected email field not be removed after update") } if customer.Name == "" { return fmt.Errorf("expected name field not be removed after update") } // Test Upsert by modifying the email. customerToUpsert := Customer{ CognitoUserID: customer.CognitoUserID, Email: "kataras2023@hotmail.com", Name: "kataras2023", } // Manually passing a column as the conflict column: // err = customers.UpsertSingle(context.Background(), "email", customerToUpsert, &customerToUpsert.ID) // // Automatically find the conflict column or expression by setting it to empty value: // err = customers.UpsertSingle(context.Background(), "", customerToUpsert, &customerToUpsert.ID) // Manually passing a unique index name, pg will resolve the conflict columns: err = customers.UpsertSingle(context.Background(), "customer_unique_idx", customerToUpsert, &customerToUpsert.ID) if err != nil { return fmt.Errorf("upsert single: %w", err) } if customerToUpsert.ID == "" { return fmt.Errorf("expected customer id to be filled after upsert") } // Delete the customer from the database by its struct value. deleted, err := customers.Delete(context.Background(), customer) if err != nil { return fmt.Errorf("delete: %w", err) } else if deleted == 0 { return fmt.Errorf("delete: was not removed") } exists, err := customers.Exists(context.Background(), customer.CognitoUserID) if err != nil { return fmt.Errorf("exists: %w", err) } if exists { return fmt.Errorf("exists: customer should not exist") } // Do something else with customers. return nil }) if err != nil { fmt.Println(fmt.Errorf("in transaction: %w", err)) return } // Insert a blog. blogs := registry.Blogs() newBlog := Blog{ Name: "test_blog_1", } err = blogs.InsertSingle(context.Background(), newBlog, &newBlog.ID) if err != nil { fmt.Println(fmt.Errorf("insert single: blog: %w", err)) return } // Insert a blog post to the blog. blogPosts := registry.BlogPosts() newBlogPost := BlogPost{ BlogID: newBlog.ID, Title: "test_blog_post_1", PhotoURL: "https://test.com/test_blog_post_1.png", SourceURL: "https://test.com/test_blog_post_1.html", ReadTimeMinutes: 5, Category: 1, SearchTerms: []string{ "test_search_blog_post_1", "test_search_blog_post_2", }, ReadDurations: []time.Duration{ 5 * time.Minute, 10 * time.Minute, }, Feature: Feature{ IsFeatured: true, }, OtherFeatures: Features{ Feature{ IsFeatured: true, }, Feature{ IsFeatured: false, }, }, Tags: []Tag{ {"test_tag_1", "test_tag_value_1"}, {"test_tag_2", 42}, }, } err = blogPosts.InsertSingle(context.Background(), newBlogPost, &newBlogPost.ID) if err != nil { fmt.Println(fmt.Errorf("insert single: blog post: %w", err)) return } query := `SELECT * FROM blog_posts WHERE id = $1 LIMIT 1;` existingBlogPost, err := blogPosts.SelectSingle(context.Background(), query, newBlogPost.ID) if err != nil { fmt.Println(fmt.Errorf("select single: blog post: %s: %w", newBlogPost.ID, err)) return } // Test select single jsonb column of a custom type of array of custom types. // var otherFeatures Features err = blogPosts.QueryRow( context.Background(), `SELECT other_features FROM blog_posts WHERE id = $1 LIMIT 1;`, newBlogPost.ID, ).Scan(&otherFeatures) // OR // otherFeatures, err := QuerySingle[Features]( // context.Background(), // db, // `SELECT other_features FROM blog_posts WHERE id = $1 LIMIT 1;`, // newBlogPost.ID, // ) if err != nil { fmt.Println(fmt.Errorf("select single jsonb column of custom array type of custom type: blog post: %s: %w", newBlogPost.ID, err)) return } if expected, got := len(otherFeatures), len(existingBlogPost.OtherFeatures); expected != got { fmt.Printf("expected %d other_features but got %d", expected, got) return } if !reflect.DeepEqual(otherFeatures, existingBlogPost.OtherFeatures) { fmt.Printf("expected other_features to be equal but got %#+v and %#+v", otherFeatures, existingBlogPost.OtherFeatures) return }
Output:
Example (Notify_JSON) ΒΆ
schema := NewSchema() db, err := Open(context.Background(), schema, getTestConnString()) if err != nil { fmt.Println(err) return } // defer db.Close() const channel = "chat_json" conn, err := db.Listen(context.Background(), channel) if err != nil { fmt.Println(fmt.Errorf("listen: %w", err)) } go func() { // To just terminate this listener's connection and unlisten from the channel: defer conn.Close(context.Background()) for { notification, err := conn.Accept(context.Background()) if err != nil { fmt.Println(fmt.Errorf("accept: %w\n", err)) return } payload, err := UnmarshalNotification[Message](notification) if err != nil { fmt.Println(fmt.Errorf("N: %w", err)) return } fmt.Printf("channel: %s, payload.sender: %s, payload.body: %s\n", notification.Channel, payload.Sender, payload.Body) } }() firstMessage := Message{ Sender: "kataras", Body: "hello", } if err = db.Notify(context.Background(), channel, firstMessage); err != nil { fmt.Println(fmt.Errorf("notify: first message: %w", err)) return } secondMessage := Message{ Sender: "kataras", Body: "world", } if err = db.Notify(context.Background(), channel, secondMessage); err != nil { fmt.Println(fmt.Errorf("notify: second message: %w", err)) return } time.Sleep(5 * time.Second) // give it sometime to receive the notifications, this is too much though.
Output: channel: chat_json, payload.sender: kataras, payload.body: hello channel: chat_json, payload.sender: kataras, payload.body: world
Index ΒΆ
- Constants
- Variables
- func IsErrColumnNotExists(err error, col string) bool
- func IsErrDuplicate(err error) (string, bool)
- func IsErrForeignKey(err error) (string, bool)
- func IsErrInputSyntax(err error) (string, bool)
- func QuerySingle[T any](ctx context.Context, db *DB, query string, args ...any) (entry T, err error)
- func QuerySlice[T any](ctx context.Context, db *DB, query string, args ...any) ([]T, error)
- func QueryTwoSlices[T, V any](ctx context.Context, db *DB, query string, args ...any) ([]T, []V, error)
- func SetDefaultColumnNameMapper(fn func(field reflect.StructField) string)
- func SetDefaultSearchPath(searchPath string)
- func SetDefaultTag(tag string)
- func UnmarshalNotification[T any](n *Notification) (T, error)
- type Closer
- type Column
- type ColumnFilter
- type ConnectionOption
- type DB
- func (db *DB) Begin(ctx context.Context) (*DB, error)
- func (db *DB) CheckSchema(ctx context.Context) error
- func (db *DB) Close()
- func (db *DB) Commit(ctx context.Context) error
- func (db *DB) CreateSchema(ctx context.Context) error
- func (db *DB) CreateSchemaDumpSQL(ctx context.Context) (string, error)
- func (db *DB) Delete(ctx context.Context, values ...any) (int64, error)
- func (db *DB) DeleteSchema(ctx context.Context) error
- func (db *DB) DisableAutoVacuum(ctx context.Context) error
- func (db *DB) DisableTableAutoVacuum(ctx context.Context, tableName string) error
- func (db *DB) Duplicate(ctx context.Context, value any, idPtr any) error
- func (db *DB) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)
- func (db *DB) ExecFiles(ctx context.Context, fileReader interface{ ... }, filenames ...string) error
- func (db *DB) Exists(ctx context.Context, value any) (bool, error)
- func (db *DB) GetSize(ctx context.Context) (t SizeInfo, err error)
- func (db *DB) GetVersion(ctx context.Context) (string, error)
- func (db *DB) InTransaction(ctx context.Context, fn func(*DB) error) error
- func (db *DB) Insert(ctx context.Context, values ...any) error
- func (db *DB) InsertSingle(ctx context.Context, value any, idPtr any) error
- func (db *DB) IsAutoVacuumEnabled(ctx context.Context) (enabled bool, err error)
- func (db *DB) IsTransaction() bool
- func (db *DB) ListColumns(ctx context.Context, tableNames ...string) ([]*desc.Column, error)
- func (db *DB) ListColumnsInformationSchema(ctx context.Context, tableNames ...string) ([]*desc.ColumnBasicInfo, error)
- func (db *DB) ListConstraints(ctx context.Context, tableNames ...string) ([]*desc.Constraint, error)
- func (db *DB) ListTableSizes(ctx context.Context) ([]TableSizeInfo, error)
- func (db *DB) ListTables(ctx context.Context, opts ListTablesOptions) ([]*desc.Table, error)
- func (db *DB) ListTriggers(ctx context.Context) ([]*desc.Trigger, error)
- func (db *DB) ListUniqueIndexes(ctx context.Context, tableNames ...string) ([]*desc.UniqueIndex, error)
- func (db *DB) Listen(ctx context.Context, channel string) (*Listener, error)
- func (db *DB) ListenTable(ctx context.Context, opts *ListenTableOptions, ...) (Closer, error)
- func (db *DB) Notify(ctx context.Context, channel string, payload any) error
- func (db *DB) PoolStat() PoolStat
- func (db *DB) PrepareListenTable(ctx context.Context, opts *ListenTableOptions) error
- func (db *DB) Query(ctx context.Context, query string, args ...any) (Rows, error)
- func (db *DB) QueryBoolean(ctx context.Context, query string, args ...any) (ok bool, err error)
- func (db *DB) QueryRow(ctx context.Context, query string, args ...any) Row
- func (db *DB) Rollback(ctx context.Context) error
- func (db *DB) Schema() *Schema
- func (db *DB) SearchPath() string
- func (db *DB) Select(ctx context.Context, scannerFunc func(Rows) error, query string, args ...any) error
- func (db *DB) SelectByID(ctx context.Context, destPtr any, id any) error
- func (db *DB) SelectByUsernameAndPassword(ctx context.Context, destPtr any, username, plainPassword string) error
- func (db *DB) Unlisten(ctx context.Context, channel string) error
- func (db *DB) Update(ctx context.Context, values ...any) (int64, error)
- func (db *DB) UpdateExceptColumns(ctx context.Context, columnsToExcept []string, values ...any) (int64, error)
- func (db *DB) UpdateJSONB(ctx context.Context, tableName, columnName, rowID string, ...) (int64, error)
- func (db *DB) UpdateOnlyColumns(ctx context.Context, columnsToUpdate []string, values ...any) (int64, error)
- func (db *DB) Upsert(ctx context.Context, forceOnConflictExpr string, values ...any) error
- func (db *DB) UpsertSingle(ctx context.Context, value any, idPtr any, forceOnConflictExpr string) error
- type DataType
- type ListTablesOptions
- type ListenTableOptions
- type Listener
- type MapTypeFilter
- type Notification
- type PoolStat
- type Repository
- func (repo *Repository[T]) DB() *DB
- func (repo *Repository[T]) Delete(ctx context.Context, values ...T) (int64, error)
- func (repo *Repository[T]) DeleteByID(ctx context.Context, id any) (bool, error)
- func (repo *Repository[T]) Duplicate(ctx context.Context, id any, newIDPtr any) error
- func (repo *Repository[T]) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)
- func (repo *Repository[T]) Exists(ctx context.Context, value T) (bool, error)
- func (repo *Repository[T]) InTransaction(ctx context.Context, fn func(*Repository[T]) error) error
- func (repo *Repository[T]) Insert(ctx context.Context, values ...T) error
- func (repo *Repository[T]) InsertSingle(ctx context.Context, value T, idPtr any) error
- func (repo *Repository[T]) IsReadOnly() bool
- func (repo *Repository[T]) IsTransaction() bool
- func (repo *Repository[T]) ListenTable(ctx context.Context, callback func(TableNotification[T], error) error) (Closer, error)
- func (repo *Repository[T]) Query(ctx context.Context, query string, args ...any) (Rows, error)
- func (repo *Repository[T]) QueryBoolean(ctx context.Context, query string, args ...any) (bool, error)
- func (repo *Repository[T]) QueryRow(ctx context.Context, query string, args ...any) Row
- func (repo *Repository[T]) Select(ctx context.Context, query string, args ...any) ([]T, error)
- func (repo *Repository[T]) SelectByID(ctx context.Context, id any) (T, error)
- func (repo *Repository[T]) SelectByUsernameAndPassword(ctx context.Context, username, plainPassword string) (T, error)
- func (repo *Repository[T]) SelectSingle(ctx context.Context, query string, args ...any) (T, error)
- func (repo *Repository[T]) Table() *desc.Table
- func (repo *Repository[T]) Update(ctx context.Context, values ...T) (int64, error)
- func (repo *Repository[T]) UpdateExceptColumns(ctx context.Context, columnsToExcept []string, values ...T) (int64, error)
- func (repo *Repository[T]) UpdateOnlyColumns(ctx context.Context, columnsToUpdate []string, values ...T) (int64, error)
- func (repo *Repository[T]) Upsert(ctx context.Context, forceOnConflictExpr string, values ...T) error
- func (repo *Repository[T]) UpsertSingle(ctx context.Context, forceOnConflictExpr string, value T, idPtr any) error
- type Row
- type Rows
- type Schema
- func (s *Schema) Get(typ reflect.Type) (*desc.Table, error)
- func (s *Schema) GetByTableName(tableName string) (*desc.Table, error)
- func (s *Schema) HandlePassword(handler desc.PasswordHandler) *Schema
- func (s *Schema) HasColumnType(dataTypes ...desc.DataType) bool
- func (s *Schema) HasPassword() bool
- func (s *Schema) Last() *desc.Table
- func (s *Schema) MustRegister(tableName string, emptyStructValue any, opts ...TableFilterFunc) *Schema
- func (s *Schema) Register(tableName string, emptyStructValue any, opts ...TableFilterFunc) (*desc.Table, error)
- func (s *Schema) TableNames(types ...desc.TableType) []string
- func (s *Schema) Tables(types ...desc.TableType) []*desc.Table
- type SizeInfo
- type Table
- type TableChangeType
- type TableFilterFunc
- type TableNotification
- type TableNotificationJSON
- type TableSizeInfo
Examples ΒΆ
Constants ΒΆ
const DoNothing = "DO NOTHING"
DoNothing is a constant that can be used as the forceOnConflictExpr argument of Upsert/UpsertSingle to do nothing on conflict.
Variables ΒΆ
var ( // NoColumnNameMapper is a column name conversion function. // It converts the column name to the same as its struct field name. NoColumnNameMapper = func(field reflect.StructField) string { return field.Name } // JSONColumnNameMapper is a column name conversion function. // It converts the column name to the same as its json tag name // and fallbacks to field name (if json tag is missing or "-"). JSONColumnNameMapper = func(field reflect.StructField) string { jsonTag := field.Tag.Get("json") if jsonTag == "-" { return field.Name } return strings.SplitN(jsonTag, ",", 2)[0] } )
var ErrEmptyPayload = fmt.Errorf("empty payload")
ErrEmptyPayload is returned when the notification payload is empty.
var ErrIntentionalRollback = errors.New("skip error: intentional rollback")
ErrIntentionalRollback is an error that can be returned by a transaction function to rollback the transaction.
var ErrIsReadOnly = errors.New("repository is read-only")
ErrIsReadOnly is returned by Insert and InsertSingle if the repository is read-only.
var ( // ErrNoRows is fired from a query when no results are came back. // Usually it's ignored and an empty json array is sent to the client instead. ErrNoRows = pgx.ErrNoRows )
var Presenter = func(td *desc.Table) bool { td.Type = desc.TableTypePresenter return true }
Presenter is a TableFilterFunc that sets the table type to "presenter" and returns true. A presenter is a table that is used to present data from one or more tables with custom select queries. It's not a base table neither a view. Example:
schema.MustRegister("customer_presenter", CustomerPresenter{}, pg.Presenter)
var View = func(td *desc.Table) bool { td.Type = desc.TableTypeView return true }
View is a TableFilterFunc that sets the table type to "view" and returns true.
Example:
schema.MustRegister("customer_master", FullCustomer{}, pg.View)
var WithLogger = func(logger tracelog.Logger) ConnectionOption { return func(poolConfig *pgxpool.Config) error { tracer := &tracelog.TraceLog{ Logger: logger, LogLevel: tracelog.LogLevelTrace, } poolConfig.ConnConfig.Tracer = tracer return nil } }
WithLogger is a ConnectionOption. It sets the logger for the connection pool.
Functions ΒΆ
func IsErrColumnNotExists ΒΆ
IsErrColumnNotExists reports whether the error is caused because the "col" defined in a select query was not exists in a row. There is no a typed error available in the driver itself.
func IsErrDuplicate ΒΆ
IsErrDuplicate reports whether the return error from `Insert` method was caused because of a violation of a unique constraint (it's not typed error at the underline driver). It returns the constraint key if it's true.
func IsErrForeignKey ΒΆ
IsErrForeignKey reports whether an insert or update command failed due to an invalid foreign key: a foreign key is missing or its source was not found. E.g. ERROR: insert or update on table "food_user_friendly_units" violates foreign key constraint "fk_food" (SQLSTATE 23503)
func IsErrInputSyntax ΒΆ
IsErrInputSyntax reports whether the return error from `Insert` method was caused because of invalid input syntax for a specific postgres column type.
func QuerySingle ΒΆ
func QuerySingle[T any](ctx context.Context, db *DB, query string, args ...any) (entry T, err error)
QuerySingle executes the given query and returns a single T entry.
Example:
names, err := QuerySingle[MyType](ctx, db, "SELECT a_json_field FROM users;")
func QuerySlice ΒΆ
QuerySlice executes the given query and returns a list of T entries. Note that the rows scanner will directly scan an element of T, meaning that the type of T should be a database scannabled type (e.g. string, int, time.Time, etc.).
The ErrNoRows is discarded, an empty list and a nil error will be returned instead. If a string column is empty then it's skipped from the returning list. Example:
names, err := QuerySlice[string](ctx, db, "SELECT name FROM users;")
func QueryTwoSlices ΒΆ added in v1.0.6
func QueryTwoSlices[T, V any](ctx context.Context, db *DB, query string, args ...any) ([]T, []V, error)
QueryTwoSlices executes the given query and returns two lists of T and V entries. Same behavior as QuerySlice but with two lists.
func SetDefaultColumnNameMapper ΒΆ added in v1.0.3
func SetDefaultColumnNameMapper(fn func(field reflect.StructField) string)
SetDefaultColumnNameMapper sets the default column name conversion function. This is used when the "name" pg tag option is missing for one or more struct fields. Set to nil function to use the default column name conversion function.
func SetDefaultSearchPath ΒΆ
func SetDefaultSearchPath(searchPath string)
SetDefaultSearchPath sets the default search path for the database.
func SetDefaultTag ΒΆ
func SetDefaultTag(tag string)
SetDefaultTag sets the default tag name for the struct fields.
func UnmarshalNotification ΒΆ
func UnmarshalNotification[T any](n *Notification) (T, error)
UnmarshalNotification returns the notification payload as a custom type of T.
Types ΒΆ
type Closer ΒΆ added in v1.0.7
Closer is the interface which is implemented by the Listener. It's used to close the underline connection.
type ColumnFilter ΒΆ
type ColumnFilter = desc.ColumnFilter
ColumnFilter is a type alias for desc.ColumnFilter.
type ConnectionOption ΒΆ
ConnectionOption is a function that takes a *pgxpool.Config and returns an error. It is used to set the connection options for the connection pool. It is used by the Open function.
See `WithLogger` package-level function too.
type DB ΒΆ
type DB struct { Pool *pgxpool.Pool ConnectionOptions *pgx.ConnConfig // contains filtered or unexported fields }
DB represents a database connection that can execute queries and transactions. It wraps a pgxpool.Pool and a pgx.ConnConfig to manage the connection options and the search path. It also holds a reference to a Schema that defines the database schema and migrations.
func Open ΒΆ
func Open(ctx context.Context, schema *Schema, connString string, opts ...ConnectionOption) (*DB, error)
Open creates a new DB instance by parsing the connection string and establishing a connection pool. It also sets the search path to the one specified in the connection string or to the default one if not specified. It takes a context and a schema as arguments and returns the DB instance or an error if any.
Example Code:
const ( host = "localhost" // The host name or IP address of the database server. port = 5432 // The port number of the database server. user = "postgres" // The user name to connect to the database with. password = "admin!123" // The password to connect to the database with. schema = "public" // The schema name to use in the database. dbname = "test_db" // The database name to connect to. sslMode = "disable" // The SSL mode to use for the connection. Can be disable, require, verify-ca or verify-full. ) connString := fmt.Sprintf("host=%s port=%d user=%s password=%s search_path=%s dbname=%s sslmode=%s pool_max_conns=%d pool_min_conns=%d pool_max_conn_lifetime=%s pool_max_conn_idle_time=%s pool_health_check_period=%s", ...) OR connString := "postgres://postgres:admin!123@localhost:5432/test_db?sslmode=disable&search_path=public" db, err := Open(context.Background(), schema, connString)
Example ΒΆ
package main import ( "context" "fmt" ) func main() { db, err := openTestConnection(true) if err != nil { handleExampleError(err) return } defer db.Close() // Work with the database... } func openTestConnection(resetSchema bool) (*DB, error) { // Database code. schema := NewSchema() schema.MustRegister("customers", Customer{}) // Register the Customer struct as a table named "customers". schema.MustRegister("blogs", Blog{}) // Register the Blog struct as a table named "blogs". schema.MustRegister("blog_posts", BlogPost{}) // Register the BlogPost struct as a table named "blog_posts". // Open a connection to the database using the schema and the connection string. db, err := Open(context.Background(), schema, getTestConnString()) if err != nil { return nil, err } // Let the caller close the database connection pool: defer db.Close() if resetSchema { // Let's clear the schema, so we can run the tests even if already ran once in the past. if err = db.DeleteSchema(context.Background()); err != nil { // DON'T DO THIS ON PRODUCTION. return nil, fmt.Errorf("delete schema: %w", err) } if err = db.CreateSchema(context.Background()); err != nil { // Create the schema in the database if it does not exist. return nil, fmt.Errorf("create schema: %w", err) } if err = db.CheckSchema(context.Background()); err != nil { // Check if the schema in the database matches the schema in the code. return nil, fmt.Errorf("check schema: %w", err) } } return db, nil } func openEmptyTestConnection() (*DB, error) { // without a schema. schema := NewSchema() // Open a connection to the database using the schema and the connection string. return Open(context.Background(), schema, getTestConnString()) } func createTestConnectionSchema() error { db, err := openTestConnection(true) if err != nil { return err } db.Close() return nil } // getTestConnString returns a connection string for connecting to a test database. // It uses constants to define the host, port, user, password, schema, dbname, and sslmode parameters. func getTestConnString() string { const ( host = "localhost" // The host name or IP address of the database server. port = 5432 // The port number of the database server. user = "postgres" // The user name to connect to the database with. password = "admin!123" // The password to connect to the database with. schema = "public" // The schema name to use in the database. dbname = "test_db" // The database name to connect to. sslMode = "disable" // The SSL mode to use for the connection. Can be disable, require, verify-ca or verify-full. ) connString := fmt.Sprintf("host=%s port=%d user=%s password=%s search_path=%s dbname=%s sslmode=%s", host, port, user, password, schema, dbname, sslMode) // Format the connection string using the parameters. return connString // Return the connection string. }
Output:
func OpenPool ΒΆ
OpenPool creates a new DB instance with the given context, schema and pool. It copies the connection config from the pool and sets the search path and schema fields of the DB instance. It returns a pointer to the DB instance.
Use the `Open` function to create a new DB instance of a connection string instead.
func (*DB) Begin ΒΆ
Begin starts a new database transaction and returns a new DB instance that operates within that transaction.
func (*DB) CheckSchema ΒΆ
CheckSchema checks if the database schema matches the expected table definitions by querying the information schema and comparing the results.
func (*DB) Close ΒΆ
func (db *DB) Close()
Close closes the database connection pool and its transactions.
func (*DB) Commit ΒΆ
Commit commits the current database transaction and returns any error that occurs.
func (*DB) CreateSchema ΒΆ
CreateSchema creates the database schema by executing a series of SQL commands in a transaction.
func (*DB) CreateSchemaDumpSQL ΒΆ
CreateSchemaDumpSQL dumps the SQL commands for creating the database schema.
func (*DB) Delete ΒΆ
Delete deletes one or more values from the database by building and executing an SQL query based on the values and the table definition.
func (*DB) DeleteSchema ΒΆ
DeleteSchema drops the database schema.
func (*DB) DisableAutoVacuum ΒΆ
DisableAutoVacuum disables autovacuum for the whole database.
func (*DB) DisableTableAutoVacuum ΒΆ
DisableTableAutoVacuum disables autovacuum for a specific table.
func (*DB) Duplicate ΒΆ added in v1.0.6
Duplicate duplicates a row in the database by building and executing an SQL query based on the value's primary key (uses SELECT for insert column values). The idPtr parameter can be used to get the primary key value of the inserted row. If idPtr is nil, the primary key value is not returned. If the value is nil, the method returns nil.
func (*DB) Exec ΒΆ
Exec executes SQL. The query can be either a prepared statement name or an SQL string. Arguments should be referenced positionally from the sql "query" string as $1, $2, etc.
func (*DB) ExecFiles ΒΆ
func (db *DB) ExecFiles(ctx context.Context, fileReader interface { ReadFile(name string) ([]byte, error) }, filenames ...string) error
ExecFiles executes the SQL statements in the given files.
Example:
//go:embed _embed var embedDir embed.FS [...] err := db.ExecFiles(context.Background(), embedDir, "_embed/triggers.sql", "_embed/functions.sql")
func (*DB) Exists ΒΆ
Exists returns true if a row exists in the table that matches the given value's non-zero fields or false otherwise.
func (*DB) GetVersion ΒΆ
GetVersion returns the version number of the PostgreSQL database as a string.
func (*DB) InTransaction ΒΆ
InTransaction runs a function within a database transaction and commits or rolls back depending on the error value returned by the function. Note that: After the first error in the transaction, the transaction is rolled back. After the first error in query execution, the transaction is aborted and no new commands should be sent through the same transaction.
func (*DB) Insert ΒΆ
Insert inserts one or more values into the database by calling db.InsertSingle for each value within a transaction.
func (*DB) InsertSingle ΒΆ
InsertSingle inserts a single value into the database by building and executing an SQL query based on the value and the table definition.
func (*DB) IsAutoVacuumEnabled ΒΆ
IsAutoVacuumEnabled returns true if autovacuum is enabled for the database.
Read more about autovacuum at: https://www.postgresql.org/docs/current/runtime-config-autovacuum.html.
func (*DB) IsTransaction ΒΆ
IsTransaction reports whether this database instance is in transaction.
func (*DB) ListColumns ΒΆ
ListColumns returns a list of columns definitions for the given table names.
Example ΒΆ
db, err := openTestConnection(true) if err != nil { handleExampleError(err) return } defer db.Close() columns, err := db.ListColumns(context.Background()) if err != nil { handleExampleError(err) return } expectedTags := []string{ `[blog_posts.id] pg:"name=id,type=uuid,primary,default=gen_random_uuid()"`, `[blog_posts.created_at] pg:"name=created_at,type=timestamp,default=clock_timestamp()"`, `[blog_posts.updated_at] pg:"name=updated_at,type=timestamp,default=clock_timestamp()"`, `[blog_posts.blog_id] pg:"name=blog_id,type=uuid,ref=blogs(id CASCADE deferrable),index=btree"`, `[blog_posts.title] pg:"name=title,type=varchar(255),unique_index=uk_blog_post"`, `[blog_posts.photo_url] pg:"name=photo_url,type=varchar(255)"`, `[blog_posts.source_url] pg:"name=source_url,type=varchar(255),unique_index=uk_blog_post"`, `[blog_posts.read_time_minutes] pg:"name=read_time_minutes,type=smallint,default=1,check=read_time_minutes > 0"`, `[blog_posts.category] pg:"name=category,type=smallint,default=0"`, `[blog_posts.search_terms] pg:"name=search_terms,type=varchar[]"`, `[blog_posts.read_durations] pg:"name=read_durations,type=bigint[]"`, `[blog_posts.feature] pg:"name=feature,type=jsonb"`, `[blog_posts.other_features] pg:"name=other_features,type=jsonb"`, `[blog_posts.tags] pg:"name=tags,type=jsonb"`, `[blogs.id] pg:"name=id,type=uuid,primary,default=gen_random_uuid()"`, `[blogs.created_at] pg:"name=created_at,type=timestamp,default=clock_timestamp()"`, `[blogs.updated_at] pg:"name=updated_at,type=timestamp,default=clock_timestamp()"`, `[blogs.name] pg:"name=name,type=varchar(255)"`, `[customers.id] pg:"name=id,type=uuid,primary,default=gen_random_uuid()"`, `[customers.created_at] pg:"name=created_at,type=timestamp,default=clock_timestamp()"`, `[customers.updated_at] pg:"name=updated_at,type=timestamp,default=clock_timestamp()"`, `[customers.cognito_user_id] pg:"name=cognito_user_id,type=uuid,unique_index=customer_unique_idx"`, `[customers.email] pg:"name=email,type=varchar(255),unique_index=customer_unique_idx"`, `[customers.name] pg:"name=name,type=varchar(255),index=btree"`, `[customers.username] pg:"name=username,type=varchar(255),default=''::character varying"`, // before columns convert from struct field should match this. } if len(columns) != len(expectedTags) { fmt.Printf("expected %d columns but got %d\n%s", len(expectedTags), len(columns), strings.Join(expectedTags, "\n")) fmt.Println("\n=========") for _, c := range columns { fmt.Println(c.Name) } return } for i, column := range columns { expected := expectedTags[i] got := fmt.Sprintf("[%s.%s] %s", column.TableName, column.Name, column.FieldTagString(true)) if expected != got { fmt.Printf("expected tag:\n%s\nbut got:\n%s\n", expected, got) } } fmt.Println("OK")
Output: OK
func (*DB) ListColumnsInformationSchema ΒΆ
func (db *DB) ListColumnsInformationSchema(ctx context.Context, tableNames ...string) ([]*desc.ColumnBasicInfo, error)
ListColumnsInformationSchema returns a list of basic columns information for the given table names or all.
Example ΒΆ
if err := createTestConnectionSchema(); err != nil { handleExampleError(err) return } db, err := openEmptyTestConnection() if err != nil { handleExampleError(err) return } defer db.Close() columns, err := db.ListColumnsInformationSchema(context.Background()) if err != nil { handleExampleError(err) return } for _, column := range columns { fmt.Printf("%#+v\n", column) }
Output: &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"id", OrdinalPosition:1, Description:"", Default:"gen_random_uuid()", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"created_at", OrdinalPosition:2, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"updated_at", OrdinalPosition:3, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"blog_id", OrdinalPosition:4, Description:"", Default:"", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"title", OrdinalPosition:5, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"photo_url", OrdinalPosition:6, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"source_url", OrdinalPosition:7, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"read_time_minutes", OrdinalPosition:8, Description:"", Default:"1", DataType:0x24, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"category", OrdinalPosition:9, Description:"", Default:"0", DataType:0x24, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"search_terms", OrdinalPosition:10, Description:"", Default:"", DataType:0xc, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"read_durations", OrdinalPosition:11, Description:"", Default:"", DataType:0x2, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"feature", OrdinalPosition:12, Description:"", Default:"", DataType:0x18, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"other_features", OrdinalPosition:13, Description:"", Default:"", DataType:0x18, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blog_posts", TableDescription:"", TableType:0x0, Name:"tags", OrdinalPosition:14, Description:"", Default:"", DataType:0x18, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"id", OrdinalPosition:1, Description:"", Default:"gen_random_uuid()", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"created_at", OrdinalPosition:2, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"updated_at", OrdinalPosition:3, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"blogs", TableDescription:"", TableType:0x0, Name:"name", OrdinalPosition:4, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"id", OrdinalPosition:1, Description:"", Default:"gen_random_uuid()", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"created_at", OrdinalPosition:2, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"updated_at", OrdinalPosition:3, Description:"", Default:"clock_timestamp()", DataType:0x2c, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"cognito_user_id", OrdinalPosition:4, Description:"", Default:"", DataType:0x31, DataTypeArgument:"", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"email", OrdinalPosition:5, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"name", OrdinalPosition:6, Description:"", Default:"", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false} &desc.ColumnBasicInfo{TableName:"customers", TableDescription:"", TableType:0x0, Name:"username", OrdinalPosition:7, Description:"", Default:"''::character varying", DataType:0xb, DataTypeArgument:"255", IsNullable:false, IsIdentity:false, IsGenerated:false}
func (*DB) ListConstraints ΒΆ
func (db *DB) ListConstraints(ctx context.Context, tableNames ...string) ([]*desc.Constraint, error)
ListConstraints returns a list of constraint definitions in the database schema by querying the pg_constraint table and.
Example ΒΆ
connString := getTestConnString() schema := NewSchema() db, err := Open(context.Background(), schema, connString) if err != nil { handleExampleError(err) return } defer db.Close() /* table_name column_name constraint_name constraint_type constraint_definition index_type blog_posts blog_posts_blog_id_fkey i CREATE INDEX blog_posts_blog_id_fkey ON public.blog_posts USING btree (blog_id) blog_posts blog_id blog_posts_blog_id_fkey f FOREIGN KEY (blog_id) REFERENCES blogs(id) ON DELETE CASCADE DEFERRABLE blog_posts id blog_posts_pkey p PRIMARY KEY (id) btree blog_posts read_time_minutes blog_posts_read_time_minutes_check c CHECK ((read_time_minutes > 0)) blog_posts source_url uk_blog_post u UNIQUE (title, source_url) btree blog_posts title uk_blog_post u UNIQUE (title, source_url) btree blogs id blogs_pkey p PRIMARY KEY (id) btree customers customers_name_idx i CREATE INDEX customers_name_idx ON public.customers USING btree (name) customers cognito_user_id customer_unique_idx u UNIQUE (cognito_user_id, email) btree customers email customer_unique_idx u UNIQUE (cognito_user_id, email) btree customers id customers_pkey p PRIMARY KEY (id) btree */id) btree */ var expectedConstraints = []*desc.Constraint{ { TableName: "blog_posts", ColumnName: "blog_id", ConstraintName: "blog_posts_blog_id_fkey", ConstraintType: desc.IndexConstraintType, IndexType: desc.Btree, }, { TableName: "blog_posts", ColumnName: "blog_id", ConstraintName: "blog_posts_blog_id_fkey", ConstraintType: desc.ForeignKeyConstraintType, ForeignKey: &desc.ForeignKeyConstraint{ ColumnName: "blog_id", ReferenceTableName: "blogs", ReferenceColumnName: "id", OnDelete: "CASCADE", Deferrable: true, }, }, { TableName: "blog_posts", ColumnName: "id", ConstraintName: "blog_posts_pkey", ConstraintType: desc.PrimaryKeyConstraintType, IndexType: desc.Btree, }, { TableName: "blog_posts", ColumnName: "read_time_minutes", ConstraintName: "blog_posts_read_time_minutes_check", ConstraintType: desc.CheckConstraintType, Check: &desc.CheckConstraint{ Expression: "read_time_minutes > 0", }, }, { TableName: "blog_posts", ColumnName: "source_url", ConstraintName: "uk_blog_post", ConstraintType: desc.UniqueConstraintType, IndexType: desc.Btree, Unique: &desc.UniqueConstraint{ Columns: []string{"title", "source_url"}, }, }, { TableName: "blog_posts", ColumnName: "title", ConstraintName: "uk_blog_post", ConstraintType: desc.UniqueConstraintType, IndexType: desc.Btree, Unique: &desc.UniqueConstraint{ Columns: []string{"title", "source_url"}, }, }, { TableName: "blogs", ColumnName: "id", ConstraintName: "blogs_pkey", ConstraintType: desc.PrimaryKeyConstraintType, IndexType: desc.Btree, }, { TableName: "customers", ColumnName: "name", ConstraintName: "customers_name_idx", ConstraintType: desc.IndexConstraintType, IndexType: desc.Btree, }, { TableName: "customers", ColumnName: "cognito_user_id", ConstraintName: "customer_unique_idx", ConstraintType: desc.UniqueConstraintType, IndexType: desc.Btree, Unique: &desc.UniqueConstraint{ Columns: []string{"cognito_user_id", "email"}, }, }, { TableName: "customers", ColumnName: "email", ConstraintName: "customer_unique_idx", ConstraintType: desc.UniqueConstraintType, IndexType: desc.Btree, Unique: &desc.UniqueConstraint{ Columns: []string{"cognito_user_id", "email"}, }, }, { TableName: "customers", ColumnName: "id", ConstraintName: "customers_pkey", ConstraintType: desc.PrimaryKeyConstraintType, IndexType: desc.Btree, }, } columns, err := db.ListConstraints(context.Background()) if err != nil { handleExampleError(err) return } for i, got := range columns { expected := expectedConstraints[i] if !reflect.DeepEqual(expected, got) { if expected.ForeignKey != nil && got.ForeignKey != nil { if !reflect.DeepEqual(expected.ForeignKey, got.ForeignKey) { fmt.Printf("expected foreign key:\n%#+v\nbut got:\n%#+v", expected.ForeignKey, got.ForeignKey) } continue } if expected.Unique != nil && got.Unique != nil { if !reflect.DeepEqual(expected.Unique, got.Unique) { fmt.Printf("expected unique:\n%#+v\nbut got:\n%#+v", expected.Unique, got.Unique) } continue } if expected.Check != nil && got.Check != nil { if !reflect.DeepEqual(expected.Check, got.Check) { fmt.Printf("expected check:\n%#+v\nbut got:\n%#+v", expected.Check, got.Check) } continue } fmt.Printf("expected:\n%#+v\nbut got:\n%#+v", expected, got) } } fmt.Println("OK")
Output: OK
func (*DB) ListTableSizes ΒΆ added in v1.0.7
func (db *DB) ListTableSizes(ctx context.Context) ([]TableSizeInfo, error)
ListTableSizes lists the disk size of tables (not only the registered ones) in the database.
func (*DB) ListTables ΒΆ
ListTables returns a list of converted table definitions from the remote database schema.
func (*DB) ListTriggers ΒΆ
ListTriggers returns a list of triggers in the database for a given set of tables The method takes a context and returns a slice of Trigger pointers, and an error if any.
func (*DB) ListUniqueIndexes ΒΆ added in v1.0.6
func (db *DB) ListUniqueIndexes(ctx context.Context, tableNames ...string) ([]*desc.UniqueIndex, error)
ListUniqueIndexes returns a list of unique indexes in the database schema by querying the pg_index table and filtering the results to only include unique indexes.
func (*DB) Listen ΒΆ
Listen listens for notifications on the given channel and returns a Listener instance.
Example Code:
conn, err := db.Listen(context.Background(), channel) if err != nil { fmt.Println(fmt.Errorf("listen: %w\n", err)) return } // To just terminate this listener's connection and unlisten from the channel: defer conn.Close(context.Background()) for { notification, err := conn.Accept(context.Background()) if err != nil { fmt.Println(fmt.Errorf("accept: %w\n", err)) return } fmt.Printf("channel: %s, payload: %s\n", notification.Channel, notification.Payload) }
Example ΒΆ
go test -coverprofile=cov go tool cover -html=cov
db, err := openEmptyTestConnection() if err != nil { handleExampleError(err) return } // defer db.Close() const channel = "chat_db" conn, err := db.Listen(context.Background(), channel) if err != nil { fmt.Println(fmt.Errorf("listen: %w\n", err)) return } go func() { // To just terminate this listener's connection and unlisten from the channel: defer conn.Close(context.Background()) for { notification, err := conn.Accept(context.Background()) if err != nil { fmt.Println(fmt.Errorf("accept: %w\n", err)) return } fmt.Printf("channel: %s, payload: %s\n", notification.Channel, notification.Payload) } }() if err = db.Notify(context.Background(), channel, "hello"); err != nil { fmt.Println(fmt.Errorf("notify: hello: %w", err)) return } if err = db.Notify(context.Background(), channel, "world"); err != nil { fmt.Println(fmt.Errorf("notify: world: %w", err)) return } time.Sleep(5 * time.Second) // give it sometime to receive the notifications.
Output: channel: chat_db, payload: hello channel: chat_db, payload: world
func (*DB) ListenTable ΒΆ added in v1.0.7
func (db *DB) ListenTable(ctx context.Context, opts *ListenTableOptions, callback func(TableNotificationJSON, error) error) (Closer, error)
ListenTable registers a function which notifies on the given "table" changes (INSERT, UPDATE, DELETE), the subscribed postgres channel is named 'table_change_notifications'.
The callback function can return any other error to stop the listener. The callback function can return nil to continue listening.
TableNotification's New and Old fields are raw json values, use the "json.Unmarshal" to decode them to the actual type.
Example ΒΆ
db, err := openTestConnection(true) if err != nil { handleExampleError(err) return } defer db.Close() opts := &ListenTableOptions{ Tables: map[string][]TableChangeType{"customers": defaultChangesToWatch}, } closer, err := db.ListenTable(context.Background(), opts, func(evt TableNotificationJSON, err error) error { if err != nil { fmt.Printf("received error: %v\n", err) return err } if evt.Change == "INSERT" { fmt.Printf("table: %s, event: %s, old: %s\n", evt.Table, evt.Change, string(evt.Old)) // new can't be predicated through its ID and timestamps. } else { fmt.Printf("table: %s, event: %s\n", evt.Table, evt.Change) } return nil }) if err != nil { fmt.Println(err) return } defer closer.Close(context.Background()) newCustomer := Customer{ CognitoUserID: "766064d4-a2a7-442d-aa75-33493bb4dbb9", Email: "kataras2024@hotmail.com", Name: "Makis", } err = db.InsertSingle(context.Background(), newCustomer, &newCustomer.ID) if err != nil { fmt.Println(err) return } newCustomer.Name = "Makis_UPDATED" _, err = db.UpdateOnlyColumns(context.Background(), []string{"name"}, newCustomer) if err != nil { fmt.Println(err) return } time.Sleep(8 * time.Second) // give it sometime to receive the notifications.
Output: table: customers, event: INSERT, old: null table: customers, event: UPDATE
func (*DB) Notify ΒΆ
Notify sends a notification using pg_notify to the database.
See the `Listen` package-level function too.
func (*DB) PoolStat ΒΆ added in v1.0.6
PoolStat returns a snapshot of the database pool statistics. The returned structure can be represented through JSON.
func (*DB) PrepareListenTable ΒΆ added in v1.0.7
func (db *DB) PrepareListenTable(ctx context.Context, opts *ListenTableOptions) error
PrepareListenTable prepares the table for listening for live table updates. See "db.ListenTable" method for more.
func (*DB) Query ΒΆ
Query executes the given "query" with args. If there is an error the returned Rows will be returned in an error state.
func (*DB) QueryBoolean ΒΆ added in v1.0.6
QueryBoolean executes a query that returns a single boolean value and returns it as a bool and an error.
func (*DB) QueryRow ΒΆ
QueryRow is a convenience wrapper over QueryRow. Any error that occurs while querying is deferred until calling Scan on the returned Row. That Row will error with ErrNoRows if no rows are returned.
func (*DB) Rollback ΒΆ
Rollback rolls back the current database transaction and returns any error that occurs.
func (*DB) Schema ΒΆ added in v1.0.5
Schema returns the Schema instance of the database. It should NOT be modified by the caller.
func (*DB) SearchPath ΒΆ
SearchPath returns the search path of the database.
func (*DB) Select ΒΆ
func (db *DB) Select(ctx context.Context, scannerFunc func(Rows) error, query string, args ...any) error
Select executes a query that returns rows and calls the scanner function on them. It takes a context, a query string, a scanner function and a variadic list of arguments as parameters. It returns an error if the query fails, the scanner function returns an error or the rows have an error.
func (*DB) SelectByID ΒΆ
SelectByID selects a row from a table by matching the primary key column with the given argument.
func (*DB) SelectByUsernameAndPassword ΒΆ
func (db *DB) SelectByUsernameAndPassword(ctx context.Context, destPtr any, username, plainPassword string) error
SelectByUsernameAndPassword selects a row from a table by matching the username and password columns with the given arguments and scans the row into the destPtr variable.
func (*DB) Unlisten ΒΆ
Unlisten removes the given channel from the list of channels that the database is listening on. Available channels: - Any custom one - * (for all)
func (*DB) Update ΒΆ
Update updates one or more values in the database by building and executing an SQL query based on the values and the table definition.
func (*DB) UpdateExceptColumns ΒΆ added in v1.0.3
func (db *DB) UpdateExceptColumns(ctx context.Context, columnsToExcept []string, values ...any) (int64, error)
UpdateExceptColumns updates one or more values in the database by building and executing an SQL query based on the values and the table definition.
The columnsToExcept parameter can be used to specify which columns should NOT be updated.
func (*DB) UpdateJSONB ΒΆ added in v1.0.8
func (db *DB) UpdateJSONB(ctx context.Context, tableName, columnName, rowID string, values map[string]any, fieldsToUpdate []string) (int64, error)
UpdateJSONB updates a JSONB column (full or partial) in the database by building and executing an SQL query based on the provided values and the given tableName and columnName. The values parameter is a map of key-value pairs where the key is the json field name and the value is its new value, new keys are accepted. Note that tableName and columnName are not escaped.
func (*DB) UpdateOnlyColumns ΒΆ
func (db *DB) UpdateOnlyColumns(ctx context.Context, columnsToUpdate []string, values ...any) (int64, error)
UpdateOnlyColumns updates one or more values in the database by building and executing an SQL query based on the values and the table definition.
The columnsToUpdate parameter can be used to specify which columns should be updated.
type ListTablesOptions ΒΆ
type ListTablesOptions struct { TableNames []string Filter desc.TableFilter // Filter allows to customize the StructName and its Column field types. }
ListTableOptions are the options for listing tables.
type ListenTableOptions ΒΆ added in v1.0.7
type ListenTableOptions struct { // Tables map of table name and changes to listen for. // // Key is the table to listen on for changes. // Value is changes is the list of table changes to listen for. // Defaults to {"*": ["INSERT", "UPDATE", "DELETE"] }. Tables map[string][]TableChangeType // Channel is the name of the postgres channel to listen on. // Default: "table_change_notifications". Channel string // Function is the name of the postgres function // which is used to notify on table changes, the // trigger name is <table_name>_<Function>. // Defaults to "table_change_notify". Function string }
ListenTableOptions is the options for the "DB.ListenTable" method.
type Listener ΒΆ
type Listener struct {
// contains filtered or unexported fields
}
Listener represents a postgres database LISTEN connection.
type MapTypeFilter ΒΆ
MapTypeFilter is a map of expressions inputs text to field type. It's a TableFilter.
Example on LsitTableOptions of the ListTables method:
Filter: pg.MapTypeFilter{ "customer_profiles.fields.jsonb": entity.Fields{}, },
func (MapTypeFilter) FilterTable ΒΆ
func (m MapTypeFilter) FilterTable(t *Table) bool
FilterTable implements the TableFilter interface.
type Notification ΒΆ
type Notification = pgconn.Notification
Notification is a type alias of pgconn.Notification type.
type PoolStat ΒΆ added in v1.0.6
type PoolStat struct { // AcquireCount is the cumulative count of successful acquires from the pool. AcquireCount int64 `json:"acquire_count"` // AcquireDuration is the total duration of all successful acquires from // the pool. AcquireDuration time.Duration `json:"acquire_duration"` // AcquiredConns is the number of currently acquired connections in the pool. AcquiredConns int32 `json:"acquired_conns"` // CanceledAcquireCount is the cumulative count of acquires from the pool // that were canceled by a context. CanceledAcquireCount int64 `json:"canceled_acquire_count"` // ConstructingConns is the number of conns with construction in progress in // the pool. ConstructingConns int32 `json:"constructing_conns"` // EmptyAcquireCount is the cumulative count of successful acquires from the pool // that waited for a resource to be released or constructed because the pool was // empty. EmptyAcquireCount int64 `json:"empty_acquire_count"` // IdleConns is the number of currently idle conns in the pool. IdleConns int32 `json:"idle_conns"` // MaxConns is the maximum size of the pool. MaxConns int32 `json:"max_conns"` // TotalConns is the total number of resources currently in the pool. // The value is the sum of ConstructingConns, AcquiredConns, and // IdleConns. TotalConns int32 `json:"total_conns"` }
PoolStat holds the database pool's statistics.
type Repository ΒΆ
type Repository[T any] struct { // contains filtered or unexported fields }
Repository is a generic type that represents a repository for a specific type T.
func NewRepository ΒΆ
func NewRepository[T any](db *DB) *Repository[T]
NewRepository creates and returns a new Repository instance for a given type T and a DB instance. It panics if the T was not registered to the schema.
Example ΒΆ
package main import ( "context" "fmt" ) // Repositories. // CustomerRepository is a struct that wraps a generic Repository instance with the Customer type parameter. // It provides methods for accessing and manipulating customer data in the database. type CustomerRepository struct { *Repository[Customer] } // NewCustomerRepository creates and returns a new CustomerRepository instance with the given DB instance. func NewCustomerRepository(db *DB) *CustomerRepository { return &CustomerRepository{ Repository: NewRepository[Customer](db), } } // InTransaction overrides the pg Repository's InTransaction method to include the custom type of CustomerRepository. // It takes a context and a function as arguments and executes the function within a database transaction, // passing it a CustomerRepository instance that uses the transactional DB instance. func (r *CustomerRepository) InTransaction(ctx context.Context, fn func(*CustomerRepository) error) (err error) { if r.DB().IsTransaction() { return fn(r) } return r.DB().InTransaction(ctx, func(db *DB) error { txRepository := NewCustomerRepository(db) return fn(txRepository) }) } // Exists is a custom method that uses the pg repository's Database instance to execute a query and return a result. // It takes a context and a cognitoUserID as arguments and checks if there is any customer row with that cognitoUserID in the database. func (r *CustomerRepository) Exists(ctx context.Context, cognitoUserID string) (exists bool, err error) { // query := `SELECT EXISTS(SELECT 1 FROM customers WHERE cognito_user_id = $1)` // err = r.QueryRow(ctx, query, cognitoUserID).Scan(&exists) // OR: exists, err = r.Repository.Exists(ctx, Customer{CognitoUserID: cognitoUserID}) return } // Registry is (optional) a struct that holds references to different repositories for accessing and manipulating data in the database. // It has a db field that is a pointer to a DB instance, and a customers field that is a pointer to a CustomerRepository instance. type Registry struct { db *DB customers *CustomerRepository blogs *Repository[Blog] blogPosts *Repository[BlogPost] } // NewRegistry creates and returns a new Registry instance with the given DB instance. // It also initializes the customers field with a new CustomerRepository instance that uses the same DB instance. func NewRegistry(db *DB) *Registry { return &Registry{ db: db, customers: NewCustomerRepository(db), blogs: NewRepository[Blog](db), blogPosts: NewRepository[BlogPost](db), } } // InTransaction overrides the pg Repository's InTransaction method to include the custom type of Registry. // It takes a context and a function as arguments and executes the function within a database transaction, // passing it a Registry instance that uses the transactional DB instance. func (r *Registry) InTransaction(ctx context.Context, fn func(*Registry) error) (err error) { if r.db.IsTransaction() { return fn(r) } return r.db.InTransaction(ctx, func(db *DB) error { txRegistry := NewRegistry(db) return fn(txRegistry) }) } // Customers returns the CustomerRepository instance of the Registry. func (r *Registry) Customers() *CustomerRepository { return r.customers } // Blogs returns the Repository instance of the Blog entity. func (r *Registry) Blogs() *Repository[Blog] { return r.blogs } // BlogPosts returns the Repository instance of the BlogPost entity. func (r *Registry) BlogPosts() *Repository[BlogPost] { return r.blogPosts } func main() { db, err := openTestConnection(true) if err != nil { handleExampleError(err) return } defer db.Close() registry := NewRegistry(db) // Create a new Registry instance with the DB instance. customers := registry.Customers() // Get the CustomerRepository instance from the Registry. // Repository example code. customerToInsert := Customer{ // Create a Customer struct to be inserted into the database. CognitoUserID: "373f90eb-00ac-410f-9fe0-1a7058d090ba", Email: "kataras2006@hotmail.com", Name: "kataras", } err = customers.InsertSingle(context.Background(), customerToInsert, &customerToInsert.ID) if err != nil { handleExampleError(err) return } fmt.Println(customerToInsert.ID) }
Output:
func (*Repository[T]) DB ΒΆ
func (repo *Repository[T]) DB() *DB
DB returns the DB instance associated with the Repository instance.
func (*Repository[T]) Delete ΒΆ
func (repo *Repository[T]) Delete(ctx context.Context, values ...T) (int64, error)
Delete deletes one or more values of type T from the database by their primary key values.
func (*Repository[T]) DeleteByID ΒΆ added in v1.0.4
DeleteByID deletes a single row from a table by matching the id column with the given argument and reports whether the entry was removed or not.
The difference between Delete and DeleteByID is that DeleteByID accepts just the id value instead of the whole entity structure value.
func (*Repository[T]) Duplicate ΒΆ added in v1.0.6
Duplicate duplicates a row from a table by matching the id column with the given argument. The idPtr parameter can be used to get the primary key value of the inserted row. If idPtr is nil, the primary key value is not returned. If the value is nil, the method returns nil.
func (*Repository[T]) Exec ΒΆ
func (repo *Repository[T]) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error)
Exec executes a query that does not return rows and returns a command tag and an error.
func (*Repository[T]) Exists ΒΆ
func (repo *Repository[T]) Exists(ctx context.Context, value T) (bool, error)
Exists returns true if a row exists in the table that matches the given value's non-zero fields or false otherwise.
func (*Repository[T]) InTransaction ΒΆ
func (repo *Repository[T]) InTransaction(ctx context.Context, fn func(*Repository[T]) error) error
InTransaction runs a function within a database transaction and commits or rolls back depending on the error value returned by the function.
func (*Repository[T]) Insert ΒΆ
func (repo *Repository[T]) Insert(ctx context.Context, values ...T) error
Insert inserts one or more values of type T into the database by calling repo.InsertSingle for each value within a transaction.
func (*Repository[T]) InsertSingle ΒΆ
func (repo *Repository[T]) InsertSingle(ctx context.Context, value T, idPtr any) error
InsertSingle inserts a single value of type T into the database by calling repo.db.InsertSingle with the value and the idPtr.
If it is not null then the value is updated by its primary key value.
func (*Repository[T]) IsReadOnly ΒΆ
func (repo *Repository[T]) IsReadOnly() bool
IsReadOnly returns true if the underline repository's table is read-only or false otherwise.
func (*Repository[T]) IsTransaction ΒΆ
func (repo *Repository[T]) IsTransaction() bool
IsTransaction returns true if the underline database is already in a transaction or false otherwise.
func (*Repository[T]) ListenTable ΒΆ added in v1.0.7
func (repo *Repository[T]) ListenTable(ctx context.Context, callback func(TableNotification[T], error) error) (Closer, error)
ListenTable registers a function which notifies on the current table's changes (INSERT, UPDATE, DELETE), the subscribed postgres channel is named 'table_change_notifications'. The callback function is called on a separate goroutine.
The callback function can return ErrStop to stop the listener without actual error. The callback function can return any other error to stop the listener and return the error. The callback function can return nil to continue listening.
Example ΒΆ
db, err := openTestConnection(true) if err != nil { handleExampleError(err) return } defer db.Close() customers := NewRepository[Customer](db) closer, err := customers.ListenTable(context.Background(), func(evt TableNotification[Customer], err error) error { if err != nil { fmt.Printf("received error: %v\n", err) return err } fmt.Printf("table: %s, event: %s, old name: %s new name: %s\n", evt.Table, evt.Change, evt.Old.Name, evt.New.Name) return nil }) if err != nil { fmt.Println(err) return } defer closer.Close(context.Background()) newCustomer := Customer{ CognitoUserID: "766064d4-a2a7-442d-aa75-33493bb4dbb9", Email: "kataras2024@hotmail.com", Name: "Makis", } err = customers.InsertSingle(context.Background(), newCustomer, &newCustomer.ID) if err != nil { fmt.Println(err) return } newCustomer.Name = "Makis_UPDATED" _, err = customers.UpdateOnlyColumns(context.Background(), []string{"name"}, newCustomer) if err != nil { fmt.Println(err) return } time.Sleep(8 * time.Second) // give it sometime to receive the notifications.
Output: table: customers, event: INSERT, old name: new name: Makis table: customers, event: UPDATE, old name: Makis new name: Makis_UPDATED
func (*Repository[T]) Query ΒΆ
Query executes a query that returns multiple rows and returns them as a Rows instance and an error.
func (*Repository[T]) QueryBoolean ΒΆ added in v1.0.6
func (repo *Repository[T]) QueryBoolean(ctx context.Context, query string, args ...any) (bool, error)
QueryBoolean executes a query that returns a single boolean value and returns it as a bool and an error.
func (*Repository[T]) QueryRow ΒΆ
QueryRow executes a query that returns at most one row and returns it as a Row instance.
func (*Repository[T]) Select ΒΆ
Select executes a SQL query and returns a slice of values of type T that match the query results.
func (*Repository[T]) SelectByID ΒΆ
func (repo *Repository[T]) SelectByID(ctx context.Context, id any) (T, error)
SelectByID selects a row from a table by matching the id column with the given argument and returns the row or ErrNoRows.
func (*Repository[T]) SelectByUsernameAndPassword ΒΆ
func (repo *Repository[T]) SelectByUsernameAndPassword(ctx context.Context, username, plainPassword string) (T, error)
SelectByUsernameAndPassword selects a row from a table by matching the username and password columns with the given arguments and returns the row or ErrNoRows.
func (*Repository[T]) SelectSingle ΒΆ
SelectSingle executes a SQL query and returns a single value of type T that matches the query result.
func (*Repository[T]) Table ΒΆ added in v1.0.5
func (repo *Repository[T]) Table() *desc.Table
Table returns the Table definition instance associated with the Repository instance. It should NOT be modified by the caller.
func (*Repository[T]) Update ΒΆ
func (repo *Repository[T]) Update(ctx context.Context, values ...T) (int64, error)
Update updates one or more values of type T in the database by their primary key values.
func (*Repository[T]) UpdateExceptColumns ΒΆ added in v1.0.3
func (repo *Repository[T]) UpdateExceptColumns(ctx context.Context, columnsToExcept []string, values ...T) (int64, error)
UpdateExceptColumns updates one or more values of type T in the database by their primary key values. The columnsToExcept parameter can be used to specify which columns should NOT be updated.
func (*Repository[T]) UpdateOnlyColumns ΒΆ
func (repo *Repository[T]) UpdateOnlyColumns(ctx context.Context, columnsToUpdate []string, values ...T) (int64, error)
UpdateOnlyColumns updates one or more values of type T in the database by their primary key values.
The columnsToUpdate parameter can be used to specify which columns should be updated.
func (*Repository[T]) Upsert ΒΆ
func (repo *Repository[T]) Upsert(ctx context.Context, forceOnConflictExpr string, values ...T) error
Upsert inserts or updates one or more values of type T into the database.
func (*Repository[T]) UpsertSingle ΒΆ
func (repo *Repository[T]) UpsertSingle(ctx context.Context, forceOnConflictExpr string, value T, idPtr any) error
UpsertSingle inserts or updates a single value of type T into the database.
If idPtr is not null then the value is updated by its primary key value.
type Schema ΒΆ
type Schema struct { // The name of the "updated_at" column. Defaults to "updated_at" but it can be modified, // this is useful to set when triggers should be registered automatically. // // If set to empty then triggers will not be registered automatically. UpdatedAtColumnName string // Set the name of the trigger that sets the "updated_at" column, defaults to "set_timestamp". // // If set to empty then triggers will not be registered automatically. SetTimestampTriggerName string // Strict reports whether the schema should be strict on the database side. // It's enabled by default. Strict bool // contains filtered or unexported fields }
Schema is a type that represents a schema for the database.
func NewSchema ΒΆ
func NewSchema() *Schema
NewSchema creates and returns a new Schema with an initialized struct cache.
Example ΒΆ
package main import ( "fmt" "time" ) // Structs. // BaseEntity is a struct that defines common fields for all entities in the database. // It has an ID field of type uuid that is the primary key, and two timestamp fields // for tracking the creation and update times of each row. type BaseEntity struct { ID string `pg:"type=uuid,primary"` CreatedAt time.Time `pg:"type=timestamp,default=clock_timestamp()"` UpdatedAt time.Time `pg:"type=timestamp,default=clock_timestamp()"` } // Customer is a struct that represents a customer entity in the database. // It embeds the BaseEntity struct and adds a CognitoUserID field of type uuid // that is required and unique. It also specifies a conflict resolution strategy // for the CognitoUserID field in case of duplicate values. type Customer struct { BaseEntity // CognitoUserID string `pg:"type=uuid,unique,conflict=DO UPDATE SET cognito_user_id=EXCLUDED.cognito_user_id"` CognitoUserID string `pg:"type=uuid,unique_index=customer_unique_idx"` Email string `pg:"type=varchar(255),unique_index=customer_unique_idx"` // ^ optional: unique to allow upsert by "email"-only column confliction instead of the unique_index. Name string `pg:"type=varchar(255),index=btree"` Username string `pg:"type=varchar(255),default=''"` } // Blog is a struct that represents a blog entity in the database. // It embeds the BaseEntity struct and has no other fields. type Blog struct { BaseEntity Name string `pg:"type=varchar(255)"` } // BlogPost is a struct that represents a blog post entity in the database. // It embeds the BaseEntity struct and adds several fields for the blog post details, // such as BlogID, Title, PhotoURL, SourceURL, ReadTimeMinutes, and Category. // The BlogID field is a foreign key that references the ID field of the blogs table, // with cascade option for deletion and deferrable option for constraint checking. // The Title and SourceURL fields are part of a unique index named uk_blog_post, // which ensures that no two blog posts have the same title or source URL. // The ReadTimeMinutes field is a smallint with a default value of 1 and a check constraint // that ensures it is positive. The Category field is a smallint with a default value of 0. type BlogPost struct { BaseEntity BlogID string `pg:"type=uuid,index,ref=blogs(id cascade deferrable)"` Title string `pg:"type=varchar(255),unique_index=uk_blog_post"` PhotoURL string `pg:"type=varchar(255)"` SourceURL string `pg:"type=varchar(255),unique_index=uk_blog_post"` ReadTimeMinutes int `pg:"type=smallint,default=1,check=read_time_minutes > 0"` Category int `pg:"type=smallint,default=0"` SearchTerms []string `pg:"type=varchar[]"` // Test a slice of strings. ReadDurations []time.Duration `pg:"type=bigint[]"` // Test a slice of time.Duration based on an int64. // Custom types. Feature Feature `pg:"type=jsonb"` // Test a JSON structure. OtherFeatures Features `pg:"type=jsonb"` // Test a JSON array of structures behind a custom type. Tags []Tag `pg:"type=jsonb"` // Test a JSON array of structures. } type Features []Feature type Feature struct { IsFeatured bool `json:"is_featured"` } type Tag struct { Name string `json:"name"` Value any `json:"value"` } func main() { // Database code. schema := NewSchema() schema.MustRegister("customers", Customer{}) // Register the Customer struct as a table named "customers". schema.MustRegister("blogs", Blog{}) // Register the Blog struct as a table named "blogs". schema.MustRegister("blog_posts", BlogPost{}) // Register the BlogPost struct as a table named "blog_posts". fmt.Println("OK") }
Output: OK
func (*Schema) Get ΒΆ
Get takes a reflect.Type that represents a struct type and returns a pointer to a Table that represents the table definition for the database or an error if the type is not registered in the schema.
func (*Schema) GetByTableName ΒΆ
GetByTableName takes a table name as a string and returns a pointer to a Table that represents the table definition for the database or an error if the table name is not registered in the schema.
func (*Schema) HandlePassword ΒΆ
func (s *Schema) HandlePassword(handler desc.PasswordHandler) *Schema
HandlePassword sets the password handler.
func (*Schema) HasColumnType ΒΆ
HasColumnType takes a DataType that represents a data type for the database and returns true if any of the tables in the schema has a column with that data type.
func (*Schema) HasPassword ΒΆ
HasPassword reports whether the tables in the schema have a column with the password feature enabled.
func (*Schema) MustRegister ΒΆ
func (s *Schema) MustRegister(tableName string, emptyStructValue any, opts ...TableFilterFunc) *Schema
MustRegister same as "Register" but it panics on errors and returns the Schema instance instead of the Table one.
func (*Schema) Register ΒΆ
func (s *Schema) Register(tableName string, emptyStructValue any, opts ...TableFilterFunc) (*desc.Table, error)
Register registers a database model (a struct value) mapped to a specific database table name. Returns the generated Table definition.
func (*Schema) TableNames ΒΆ
TableNames returns a slice of strings that represents all the table names in the schema.
type SizeInfo ΒΆ added in v1.0.7
type SizeInfo struct { SizePretty string `json:"size_pretty"` // The on-disk size in bytes of one fork of that relation. // A fork is a variant of the main data file that stores additional information, // such as the free space map, the visibility map, or the initialization fork. // By default, this is the size of the main data fork only. Size float64 `json:"size"` SizeTotalPretty string `json:"size_total_pretty"` // The total on-disk space used for that table, including all associated indexes. This is equivalent to pg_table_size + pg_indexes_size. SizeTotal float64 `json:"size_total"` }
SizeInfo is a struct which contains the size information (for individual table or the whole database).
type TableChangeType ΒΆ added in v1.0.7
type TableChangeType string
TableChangeType is the type of the table change. Available values: INSERT, UPDATE, DELETE.
const ( // TableChangeTypeInsert is the INSERT table change type. TableChangeTypeInsert TableChangeType = "INSERT" // TableChangeTypeUpdate is the UPDATE table change type. TableChangeTypeUpdate TableChangeType = "UPDATE" // TableChangeTypeDelete is the DELETE table change type. TableChangeTypeDelete TableChangeType = "DELETE" )
type TableFilterFunc ΒΆ
type TableFilterFunc = desc.TableFilterFunc
TableFilter is a type alias for desc.TableFilter.
type TableNotification ΒΆ added in v1.0.7
type TableNotification[T any] struct { Table string `json:"table"` Change TableChangeType `json:"change"` // INSERT, UPDATE, DELETE. New T `json:"new"` Old T `json:"old"` // contains filtered or unexported fields }
TableNotification is the notification message sent by the postgresql server when a table change occurs. The subscribed postgres channel is named 'table_change_notifications'. The "old" and "new" fields are the old and new values of the row. The "old" field is only available for UPDATE and DELETE table change types. The "new" field is only available for INSERT and UPDATE table change types. The "old" and "new" fields are raw json values, use the "json.Unmarshal" to decode them. See "DB.ListenTable" method.
func (TableNotification[T]) GetPayload ΒΆ added in v1.0.7
func (tn TableNotification[T]) GetPayload() string
GetPayload returns the raw payload of the notification.
type TableNotificationJSON ΒΆ added in v1.0.7
type TableNotificationJSON = TableNotification[json.RawMessage]
TableNotificationJSON is the generic version of the TableNotification.
type TableSizeInfo ΒΆ added in v1.0.7
TableSizeInfo is a struct which contains the table size information used as an output parameter of the `db.ListTableSizes` method.