pool

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// https://www.cockroachlabs.com/docs/stable/common-errors.html#restart-transaction
	CrdbRetryErrCode = "40001"
	// https://www.cockroachlabs.com/docs/stable/common-errors.html#result-is-ambiguous
	CrdbAmbiguousErrorCode = "40003"
	// https://www.cockroachlabs.com/docs/stable/node-shutdown.html#connection-retry-loop
	CrdbServerNotAcceptingClients = "57P01"
	// Error when SqlState is unknown
	CrdbUnknownSQLState = "XXUUU"
	// Error message encountered when crdb nodes have large clock skew
	CrdbClockSkewMessage = "cannot specify timestamp in the future"
)

Variables

View Source
var CtxDisableRetries ctxDisableRetries

Functions

This section is empty.

Types

type MaxRetryError

type MaxRetryError struct {
	MaxRetries uint8
	LastErr    error
}

MaxRetryError is returned when the retry budget is exhausted.

func (*MaxRetryError) Error

func (e *MaxRetryError) Error() string

func (*MaxRetryError) Unwrap

func (e *MaxRetryError) Unwrap() error

type NodeConnectionBalancer

type NodeConnectionBalancer struct {
	// contains filtered or unexported fields
}

NodeConnectionBalancer attempts to keep the connections managed by a RetryPool balanced between healthy nodes in a Cockroach cluster. It asynchronously processes idle connections, and kills any to nodes that have too many. When the pool reconnects, it will have a different balance of connections, and over time the balancer will bring the counts close to equal.

func NewNodeConnectionBalancer

func NewNodeConnectionBalancer(pool *RetryPool, healthTracker *NodeHealthTracker, interval time.Duration) *NodeConnectionBalancer

NewNodeConnectionBalancer builds a new nodeConnectionBalancer for a given connection pool and health tracker.

func (*NodeConnectionBalancer) Prune

func (p *NodeConnectionBalancer) Prune(ctx context.Context)

Prune starts periodically checking idle connections and killing ones that are determined to be unbalanced.

type NodeHealthTracker

type NodeHealthTracker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

NodeHealthTracker detects changes in the node pool by polling the cluster periodically and recording the node ids that are seen. This is used to detect new nodes that come online that have either previously been marked unhealthy due to connection errors or due to scale up.

Consumers can manually mark a node healthy or unhealthy as well.

func NewNodeHealthChecker

func NewNodeHealthChecker(url string) (*NodeHealthTracker, error)

NewNodeHealthChecker builds a health checker that polls the cluster at the given url.

func (*NodeHealthTracker) HealthyNodeCount

func (t *NodeHealthTracker) HealthyNodeCount() int

HealthyNodeCount returns the number of healthy nodes currently tracked.

func (*NodeHealthTracker) IsHealthy

func (t *NodeHealthTracker) IsHealthy(nodeID uint32) bool

IsHealthy returns true if the given nodeID has been marked healthy.

func (*NodeHealthTracker) Poll

func (t *NodeHealthTracker) Poll(ctx context.Context, interval time.Duration)

Poll starts polling the cluster and recording the node IDs that it sees.

func (*NodeHealthTracker) SetNodeHealth

func (t *NodeHealthTracker) SetNodeHealth(nodeID uint32, healthy bool)

SetNodeHealth marks a node as either healthy or unhealthy.

type ResettableError

type ResettableError struct {
	Err error
}

ResettableError is an error that we think may succeed if retried against a new connection.

func (*ResettableError) Error

func (e *ResettableError) Error() string

func (*ResettableError) Unwrap

func (e *ResettableError) Unwrap() error

type RetryPool

type RetryPool struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewRetryPool

func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, healthTracker *NodeHealthTracker, maxRetries uint8, connectRate time.Duration) (*RetryPool, error)

func (*RetryPool) AcquireAllIdle

func (p *RetryPool) AcquireAllIdle(ctx context.Context) []*pgxpool.Conn

AcquireAllIdle returns all idle connections from the underlying pgxpool.Pool

func (*RetryPool) BeginFunc

func (p *RetryPool) BeginFunc(ctx context.Context, txFunc func(pgx.Tx) error) error

BeginFunc is a replacement for pgxpool.BeginFunc that allows resetting the connection on error, or retrying on a retryable error.

func (*RetryPool) BeginTxFunc

func (p *RetryPool) BeginTxFunc(ctx context.Context, txOptions pgx.TxOptions, txFunc func(pgx.Tx) error) error

BeginTxFunc is a replacement for pgxpool.BeginTxFunc that allows resetting the connection on error, or retrying on a retryable error.

func (*RetryPool) Close

func (p *RetryPool) Close()

Close closes the underlying pgxpool.Pool

func (*RetryPool) Config

func (p *RetryPool) Config() *pgxpool.Config

Config returns a copy of the underlying pgxpool.Pool config

func (*RetryPool) ExecFunc

func (p *RetryPool) ExecFunc(ctx context.Context, tagFunc func(ctx context.Context, tag pgconn.CommandTag, err error) error, sql string, arguments ...any) error

ExecFunc is a replacement for pgxpool.Pool.Exec that allows resetting the connection on error, or retrying on a retryable error.

func (*RetryPool) GC

func (p *RetryPool) GC(conn *pgx.Conn)

GC marks a connection for destruction on the next Acquire. BeforeAcquire can signal to the pool to close the connection and clean up the reference in the pool at the same time, so we lazily GC connections instead of closing the connection directly.

func (*RetryPool) ID

func (p *RetryPool) ID() string

ID returns a string identifier for this pool for use in metrics and logs.

func (*RetryPool) MaxConns

func (p *RetryPool) MaxConns() uint32

MaxConns returns the MaxConns configured on the underlying pool

func (*RetryPool) MinConns

func (p *RetryPool) MinConns() uint32

MinConns returns the MinConns configured on the underlying pool

func (*RetryPool) Node

func (p *RetryPool) Node(conn *pgx.Conn) uint32

Node returns the id for a connection

func (*RetryPool) QueryFunc

func (p *RetryPool) QueryFunc(ctx context.Context, rowsFunc func(ctx context.Context, rows pgx.Rows) error, sql string, optionsAndArgs ...any) error

QueryFunc is a replacement for pgxpool.Pool.Query that allows resetting the connection on error, or retrying on a retryable error.

func (*RetryPool) QueryRowFunc

func (p *RetryPool) QueryRowFunc(ctx context.Context, rowFunc func(ctx context.Context, row pgx.Row) error, sql string, optionsAndArgs ...any) error

QueryRowFunc is a replacement for pgxpool.Pool.QueryRow that allows resetting the connection on error, or retrying on a retryable error.

func (*RetryPool) Range

func (p *RetryPool) Range(f func(conn *pgx.Conn, nodeID uint32))

Range applies a function to every entry in the connection list

func (*RetryPool) Stat

func (p *RetryPool) Stat() *pgxpool.Stat

Stat returns the underlying pgxpool.Pool stats

type RetryableError

type RetryableError struct {
	Err error
}

RetryableError is an error that can be retried against the existing connection.

func (*RetryableError) Error

func (e *RetryableError) Error() string

func (*RetryableError) Unwrap

func (e *RetryableError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL