keryxlib

package module
v0.0.0-...-c42b739 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2018 License: BSD-3-Clause Imports: 8 Imported by: 1

README

keryxlib - parse postgres WAL logs as json events

Deprecated - this library is no longer actively maintained. Move to logical decoding in postgres 9.4 or higher.

Keryxlib is a system for parsing postgres WAL logs and turning them into json events. The basic algorithm for this is:

  • Parse the WAL log and create a wal.Entry record. This is a basic record that contains basic WAL data.
  • Entry records are buffered until either a commit or a rollback is detected. Entries that are rolled back are discarded.
  • On commit each entry is turned into a message.Message record.
  • Each message is populated by querying the postgres RDBMS.
  • Messages are added to a message.Transaction record for delivery.

Example usage

msgChan, err := keryxlib.TransactionChannel("<ID STRING EMBEDDED IN TRANSACTIONS", config)

The config file that is passed into this function:

{
	"data_dir": "/opt/postgresql/data",
	"max_message_per_txn":1000, 
	"pg_conn_strings": [
		"user=user1 password=password1 host=/var/run/postgresql port=5432 dbname=db1 sslmode=disable"
	],
	"buffer_max": 100000,
	"buffer_directory": "/var/tmp/keryx/buffer",
	"exclude": {
		"db1.public.users":["password"],
		"db1.schema1.boo":["*"],
		"db1.schema2.moo":["*"],
		"db1.schema3.goo":["*"]
	}
}
Filters

Frequently it is useful to not include certain output in the keryx channel. To support this keryxlib supports filtering tables prior to buffering the WAL entry. It also supports filtering out specific columns at the population step. The format for filtering is "dbname.schemaname.tablename":["columnname1", "columnname2"]. Filtering also supports * in the colun name array, which means all columns.

Inclusive vs Exclusive Filtering

The filters package supports both inclusive and exclusive filtering. In inclusive filtering only tables and columns that are explicitly listed will be sent into the channel. In exclusive filtering, all tables and columns will be sent by default, but any columns that are explicitly listed in the filter will be removed from the published messages and any tables that have * columns excluded will be excluded entirely.

Database Connections

The postgres WAL log contains records for every database in the postgres instance. A single connection for querying the RDBMS must be on a database by database level. Therefore if you want to send messages for multiple databases in the message channel you must provide multiple connection strings, one for each database. Any message for a database that does not have a connection string will be automatically filtered from the message channel.

Big Transactions

Transactions in some cases can become very big. The cost of populating these very large transactions is very expensive. In some cases this cost is not worth the effort. If "max_message_per_txn" is set any transaction that has more messages than that value in it, will not populate the messages field and instead will have the tables that were impacted in the transaction listed as well as a count for the number of messages.

Keryxlib misses data when...

Keryxlib will miss data in certain known cases.

Deletes

By the time keryxlib sees deletes from the WAL log, the information about the fields that were deleted is already gone. Therefore delete messages will not have any field level information, including any IDs of the row in question. The tuple id will be available. This means that if your system needs to publish the ids of a specific delete then you will need to augment keryxlib with an external mapping between tuple id and entity id.

Population lag causes missed message population

Keryxlib runs behind the postgres replication application. If a second update or delete is applied to a row that keryxlib is trying to populate before keryxlib can populate it, that message will have a population error applied to it, and message field information will not be available for that message. In cases where lots of WAL log entries are written, keryxlib will fall behind and the lag between it and postgres will increase. In that case the chance of an overwrite and subsequent population error increases.

WAL log files removed before keryxlib can read them.

If WAL log rotation happens on files that keryxlib has not read then that data will be missed by keryxlib. In some degenerate cases the WAL log rotation happens very fast and keryxlib cannot keep up. Conversely, in some cases keryxlib is reading too fast and encounters WAL log files that are not yet populated with new replication data. In that case it will wait for the WAL log application to catch up.

Insufficient query priveleges to populate a message.

While keryxlib will filter any messages for databases it does not have a connection for, if a message comes in for a database with a connection, but for a schema or table that the connections user cannot read, the message will be published with a population error.

Documentation

Index

Constants

View Source
const BufferDirectoryDefaultBase = "/var/tmp/keryx"

BufferDirectoryDefaultBase is the root directory to attempt to create buffers files in if nothing else is specified

Variables

This section is empty.

Functions

func StartSummaryChannel

func StartSummaryChannel(ctx context.Context, serverVersion string, kc *Config) (<-chan message.TxnSummary, error)

StartSummaryChannel sets up a keryx symmary stream and schema reader with the provided configuration and returns a channel

func StartTransactionChannel

func StartTransactionChannel(serverVersion string, kc *Config, stopper WaitForStop) (<-chan *message.Transaction, error)

StartTransactionChannel sets up a keryx stream and schema reader with the provided configuration and return it as a channel. The channel can be stopped with the provided stopper

func TransactionChannel

func TransactionChannel(serverVersion string, kc *Config) (<-chan *message.Transaction, error)

TransactionChannel sets up a keryx stream and schema reader with the provided configuration and returns it as a channel

Types

type Config

type Config struct {
	DataDir          string              `json:"data_dir"`
	PGConnStrings    []string            `json:"pg_conn_strings"`
	BufferMax        int                 `json:"buffer_max"`
	ExcludeRelations map[string][]string `json:"exclude,omitempty"`
	IncludeRelations map[string][]string `json:"include,omitempty"`
	BufferDirectory  string              `json:"buffer_directory"`
	MaxMessagePerTxn uint                `json:"max_message_per_txn"`
}

Config contains necessary information to start a keryx stream

func ConfigFromFile

func ConfigFromFile(path string) (*Config, error)

ConfigFromFile loads a config object from a json file

func (*Config) ExcludedTables

func (config *Config) ExcludedTables() []message.Table

ExcludedTables returns message.Tables from the config

func (*Config) GetBufferDirectoryOrTemp

func (config *Config) GetBufferDirectoryOrTemp() (bufferWorkingDirectory string, err error)

GetBufferDirectoryOrTemp gets the buffer directory out of the config file. If it isnt defined it creates it in BufferDirectoryDefaultBase

func (*Config) IncludedTables

func (config *Config) IncludedTables() []message.Table

IncludedTables returns message.Tables from the config

type FullStream

type FullStream struct {
	MaxMessageCount uint
	// contains filtered or unexported fields
}

FullStream is a facade around the full process of taking WAL entries and publishing them as txn messages.

func NewKeryxStream

func NewKeryxStream(sr *pg.SchemaReader, maxMessageCount uint) *FullStream

NewKeryxStream takes a schema reader and returns a FullStream

func (*FullStream) StartKeryxStream

func (fs *FullStream) StartKeryxStream(serverVersion string, filters filters.MessageFilter, dataDir string, bufferWorkingDirectory string) (<-chan *message.Transaction, error)

StartKeryxStream will start all the streams necessary to go from WAL entries to txn messages.

func (*FullStream) Stop

func (fs *FullStream) Stop()

Stop will end the reading on the WAL log and subsequent streams will therefore end.

type TxnChannelStopper

type TxnChannelStopper struct {
	// contains filtered or unexported fields
}

TxnChannelStopper can be used to stop a Transaction channel

func NewTxnChannelStopper

func NewTxnChannelStopper() *TxnChannelStopper

NewTxnChannelStopper creates a TxnChannelStopper

func (*TxnChannelStopper) Stop

func (t *TxnChannelStopper) Stop()

Stop will initiate a Transaction channel shutdown

func (*TxnChannelStopper) Wait

func (t *TxnChannelStopper) Wait()

Wait will block until Stop is called {

type WaitForStop

type WaitForStop interface {
	Wait()
}

WaitForStop will wait

Directories

Path Synopsis
pg
wal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL