pgsink

module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2021 License: MIT

README

pgsink CircleCI Go Reference

Path to v1.0.0: https://github.com/lawrencejones/pgsink/projects/1

Draft docs can be seen at: docs

pgsink is a Postgres change-capture device that supports high-throughput and low-latency capture to a variety of sinks.

You'd use this project if your primary database is Postgres and you want a stress-free, quick-to-setup and easy-to-operate tool to replicate your data to other stores such as BigQuery or Elasticsearch, which works with any size Postgres database.

Similar projects

There are many change-capture projects out there, and several support Postgres.

As an example, pgsink is similar to debezium in performance and durability goals, but with a much simpler setup (no Kafka required). We also bear similarity to Netflix's dblog, with the benefit of being open-source and available for use.

We win in these comparisons when you want a simple, no additional dependencies setup. We also benefit from the sole focus on Postgres instead of many upstream sources, as we can optimise our data-access pattern for large, high-transaction volume Postgres databases. Examples of this are keeping transactions short to help vacuums, and traversing tables using efficient indexes.

This makes pgsink a much safer bet for people managing production critical Postgres databases.

Developing

As an overview of important packages, for those understanding the source code:

  • changelog the input to sinks, produced by subscription or import
  • decode configures decoding of Postgres types into Golang (text -> int64)
  • imports create, manage and work import jobs, producing changelog entries
  • logical parsing of the pgoutput logical encoding, used by subscription
  • sinks implements different types of sink, from files to Google BigQuery
  • subscription Postgres change capture via replication, generating a changelog

This project comes with a docker-compose development environment. Boot the environment like so:

$ docker-compose up -d
docker-compose up -d
pgsink_prometheus_1 is up-to-date
pgsink_postgres_1 is up-to-date
pgsink_grafana_1 is up-to-date

Then run make recreatedb to create a pgsink database. You can now access your database like so:

$ psql --host localhost --user pgsink pgsink
pgsink=> \q

pgsink will work with this database: try pgsink --sink=file --decode-only.

Database migrations

We use goose to run database migrations. Create new migrations like so:

$ go run internal/migration/cmd/goose.go --dir internal/migration create create_import_jobs_table go
2019/12/29 14:59:51 Created new file: internal/migration/20191229145951_create_import_jobs_table.go

Running migrations is done using the make target:

$ make migrate structure.sql
$ go run internal/migration/cmd/goose.go --install up
2021/01/09 15:38:29 requested --install, so creating schema 'pgsink'
2021/01/09 15:38:29 goose: no migrations to run. current version: 20210102200953
docker-compose --env-file=/dev/null exec -T postgres pg_dump -U postgres pgsink --schema-only --schema=pgsink >structure.sql

Getting started

Boot a Postgres database, then create an example table.

$ createdb pgsink
$ psql pgsink
psql (11.5)
Type "help" for help.

pgsink=# create table public.example (id bigserial primary key, msg text);
CREATE TABLE

pgsink=# insert into public.example (msg) values ('hello world');
INSERT 1

pgsink will stream these changes from the database and send it to the configured sink. Changes are expressed as a stream of messages, either a Schema that describes the structure of a Postgres table, or a Modification corresponding to an insert/update/delete of a row in Postgres.

Our example would produce the following modification, where timestamp is the time at which the change was committed and sequence the operation index within the transaction:

{
  "timestamp": "2019-10-04T16:05:55.123456+01:00",
  "sequence": 1,
  "namespace": "public",
  "name": "example",
  "before": null,
  "after": {
    "id": "1",
    "msg": "hello world"
  }
}

Also sent, arriving before the modification element, will be a schema entry that describes the public.example table. We represent these as Avro schemas, built from the Postgres catalog information.

{
  "timestamp": "2019-10-04T16:05:55.123456+01:00",
  "schema": {
    "namespace": "public.example",
    "type": "record",
    "name": "value",
    "fields": [
      {
        "name": "id",
        "type": ["long", "null"],
        "default": null
      },
      {
        "name": "msg",
        "type": ["string", "null"],
        "default": null
      }
    ]
  }
}

Schemas are published whenever we first discover a relation. Use the timestamp field to order each successive schema event to ensure stale messages don't override more recent data.

Directories

Path Synopsis
api
cmd
internal
pkg
changelog
Defines a changelog Entry, which is a sum type of Schema or Modification.
Defines a changelog Entry, which is a sum type of Schema or Modification.
decode
Implements decoding of Postgres types into Golang, using mappings that are generated from pgtype.
Implements decoding of Postgres types into Golang, using mappings that are generated from pgtype.
imports
Create, manage and work import jobs.
Create, manage and work import jobs.
sinks/bigquery
Implements a sink to Google BigQuery, creating a dataset containing raw tables and de-duplicated views.
Implements a sink to Google BigQuery, creating a dataset containing raw tables and de-duplicated views.
sinks/file
Implements a file sink, intended for testing.
Implements a file sink, intended for testing.
sinks/generic
Suite of components that can be composed to make sinks.
Suite of components that can be composed to make sinks.
subscription
Capture changes from a Postgres database by creating and managing a publication, then streaming changes over a logical replication socket.
Capture changes from a Postgres database by creating and managing a publication, then streaming changes over a logical replication socket.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL