pgconn: github.com/jackc/pgconn Index | Examples | Files | Directories

package pgconn

import "github.com/jackc/pgconn"

Package pgconn is a low-level PostgreSQL database driver.

pgconn provides lower level access to a PostgreSQL connection than a database/sql or pgx connection. It operates at nearly the same level is the C library libpq.

Establishing a Connection

Use Connect to establish a connection. It accepts a connection string in URL or DSN and will read the environment for libpq style environment variables.

Executing a Query

ExecParams and ExecPrepared execute a single query. They return readers that iterate over each row. The Read method reads all rows into memory.

Executing Multiple Queries in a Single Round Trip

Exec and ExecBatch can execute multiple queries in a single round trip. They return readers that iterate over each query result. The ReadAll method reads all query results into memory.

Context Support

All potentially blocking operations take a context.Context. If a context is canceled while the method is in progress the method immediately returns. In most circumstances, this will close the underlying connection.

The CancelRequest method may be used to request the PostgreSQL server cancel an in-progress query without forcing the client to abort.

Code:

package main

import (
    "bytes"
    "compress/gzip"
    "context"
    "crypto/tls"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "math"
    "net"
    "os"
    "strconv"
    "strings"
    "testing"
    "time"

    "github.com/jackc/pgmock"

    "github.com/jackc/pgconn"
    "github.com/jackc/pgproto3/v2"
    errors "golang.org/x/xerrors"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

func TestConnect(t *testing.T) {
    tests := []struct {
        name string
        env  string
    }{
        {"Unix socket", "PGX_TEST_UNIX_SOCKET_CONN_STRING"},
        {"TCP", "PGX_TEST_TCP_CONN_STRING"},
        {"Plain password", "PGX_TEST_PLAIN_PASSWORD_CONN_STRING"},
        {"MD5 password", "PGX_TEST_MD5_PASSWORD_CONN_STRING"},
        {"SCRAM password", "PGX_TEST_SCRAM_PASSWORD_CONN_STRING"},
    }

    for _, tt := range tests {
        tt := tt
        t.Run(tt.name, func(t *testing.T) {
            connString := os.Getenv(tt.env)
            if connString == "" {
                t.Skipf("Skipping due to missing environment variable %v", tt.env)
            }

            conn, err := pgconn.Connect(context.Background(), connString)
            require.NoError(t, err)

            closeConn(t, conn)
        })
    }
}

// TestConnectTLS is separate from other connect tests because it has an additional test to ensure it really is a secure
// connection.
func TestConnectTLS(t *testing.T) {
    t.Parallel()

    connString := os.Getenv("PGX_TEST_TLS_CONN_STRING")
    if connString == "" {
        t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_TLS_CONN_STRING")
    }

    conn, err := pgconn.Connect(context.Background(), connString)
    require.NoError(t, err)

    if _, ok := conn.Conn().(*tls.Conn); !ok {
        t.Error("not a TLS connection")
    }

    closeConn(t, conn)
}

type pgmockWaitStep time.Duration

func (s pgmockWaitStep) Step(*pgproto3.Backend) error {
    time.Sleep(time.Duration(s))
    return nil
}

func TestConnectTimeout(t *testing.T) {
    t.Parallel()
    tests := []struct {
        name    string
        connect func(connStr string) error
    }{
        {
            name: "via context that times out",
            connect: func(connStr string) error {
                ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
                defer cancel()
                _, err := pgconn.Connect(ctx, connStr)
                return err
            },
        },
        {
            name: "via config ConnectTimeout",
            connect: func(connStr string) error {
                conf, err := pgconn.ParseConfig(connStr)
                require.NoError(t, err)
                conf.ConnectTimeout = time.Microsecond * 50
                _, err = pgconn.ConnectConfig(context.Background(), conf)
                return err
            },
        },
    }
    for _, tt := range tests {
        tt := tt
        t.Run(tt.name, func(t *testing.T) {
            t.Parallel()
            script := &pgmock.Script{
                Steps: []pgmock.Step{
                    pgmock.ExpectAnyMessage(&pgproto3.StartupMessage{ProtocolVersion: pgproto3.ProtocolVersionNumber, Parameters: map[string]string{}}),
                    pgmock.SendMessage(&pgproto3.AuthenticationOk{}),
                    pgmockWaitStep(time.Millisecond * 500),
                    pgmock.SendMessage(&pgproto3.BackendKeyData{ProcessID: 0, SecretKey: 0}),
                    pgmock.SendMessage(&pgproto3.ReadyForQuery{TxStatus: 'I'}),
                },
            }

            ln, err := net.Listen("tcp", "127.0.0.1:")
            require.NoError(t, err)
            defer ln.Close()

            serverErrChan := make(chan error, 1)
            go func() {
                defer close(serverErrChan)

                conn, err := ln.Accept()
                if err != nil {
                    serverErrChan <- err
                    return
                }
                defer conn.Close()

                err = conn.SetDeadline(time.Now().Add(time.Millisecond * 450))
                if err != nil {
                    serverErrChan <- err
                    return
                }

                err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn))
                if err != nil {
                    serverErrChan <- err
                    return
                }
            }()

            parts := strings.Split(ln.Addr().String(), ":")
            host := parts[0]
            port := parts[1]
            connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port)
            tooLate := time.Now().Add(time.Millisecond * 500)

            err = tt.connect(connStr)
            require.True(t, pgconn.Timeout(err), err)
            require.True(t, time.Now().Before(tooLate))
        })
    }
}

func TestConnectInvalidUser(t *testing.T) {
    t.Parallel()

    connString := os.Getenv("PGX_TEST_TCP_CONN_STRING")
    if connString == "" {
        t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_TCP_CONN_STRING")
    }

    config, err := pgconn.ParseConfig(connString)
    require.NoError(t, err)

    config.User = "pgxinvalidusertest"

    _, err = pgconn.ConnectConfig(context.Background(), config)
    require.Error(t, err)
    pgErr, ok := errors.Unwrap(err).(*pgconn.PgError)
    if !ok {
        t.Fatalf("Expected to receive a wrapped PgError, instead received: %v", err)
    }
    if pgErr.Code != "28000" && pgErr.Code != "28P01" {
        t.Fatalf("Expected to receive a PgError with code 28000 or 28P01, instead received: %v", pgErr)
    }
}

func TestConnectWithConnectionRefused(t *testing.T) {
    t.Parallel()

    // Presumably nothing is listening on 127.0.0.1:1
    conn, err := pgconn.Connect(context.Background(), "host=127.0.0.1 port=1")
    if err == nil {
        conn.Close(context.Background())
        t.Fatal("Expected error establishing connection to bad port")
    }
}

func TestConnectCustomDialer(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    dialed := false
    config.DialFunc = func(ctx context.Context, network, address string) (net.Conn, error) {
        dialed = true
        return net.Dial(network, address)
    }

    conn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    require.True(t, dialed)
    closeConn(t, conn)
}

func TestConnectCustomLookup(t *testing.T) {
    t.Parallel()

    connString := os.Getenv("PGX_TEST_TCP_CONN_STRING")
    if connString == "" {
        t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_TCP_CONN_STRING")
    }

    config, err := pgconn.ParseConfig(connString)
    require.NoError(t, err)

    looked := false
    config.LookupFunc = func(ctx context.Context, host string) (addrs []string, err error) {
        looked = true
        return net.LookupHost(host)
    }

    conn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    require.True(t, looked)
    closeConn(t, conn)
}

func TestConnectWithRuntimeParams(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    config.RuntimeParams = map[string]string{
        "application_name": "pgxtest",
        "search_path":      "myschema",
    }

    conn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, conn)

    result := conn.ExecParams(context.Background(), "show application_name", nil, nil, nil, nil).Read()
    require.Nil(t, result.Err)
    assert.Equal(t, 1, len(result.Rows))
    assert.Equal(t, "pgxtest", string(result.Rows[0][0]))

    result = conn.ExecParams(context.Background(), "show search_path", nil, nil, nil, nil).Read()
    require.Nil(t, result.Err)
    assert.Equal(t, 1, len(result.Rows))
    assert.Equal(t, "myschema", string(result.Rows[0][0]))
}

func TestConnectWithFallback(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    // Prepend current primary config to fallbacks
    config.Fallbacks = append([]*pgconn.FallbackConfig{
        &pgconn.FallbackConfig{
            Host:      config.Host,
            Port:      config.Port,
            TLSConfig: config.TLSConfig,
        },
    }, config.Fallbacks...)

    // Make primary config bad
    config.Host = "localhost"
    config.Port = 1 // presumably nothing listening here

    // Prepend bad first fallback
    config.Fallbacks = append([]*pgconn.FallbackConfig{
        &pgconn.FallbackConfig{
            Host:      "localhost",
            Port:      1,
            TLSConfig: config.TLSConfig,
        },
    }, config.Fallbacks...)

    conn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    closeConn(t, conn)
}

func TestConnectWithValidateConnect(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    dialCount := 0
    config.DialFunc = func(ctx context.Context, network, address string) (net.Conn, error) {
        dialCount++
        return net.Dial(network, address)
    }

    acceptConnCount := 0
    config.ValidateConnect = func(ctx context.Context, conn *pgconn.PgConn) error {
        acceptConnCount++
        if acceptConnCount < 2 {
            return errors.New("reject first conn")
        }
        return nil
    }

    // Append current primary config to fallbacks
    config.Fallbacks = append(config.Fallbacks, &pgconn.FallbackConfig{
        Host:      config.Host,
        Port:      config.Port,
        TLSConfig: config.TLSConfig,
    })

    // Repeat fallbacks
    config.Fallbacks = append(config.Fallbacks, config.Fallbacks...)

    conn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    closeConn(t, conn)

    assert.True(t, dialCount > 1)
    assert.True(t, acceptConnCount > 1)
}

func TestConnectWithValidateConnectTargetSessionAttrsReadWrite(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    config.ValidateConnect = pgconn.ValidateConnectTargetSessionAttrsReadWrite
    config.RuntimeParams["default_transaction_read_only"] = "on"

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    conn, err := pgconn.ConnectConfig(ctx, config)
    if !assert.NotNil(t, err) {
        conn.Close(ctx)
    }
}

func TestConnectWithAfterConnect(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    config.AfterConnect = func(ctx context.Context, conn *pgconn.PgConn) error {
        _, err := conn.Exec(ctx, "set search_path to foobar;").ReadAll()
        return err
    }

    conn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)

    results, err := conn.Exec(context.Background(), "show search_path;").ReadAll()
    require.NoError(t, err)
    defer closeConn(t, conn)

    assert.Equal(t, []byte("foobar"), results[0].Rows[0][0])
}

func TestConnectConfigRequiresConfigFromParseConfig(t *testing.T) {
    t.Parallel()

    config := &pgconn.Config{}

    require.PanicsWithValue(t, "config must be created by ParseConfig", func() { pgconn.ConnectConfig(context.Background(), config) })
}

func TestConnPrepareSyntaxError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    psd, err := pgConn.Prepare(context.Background(), "ps1", "SYNTAX ERROR", nil)
    require.Nil(t, psd)
    require.NotNil(t, err)

    ensureConnValid(t, pgConn)
}

func TestConnPrepareContextPrecanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    psd, err := pgConn.Prepare(ctx, "ps1", "select 1", nil)
    assert.Nil(t, psd)
    assert.Error(t, err)
    assert.True(t, errors.Is(err, context.Canceled))
    assert.True(t, pgconn.SafeToRetry(err))

    ensureConnValid(t, pgConn)
}

func TestConnExec(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    results, err := pgConn.Exec(context.Background(), "select 'Hello, world'").ReadAll()
    assert.NoError(t, err)

    assert.Len(t, results, 1)
    assert.Nil(t, results[0].Err)
    assert.Equal(t, "SELECT 1", string(results[0].CommandTag))
    assert.Len(t, results[0].Rows, 1)
    assert.Equal(t, "Hello, world", string(results[0].Rows[0][0]))

    ensureConnValid(t, pgConn)
}

func TestConnExecEmpty(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    multiResult := pgConn.Exec(context.Background(), ";")

    resultCount := 0
    for multiResult.NextResult() {
        resultCount++
        multiResult.ResultReader().Close()
    }
    assert.Equal(t, 0, resultCount)
    err = multiResult.Close()
    assert.NoError(t, err)

    ensureConnValid(t, pgConn)
}

func TestConnExecMultipleQueries(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    results, err := pgConn.Exec(context.Background(), "select 'Hello, world'; select 1").ReadAll()
    assert.NoError(t, err)

    assert.Len(t, results, 2)

    assert.Nil(t, results[0].Err)
    assert.Equal(t, "SELECT 1", string(results[0].CommandTag))
    assert.Len(t, results[0].Rows, 1)
    assert.Equal(t, "Hello, world", string(results[0].Rows[0][0]))

    assert.Nil(t, results[1].Err)
    assert.Equal(t, "SELECT 1", string(results[1].CommandTag))
    assert.Len(t, results[1].Rows, 1)
    assert.Equal(t, "1", string(results[1].Rows[0][0]))

    ensureConnValid(t, pgConn)
}

func TestConnExecMultipleQueriesEagerFieldDescriptions(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    mrr := pgConn.Exec(context.Background(), "select 'Hello, world' as msg; select 1 as num")

    require.True(t, mrr.NextResult())
    require.Len(t, mrr.ResultReader().FieldDescriptions(), 1)
    assert.Equal(t, []byte("msg"), mrr.ResultReader().FieldDescriptions()[0].Name)
    _, err = mrr.ResultReader().Close()
    require.NoError(t, err)

    require.True(t, mrr.NextResult())
    require.Len(t, mrr.ResultReader().FieldDescriptions(), 1)
    assert.Equal(t, []byte("num"), mrr.ResultReader().FieldDescriptions()[0].Name)
    _, err = mrr.ResultReader().Close()
    require.NoError(t, err)

    require.False(t, mrr.NextResult())

    require.NoError(t, mrr.Close())

    ensureConnValid(t, pgConn)
}

func TestConnExecMultipleQueriesError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    results, err := pgConn.Exec(context.Background(), "select 1; select 1/0; select 1").ReadAll()
    require.NotNil(t, err)
    if pgErr, ok := err.(*pgconn.PgError); ok {
        assert.Equal(t, "22012", pgErr.Code)
    } else {
        t.Errorf("unexpected error: %v", err)
    }

    assert.Len(t, results, 1)
    assert.Len(t, results[0].Rows, 1)
    assert.Equal(t, "1", string(results[0].Rows[0][0]))

    ensureConnValid(t, pgConn)
}

func TestConnExecDeferredError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    setupSQL := `create temporary table t (
		id text primary key,
		n int not null,
		unique (n) deferrable initially deferred
	);

	insert into t (id, n) values ('a', 1), ('b', 2), ('c', 3);`

    _, err = pgConn.Exec(context.Background(), setupSQL).ReadAll()
    assert.NoError(t, err)

    _, err = pgConn.Exec(context.Background(), `update t set n=n+1 where id='b' returning *`).ReadAll()
    require.NotNil(t, err)

    var pgErr *pgconn.PgError
    require.True(t, errors.As(err, &pgErr))
    require.Equal(t, "23505", pgErr.Code)

    ensureConnValid(t, pgConn)
}

func TestConnExecContextCanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()
    multiResult := pgConn.Exec(ctx, "select 'Hello, world', pg_sleep(1)")

    for multiResult.NextResult() {
    }
    err = multiResult.Close()
    assert.True(t, pgconn.Timeout(err))
    assert.True(t, pgConn.IsClosed())
    select {
    case <-pgConn.CleanupDone():
    case <-time.After(5 * time.Second):
        t.Fatal("Connection cleanup exceeded maximum time")
    }
}

func TestConnExecContextPrecanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    _, err = pgConn.Exec(ctx, "select 'Hello, world'").ReadAll()
    assert.Error(t, err)
    assert.True(t, errors.Is(err, context.Canceled))
    assert.True(t, pgconn.SafeToRetry(err))

    ensureConnValid(t, pgConn)
}

func TestConnExecParams(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil)
    require.Len(t, result.FieldDescriptions(), 1)
    assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name)

    rowCount := 0
    for result.NextRow() {
        rowCount += 1
        assert.Equal(t, "Hello, world", string(result.Values()[0]))
    }
    assert.Equal(t, 1, rowCount)
    commandTag, err := result.Close()
    assert.Equal(t, "SELECT 1", string(commandTag))
    assert.NoError(t, err)

    ensureConnValid(t, pgConn)
}

func TestConnExecParamsDeferredError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    setupSQL := `create temporary table t (
		id text primary key,
		n int not null,
		unique (n) deferrable initially deferred
	);

	insert into t (id, n) values ('a', 1), ('b', 2), ('c', 3);`

    _, err = pgConn.Exec(context.Background(), setupSQL).ReadAll()
    assert.NoError(t, err)

    result := pgConn.ExecParams(context.Background(), `update t set n=n+1 where id='b' returning *`, nil, nil, nil, nil).Read()
    require.NotNil(t, result.Err)
    var pgErr *pgconn.PgError
    require.True(t, errors.As(result.Err, &pgErr))
    require.Equal(t, "23505", pgErr.Code)

    ensureConnValid(t, pgConn)
}

func TestConnExecParamsMaxNumberOfParams(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    paramCount := math.MaxUint16
    params := make([]string, 0, paramCount)
    args := make([][]byte, 0, paramCount)
    for i := 0; i < paramCount; i++ {
        params = append(params, fmt.Sprintf("($%d::text)", i+1))
        args = append(args, []byte(strconv.Itoa(i)))
    }
    sql := "values" + strings.Join(params, ", ")

    result := pgConn.ExecParams(context.Background(), sql, args, nil, nil, nil).Read()
    require.NoError(t, result.Err)
    require.Len(t, result.Rows, paramCount)

    ensureConnValid(t, pgConn)
}

func TestConnExecParamsTooManyParams(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    paramCount := math.MaxUint16 + 1
    params := make([]string, 0, paramCount)
    args := make([][]byte, 0, paramCount)
    for i := 0; i < paramCount; i++ {
        params = append(params, fmt.Sprintf("($%d::text)", i+1))
        args = append(args, []byte(strconv.Itoa(i)))
    }
    sql := "values" + strings.Join(params, ", ")

    result := pgConn.ExecParams(context.Background(), sql, args, nil, nil, nil).Read()
    require.Error(t, result.Err)
    require.Equal(t, "extended protocol limited to 65535 parameters", result.Err.Error())

    ensureConnValid(t, pgConn)
}

func TestConnExecParamsCanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()
    result := pgConn.ExecParams(ctx, "select current_database(), pg_sleep(1)", nil, nil, nil, nil)
    rowCount := 0
    for result.NextRow() {
        rowCount += 1
    }
    assert.Equal(t, 0, rowCount)
    commandTag, err := result.Close()
    assert.Equal(t, pgconn.CommandTag(nil), commandTag)
    assert.True(t, pgconn.Timeout(err))

    assert.True(t, pgConn.IsClosed())
    select {
    case <-pgConn.CleanupDone():
    case <-time.After(5 * time.Second):
        t.Fatal("Connection cleanup exceeded maximum time")
    }
}

func TestConnExecParamsPrecanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    result := pgConn.ExecParams(ctx, "select $1::text", [][]byte{[]byte("Hello, world")}, nil, nil, nil).Read()
    require.Error(t, result.Err)
    assert.True(t, errors.Is(result.Err, context.Canceled))
    assert.True(t, pgconn.SafeToRetry(result.Err))

    ensureConnValid(t, pgConn)
}

func TestConnExecParamsEmptySQL(t *testing.T) {
    t.Parallel()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()

    pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    result := pgConn.ExecParams(ctx, "", nil, nil, nil, nil).Read()
    assert.Nil(t, result.CommandTag)
    assert.Len(t, result.Rows, 0)
    assert.NoError(t, result.Err)

    ensureConnValid(t, pgConn)
}

// https://github.com/jackc/pgx/issues/859
func TestResultReaderValuesHaveSameCapacityAsLength(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil)
    require.Len(t, result.FieldDescriptions(), 1)
    assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name)

    rowCount := 0
    for result.NextRow() {
        rowCount += 1
        assert.Equal(t, "Hello, world", string(result.Values()[0]))
        assert.Equal(t, len(result.Values()[0]), cap(result.Values()[0]))
    }
    assert.Equal(t, 1, rowCount)
    commandTag, err := result.Close()
    assert.Equal(t, "SELECT 1", string(commandTag))
    assert.NoError(t, err)

    ensureConnValid(t, pgConn)
}

func TestConnExecPrepared(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    psd, err := pgConn.Prepare(context.Background(), "ps1", "select $1::text as msg", nil)
    require.NoError(t, err)
    require.NotNil(t, psd)
    assert.Len(t, psd.ParamOIDs, 1)
    assert.Len(t, psd.Fields, 1)

    result := pgConn.ExecPrepared(context.Background(), "ps1", [][]byte{[]byte("Hello, world")}, nil, nil)
    require.Len(t, result.FieldDescriptions(), 1)
    assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name)

    rowCount := 0
    for result.NextRow() {
        rowCount += 1
        assert.Equal(t, "Hello, world", string(result.Values()[0]))
    }
    assert.Equal(t, 1, rowCount)
    commandTag, err := result.Close()
    assert.Equal(t, "SELECT 1", string(commandTag))
    assert.NoError(t, err)

    ensureConnValid(t, pgConn)
}

func TestConnExecPreparedMaxNumberOfParams(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    paramCount := math.MaxUint16
    params := make([]string, 0, paramCount)
    args := make([][]byte, 0, paramCount)
    for i := 0; i < paramCount; i++ {
        params = append(params, fmt.Sprintf("($%d::text)", i+1))
        args = append(args, []byte(strconv.Itoa(i)))
    }
    sql := "values" + strings.Join(params, ", ")

    psd, err := pgConn.Prepare(context.Background(), "ps1", sql, nil)
    require.NoError(t, err)
    require.NotNil(t, psd)
    assert.Len(t, psd.ParamOIDs, paramCount)
    assert.Len(t, psd.Fields, 1)

    result := pgConn.ExecPrepared(context.Background(), "ps1", args, nil, nil).Read()
    require.NoError(t, result.Err)
    require.Len(t, result.Rows, paramCount)

    ensureConnValid(t, pgConn)
}

func TestConnExecPreparedTooManyParams(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    paramCount := math.MaxUint16 + 1
    params := make([]string, 0, paramCount)
    args := make([][]byte, 0, paramCount)
    for i := 0; i < paramCount; i++ {
        params = append(params, fmt.Sprintf("($%d::text)", i+1))
        args = append(args, []byte(strconv.Itoa(i)))
    }
    sql := "values" + strings.Join(params, ", ")

    psd, err := pgConn.Prepare(context.Background(), "ps1", sql, nil)
    require.NoError(t, err)
    require.NotNil(t, psd)
    assert.Len(t, psd.ParamOIDs, paramCount)
    assert.Len(t, psd.Fields, 1)

    result := pgConn.ExecPrepared(context.Background(), "ps1", args, nil, nil).Read()
    require.Error(t, result.Err)
    require.Equal(t, "extended protocol limited to 65535 parameters", result.Err.Error())

    ensureConnValid(t, pgConn)
}

func TestConnExecPreparedCanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Prepare(context.Background(), "ps1", "select current_database(), pg_sleep(1)", nil)
    require.NoError(t, err)

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()
    result := pgConn.ExecPrepared(ctx, "ps1", nil, nil, nil)
    rowCount := 0
    for result.NextRow() {
        rowCount += 1
    }
    assert.Equal(t, 0, rowCount)
    commandTag, err := result.Close()
    assert.Equal(t, pgconn.CommandTag(nil), commandTag)
    assert.True(t, pgconn.Timeout(err))
    assert.True(t, pgConn.IsClosed())
    select {
    case <-pgConn.CleanupDone():
    case <-time.After(5 * time.Second):
        t.Fatal("Connection cleanup exceeded maximum time")
    }
}

func TestConnExecPreparedPrecanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Prepare(context.Background(), "ps1", "select current_database(), pg_sleep(1)", nil)
    require.NoError(t, err)

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    result := pgConn.ExecPrepared(ctx, "ps1", nil, nil, nil).Read()
    require.Error(t, result.Err)
    assert.True(t, errors.Is(result.Err, context.Canceled))
    assert.True(t, pgconn.SafeToRetry(result.Err))

    ensureConnValid(t, pgConn)
}

func TestConnExecPreparedEmptySQL(t *testing.T) {
    t.Parallel()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()

    pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Prepare(ctx, "ps1", "", nil)
    require.NoError(t, err)

    result := pgConn.ExecPrepared(ctx, "ps1", nil, nil, nil).Read()
    assert.Nil(t, result.CommandTag)
    assert.Len(t, result.Rows, 0)
    assert.NoError(t, result.Err)

    ensureConnValid(t, pgConn)
}

func TestConnExecBatch(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Prepare(context.Background(), "ps1", "select $1::text", nil)
    require.NoError(t, err)

    batch := &pgconn.Batch{}

    batch.ExecParams("select $1::text", [][]byte{[]byte("ExecParams 1")}, nil, nil, nil)
    batch.ExecPrepared("ps1", [][]byte{[]byte("ExecPrepared 1")}, nil, nil)
    batch.ExecParams("select $1::text", [][]byte{[]byte("ExecParams 2")}, nil, nil, nil)
    results, err := pgConn.ExecBatch(context.Background(), batch).ReadAll()
    require.NoError(t, err)
    require.Len(t, results, 3)

    require.Len(t, results[0].Rows, 1)
    require.Equal(t, "ExecParams 1", string(results[0].Rows[0][0]))
    assert.Equal(t, "SELECT 1", string(results[0].CommandTag))

    require.Len(t, results[1].Rows, 1)
    require.Equal(t, "ExecPrepared 1", string(results[1].Rows[0][0]))
    assert.Equal(t, "SELECT 1", string(results[1].CommandTag))

    require.Len(t, results[2].Rows, 1)
    require.Equal(t, "ExecParams 2", string(results[2].Rows[0][0]))
    assert.Equal(t, "SELECT 1", string(results[2].CommandTag))
}

