subscription

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2021 License: MIT Imports: 29 Imported by: 0

Documentation

Overview

Capture changes from a Postgres database by creating and managing a publication, then streaming changes over a logical replication socket. Generates a stream of changelog entries that can be pushed into a sink.

Index

Constants

View Source
const PublicationAdvisoryLockPrefix = 0x0096c14500000000

PublicationAdvisoryLockPrefix is the 64-bit bitmask we combine with the 32-bit Postgres publication oid to generate an advisory lock for session control. The value has so special significance.

Variables

View Source
var NonReplicationConnection = errors.New("connection has not been created with replication=database")

Functions

func BuildChangelog

func BuildChangelog(logger kitlog.Logger, decoder decode.Decoder, stream *Stream) changelog.Changelog

BuildChangelog produces a stream of changelog entries from raw logical messages produced by a subscription.

This function is where we translate Postgres data into Golang types, via the decoder, and marshal the results into changelog entries.

It currently lacks error reporting, which should be added whenever possible as we can fail in many ways. Handling those errors will be key to availability and data durability.

func MarshalTuple added in v0.6.0

func MarshalTuple(decoder decode.Decoder, relation *logical.Relation, tuple []logical.Element) (map[string]interface{}, error)

MarshalTuple generates a map[string]interface{} from a tuple, using the column type information in the relation to find appropriate scanners in the decoder.

This is the function that defines how a logical message is encoded into a changelog entry. It's important that it matches the behaviour of the import code, to prevent imported rows from being represented differently to those that came via the logical stream.

func Sequence

func Sequence(messages <-chan interface{}) <-chan SequencedMessage

Sequence receives a channel containing logical replication messages and produces a channel which annotates each message with commit information. Sequenced structs can be tracked back to a specific LSN, and logically ordered by sequence number, ensuring we can detect the authoriative row value even if the same row is updated many times within the same transaction.

This will almost always be used like so:

Sequence(stream.Messages())

Where stream is an active Stream.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager supervises a subscription, adding and removing tables into the publication according to the white/blacklist.

func NewManager

func NewManager(db *sql.DB, opts ManagerOptions) *Manager

func (*Manager) Manage

func (m *Manager) Manage(ctx context.Context, logger kitlog.Logger, sub Subscription) (err error)

Manage begins syncing tables into publication using the rules configured on the manager options. It will run until the context expires.

func (*Manager) Reconcile

func (m *Manager) Reconcile(ctx context.Context, sub Subscription) (watchedNotPublished changelog.Tables, publishedNotWatched changelog.Tables, err error)

Reconcile ensures all watched tables are added to the subscription, through the Postgres publication.

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

type ManagerOptions

type ManagerOptions struct {
	Schemas      []string      // list of schemas to watch
	Excludes     []string      // optional blacklist
	Includes     []string      // optional whitelist, combined with blacklist
	PollInterval time.Duration // interval to scan Postgres for new matching tables
}

func (*ManagerOptions) Bind

func (opt *ManagerOptions) Bind(cmd *kingpin.CmdClause, prefix string) *ManagerOptions

type Publication

type Publication struct {
	OID  uint32
	Name string
	ID   string
}

Publication represents a Postgres publication, the publishing component of a subscription. It is coupled with a ReplicationSlot as a component of a Subscription.

func CreatePublication

func CreatePublication(ctx context.Context, db *sql.DB, name, id string) (*Publication, error)

CreatePublication transactionally creates and comments on a new publication. The comment will be the unique subscription identifier.

func FindOrCreatePublication

func FindOrCreatePublication(ctx context.Context, logger kitlog.Logger, db *sql.DB, name string) (*Publication, error)

FindOrCreatePublication will attempt to find an existing publication, or create from scratch with a fresh publication ID.

func FindPublication

func FindPublication(ctx context.Context, db *sql.DB, name string) (*Publication, error)

FindPublication attempts to find an existing publication by the given name.

func (Publication) Begin added in v0.6.0

func (p Publication) Begin(ctx context.Context, db *sql.DB, action func(PublicationSession) error) error

Begin ensures we make changes to a publication safely, with respect to other concurrent actors. The methods GetTables and SetTables are often used together, and are prone to racing- by exposing them only via a session, and making that session accessible only around locks, we can prevent callers from accidentally hurting themselves.

func (Publication) GetReplicationSlotName

func (p Publication) GetReplicationSlotName() string

GetReplicationSlotName generates the name of the replication slot associated with the publication. The ID is used to prove a slot was created against the existing publication, and to catch when publications have been dropped/recreated.

func (Publication) GetTables

func (p Publication) GetTables(ctx context.Context, db *sql.DB) (tables changelog.Tables, err error)

GetTables returns a slice of table names that are included on the publication.

func (Publication) String

func (p Publication) String() string

String provides an easy printer for anything using publications

func (Publication) UnsafeSetTables added in v0.6.0

func (p Publication) UnsafeSetTables(ctx context.Context, db *sql.DB, tables ...changelog.Table) error

UnsafeSetTables resets the publication to include the given tables only. It is unsafe unless used through a session, as set will clobber any concurrent changes.

type PublicationSession added in v0.6.0

type PublicationSession struct {
	GetTables func(ctx context.Context, db *sql.DB) (tables changelog.Tables, err error)
	SetTables func(ctx context.Context, db *sql.DB, tables ...changelog.Table) error
}

type ReplicationSlot

type ReplicationSlot struct {
	Name string
}

ReplicationSlot represents a replication slot inside of Postgres, helping track changes on a publication. These slots are created with just a single output format, which is pg_output. They are permanent, not temporary.

func (ReplicationSlot) GetConfirmedFlushLSN

func (s ReplicationSlot) GetConfirmedFlushLSN(ctx context.Context, conn *pgx.Conn) (lsn pglogrepl.LSN, err error)

type SequencedMessage

type SequencedMessage struct {
	Begin    logical.Begin
	Sequence uint64
	Entry    logical.Message // the original message
}

SequencedMessage wraps logical messages with the begin message associated with the transaction that the message was contained within.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents an on-going replication stream, managed by a subscription. Consumers of the stream can acknowledge processing messages using the Confirm() method.

func (*Stream) Confirm

func (s *Stream) Confirm(pos pglogrepl.LSN) <-chan pglogrepl.LSN

func (*Stream) Messages

func (s *Stream) Messages() <-chan interface{}

func (*Stream) Shutdown

func (s *Stream) Shutdown(ctx context.Context) error

type StreamOptions

type StreamOptions struct {
	HeartbeatInterval time.Duration
}

func (*StreamOptions) Bind

func (opt *StreamOptions) Bind(cmd *kingpin.CmdClause, prefix string) *StreamOptions

type Subscription

type Subscription struct {
	Publication
	ReplicationSlot
	SubscriptionOptions
}

Subscription is a wrapper around a Postgres publication and replication slot, coupled together via a unique identifier. Both replication slot and publication must be created before a subscription is used, to ensure Postgres retains unprocessed WAL from the moment the subscription is started.

This implementation provides similar functionality to the CREATE SUBSCRIPTION command, in terms of managing the replication slot and providing a Start function that will subscribe to changes on the target publication.

https://www.postgresql.org/docs/11/sql-createsubscription.html

func Create

func Create(ctx context.Context, logger kitlog.Logger, db *sql.DB, repconn *pgx.Conn, opts SubscriptionOptions) (*Subscription, error)

Create initialises a subscription once the publication and replication slot has been created. This is the only way to create a subscription, to ensure a replication slot exists before anyone can call Start().

func (*Subscription) GetID

func (s *Subscription) GetID() string

GetID is an easy accessor for code that needs the subscription ID. As we track and store the ID on the publication, it makes sense for the Publication struct to include it, but this is also the subscription ID.

func (*Subscription) Start

func (s *Subscription) Start(ctx context.Context, logger kitlog.Logger, conn *pgx.Conn, opts StreamOptions) (*Stream, error)

Start begins replicating from our remote. We set our WAL position to whatever the server tells us our replication slot was last recorded at, then proceed to heartbeat and replicate our remote.

type SubscriptionOptions

type SubscriptionOptions struct {
	Name string // name of the publication, and prefix of replication slot
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL