Documentation ¶
Index ¶
- Constants
- Variables
- func ItemMarshalJSON(item Item) ([]byte, error)
- func NewLogScanner(input io.Reader, buffer []byte) *bufio.Scanner
- func ParseBindParameters(input string, buffer []byte) ([]interface{}, error)
- func ParseErrlog(errlog io.Reader) (items chan Item, errs chan error, done chan error)
- func ParseJSON(jsonlog io.Reader) (items chan Item, errs chan error, done chan error)
- type BoundExecute
- type Conn
- type Connect
- type Database
- type Details
- type Disconnect
- type Execute
- type Item
- type ParserFunc
- type ReplayType
- type SessionID
- type Statement
- type Streamer
Constants ¶
const ( LogConnectionAuthorized = "LOG: connection authorized: " LogConnectionReceived = "LOG: connection received: " LogConnectionDisconnect = "LOG: disconnection: " LogStatement = "LOG: statement: " LogDuration = "LOG: duration: " LogExtendedProtocolExecute = "LOG: execute <unnamed>: " LogExtendedProtocolParameters = "DETAIL: parameters: " LogNamedPrepareExecute = "LOG: execute " LogDetail = "DETAIL: " LogError = "ERROR: " )
const ( ConnectLabel = "Connect" StatementLabel = "Statement" BoundExecuteLabel = "BoundExecute" DisconnectLabel = "Disconnect" )
Variables ¶
var ( ConnectionsActive = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "pgreplay_connections_active", Help: "Number of connections currently open against Postgres", }, ) ConnectionsEstablishedTotal = prometheus.NewCounter( prometheus.CounterOpts{ Name: "pgreplay_connections_established_total", Help: "Number of connections established against Postgres", }, ) ItemsProcessedTotal = prometheus.NewCounter( prometheus.CounterOpts{ Name: "pgreplay_items_processed_total", Help: "Total count of replay items that have been sent to the database", }, ) ItemsMostRecentTimestamp = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "pgreplay_items_most_recent_timestamp", Help: "Most recent timestamp of processed items", }, ) )
var ( LogLinesParsedTotal = prometheus.NewCounter( prometheus.CounterOpts{ Name: "pgreplay_log_lines_parsed_total", Help: "Number of log lines parsed since boot", }, ) LogLinesErrorTotal = prometheus.NewCounter( prometheus.CounterOpts{ Name: "pgreplay_log_lines_error_total", Help: "Number of log lines that failed to parse", }, ) )
var ( ItemsFilteredTotal = prometheus.NewCounter( prometheus.CounterOpts{ Name: "pgreplay_items_filtered_total", Help: "Number of items filtered by start/finish range", }, ) ItemsFilterProgressFraction = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "pgreplay_items_filter_progress_fraction", Help: "Fractional progress through filter range, assuming linear distribution", }, ) ItemsLastStreamedTimestamp = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "pgreplay_items_last_streamed_timestamp", Help: "Timestamp of last streamed item", }, ) )
var InitialScannerBufferSize = 10 * 10
var ItemBufferSize = 100
ItemBufferSize defines the size of the channel buffer when parsing Items. Allowing the channel to buffer makes a significant throughput improvement to the parsing.
var MaxLogLineSize = 10 * 1024 * 1024
MaxLogLineSize denotes the maximum size, in bytes, that we can scan in a single log line. It is possible to pass really large arrays of parameters to Postgres queries which is why this has to be so large.
var PostgresTimestampFormat = "2006-01-02 15:04:05.000 MST"
PostgresTimestampFormat is the Go template format that we expect to find our errlog
var StreamFilterBufferSize = 100
StreamFilterBufferSize is the size of the channel buffer when filtering items for a time range
Functions ¶
func ItemMarshalJSON ¶
func NewLogScanner ¶
NewLogScanner constructs a scanner that will produce a single token per errlog line from Postgres logs. Postgres errlog format looks like this:
2018-05-03|gc|LOG: duration: 0.096 ms parse <unnamed>:
DELETE FROM que_jobs WHERE queue = $1::text
...where a log line can spill over multiple lines, with trailing lines marked with a preceding \t.
func ParseBindParameters ¶
ParseBindParameters constructs an interface slice from the suffix of a DETAIL parameter Postgres errlog. An example input to this function would be:
$1 = ”, $2 = '30', $3 = '2018-05-03 10:26:27.905086+00'
...and this would be parsed into []interface{"", "30", "2018-05-03 10:26:27.905086+00"}
func ParseErrlog ¶
ParseErrlog generates a stream of Items from the given PostgreSQL errlog. Log line parsing errors are returned down the errs channel, and we signal having finished our parsing by sending a value down the done channel.
Types ¶
type BoundExecute ¶
type BoundExecute struct { Execute Parameters []interface{} `json:"parameters"` }
BoundExecute represents an Execute that is now successfully bound with parameters
type Database ¶
type Database struct { pgx.ConnConfig *pgtype.ConnInfo // contains filtered or unexported fields }
func NewDatabase ¶
func NewDatabase(cfg pgx.ConnConfig) (*Database, error)
func (*Database) Connect ¶
Connect establishes a new connection to the database, reusing the ConnInfo that was generated when the Database was constructed. The wg is incremented whenever we establish a new connection and decremented when we disconnect.
func (*Database) Consume ¶
Consume iterates through all the items in the given channel and attempts to process them against the item's session connection. Consume returns two error channels, the first for per item errors that should be used for diagnostics only, and the second to indicate unrecoverable failures.
Once all items have finished processing, both channels will be closed.
type Details ¶
type Details struct { Timestamp time.Time `json:"timestamp"` SessionID SessionID `json:"session_id"` User string `json:"user"` Database string `json:"database"` }
func (Details) GetDatabase ¶
func (Details) GetSessionID ¶
func (Details) GetTimestamp ¶
type Disconnect ¶
type Disconnect struct{ Details }
type Execute ¶
Execute is parsed and awaiting arguments. It deliberately lacks a Handle method as it shouldn't be possible this statement to have been parsed without a following duration or detail line that bound it.
func (Execute) Bind ¶
func (e Execute) Bind(parameters []interface{}) BoundExecute
type Item ¶
type Item interface { GetTimestamp() time.Time GetSessionID() SessionID GetUser() string GetDatabase() string Handle(*pgx.Conn) error }
func ItemUnmarshalJSON ¶
func ParseItem ¶
ParseItem constructs a Item from Postgres errlogs. The format we accept is log_line_prefix='%m|%u|%d|%c|', so we can split by | to discover each component.
The unbounds map allows retrieval of an Execute that was previously parsed for a session, as we expect following log lines to complete the Execute with the parameters it should use.
type ParserFunc ¶
ParserFunc is the standard interface to provide items from a parsing source
type ReplayType ¶
type ReplayType int
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
func NewStreamer ¶
func (Streamer) Filter ¶
Filter takes a Item stream and filters all items that don't match the desired time range, along with any items that are nil. Filtering of items before our start happens synchronously on first call, which will block initially until matching items are found.
This function assumes that items are pushed down the channel in chronological order.