func TestConnExecBatchDeferredError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    setupSQL := `create temporary table t (
		id text primary key,
		n int not null,
		unique (n) deferrable initially deferred
	);

	insert into t (id, n) values ('a', 1), ('b', 2), ('c', 3);`

    _, err = pgConn.Exec(context.Background(), setupSQL).ReadAll()
    assert.NoError(t, err)

    batch := &pgconn.Batch{}

    batch.ExecParams(`update t set n=n+1 where id='b' returning *`, nil, nil, nil, nil)
    _, err = pgConn.ExecBatch(context.Background(), batch).ReadAll()
    require.NotNil(t, err)
    var pgErr *pgconn.PgError
    require.True(t, errors.As(err, &pgErr))
    require.Equal(t, "23505", pgErr.Code)

    ensureConnValid(t, pgConn)
}

func TestConnExecBatchPrecanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Prepare(context.Background(), "ps1", "select $1::text", nil)
    require.NoError(t, err)

    batch := &pgconn.Batch{}

    batch.ExecParams("select $1::text", [][]byte{[]byte("ExecParams 1")}, nil, nil, nil)
    batch.ExecPrepared("ps1", [][]byte{[]byte("ExecPrepared 1")}, nil, nil)
    batch.ExecParams("select $1::text", [][]byte{[]byte("ExecParams 2")}, nil, nil, nil)

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    _, err = pgConn.ExecBatch(ctx, batch).ReadAll()
    require.Error(t, err)
    assert.True(t, errors.Is(err, context.Canceled))
    assert.True(t, pgconn.SafeToRetry(err))

    ensureConnValid(t, pgConn)
}

// Without concurrent reading and writing large batches can deadlock.
//
// See https://github.com/jackc/pgx/issues/374.
func TestConnExecBatchHuge(t *testing.T) {
    if testing.Short() {
        t.Skip("skipping test in short mode.")
    }

    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    batch := &pgconn.Batch{}

    queryCount := 100000
    args := make([]string, queryCount)

    for i := range args {
        args[i] = strconv.Itoa(i)
        batch.ExecParams("select $1::text", [][]byte{[]byte(args[i])}, nil, nil, nil)
    }

    results, err := pgConn.ExecBatch(context.Background(), batch).ReadAll()
    require.NoError(t, err)
    require.Len(t, results, queryCount)

    for i := range args {
        require.Len(t, results[i].Rows, 1)
        require.Equal(t, args[i], string(results[i].Rows[0][0]))
        assert.Equal(t, "SELECT 1", string(results[i].CommandTag))
    }
}

func TestConnExecBatchImplicitTransaction(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), "create temporary table t(id int)").ReadAll()
    require.NoError(t, err)

    batch := &pgconn.Batch{}

    batch.ExecParams("insert into t(id) values(1)", nil, nil, nil, nil)
    batch.ExecParams("insert into t(id) values(2)", nil, nil, nil, nil)
    batch.ExecParams("insert into t(id) values(3)", nil, nil, nil, nil)
    batch.ExecParams("select 1/0", nil, nil, nil, nil)
    _, err = pgConn.ExecBatch(context.Background(), batch).ReadAll()
    require.Error(t, err)

    result := pgConn.ExecParams(context.Background(), "select count(*) from t", nil, nil, nil, nil).Read()
    require.Equal(t, "0", string(result.Rows[0][0]))
}

func TestConnLocking(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    mrr := pgConn.Exec(context.Background(), "select 'Hello, world'")
    _, err = pgConn.Exec(context.Background(), "select 'Hello, world'").ReadAll()
    assert.Error(t, err)
    assert.Equal(t, "conn busy", err.Error())
    assert.True(t, pgconn.SafeToRetry(err))

    results, err := mrr.ReadAll()
    assert.NoError(t, err)
    assert.Len(t, results, 1)
    assert.Nil(t, results[0].Err)
    assert.Equal(t, "SELECT 1", string(results[0].CommandTag))
    assert.Len(t, results[0].Rows, 1)
    assert.Equal(t, "Hello, world", string(results[0].Rows[0][0]))

    ensureConnValid(t, pgConn)
}

func TestCommandTag(t *testing.T) {
    t.Parallel()

    var tests = []struct {
        commandTag   pgconn.CommandTag
        rowsAffected int64
        isInsert     bool
        isUpdate     bool
        isDelete     bool
        isSelect     bool
    }{
        {commandTag: pgconn.CommandTag("INSERT 0 5"), rowsAffected: 5, isInsert: true},
        {commandTag: pgconn.CommandTag("UPDATE 0"), rowsAffected: 0, isUpdate: true},
        {commandTag: pgconn.CommandTag("UPDATE 1"), rowsAffected: 1, isUpdate: true},
        {commandTag: pgconn.CommandTag("DELETE 0"), rowsAffected: 0, isDelete: true},
        {commandTag: pgconn.CommandTag("DELETE 1"), rowsAffected: 1, isDelete: true},
        {commandTag: pgconn.CommandTag("DELETE 1234567890"), rowsAffected: 1234567890, isDelete: true},
        {commandTag: pgconn.CommandTag("SELECT 1"), rowsAffected: 1, isSelect: true},
        {commandTag: pgconn.CommandTag("SELECT 99999999999"), rowsAffected: 99999999999, isSelect: true},
        {commandTag: pgconn.CommandTag("CREATE TABLE"), rowsAffected: 0},
        {commandTag: pgconn.CommandTag("ALTER TABLE"), rowsAffected: 0},
        {commandTag: pgconn.CommandTag("DROP TABLE"), rowsAffected: 0},
    }

    for i, tt := range tests {
        ct := tt.commandTag
        assert.Equalf(t, tt.rowsAffected, ct.RowsAffected(), "%d. %v", i, tt.commandTag)
        assert.Equalf(t, tt.isInsert, ct.Insert(), "%d. %v", i, tt.commandTag)
        assert.Equalf(t, tt.isUpdate, ct.Update(), "%d. %v", i, tt.commandTag)
        assert.Equalf(t, tt.isDelete, ct.Delete(), "%d. %v", i, tt.commandTag)
        assert.Equalf(t, tt.isSelect, ct.Select(), "%d. %v", i, tt.commandTag)
    }
}

func TestConnOnNotice(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    var msg string
    config.OnNotice = func(c *pgconn.PgConn, notice *pgconn.Notice) {
        msg = notice.Message
    }

    pgConn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    multiResult := pgConn.Exec(context.Background(), `do $$
begin
  raise notice 'hello, world';
end$$;`)
    err = multiResult.Close()
    require.NoError(t, err)
    assert.Equal(t, "hello, world", msg)

    ensureConnValid(t, pgConn)
}

func TestConnOnNotification(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    var msg string
    config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) {
        msg = n.Payload
    }

    pgConn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), "listen foo").ReadAll()
    require.NoError(t, err)

    notifier, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, notifier)
    _, err = notifier.Exec(context.Background(), "notify foo, 'bar'").ReadAll()
    require.NoError(t, err)

    _, err = pgConn.Exec(context.Background(), "select 1").ReadAll()
    require.NoError(t, err)

    assert.Equal(t, "bar", msg)

    ensureConnValid(t, pgConn)
}

func TestConnWaitForNotification(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    var msg string
    config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) {
        msg = n.Payload
    }

    pgConn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), "listen foo").ReadAll()
    require.NoError(t, err)

    notifier, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, notifier)
    _, err = notifier.Exec(context.Background(), "notify foo, 'bar'").ReadAll()
    require.NoError(t, err)

    err = pgConn.WaitForNotification(context.Background())
    require.NoError(t, err)

    assert.Equal(t, "bar", msg)

    ensureConnValid(t, pgConn)
}

func TestConnWaitForNotificationPrecanceled(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    pgConn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    err = pgConn.WaitForNotification(ctx)
    require.Equal(t, context.Canceled, err)

    ensureConnValid(t, pgConn)
}

func TestConnWaitForNotificationTimeout(t *testing.T) {
    t.Parallel()

    config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    pgConn, err := pgconn.ConnectConfig(context.Background(), config)
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
    err = pgConn.WaitForNotification(ctx)
    cancel()
    assert.True(t, pgconn.Timeout(err))

    ensureConnValid(t, pgConn)
}

func TestConnCopyToSmall(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), `create temporary table foo(
		a int2,
		b int4,
		c int8,
		d varchar,
		e text,
		f date,
		g json
	)`).ReadAll()
    require.NoError(t, err)

    _, err = pgConn.Exec(context.Background(), `insert into foo values (0, 1, 2, 'abc', 'efg', '2000-01-01', '{"abc":"def","foo":"bar"}')`).ReadAll()
    require.NoError(t, err)

    _, err = pgConn.Exec(context.Background(), `insert into foo values (null, null, null, null, null, null, null)`).ReadAll()
    require.NoError(t, err)

    inputBytes := []byte("0\t1\t2\tabc\tefg\t2000-01-01\t{\"abc\":\"def\",\"foo\":\"bar\"}\n" +
        "\\N\t\\N\t\\N\t\\N\t\\N\t\\N\t\\N\n")

    outputWriter := bytes.NewBuffer(make([]byte, 0, len(inputBytes)))

    res, err := pgConn.CopyTo(context.Background(), outputWriter, "copy foo to stdout")
    require.NoError(t, err)

    assert.Equal(t, int64(2), res.RowsAffected())
    assert.Equal(t, inputBytes, outputWriter.Bytes())

    ensureConnValid(t, pgConn)
}

func TestConnCopyToLarge(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), `create temporary table foo(
		a int2,
		b int4,
		c int8,
		d varchar,
		e text,
		f date,
		g json,
		h bytea
	)`).ReadAll()
    require.NoError(t, err)

    inputBytes := make([]byte, 0)

    for i := 0; i < 1000; i++ {
        _, err = pgConn.Exec(context.Background(), `insert into foo values (0, 1, 2, 'abc', 'efg', '2000-01-01', '{"abc":"def","foo":"bar"}', 'oooo')`).ReadAll()
        require.NoError(t, err)
        inputBytes = append(inputBytes, "0\t1\t2\tabc\tefg\t2000-01-01\t{\"abc\":\"def\",\"foo\":\"bar\"}\t\\\\x6f6f6f6f\n"...)
    }

    outputWriter := bytes.NewBuffer(make([]byte, 0, len(inputBytes)))

    res, err := pgConn.CopyTo(context.Background(), outputWriter, "copy foo to stdout")
    require.NoError(t, err)

    assert.Equal(t, int64(1000), res.RowsAffected())
    assert.Equal(t, inputBytes, outputWriter.Bytes())

    ensureConnValid(t, pgConn)
}

func TestConnCopyToQueryError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    outputWriter := bytes.NewBuffer(make([]byte, 0))

    res, err := pgConn.CopyTo(context.Background(), outputWriter, "cropy foo to stdout")
    require.Error(t, err)
    assert.IsType(t, &pgconn.PgError{}, err)
    assert.Equal(t, int64(0), res.RowsAffected())

    ensureConnValid(t, pgConn)
}

func TestConnCopyToCanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    outputWriter := &bytes.Buffer{}

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()
    res, err := pgConn.CopyTo(ctx, outputWriter, "copy (select *, pg_sleep(0.01) from generate_series(1,1000)) to stdout")
    assert.Error(t, err)
    assert.Equal(t, pgconn.CommandTag(nil), res)

    assert.True(t, pgConn.IsClosed())
    select {
    case <-pgConn.CleanupDone():
    case <-time.After(5 * time.Second):
        t.Fatal("Connection cleanup exceeded maximum time")
    }
}

func TestConnCopyToPrecanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    outputWriter := &bytes.Buffer{}

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    res, err := pgConn.CopyTo(ctx, outputWriter, "copy (select * from generate_series(1,1000)) to stdout")
    require.Error(t, err)
    assert.True(t, errors.Is(err, context.Canceled))
    assert.True(t, pgconn.SafeToRetry(err))
    assert.Equal(t, pgconn.CommandTag(nil), res)

    ensureConnValid(t, pgConn)
}

func TestConnCopyFrom(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), `create temporary table foo(
		a int4,
		b varchar
	)`).ReadAll()
    require.NoError(t, err)

    srcBuf := &bytes.Buffer{}

    inputRows := [][][]byte{}
    for i := 0; i < 1000; i++ {
        a := strconv.Itoa(i)
        b := "foo " + a + " bar"
        inputRows = append(inputRows, [][]byte{[]byte(a), []byte(b)})
        _, err = srcBuf.Write([]byte(fmt.Sprintf("%s,\"%s\"\n", a, b)))
        require.NoError(t, err)
    }

    ct, err := pgConn.CopyFrom(context.Background(), srcBuf, "COPY foo FROM STDIN WITH (FORMAT csv)")
    require.NoError(t, err)
    assert.Equal(t, int64(len(inputRows)), ct.RowsAffected())

    result := pgConn.ExecParams(context.Background(), "select * from foo", nil, nil, nil, nil).Read()
    require.NoError(t, result.Err)

    assert.Equal(t, inputRows, result.Rows)

    ensureConnValid(t, pgConn)
}

func TestConnCopyFromCanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), `create temporary table foo(
		a int4,
		b varchar
	)`).ReadAll()
    require.NoError(t, err)

    r, w := io.Pipe()
    go func() {
        for i := 0; i < 1000000; i++ {
            a := strconv.Itoa(i)
            b := "foo " + a + " bar"
            _, err := w.Write([]byte(fmt.Sprintf("%s,\"%s\"\n", a, b)))
            if err != nil {
                return
            }
            time.Sleep(time.Microsecond)
        }
    }()

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    ct, err := pgConn.CopyFrom(ctx, r, "COPY foo FROM STDIN WITH (FORMAT csv)")
    cancel()
    assert.Equal(t, int64(0), ct.RowsAffected())
    assert.Error(t, err)

    assert.True(t, pgConn.IsClosed())
    select {
    case <-pgConn.CleanupDone():
    case <-time.After(5 * time.Second):
        t.Fatal("Connection cleanup exceeded maximum time")
    }
}

func TestConnCopyFromPrecanceled(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), `create temporary table foo(
		a int4,
		b varchar
	)`).ReadAll()
    require.NoError(t, err)

    r, w := io.Pipe()
    go func() {
        for i := 0; i < 1000000; i++ {
            a := strconv.Itoa(i)
            b := "foo " + a + " bar"
            _, err := w.Write([]byte(fmt.Sprintf("%s,\"%s\"\n", a, b)))
            if err != nil {
                return
            }
            time.Sleep(time.Microsecond)
        }
    }()

    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    ct, err := pgConn.CopyFrom(ctx, r, "COPY foo FROM STDIN WITH (FORMAT csv)")
    require.Error(t, err)
    assert.True(t, errors.Is(err, context.Canceled))
    assert.True(t, pgconn.SafeToRetry(err))
    assert.Equal(t, pgconn.CommandTag(nil), ct)

    ensureConnValid(t, pgConn)
}

func TestConnCopyFromGzipReader(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), `create temporary table foo(
		a int4,
		b varchar
	)`).ReadAll()
    require.NoError(t, err)

    f, err := ioutil.TempFile("", "*")
    require.NoError(t, err)

    gw := gzip.NewWriter(f)

    inputRows := [][][]byte{}
    for i := 0; i < 1000; i++ {
        a := strconv.Itoa(i)
        b := "foo " + a + " bar"
        inputRows = append(inputRows, [][]byte{[]byte(a), []byte(b)})
        _, err = gw.Write([]byte(fmt.Sprintf("%s,\"%s\"\n", a, b)))
        require.NoError(t, err)
    }

    err = gw.Close()
    require.NoError(t, err)

    _, err = f.Seek(0, 0)
    require.NoError(t, err)

    gr, err := gzip.NewReader(f)
    require.NoError(t, err)

    ct, err := pgConn.CopyFrom(context.Background(), gr, "COPY foo FROM STDIN WITH (FORMAT csv)")
    require.NoError(t, err)
    assert.Equal(t, int64(len(inputRows)), ct.RowsAffected())

    err = gr.Close()
    require.NoError(t, err)

    err = f.Close()
    require.NoError(t, err)

    err = os.Remove(f.Name())
    require.NoError(t, err)

    result := pgConn.ExecParams(context.Background(), "select * from foo", nil, nil, nil, nil).Read()
    require.NoError(t, result.Err)

    assert.Equal(t, inputRows, result.Rows)

    ensureConnValid(t, pgConn)
}

func TestConnCopyFromQuerySyntaxError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(context.Background(), `create temporary table foo(
		a int4,
		b varchar
	)`).ReadAll()
    require.NoError(t, err)

    srcBuf := &bytes.Buffer{}

    res, err := pgConn.CopyFrom(context.Background(), srcBuf, "cropy foo to stdout")
    require.Error(t, err)
    assert.IsType(t, &pgconn.PgError{}, err)
    assert.Equal(t, int64(0), res.RowsAffected())

    ensureConnValid(t, pgConn)
}

func TestConnCopyFromQueryNoTableError(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    srcBuf := &bytes.Buffer{}

    res, err := pgConn.CopyFrom(context.Background(), srcBuf, "copy foo to stdout")
    require.Error(t, err)
    assert.IsType(t, &pgconn.PgError{}, err)
    assert.Equal(t, int64(0), res.RowsAffected())

    ensureConnValid(t, pgConn)
}

// https://github.com/jackc/pgconn/issues/21
func TestConnCopyFromNoticeResponseReceivedMidStream(t *testing.T) {
    t.Parallel()

    ctx := context.Background()
    pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    _, err = pgConn.Exec(ctx, `create temporary table sentences(
		t text,
		ts tsvector
	)`).ReadAll()
    require.NoError(t, err)

    _, err = pgConn.Exec(ctx, `create function pg_temp.sentences_trigger() returns trigger as $$
	begin
	  new.ts := to_tsvector(new.t);
		return new;
	end
	$$ language plpgsql;`).ReadAll()
    require.NoError(t, err)

    _, err = pgConn.Exec(ctx, `create trigger sentences_update before insert on sentences for each row execute procedure pg_temp.sentences_trigger();`).ReadAll()
    require.NoError(t, err)

    longString := make([]byte, 10001)
    for i := range longString {
        longString[i] = 'x'
    }

    buf := &bytes.Buffer{}
    for i := 0; i < 1000; i++ {
        buf.Write([]byte(fmt.Sprintf("%s\n", string(longString))))
    }

    _, err = pgConn.CopyFrom(ctx, buf, "COPY sentences(t) FROM STDIN WITH (FORMAT csv)")
    require.NoError(t, err)
}

func TestConnEscapeString(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    tests := []struct {
        in  string
        out string
    }{
        {in: "", out: ""},
        {in: "42", out: "42"},
        {in: "'", out: "''"},
        {in: "hi'there", out: "hi''there"},
        {in: "'hi there'", out: "''hi there''"},
    }

    for i, tt := range tests {
        value, err := pgConn.EscapeString(tt.in)
        if assert.NoErrorf(t, err, "%d.", i) {
            assert.Equalf(t, tt.out, value, "%d.", i)
        }
    }

    ensureConnValid(t, pgConn)
}

func TestConnCancelRequest(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    multiResult := pgConn.Exec(context.Background(), "select 'Hello, world', pg_sleep(2)")

    // This test flickers without the Sleep. It appears that since Exec only sends the query and returns without awaiting a
    // response that the CancelRequest can race it and be received before the query is running and cancellable. So wait a
    // few milliseconds.
    time.Sleep(50 * time.Millisecond)

    err = pgConn.CancelRequest(context.Background())
    require.NoError(t, err)

    for multiResult.NextResult() {
    }
    err = multiResult.Close()

    require.IsType(t, &pgconn.PgError{}, err)
    require.Equal(t, "57014", err.(*pgconn.PgError).Code)

    ensureConnValid(t, pgConn)
}

// https://github.com/jackc/pgx/issues/659
func TestConnContextCanceledCancelsRunningQueryOnServer(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    pid := pgConn.PID()

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()
    multiResult := pgConn.Exec(ctx, "select 'Hello, world', pg_sleep(30)")

    for multiResult.NextResult() {
    }
    err = multiResult.Close()
    assert.True(t, pgconn.Timeout(err))
    assert.True(t, pgConn.IsClosed())
    select {
    case <-pgConn.CleanupDone():
    case <-time.After(5 * time.Second):
        t.Fatal("Connection cleanup exceeded maximum time")
    }

    otherConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, otherConn)

    ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()

    for {
        result := otherConn.ExecParams(ctx,
            `select 1 from pg_stat_activity where pid=$1`,
            [][]byte{[]byte(strconv.FormatInt(int64(pid), 10))},
            nil,
            nil,
            nil,
        ).Read()
        require.NoError(t, result.Err)

        if len(result.Rows) == 0 {
            break
        }
    }
}

func TestConnSendBytesAndReceiveMessage(t *testing.T) {
    t.Parallel()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()

    pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)
    defer closeConn(t, pgConn)

    queryMsg := pgproto3.Query{String: "select 42"}
    buf := queryMsg.Encode(nil)

    err = pgConn.SendBytes(ctx, buf)
    require.NoError(t, err)

    msg, err := pgConn.ReceiveMessage(ctx)
    require.NoError(t, err)
    _, ok := msg.(*pgproto3.RowDescription)
    require.True(t, ok)

    msg, err = pgConn.ReceiveMessage(ctx)
    require.NoError(t, err)
    _, ok = msg.(*pgproto3.DataRow)
    require.True(t, ok)

    msg, err = pgConn.ReceiveMessage(ctx)
    require.NoError(t, err)
    _, ok = msg.(*pgproto3.CommandComplete)
    require.True(t, ok)

    msg, err = pgConn.ReceiveMessage(ctx)
    require.NoError(t, err)
    _, ok = msg.(*pgproto3.ReadyForQuery)
    require.True(t, ok)

    ensureConnValid(t, pgConn)
}

func TestHijackAndConstruct(t *testing.T) {
    t.Parallel()

    origConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    hc, err := origConn.Hijack()
    require.NoError(t, err)

    _, err = origConn.Exec(context.Background(), "select 'Hello, world'").ReadAll()
    require.Error(t, err)

    newConn, err := pgconn.Construct(hc)
    require.NoError(t, err)

    defer closeConn(t, newConn)

    results, err := newConn.Exec(context.Background(), "select 'Hello, world'").ReadAll()
    assert.NoError(t, err)

    assert.Len(t, results, 1)
    assert.Nil(t, results[0].Err)
    assert.Equal(t, "SELECT 1", string(results[0].CommandTag))
    assert.Len(t, results[0].Rows, 1)
    assert.Equal(t, "Hello, world", string(results[0].Rows[0][0]))

    ensureConnValid(t, newConn)
}

func TestConnCloseWhileCancellableQueryInProgress(t *testing.T) {
    t.Parallel()

    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    require.NoError(t, err)

    ctx, _ := context.WithCancel(context.Background())
    pgConn.Exec(ctx, "select n from generate_series(1,10) n")

    closeCtx, _ := context.WithCancel(context.Background())
    pgConn.Close(closeCtx)
    select {
    case <-pgConn.CleanupDone():
    case <-time.After(5 * time.Second):
        t.Fatal("Connection cleanup exceeded maximum time")
    }
}

// https://github.com/jackc/pgx/issues/800
func TestFatalErrorReceivedAfterCommandComplete(t *testing.T) {
    t.Parallel()

    steps := pgmock.AcceptUnauthenticatedConnRequestSteps()
    steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
    steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Bind{}))
    steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
    steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Execute{}))
    steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Sync{}))
    steps = append(steps, pgmock.SendMessage(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
        {Name: []byte("mock")},
    }}))
    steps = append(steps, pgmock.SendMessage(&pgproto3.CommandComplete{CommandTag: []byte("SELECT 0")}))
    steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"}))

    script := &pgmock.Script{Steps: steps}

    ln, err := net.Listen("tcp", "127.0.0.1:")
    require.NoError(t, err)
    defer ln.Close()

    serverErrChan := make(chan error, 1)
    go func() {
        defer close(serverErrChan)

        conn, err := ln.Accept()
        if err != nil {
            serverErrChan <- err
            return
        }
        defer conn.Close()

        err = conn.SetDeadline(time.Now().Add(5 * time.Second))
        if err != nil {
            serverErrChan <- err
            return
        }

        err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn))
        if err != nil {
            serverErrChan <- err
            return
        }
    }()

    parts := strings.Split(ln.Addr().String(), ":")
    host := parts[0]
    port := parts[1]
    connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    conn, err := pgconn.Connect(ctx, connStr)
    require.NoError(t, err)

    rr := conn.ExecParams(ctx, "mocked...", nil, nil, nil, nil)

    for rr.NextRow() {
    }

    _, err = rr.Close()
    require.Error(t, err)
}

func main() {
    pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
    if err != nil {
        log.Fatalln(err)
    }
    defer pgConn.Close(context.Background())

    result := pgConn.ExecParams(context.Background(), "select generate_series(1,3)", nil, nil, nil, nil).Read()
    if result.Err != nil {
        log.Fatalln(result.Err)
    }

    for _, row := range result.Rows {
        fmt.Println(string(row[0]))
    }

    fmt.Println(result.CommandTag)
}

Index

Examples

Package Files

auth_scram.go config.go doc.go errors.go pgconn.go

func NetworkAddress Uses

func NetworkAddress(host string, port uint16) (network, address string)

NetworkAddress converts a PostgreSQL host and port into network and address suitable for use with net.Dial.

func SafeToRetry Uses

func SafeToRetry(err error) bool

SafeToRetry checks if the err is guaranteed to have occurred before sending any data to the server.

func Timeout Uses

func Timeout(err error) bool

Timeout checks if err was was caused by a timeout. To be specific, it is true if err is or was caused by a context.Canceled, context.DeadlineExceeded or an implementer of net.Error where Timeout() is true.

func ValidateConnectTargetSessionAttrsReadWrite Uses

func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error

ValidateConnectTargetSessionAttrsReadWrite is an ValidateConnectFunc that implements libpq compatible target_session_attrs=read-write.

type AfterConnectFunc Uses

type AfterConnectFunc func(ctx context.Context, pgconn *PgConn) error

type Batch Uses

type Batch struct {
    // contains filtered or unexported fields
}

Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip.

func (*Batch) ExecParams Uses

func (batch *Batch) ExecParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16)

ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions.

func (*Batch) ExecPrepared Uses

func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16)

ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions.

type BuildFrontendFunc Uses

type BuildFrontendFunc func(r io.Reader, w io.Writer) Frontend

BuildFrontendFunc is a function that can be used to create Frontend implementation for connection.

type CommandTag Uses

type CommandTag []byte

CommandTag is the result of an Exec function

func (CommandTag) Delete Uses

func (ct CommandTag) Delete() bool

Delete is true if the command tag starts with "DELETE".

func (CommandTag) Insert Uses

func (ct CommandTag) Insert() bool

Insert is true if the command tag starts with "INSERT".

func (CommandTag) RowsAffected Uses

func (ct CommandTag) RowsAffected() int64

RowsAffected returns the number of rows affected. If the CommandTag was not for a row affecting command (e.g. "CREATE TABLE") then it returns 0.

func (CommandTag) Select Uses

func (ct CommandTag) Select() bool

Select is true if the command tag starts with "SELECT".

func (CommandTag) String Uses

func (ct CommandTag) String() string

func (CommandTag) Update Uses

func (ct CommandTag) Update() bool

Update is true if the command tag starts with "UPDATE".

type Config Uses

type Config struct {
    Host           string // host (e.g. localhost) or absolute path to unix domain socket directory (e.g. /private/tmp)
    Port           uint16
    Database       string
    User           string
    Password       string
    TLSConfig      *tls.Config // nil disables TLS
    ConnectTimeout time.Duration
    DialFunc       DialFunc   // e.g. net.Dialer.DialContext
    LookupFunc     LookupFunc // e.g. net.Resolver.LookupHost
    BuildFrontend  BuildFrontendFunc
    RuntimeParams  map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name)

    Fallbacks []*FallbackConfig

    // ValidateConnect is called during a connection attempt after a successful authentication with the PostgreSQL server.
    // It can be used to validate that the server is acceptable. If this returns an error the connection is closed and the next
    // fallback config is tried. This allows implementing high availability behavior such as libpq does with target_session_attrs.
    ValidateConnect ValidateConnectFunc

    // AfterConnect is called after ValidateConnect. It can be used to set up the connection (e.g. Set session variables
    // or prepare statements). If this returns an error the connection attempt fails.
    AfterConnect AfterConnectFunc

    // OnNotice is a callback function called when a notice response is received.
    OnNotice NoticeHandler

    // OnNotification is a callback function called when a notification from the LISTEN/NOTIFY system is received.
    OnNotification NotificationHandler
    // contains filtered or unexported fields
}

Config is the settings used to establish a connection to a PostgreSQL server. It must be created by ParseConfig. A manually initialized Config will cause ConnectConfig to panic.

func ParseConfig Uses

func ParseConfig(connString string) (*Config, error)

ParseConfig builds a *Config with similar behavior to the PostgreSQL standard C library libpq. It uses the same defaults as libpq (e.g. port=5432) and understands most PG* environment variables. connString may be a URL or a DSN. It also may be empty to only read from the environment. If a password is not supplied it will attempt to read the .pgpass file.

# Example DSN
user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca

# Example URL
postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca

The returned *Config may be modified. However, it is strongly recommended that any configuration that can be done through the connection string be done there. In particular the fields Host, Port, TLSConfig, and Fallbacks can be interdependent (e.g. TLSConfig needs knowledge of the host to validate the server certificate). These fields should not be modified individually. They should all be modified or all left unchanged.

ParseConfig supports specifying multiple hosts in similar manner to libpq. Host and port may include comma separated values that will be tried in order. This can be used as part of a high availability system. See https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS for more information.

# Example URL
postgres://jack:secret@foo.example.com:5432,bar.example.com:5432/mydb

ParseConfig currently recognizes the following environment variable and their parameter key word equivalents passed via database URL or DSN:

PGHOST
PGPORT
PGDATABASE
PGUSER
PGPASSWORD
PGPASSFILE
PGSERVICE
PGSERVICEFILE
PGSSLMODE
PGSSLCERT
PGSSLKEY
PGSSLROOTCERT
PGAPPNAME
PGCONNECT_TIMEOUT
PGTARGETSESSIONATTRS

See http://www.postgresql.org/docs/11/static/libpq-envars.html for details on the meaning of environment variables.

See https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-PARAMKEYWORDS for parameter key word names. They are usually but not always the environment variable name downcased and without the "PG" prefix.

Important Security Notes:

ParseConfig tries to match libpq behavior with regard to PGSSLMODE. This includes defaulting to "prefer" behavior if not set.

See http://www.postgresql.org/docs/11/static/libpq-ssl.html#LIBPQ-SSL-PROTECTION for details on what level of security each sslmode provides.

The sslmode "prefer" (the default), sslmode "allow", and multiple hosts are implemented via the Fallbacks field of the Config struct. If TLSConfig is manually changed it will not affect the fallbacks. For example, in the case of sslmode "prefer" this means it will first try the main Config settings which use TLS, then it will try the fallback which does not use TLS. This can lead to an unexpected unencrypted connection if the main TLS config is manually changed later but the unencrypted fallback is present. Ensure there are no stale fallbacks when manually setting TLCConfig.

Other known differences with libpq:

If a host name resolves into multiple addresses, libpq will try all addresses. pgconn will only try the first.

When multiple hosts are specified, libpq allows them to have different passwords set via the .pgpass file. pgconn does not.

In addition, ParseConfig accepts the following options:

min_read_buffer_size
	The minimum size of the internal read buffer. Default 8192.
servicefile
	libpq only reads servicefile from the PGSERVICEFILE environment variable. ParseConfig accepts servicefile as a
	part of the connection string.

func (*Config) Copy Uses

func (c *Config) Copy() *Config

Copy returns a deep copy of the config that is safe to use and modify. The only exception is the TLSConfig field: according to the tls.Config docs it must not be modified after creation.

type DialFunc Uses

type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)

DialFunc is a function that can be used to connect to a PostgreSQL server.

type FallbackConfig Uses

type FallbackConfig struct {
    Host      string // host (e.g. localhost) or path to unix domain socket directory (e.g. /private/tmp)
    Port      uint16
    TLSConfig *tls.Config // nil disables TLS
}

FallbackConfig is additional settings to attempt a connection with when the primary Config fails to establish a network connection. It is used for TLS fallback such as sslmode=prefer and high availability (HA) connections.

type Frontend Uses

type Frontend interface {
    Receive() (pgproto3.BackendMessage, error)
}

Frontend used to receive messages from backend.

type HijackedConn Uses

type HijackedConn struct {
    Conn              net.Conn          // the underlying TCP or unix domain socket connection
    PID               uint32            // backend pid
    SecretKey         uint32            // key to use to send a cancel query message to the server
    ParameterStatuses map[string]string // parameters that have been reported by the server
    TxStatus          byte
    Frontend          Frontend
    Config            *Config
}

HijackedConn is the result of hijacking a connection.

Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning compatibility.

type LookupFunc Uses

type LookupFunc func(ctx context.Context, host string) (addrs []string, err error)

LookupFunc is a function that can be used to lookup IPs addrs from host.

type MultiResultReader Uses

type MultiResultReader struct {
    // contains filtered or unexported fields
}

MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch.

func (*MultiResultReader) Close Uses

func (mrr *MultiResultReader) Close() error

Close closes the MultiResultReader and returns the first error that occurred during the MultiResultReader's use.

func (*MultiResultReader) NextResult Uses

func (mrr *MultiResultReader) NextResult() bool

NextResult returns advances the MultiResultReader to the next result and returns true if a result is available.

func (*MultiResultReader) ReadAll Uses

func (mrr *MultiResultReader) ReadAll() ([]*Result, error)

ReadAll reads all available results. Calling ReadAll is mutually exclusive with all other MultiResultReader methods.

func (*MultiResultReader) ResultReader Uses

func (mrr *MultiResultReader) ResultReader() *ResultReader

ResultReader returns the current ResultReader.

type Notice Uses

type Notice PgError

Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from LISTEN/NOTIFY notification.

type NoticeHandler Uses

type NoticeHandler func(*PgConn, *Notice)

NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY notification.

type Notification Uses

type Notification struct {
    PID     uint32 // backend pid that sent the notification
    Channel string // channel from which notification was received
    Payload string
}

Notification is a message received from the PostgreSQL LISTEN/NOTIFY system

type NotificationHandler Uses

type NotificationHandler func(*PgConn, *Notification)

NotificationHandler is a function that can handle notifications received from the PostgreSQL server. Notifications can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from a notice event.

type PgConn Uses

type PgConn struct {
    // contains filtered or unexported fields
}

PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage.

func Connect Uses

func Connect(ctx context.Context, connString string) (*PgConn, error)

Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format) to provide configuration. See documention for ParseConfig for details. ctx can be used to cancel a connect attempt.

func ConnectConfig Uses

func ConnectConfig(ctx context.Context, config *Config) (pgConn *PgConn, err error)

Connect establishes a connection to a PostgreSQL server using config. config must have been constructed with ParseConfig. ctx can be used to cancel a connect attempt.

If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An authentication error will terminate the chain of attempts (like libpq: https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. Otherwise, if all attempts fail the last error is returned.

func Construct Uses

func Construct(hc *HijackedConn) (*PgConn, error)

Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of PgConn.Hijack. The connection must be in an idle state.

Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning compatibility.

func (*PgConn) CancelRequest Uses

func (pgConn *PgConn) CancelRequest(ctx context.Context) error

CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9

func (*PgConn) CleanupDone Uses

func (pgConn *PgConn) CleanupDone() chan (struct{})

CleanupDone returns a channel that will be closed after all underlying resources have been cleaned up. A closed connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing yet. This is because certain errors such as a context cancellation require that the interrupted function call return immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are closed asynchronously.

This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while an old connection is still being cleaned up and thereby exceeding the maximum pool size.

func (*PgConn) Close Uses

func (pgConn *PgConn) Close(ctx context.Context) error

Close closes a connection. It is safe to call Close on a already closed connection. Close attempts a clean close by sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The underlying net.Conn.Close() will always be called regardless of any other errors.

func (*PgConn) Conn Uses

func (pgConn *PgConn) Conn() net.Conn

Conn returns the underlying net.Conn.

func (*PgConn) CopyFrom Uses

func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error)

CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server.

Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r could still block.

func (*PgConn) CopyTo Uses

func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error)

CopyTo executes the copy command sql and copies the results to w.

func (*PgConn) EscapeString Uses

func (pgConn *PgConn) EscapeString(s string) (string, error)

EscapeString escapes a string such that it can safely be interpolated into a SQL command string. It does not include the surrounding single quotes.

The current implementation requires that standard_conforming_strings=on and client_encoding="UTF8". If these conditions are not met an error will be returned. It is possible these restrictions will be lifted in the future.

func (*PgConn) Exec Uses

func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader

Exec executes SQL via the PostgreSQL simple query protocol. SQL may contain multiple queries. Execution is implicitly wrapped in a transaction unless a transaction is already in progress or SQL contains transaction control statements.

Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries.

func (*PgConn) ExecBatch Uses

func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader

ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a transaction is already in progress or SQL contains transaction control statements.

func (*PgConn) ExecParams Uses

func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader

ExecParams executes a command via the PostgreSQL extended query protocol.

sql is a SQL command string. It may only contain one query. Parameter substitution is positional using $1, $2, $3, etc.

paramValues are the parameter values. It must be encoded in the format given by paramFormats.

paramOIDs is a slice of data type OIDs for paramValues. If paramOIDs is nil, the server will infer the data type for all parameters. Any paramOID element that is 0 that will cause the server to infer the data type for that parameter. ExecParams will panic if len(paramOIDs) is not 0, 1, or len(paramValues).

paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or binary format. If paramFormats is nil all params are text format. ExecParams will panic if len(paramFormats) is not 0, 1, or len(paramValues).

resultFormats is a slice of format codes determining for each result column whether it is encoded in text or binary format. If resultFormats is nil all results will be in text format.

ResultReader must be closed before PgConn can be used again.

func (*PgConn) ExecPrepared Uses

func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader

ExecPrepared enqueues the execution of a prepared statement via the PostgreSQL extended query protocol.

paramValues are the parameter values. It must be encoded in the format given by paramFormats.

paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if len(paramFormats) is not 0, 1, or len(paramValues).

resultFormats is a slice of format codes determining for each result column whether it is encoded in text or binary format. If resultFormats is nil all results will be in text format.

ResultReader must be closed before PgConn can be used again.

func (*PgConn) Hijack Uses

func (pgConn *PgConn) Hijack() (*HijackedConn, error)

Hijack extracts the internal connection data. pgConn must be in an idle state. pgConn is unusable after hijacking. Hijacking is typically only useful when using pgconn to establish a connection, but taking complete control of the raw connection after that (e.g. a load balancer or proxy).

Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning compatibility.

func (*PgConn) IsBusy Uses

func (pgConn *PgConn) IsBusy() bool

IsBusy reports if the connection is busy.

func (*PgConn) IsClosed Uses

func (pgConn *PgConn) IsClosed() bool

IsClosed reports if the connection has been closed.

CleanupDone() can be used to determine if all cleanup has been completed.

func (*PgConn) PID Uses

func (pgConn *PgConn) PID() uint32

PID returns the backend PID.

func (*PgConn) ParameterStatus Uses

func (pgConn *PgConn) ParameterStatus(key string) string

ParameterStatus returns the value of a parameter reported by the server (e.g. server_version). Returns an empty string for unknown parameters.

func (*PgConn) Prepare Uses

func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error)

Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This allows Prepare to also to describe statements without creating a server-side prepared statement.

func (*PgConn) ReceiveMessage Uses

func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)

ReceiveMessage receives one wire protocol message from the PostgreSQL server. It must only be used when the connection is not busy. e.g. It is an error to call ReceiveMessage while reading the result of a query. The messages are still handled by the core pgconn message handling system so receiving a NotificationResponse will still trigger the OnNotification callback.

This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. See https://www.postgresql.org/docs/current/protocol.html.

func (*PgConn) ReceiveResults Uses

func (pgConn *PgConn) ReceiveResults(ctx context.Context) *MultiResultReader

ReceiveResults reads the result that might be returned by Postgres after a SendBytes (e.a. after sending a CopyDone in a copy-both situation).

This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. See https://www.postgresql.org/docs/current/protocol.html.

func (*PgConn) SecretKey Uses

func (pgConn *PgConn) SecretKey() uint32

SecretKey returns the backend secret key used to send a cancel query message to the server.

func (*PgConn) SendBytes Uses

func (pgConn *PgConn) SendBytes(ctx context.Context, buf []byte) error

SendBytes sends buf to the PostgreSQL server. It must only be used when the connection is not busy. e.g. It is as error to call SendBytes while reading the result of a query.

This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. See https://www.postgresql.org/docs/current/protocol.html.

func (*PgConn) TxStatus Uses

func (pgConn *PgConn) TxStatus() byte

TxStatus returns the current TxStatus as reported by the server.

func (*PgConn) WaitForNotification Uses

func (pgConn *PgConn) WaitForNotification(ctx context.Context) error

WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not received.

type PgError Uses

type PgError struct {
    Severity         string
    Code             string
    Message          string
    Detail           string
    Hint             string
    Position         int32
    InternalPosition int32
    InternalQuery    string
    Where            string
    SchemaName       string
    TableName        string
    ColumnName       string
    DataTypeName     string
    ConstraintName   string
    File             string
    Line             int32
    Routine          string
}

PgError represents an error reported by the PostgreSQL server. See http://www.postgresql.org/docs/11/static/protocol-error-fields.html for detailed field description.

func ErrorResponseToPgError Uses

func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError

ErrorResponseToPgError converts a wire protocol error message to a *PgError.

func (*PgError) Error Uses

func (pe *PgError) Error() string

func (*PgError) SQLState Uses

func (pe *PgError) SQLState() string

SQLState returns the SQLState of the error.

type Result Uses

type Result struct {
    FieldDescriptions []pgproto3.FieldDescription
    Rows              [][][]byte
    CommandTag        CommandTag
    Err               error
}

Result is the saved query response that is returned by calling Read on a ResultReader.

type ResultReader Uses

type ResultReader struct {
    // contains filtered or unexported fields
}

ResultReader is a reader for the result of a single query.

func (*ResultReader) Close Uses

func (rr *ResultReader) Close() (CommandTag, error)

Close consumes any remaining result data and returns the command tag or error.

func (*ResultReader) FieldDescriptions Uses

func (rr *ResultReader) FieldDescriptions() []pgproto3.FieldDescription

FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until the ResultReader is closed.

func (*ResultReader) NextRow Uses

func (rr *ResultReader) NextRow() bool

NextRow advances the ResultReader to the next row and returns true if a row is available.

func (*ResultReader) Read Uses

func (rr *ResultReader) Read() *Result

Read saves the query response to a Result.

func (*ResultReader) Values Uses

func (rr *ResultReader) Values() [][]byte

Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only valid until the next NextRow call or the ResultReader is closed. However, the underlying byte data is safe to retain a reference to and mutate.

type StatementDescription Uses

type StatementDescription struct {
    Name      string
    SQL       string
    ParamOIDs []uint32
    Fields    []pgproto3.FieldDescription
}

type ValidateConnectFunc Uses

type ValidateConnectFunc func(ctx context.Context, pgconn *PgConn) error

Directories

PathSynopsis
internal/ctxwatch
stmtcachePackage stmtcache is a cache that can be used to implement lazy prepared statements.

Package pgconn imports 34 packages (graph) and is imported by 52 packages. Updated 2020-11-12. Refresh now. Tools for package owners.