warppipe

package module
v0.0.0-...-2943bdd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2021 License: MIT Imports: 20 Imported by: 3

README

warp-pipe

NOTE: NOT FOR PRODUCTION USE, THIS IS A WORK IN PROGRESS

warp-pipe is a simple CDC tool for Postgres.

How does it work

warp-pipe supports two modes, depending on the version and configuration of Postgres that you are running:

Logical Replication (LR)
Requirements
  • Postgres >= 9.5
  • wal2json
  • Logical replication enabled (via postgresql.conf)

You need to set up at least two parameters at postgresql.conf:

// postgresql.conf (changes require restart)
wal_level = logical
max_replication_slots = 1

In LR mode, warp-pipe will connect to a replication slot on your database using the wal2json output plugin, and emit Changesets via channel.

NOTE: You must set the appropriate REPLICA IDENTITY on your tables if you wish to expose old values in changesets. To learn more, see replica identity.

Audit
Requirements
  • Postgres >= 9.4

In audit mode, warp-pipe creates a new schema (warp_pipe) with a changesets tables in your database to track modifications on your schema's tables. A trigger is registered with all configured tables to notify (via NOTIFY/LISTEN) when there are new changes to be read.

Installation

Install the warp-pipe library with:

go get github.com/perangel/warp-pipe

Install the library and daemon with:

go get github.com/perangel/warp-pipe/...

Demo

The demo shows how data in a Source DB can be accessed as a stream and used to sync to a Target DB.

  1. Creates two databases in Docker.
  2. Sets up a schema on the Source DB.
  3. Sets up a schema on the Target DB.
  4. Runs warp-pipe setup-db on the Source DB.
  5. Pauses to allow the user to load a front-end web application which loads Warp Pipe Changesets via websockets.
  6. Adds data to the Source DB.
  7. Runs axon to sync data from the Source to Target.

Run:

make demo
make demo-clean
Usage
Run a warp-pipe and stream changes from a Postgres database.

Usage:
  warp-pipe [flags]
  warp-pipe [command]

Available Commands:
  help        Help about any command
  setup-db    Setup the source database
  teardown-db Teardown the `warp_pipe` schema

Flags:
      --start-from-lsn int         stream all changes starting from the provided LSN (default -1)
      --start-from-id int          stream all changes starting from the provided changeset ID (default -1)
      --start-from-ts int          stream all changes starting from the provided timestamp (default -1)
  -M, --replication-mode string    replication mode (default "lr")
  -i, --ignore-tables strings      tables to ignore during replication
  -w, --whitelist-tables strings   tables to include during replication
  -H, --db-host string             database host
  -d, --db-name string             database name
  -P, --db-pass string             database password
  -p, --db-port int                database port
  -U, --db-user string             database user
  -L, --log-level string           log level (default "info")
  -h, --help                       help for warp-pipe

Use "warp-pipe [command] --help" for more information about a command.
Configuration
Flag Environment Variable Description Mode
--start-from-lsn START_FROM_LSN Sets the logical sequence number from which to start logical replication lr
--start-from-id START_FROM_ID Sets the changeset ID from which to start relaying changesets audit
--start-from-ts START_FROM_TIMESTAMP Sets the timestamp from which to start replaying changesets audit
-M, --replication-mode REPLICATION_MODE Sets the replication mode to one of audit or lr (logical replication) (see: requirements) *
-i, --ignore-tables IGNORE_TABLES Specify tables to exclude from replication. *
-w, --whitelist-tables WHITELIST_TABLES Specify tables to include during replication. *
-H, --db-host DB_HOST The database host. *
-d, --db-name DB_NAME The database name. *
-P, --db-pass DB_PASS The database password. *
-p, --db-port DB_PORT The database port. *
-U, --db-user DB_USER The database user. *
-L, --log-level LOG_LEVEL Sets the logging level *

Additional Reading

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseLogLevel

func ParseLogLevel(level string) (logrus.Level, error)

ParseLogLevel parses a log level string and returns a logrus.Level.

Types

type Axon

type Axon struct {
	Config *AxonConfig
	Logger *logrus.Logger
	// contains filtered or unexported fields
}

Axon listens for Warp-Pipe change sets events. Then converts them into SQL statements, executing them on the remote target.

func (*Axon) Run

func (a *Axon) Run()

Run the Axon worker.

func (*Axon) RunWithPipeline

func (a *Axon) RunWithPipeline(p *Pipeline)

func (*Axon) Shutdown

func (a *Axon) Shutdown()

Shutdown the Axon worker.

func (*Axon) Verify

func (a *Axon) Verify(schemas, includeTables, excludeTables []string) error

type AxonConfig

type AxonConfig struct {
	// source db credentials
	SourceDBHost string `envconfig:"source_db_host"`
	SourceDBPort int    `envconfig:"source_db_port"`
	SourceDBName string `envconfig:"source_db_name"`
	SourceDBUser string `envconfig:"source_db_user"`
	SourceDBPass string `envconfig:"source_db_pass"`

	// target db credentials
	TargetDBHost   string `envconfig:"target_db_host"`
	TargetDBPort   int    `envconfig:"target_db_port"`
	TargetDBName   string `envconfig:"target_db_name"`
	TargetDBUser   string `envconfig:"target_db_user"`
	TargetDBPass   string `envconfig:"target_db_pass"`
	TargetDBSchema string `envconfig:"target_db_schema" default:"public"`

	// force Axon to shutdown after processing the latest changeset
	ShutdownAfterLastChangeset bool `envconfig:"shutdown_after_last_changeset"`
}

AxonConfig store configuration for axon

func NewAxonConfigFromEnv

func NewAxonConfigFromEnv() (*AxonConfig, error)

NewAxonConfigFromEnv loads the Axon configuration from environment variables.

type Changeset

type Changeset struct {
	ID        int64              `json:"id"`
	Kind      ChangesetKind      `json:"kind"`
	Schema    string             `json:"schema"`
	Table     string             `json:"table"`
	Timestamp time.Time          `json:"timestamp"`
	NewValues []*ChangesetColumn `json:"new_values"`
	OldValues []*ChangesetColumn `json:"old_values"`
}

Changeset represents a changeset for a record on a Postgres table.

func (*Changeset) GetNewColumnValue

func (c *Changeset) GetNewColumnValue(column string) (interface{}, bool)

GetNewColumnValue returns the current value for a column and a bool denoting whether a new value is present in the changeset.

func (*Changeset) GetPreviousColumnValue

func (c *Changeset) GetPreviousColumnValue(column string) (interface{}, bool)

GetPreviousColumnValue returns the previous value for a column and a bool denoting whether an old value is present in the changeset.

func (*Changeset) String

func (c *Changeset) String() string

String implements Stringer to create a useful string representation of a Changeset.

type ChangesetColumn

type ChangesetColumn struct {
	Column string      `json:"column"`
	Value  interface{} `json:"value"`
	Type   string      `json:"type"`
}

ChangesetColumn represents a type and value for a column in a changeset.

type ChangesetKind

type ChangesetKind string

ChangesetKind is the type for changeset kinds

const (
	ChangesetKindInsert ChangesetKind = "insert"
	ChangesetKindUpdate ChangesetKind = "update"
	ChangesetKindDelete ChangesetKind = "delete"
)

ChangesetKind constants

func ParseChangesetKind

func ParseChangesetKind(kind string) ChangesetKind

ParseChangesetKind parses a changeset kind from a string.

type Config

type Config struct {
	// Database connection settings.
	Database DBConfig

	// If defined, warppipe will only emit changes for the specified tables.
	WhitelistTables []string `envconfig:"WHITELIST_TABLES"`

	// If set, warppipe will suppress changes for any specified tables.
	// Note: This setting takes precedent over the whitelisted tables.
	IgnoreTables []string `envconfig:"IGNORE_TABLES"`

	// Replication mode may be either `lr` (logical replication) or `audit`.
	ReplicationMode string `envconfig:"REPLICATION_MODE" default:"lr"`

	// Specifies the replication slot name to be used. (LR mode only)
	ReplicationSlotName string `envconfig:"REPLICATION_SLOT_NAME"`

	// Start replication from the specified logical sequence number. (LR mode only)
	StartFromLSN uint64 `envconfig:"START_FROM_LSN"`

	// Start replication from the specified changeset ID. (Audit mode only)
	StartFromID int64 `envconfig:"START_FROM_ID"`

	// Start replication from the specified changeset timestamp. (Audit mode only)
	StartFromTimestamp int64 `envconfig:"START_FROM_TIMESTAMP"`

	// Sets the log level
	LogLevel string `envconfig:"LOG_LEVEL" default:"info"`
}

Config represents the warp pipe configuration settings.

func NewConfigFromEnv

func NewConfigFromEnv() (*Config, error)

NewConfigFromEnv returns a new Config initialized with values read from the environment.

type DBConfig

type DBConfig struct {
	Host     string `envconfig:"DB_HOST"`
	Port     int    `envconfig:"DB_PORT"`
	User     string `envconfig:"DB_USER"`
	Password string `envconfig:"DB_PASS"`
	Database string `envconfig:"DB_NAME"`
	Schema   string `envconfig:"DB_SCHEMA"`
}

DBConfig represents the database configuration settings.

type LROption

type LROption func(*LogicalReplicationListener)

LROption is a LogicalReplicationListener option function

func HeartbeatInterval

func HeartbeatInterval(seconds int) LROption

HeartbeatInterval is an option for setting the connection heartbeat interval.

func ReplSlotName

func ReplSlotName(name string) LROption

ReplSlotName is an option for setting the replication slot name.

func StartFromLSN

func StartFromLSN(lsn uint64) LROption

StartFromLSN is an option for setting the logical sequence number to start from.

type Listener

type Listener interface {
	Dial(*pgx.ConnConfig) error
	ListenForChanges(context.Context) (chan *Changeset, chan error)
	Close() error
}

Listener is an interface for implementing a changeset listener.

type LogicalReplicationListener

type LogicalReplicationListener struct {
	// contains filtered or unexported fields
}

LogicalReplicationListener is a Listener that uses logical replication slots to listen for changesets.

func NewLogicalReplicationListener

func NewLogicalReplicationListener(opts ...LROption) *LogicalReplicationListener

NewLogicalReplicationListener returns a new LogicalReplicationListener.

func (*LogicalReplicationListener) Close

func (l *LogicalReplicationListener) Close() error

Close closes the database connection.

func (*LogicalReplicationListener) Dial

func (l *LogicalReplicationListener) Dial(connConfig *pgx.ConnConfig) error

Dial connects to the source database.

func (*LogicalReplicationListener) ListenForChanges

func (l *LogicalReplicationListener) ListenForChanges(ctx context.Context) (chan *Changeset, chan error)

ListenForChanges returns a channel that emits database changesets.

type NotifyListener

type NotifyListener struct {
	// contains filtered or unexported fields
}

NotifyListener is a listener that uses Postgres' LISTEN/NOTIFY pattern for subscribing for subscribing to changeset enqueued in a changesets table. For more details see `pkg/schema/changesets`.

func NewNotifyListener

func NewNotifyListener(opts ...NotifyOption) *NotifyListener

NewNotifyListener returns a new NotifyListener.

func (*NotifyListener) Close

func (l *NotifyListener) Close() error

Close closes the database connection.

func (*NotifyListener) Dial

func (l *NotifyListener) Dial(connConfig *pgx.ConnConfig) error

Dial connects to the source database.

func (*NotifyListener) ListenForChanges

func (l *NotifyListener) ListenForChanges(ctx context.Context) (chan *Changeset, chan error)

ListenForChanges returns a channel that emits database changesets.

type NotifyOption

type NotifyOption func(*NotifyListener)

NotifyOption is a NotifyListener option function

func StartFromID

func StartFromID(changesetID int64) NotifyOption

StartFromID is an option for setting the startFromID

func StartFromTimestamp

func StartFromTimestamp(t time.Time) NotifyOption

StartFromTimestamp is an option for setting the startFromTimestamp

type Option

type Option func(*WarpPipe)

Option is a WarpPipe option function

func IgnoreTables

func IgnoreTables(tables []string) Option

IgnoreTables is an option for setting the tables that WarpPipe should ignore. It accepts entries in either of the following formats:

<schema>.<table>
<schema>.*
<table>

Any tables in this list will negate any whitelisted tables set via WhitelistTables().

func LogLevel

func LogLevel(level string) Option

LogLevel is an option for setting the logging level.

func WhitelistTables

func WhitelistTables(tables []string) Option

WhitelistTables is an option for setting a list of tables we want to listen for change from. It accepts entries in either of the following formats:

<schema>.<table>
<schema>.*
<table>

Any tables set via IgnoreTables() will be excluded.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline represents a sequence of stages for processing Changesets.

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline returns a new Pipeline.

func (*Pipeline) AddStage

func (p *Pipeline) AddStage(name string, fn StageFunc)

AddStage adds a new Stage to the pipeline

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context, sourceCh <-chan *Changeset) (<-chan *Changeset, <-chan error)

Start starts the pipeline, consuming off of a source chan that emits *Changeset.

type Stage

type Stage struct {
	Name string
	Fn   stageFn
}

Stage is a pipeline stage.

type StageFunc

type StageFunc func(*Changeset) (*Changeset, error)

StageFunc is a function for processing changesets in a pipeline Stage. It accepts a single argument, a Changset, and returns one of:

(Changeset, nil): If the stage was successful
(nil, nil): If the changeset should be dropped (useful for filtering)
(nil, error): If there was an error during the stage

type WarpPipe

type WarpPipe struct {
	// contains filtered or unexported fields
}

WarpPipe is a daemon that listens for database changes and transmits them somewhere else.

func NewWarpPipe

func NewWarpPipe(connConfig *pgx.ConnConfig, listener Listener, opts ...Option) (*WarpPipe, error)

NewWarpPipe initializes and returns a new WarpPipe.

func (*WarpPipe) Close

func (w *WarpPipe) Close() error

Close will close the listener and try to gracefully shutdown the WarpPipe.

func (*WarpPipe) IsLatestChangeSet

func (w *WarpPipe) IsLatestChangeSet(id int64) (bool, error)

IsLatestChangeSet returns true if the id argument matches that of the last record in the changeset table. TODO: This feature only supports the notify listener. It needs to support others.

func (*WarpPipe) ListenForChanges

func (w *WarpPipe) ListenForChanges(ctx context.Context) (<-chan *Changeset, <-chan error)

ListenForChanges starts the listener listening for database changesets. It returns two channels, on for Changesets, another for errors.

func (*WarpPipe) Open

func (w *WarpPipe) Open() error

Open dials the listener's connection to the database.

Directories

Path Synopsis
build
demo-service Module
cmd
internal
cli

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL