Documentation ¶
Overview ¶
Capture changes from a Postgres database by creating and managing a publication, then streaming changes over a logical replication socket. Generates a stream of changelog entries that can be pushed into a sink.
Index ¶
- Constants
- Variables
- func BuildChangelog(logger kitlog.Logger, decoder decode.Decoder, stream *Stream) changelog.Changelog
- func MarshalTuple(decoder decode.Decoder, relation *logical.Relation, tuple []logical.Element) (map[string]interface{}, error)
- func Sequence(messages <-chan interface{}) <-chan SequencedMessage
- type Manager
- func (m *Manager) Manage(ctx context.Context, logger kitlog.Logger, sub Subscription) (err error)
- func (m *Manager) Reconcile(ctx context.Context, sub Subscription) (watchedNotPublished changelog.Tables, publishedNotWatched changelog.Tables, ...)
- func (m *Manager) Shutdown(ctx context.Context) error
- type ManagerOptions
- type Publication
- func CreatePublication(ctx context.Context, db *sql.DB, name, id string) (*Publication, error)
- func FindOrCreatePublication(ctx context.Context, logger kitlog.Logger, db *sql.DB, name string) (*Publication, error)
- func FindPublication(ctx context.Context, db *sql.DB, name string) (*Publication, error)
- func (p Publication) Begin(ctx context.Context, db *sql.DB, action func(PublicationSession) error) error
- func (p Publication) GetReplicationSlotName() string
- func (p Publication) GetTables(ctx context.Context, db *sql.DB) (tables changelog.Tables, err error)
- func (p Publication) String() string
- func (p Publication) UnsafeSetTables(ctx context.Context, db *sql.DB, tables ...changelog.Table) error
- type PublicationSession
- type ReplicationSlot
- type SequencedMessage
- type Stream
- type StreamOptions
- type Subscription
- type SubscriptionOptions
Constants ¶
const PublicationAdvisoryLockPrefix = 0x0096c14500000000
PublicationAdvisoryLockPrefix is the 64-bit bitmask we combine with the 32-bit Postgres publication oid to generate an advisory lock for session control. The value has so special significance.
Variables ¶
var NonReplicationConnection = errors.New("connection has not been created with replication=database")
Functions ¶
func BuildChangelog ¶
func BuildChangelog(logger kitlog.Logger, decoder decode.Decoder, stream *Stream) changelog.Changelog
BuildChangelog produces a stream of changelog entries from raw logical messages produced by a subscription.
This function is where we translate Postgres data into Golang types, via the decoder, and marshal the results into changelog entries.
It currently lacks error reporting, which should be added whenever possible as we can fail in many ways. Handling those errors will be key to availability and data durability.
func MarshalTuple ¶ added in v0.6.0
func MarshalTuple(decoder decode.Decoder, relation *logical.Relation, tuple []logical.Element) (map[string]interface{}, error)
MarshalTuple generates a map[string]interface{} from a tuple, using the column type information in the relation to find appropriate scanners in the decoder.
This is the function that defines how a logical message is encoded into a changelog entry. It's important that it matches the behaviour of the import code, to prevent imported rows from being represented differently to those that came via the logical stream.
func Sequence ¶
func Sequence(messages <-chan interface{}) <-chan SequencedMessage
Sequence receives a channel containing logical replication messages and produces a channel which annotates each message with commit information. Sequenced structs can be tracked back to a specific LSN, and logically ordered by sequence number, ensuring we can detect the authoriative row value even if the same row is updated many times within the same transaction.
This will almost always be used like so:
Sequence(stream.Messages())
Where stream is an active Stream.
Types ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager supervises a subscription, adding and removing tables into the publication according to the white/blacklist.
func NewManager ¶
func NewManager(db *sql.DB, opts ManagerOptions) *Manager
func (*Manager) Manage ¶
Manage begins syncing tables into publication using the rules configured on the manager options. It will run until the context expires.
type ManagerOptions ¶
type ManagerOptions struct { Schemas []string // list of schemas to watch Excludes []string // optional blacklist Includes []string // optional whitelist, combined with blacklist PollInterval time.Duration // interval to scan Postgres for new matching tables }
func (*ManagerOptions) Bind ¶
func (opt *ManagerOptions) Bind(cmd *kingpin.CmdClause, prefix string) *ManagerOptions
type Publication ¶
Publication represents a Postgres publication, the publishing component of a subscription. It is coupled with a ReplicationSlot as a component of a Subscription.
func CreatePublication ¶
CreatePublication transactionally creates and comments on a new publication. The comment will be the unique subscription identifier.
func FindOrCreatePublication ¶
func FindOrCreatePublication(ctx context.Context, logger kitlog.Logger, db *sql.DB, name string) (*Publication, error)
FindOrCreatePublication will attempt to find an existing publication, or create from scratch with a fresh publication ID.
func FindPublication ¶
FindPublication attempts to find an existing publication by the given name.
func (Publication) Begin ¶ added in v0.6.0
func (p Publication) Begin(ctx context.Context, db *sql.DB, action func(PublicationSession) error) error
Begin ensures we make changes to a publication safely, with respect to other concurrent actors. The methods GetTables and SetTables are often used together, and are prone to racing- by exposing them only via a session, and making that session accessible only around locks, we can prevent callers from accidentally hurting themselves.
func (Publication) GetReplicationSlotName ¶
func (p Publication) GetReplicationSlotName() string
GetReplicationSlotName generates the name of the replication slot associated with the publication. The ID is used to prove a slot was created against the existing publication, and to catch when publications have been dropped/recreated.
func (Publication) GetTables ¶
func (p Publication) GetTables(ctx context.Context, db *sql.DB) (tables changelog.Tables, err error)
GetTables returns a slice of table names that are included on the publication.
func (Publication) String ¶
func (p Publication) String() string
String provides an easy printer for anything using publications
func (Publication) UnsafeSetTables ¶ added in v0.6.0
func (p Publication) UnsafeSetTables(ctx context.Context, db *sql.DB, tables ...changelog.Table) error
UnsafeSetTables resets the publication to include the given tables only. It is unsafe unless used through a session, as set will clobber any concurrent changes.
type PublicationSession ¶ added in v0.6.0
type ReplicationSlot ¶
type ReplicationSlot struct {
Name string
}
ReplicationSlot represents a replication slot inside of Postgres, helping track changes on a publication. These slots are created with just a single output format, which is pg_output. They are permanent, not temporary.
func (ReplicationSlot) GetConfirmedFlushLSN ¶
type SequencedMessage ¶
type SequencedMessage struct { Begin logical.Begin Sequence uint64 Entry logical.Message // the original message }
SequencedMessage wraps logical messages with the begin message associated with the transaction that the message was contained within.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents an on-going replication stream, managed by a subscription. Consumers of the stream can acknowledge processing messages using the Confirm() method.
type StreamOptions ¶
func (*StreamOptions) Bind ¶
func (opt *StreamOptions) Bind(cmd *kingpin.CmdClause, prefix string) *StreamOptions
type Subscription ¶
type Subscription struct { Publication ReplicationSlot SubscriptionOptions }
Subscription is a wrapper around a Postgres publication and replication slot, coupled together via a unique identifier. Both replication slot and publication must be created before a subscription is used, to ensure Postgres retains unprocessed WAL from the moment the subscription is started.
This implementation provides similar functionality to the CREATE SUBSCRIPTION command, in terms of managing the replication slot and providing a Start function that will subscribe to changes on the target publication.
https://www.postgresql.org/docs/11/sql-createsubscription.html
func Create ¶
func Create(ctx context.Context, logger kitlog.Logger, db *sql.DB, repconn *pgx.Conn, opts SubscriptionOptions) (*Subscription, error)
Create initialises a subscription once the publication and replication slot has been created. This is the only way to create a subscription, to ensure a replication slot exists before anyone can call Start().
func (*Subscription) GetID ¶
func (s *Subscription) GetID() string
GetID is an easy accessor for code that needs the subscription ID. As we track and store the ID on the publication, it makes sense for the Publication struct to include it, but this is also the subscription ID.
func (*Subscription) Start ¶
func (s *Subscription) Start(ctx context.Context, logger kitlog.Logger, conn *pgx.Conn, opts StreamOptions) (*Stream, error)
Start begins replicating from our remote. We set our WAL position to whatever the server tells us our replication slot was last recorded at, then proceed to heartbeat and replicate our remote.
type SubscriptionOptions ¶
type SubscriptionOptions struct {
Name string // name of the publication, and prefix of replication slot
}