pglogrepl

package module
v1.1.0-fork2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2019 License: MIT Imports: 10 Imported by: 0

README

Build Status

pglogrepl

pglogrepl is a Go package for PostgreSQL logical replication.

pglogrepl uses package github.com/jackc/pgconn as its underlying PostgreSQL connection.

Proper use of this package requires understanding the underlying PostgreSQL concepts. See https://www.postgresql.org/docs/current/protocol-replication.html.

Example

In example/pglogrepl_demo, there is an example demo program that connects to a database and logs all messages sent over logical replication.

Testing

Testing requires a user with replication permission, a database to replicate, access allowed in pg_hba.conf, and logical replication enabled in postgresql.conf.

Create a database:

create database pglogrepl;

Create a user:

create user pglogrepl with replication password 'secret';

Add a replication line to your pg_hba.conf:

host replication pglogrepl 127.0.0.1/32 md5

Change the following settings in your postgresql.conf:

wal_level=logical
max_wal_senders=5
max_replication_slots=5

To run the tests set PGLOGREPL_TEST_CONN_STRING environment variable with a replication connection string (URL or DSN).

Example:

PGLOGREPL_TEST_CONN_STRING=postgres://pglogrepl:secret@127.0.0.1/pglogrepl?replication=database go test

Documentation

Overview

pglogrepl package implements PostgreSQL logical replication client functionality.

pglogrepl uses package github.com/jackc/pgconn as its underlying PostgreSQL connection. Use pgconn to establish a connection to PostgreSQL and then use the pglogrepl functions on that connection.

Proper use of this package requires understanding the underlying PostgreSQL concepts. See https://www.postgresql.org/docs/current/protocol-replication.html.

Index

Constants

View Source
const (
	XLogDataByteID                = 'w'
	PrimaryKeepaliveMessageByteID = 'k'
	StandbyStatusUpdateByteID     = 'r'
)

Variables

This section is empty.

Functions

func DropReplicationSlot

func DropReplicationSlot(ctx context.Context, conn *pgconn.PgConn, slotName string, options DropReplicationSlotOptions) error

DropReplicationSlot drops a logical replication slot.

func SendStandbyStatusUpdate

func SendStandbyStatusUpdate(ctx context.Context, conn *pgconn.PgConn, ssu StandbyStatusUpdate) error

SendStandbyStatusUpdate sends a StandbyStatusUpdate to the PostgreSQL server.

The only required field in ssu is WALWritePosition. If WALFlushPosition is 0 then WALWritePosition will be assigned to it. If WALApplyPosition is 0 then WALWritePosition will be assigned to it. If ClientTime is the zero value then the current time will be assigned to it.

func StartReplication

func StartReplication(ctx context.Context, conn *pgconn.PgConn, slotName string, startLSN LSN, options StartReplicationOptions) error

StartReplication begins the replication process by executing the START_REPLICATION command.

Types

type CreateReplicationSlotOptions

type CreateReplicationSlotOptions struct {
	Temporary      bool
	SnapshotAction string
}

type CreateReplicationSlotResult

type CreateReplicationSlotResult struct {
	SlotName        string
	ConsistentPoint string
	SnapshotName    string
	OutputPlugin    string
}

CreateReplicationSlotResult is the parsed results the CREATE_REPLICATION_SLOT command.

func CreateReplicationSlot

func CreateReplicationSlot(
	ctx context.Context,
	conn *pgconn.PgConn,
	slotName string,
	outputPlugin string,
	options CreateReplicationSlotOptions,
) (CreateReplicationSlotResult, error)

CreateReplicationSlot creates a logical replication slot.

func ParseCreateReplicationSlot

func ParseCreateReplicationSlot(mrr *pgconn.MultiResultReader) (CreateReplicationSlotResult, error)

ParseCreateReplicationSlot parses the result of the CREATE_REPLICATION_SLOT command.

type DropReplicationSlotOptions

type DropReplicationSlotOptions struct {
	Wait bool
}

type IdentifySystemResult

type IdentifySystemResult struct {
	SystemID string
	Timeline int32
	XLogPos  LSN
	DBName   string
}

IdentifySystemResult is the parsed result of the IDENTIFY_SYSTEM command.

func IdentifySystem

func IdentifySystem(ctx context.Context, conn *pgconn.PgConn) (IdentifySystemResult, error)

IdentifySystem executes the IDENTIFY_SYSTEM command.

func ParseIdentifySystem

func ParseIdentifySystem(mrr *pgconn.MultiResultReader) (IdentifySystemResult, error)

ParseIdentifySystem parses the result of the IDENTIFY_SYSTEM command.

type LSN

type LSN uint64

LSN is a PostgreSQL Log Sequence Number. See https://www.postgresql.org/docs/current/datatype-pg-lsn.html.

func ParseLSN

func ParseLSN(s string) (LSN, error)

Parse the given XXX/XXX text format LSN used by PostgreSQL.

func (LSN) String

func (lsn LSN) String() string

String formats the LSN value into the XXX/XXX format which is the text format used by PostgreSQL.

type PrimaryKeepaliveMessage

type PrimaryKeepaliveMessage struct {
	ServerWALEnd   LSN
	ServerTime     time.Time
	ReplyRequested bool
}

func ParsePrimaryKeepaliveMessage

func ParsePrimaryKeepaliveMessage(buf []byte) (PrimaryKeepaliveMessage, error)

ParsePrimaryKeepaliveMessage parses a Primary keepalive message from the server.

type StandbyStatusUpdate

type StandbyStatusUpdate struct {
	WALWritePosition LSN       // The WAL position that's been locally written
	WALFlushPosition LSN       // The WAL position that's been locally flushed
	WALApplyPosition LSN       // The WAL position that's been locally applied
	ClientTime       time.Time // Client system clock time
	ReplyRequested   bool      // Request server to reply immediately.
}

StandbyStatusUpdate is a message sent from the client that acknowledges receipt of WAL records.

type StartReplicationOptions

type StartReplicationOptions struct {
	Timeline        int32 // 0 means current server timeline
	PluginArguments []string
}

type XLogData

type XLogData struct {
	WALStart     LSN
	ServerWALEnd LSN
	ServerTime   time.Time
	WALData      []byte
}

func ParseXLogData

func ParseXLogData(buf []byte) (XLogData, error)

ParseXLogData parses a XLogData message from the server.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL