pgreplay

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2019 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LogConnectionAuthorized       = "LOG:  connection authorized: "
	LogConnectionReceived         = "LOG:  connection received: "
	LogConnectionDisconnect       = "LOG:  disconnection: "
	LogStatement                  = "LOG:  statement: "
	LogDuration                   = "LOG:  duration: "
	LogExtendedProtocolExecute    = "LOG:  execute <unnamed>: "
	LogExtendedProtocolParameters = "DETAIL:  parameters: "
	LogNamedPrepareExecute        = "LOG:  execute "
	LogDetail                     = "DETAIL:  "
	LogError                      = "ERROR:  "
)
View Source
const (
	ConnectLabel      = "Connect"
	StatementLabel    = "Statement"
	BoundExecuteLabel = "BoundExecute"
	DisconnectLabel   = "Disconnect"
)

Variables

View Source
var (
	ConnectionsActive = prometheus.NewGauge(
		prometheus.GaugeOpts{
			Name: "pgreplay_connections_active",
			Help: "Number of connections currently open against Postgres",
		},
	)
	ConnectionsEstablishedTotal = prometheus.NewCounter(
		prometheus.CounterOpts{
			Name: "pgreplay_connections_established_total",
			Help: "Number of connections established against Postgres",
		},
	)
	ItemsProcessedTotal = prometheus.NewCounter(
		prometheus.CounterOpts{
			Name: "pgreplay_items_processed_total",
			Help: "Total count of replay items that have been sent to the database",
		},
	)
	ItemsMostRecentTimestamp = prometheus.NewGauge(
		prometheus.GaugeOpts{
			Name: "pgreplay_items_most_recent_timestamp",
			Help: "Most recent timestamp of processed items",
		},
	)
)
View Source
var (
	LogLinesParsedTotal = prometheus.NewCounter(
		prometheus.CounterOpts{
			Name: "pgreplay_log_lines_parsed_total",
			Help: "Number of log lines parsed since boot",
		},
	)
	LogLinesErrorTotal = prometheus.NewCounter(
		prometheus.CounterOpts{
			Name: "pgreplay_log_lines_error_total",
			Help: "Number of log lines that failed to parse",
		},
	)
)
View Source
var (
	ItemsFilteredTotal = prometheus.NewCounter(
		prometheus.CounterOpts{
			Name: "pgreplay_items_filtered_total",
			Help: "Number of items filtered by start/finish range",
		},
	)
	ItemsFilterProgressFraction = prometheus.NewGauge(
		prometheus.GaugeOpts{
			Name: "pgreplay_items_filter_progress_fraction",
			Help: "Fractional progress through filter range, assuming linear distribution",
		},
	)
	ItemsLastStreamedTimestamp = prometheus.NewGauge(
		prometheus.GaugeOpts{
			Name: "pgreplay_items_last_streamed_timestamp",
			Help: "Timestamp of last streamed item",
		},
	)
)
View Source
var InitialScannerBufferSize = 10 * 10
View Source
var ItemBufferSize = 100

ItemBufferSize defines the size of the channel buffer when parsing Items. Allowing the channel to buffer makes a significant throughput improvement to the parsing.

View Source
var MaxLogLineSize = 10 * 1024 * 1024

MaxLogLineSize denotes the maximum size, in bytes, that we can scan in a single log line. It is possible to pass really large arrays of parameters to Postgres queries which is why this has to be so large.

View Source
var PostgresTimestampFormat = "2006-01-02 15:04:05.000 MST"

PostgresTimestampFormat is the Go template format that we expect to find our errlog

View Source
var StreamFilterBufferSize = 100

StreamFilterBufferSize is the size of the channel buffer when filtering items for a time range

Functions

func ItemMarshalJSON

func ItemMarshalJSON(item Item) ([]byte, error)

func NewLogScanner

func NewLogScanner(input io.Reader, buffer []byte) *bufio.Scanner

NewLogScanner constructs a scanner that will produce a single token per errlog line from Postgres logs. Postgres errlog format looks like this:

2018-05-03|gc|LOG: duration: 0.096 ms parse <unnamed>:

DELETE FROM que_jobs
WHERE queue    = $1::text

...where a log line can spill over multiple lines, with trailing lines marked with a preceding \t.

func ParseBindParameters

func ParseBindParameters(input string, buffer []byte) ([]interface{}, error)

ParseBindParameters constructs an interface slice from the suffix of a DETAIL parameter Postgres errlog. An example input to this function would be:

$1 = ”, $2 = '30', $3 = '2018-05-03 10:26:27.905086+00'

...and this would be parsed into []interface{"", "30", "2018-05-03 10:26:27.905086+00"}

func ParseErrlog

func ParseErrlog(errlog io.Reader) (items chan Item, errs chan error, done chan error)

ParseErrlog generates a stream of Items from the given PostgreSQL errlog. Log line parsing errors are returned down the errs channel, and we signal having finished our parsing by sending a value down the done channel.

func ParseJSON

func ParseJSON(jsonlog io.Reader) (items chan Item, errs chan error, done chan error)

ParseJSON operates on a file of JSON serialized Item elements, and pushes the parsed items down the returned channel.

Types

type BoundExecute

type BoundExecute struct {
	Execute
	Parameters []interface{} `json:"parameters"`
}

BoundExecute represents an Execute that is now successfully bound with parameters

func (BoundExecute) Handle

func (e BoundExecute) Handle(conn *pgx.Conn) error

type Conn

type Conn struct {
	*pgx.Conn
	channels.Channel
	sync.Once
}

Conn represents a single database connection handling a stream of work Items

func (*Conn) Close

func (c *Conn) Close()

func (*Conn) Start

func (c *Conn) Start() error

Start begins to process the items that are placed into the Conn's channel. We'll finish once the connection has died or we run out of items to process.

type Connect

type Connect struct{ Details }

func (Connect) Handle

func (_ Connect) Handle(_ *pgx.Conn) error

type Database

type Database struct {
	pgx.ConnConfig
	*pgtype.ConnInfo
	// contains filtered or unexported fields
}

func NewDatabase

func NewDatabase(cfg pgx.ConnConfig) (*Database, error)

func (*Database) Connect

func (d *Database) Connect(item Item) (*Conn, error)

Connect establishes a new connection to the database, reusing the ConnInfo that was generated when the Database was constructed. The wg is incremented whenever we establish a new connection and decremented when we disconnect.

func (*Database) Consume

func (d *Database) Consume(items chan Item) (chan error, chan error)

Consume iterates through all the items in the given channel and attempts to process them against the item's session connection. Consume returns two error channels, the first for per item errors that should be used for diagnostics only, and the second to indicate unrecoverable failures.

Once all items have finished processing, both channels will be closed.

type Details

type Details struct {
	Timestamp time.Time `json:"timestamp"`
	SessionID SessionID `json:"session_id"`
	User      string    `json:"user"`
	Database  string    `json:"database"`
}

func (Details) GetDatabase

func (e Details) GetDatabase() string

func (Details) GetSessionID

func (e Details) GetSessionID() SessionID

func (Details) GetTimestamp

func (e Details) GetTimestamp() time.Time

func (Details) GetUser

func (e Details) GetUser() string

type Disconnect

type Disconnect struct{ Details }

func (Disconnect) Handle

func (_ Disconnect) Handle(conn *pgx.Conn) error

type Execute

type Execute struct {
	Details
	Query string `json:"query"`
}

Execute is parsed and awaiting arguments. It deliberately lacks a Handle method as it shouldn't be possible this statement to have been parsed without a following duration or detail line that bound it.

func (Execute) Bind

func (e Execute) Bind(parameters []interface{}) BoundExecute

type Item

type Item interface {
	GetTimestamp() time.Time
	GetSessionID() SessionID
	GetUser() string
	GetDatabase() string
	Handle(*pgx.Conn) error
}

func ItemUnmarshalJSON

func ItemUnmarshalJSON(payload []byte) (Item, error)

func ParseItem

func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) (Item, error)

ParseItem constructs a Item from Postgres errlogs. The format we accept is log_line_prefix='%m|%u|%d|%c|', so we can split by | to discover each component.

The unbounds map allows retrieval of an Execute that was previously parsed for a session, as we expect following log lines to complete the Execute with the parameters it should use.

type ParserFunc

type ParserFunc func(io.Reader) (items chan Item, errs chan error, done chan error)

ParserFunc is the standard interface to provide items from a parsing source

type ReplayType

type ReplayType int

type SessionID

type SessionID string

type Statement

type Statement struct {
	Details
	Query string `json:"query"`
}

func (Statement) Handle

func (s Statement) Handle(conn *pgx.Conn) error

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

func NewStreamer

func NewStreamer(start, finish *time.Time) Streamer

func (Streamer) Filter

func (s Streamer) Filter(items chan Item) chan Item

Filter takes a Item stream and filters all items that don't match the desired time range, along with any items that are nil. Filtering of items before our start happens synchronously on first call, which will block initially until matching items are found.

This function assumes that items are pushed down the channel in chronological order.

func (Streamer) Stream

func (s Streamer) Stream(items chan Item, rate float64) (chan Item, error)

Stream takes all the items from the given items channel and returns a channel that will receive those events at a simulated given rate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL