creek

package module
v0.0.0-...-c45e23a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2023 License: MIT Imports: 19 Imported by: 2

README

Creek

A PostgreSQL Change-Data-Capture (CDC) based system for event sourcing.

Motivation

Many services inside Modular Finance depend on the same core data. This data is useful for many services, but keeping it in sync can be cumbersome. Some projects have tried to break away from the core database while still needing some of its data. Currently, no standardized way of synchronizing data is used. A CDC system allows capturing changes from a database in real-time and propagating those changes to downstream consumers. This can have many uses beyond keeping two SQL databases in sync, such as streaming changes to a specialized search database.

Architecture

Creek consists of two major parts: producers and consumers. A producer is responsible for listening for change events on a PostgreSQL database and publishing events on a Message Queue (MQ). The MQ used is NATS JetStream. Events are published to topics corresponding to the table name. Generally, a practical system will only consist of one producer database that acts as a single source of truth, but creek is flexible and allows using multiple producers, as long as they have different table names.

A system may consist of multiple consumers, and even different types of consumers. A PostgreSQL consumer has been implemented. This consumer applies changes on from a source table on a topic to a specified table in a consumer PostgreSQL database.

graph  TD
  source_db  -->|changes|source_db
  source_db[(Source DB)]  -->  |Postgres Streaming Logical Replication|producer
  producer[Producer]  -->  |Pub: CREEK.db.wal.namespace.table|nats[NATS JetStream]
  producer[Producer]  -->  |Pub: CREEK.db.schema.namespace.table|nats
  nats --> |Sub: CREEK.db.wal.namespace.table|Consumer
Producer architecture

The producer works by leveraging Postgres Logical Replication. It directly hooks into the Postgres Logical Replication Protocol emitted from pgoutput via the pglogrepl Go library. It can then listen to events from the PostgreSQL Write-Ahead Log (WAL) on specific tables, and emit messages to a NATS JetStream MQ.

Postgres logical replication can be started for a Replication Slot, which corresponds to one consumer of a Postgres Publication. A publication can be defined for specific tables. The replication slot contains information about the current position in the WAL. As such, when restarting the producer, it will continue from the last processed WAL location, and include events that may have happened while the producer was offline.

The messages produced are encoded using the binary Avro data serialization format. This means that messages are encoded in efficient format that allows simple serialization and deserialization. Avro relies on using a schema when both decoding and encoding messages, and the same schema that was used to encode a message must be used when decoding it. Schemas can be uniquely identified by a 64-byte fingerprint.

The creek producer automatically generates Avro schemas based on the columns of the producer PostgreSQL database, and uses them to encode its messages. The producer is responsible for publishing schemas used to encode its messages, and persisting the schemas in order to be able to provide it to clients that request the schema.

WAL

The creek producer publishes WAL events for each table on the topic [creek-ns].[db].wal.[ns].[table], where creek-ns is a global namespace for creek (by default CREEK), db is the database name, ns is the Postgres namespace (aka. schema) for the table, and table is the name of the table. Messages are encoded using Avro, and the messages will have differing schemas. For example, a message for a table with the following Postgres schema:

CREATE TABLE test (
    id int PRIMARY KEY,
    name text,
    at timestamptz
);

Will have the following corresponding Avro schema:

View schema
{
    "name": "publish_message",
    "type": "record",
    "fields": [
        {
            "name": "fingerprint",
            "type": "string"
        },
        {
            "name": "source",
            "type": {
                "name": "source",
                "type": "record",
                "fields": [
                    {
                        "name": "name",
                        "type": "string"
                    },
                    {
                        "name": "tx_at",
                        "type": {
                            "type": "long",
                            "logicalType": "timestamp-micros"
                        }
                    },
                    {
                        "name": "db",
                        "type": "string"
                    },
                    {
                        "name": "schema",
                        "type": "string"
                    },
                    {
                        "name": "table",
                        "type": "string"
                    },
                    {
                        "name": "tx_id",
                        "type": "long"
                    },
                    {
                        "name": "last_lsn",
                        "type": "string"
                    },
                    {
                        "name": "lsn",
                        "type": "string"
                    }
                ]
            }
        },
        {
            "name": "op",
            "type": {
                "name": "op",
                "type": "enum",
                "symbols": [
                    "c",
                    "u",
                    "u_pk",
                    "d",
                    "t",
                    "r"
                ]
            }
        },
        {
            "name": "sent_at",
            "type": {
                "type": "long",
                "logicalType": "timestamp-micros"
            }
        },
        {
            "name": "before",
            "type": [
                "null",
                {
                    "name": "integration_tests",
                    "type": "record",
                    "fields": [
                        {
                            "name": "id",
                            "type": {
                                "type": "string",
                                "logicalType": "uuid"
                            },
                            "pgKey": true,
                            "pgType": "uuid"
                        }
                    ]
                }
            ]
        },
        {
            "name": "after",
            "type": [
                "null",
                {
                    "name": "integration_tests",
                    "type": "record",
                    "fields": [
                        {
                            "name": "id",
                            "type": {
                                "type": "string",
                                "logicalType": "uuid"
                            },
                            "pgKey": true,
                            "pgType": "uuid"
                        },
                        {
                            "name": "data",
                            "type": [
                                "null",
                                "string"
                            ],
                            "pgKey": false,
                            "pgType": "text"
                        }
                    ]
                }
            ]
        }
    ]
}
Schemas

The creek producer publishes the Avro schemas for each table on the topic [creek-ns].[db].schema.[ns].[table], where creek-ns is a global namespace for creek (by default CREEK), db is the database name, ns is the Postgres namespace for the table, and table is the name of the table. These messages are sent as plain JSON with the following structure:

{
    "fingerprint": "Sykce18MgAQ=", // Base64 url-encoded fingerprint
    "schema": "...",
    "source": "namespace.table",
    "created_at": "YYYY..."
}

In addition, the producer persists schemas in the database that it is connected to. Clients can request this schema using NATS Request-Reply. A client issues a message to [creek-ns]._schemas with the fingerprint of the schemas, and will (if available) receive the schema from a producer that has the schema.

Snapshots

The WAL does not contain all data in the database, so in order to be able to get a consistent view of the database, we need to be able to take snapshots of the data. A snapshot is taken by the producer. Each snapshot taken will be written to a separate topic, with the name [creek-ns].[db].snap.[ns].[table].[ts]_[id]. Here, creek-ns refers to the global namespace for creek (by default CREEK), db is the database name, ns is the Postgres namespace for the table, table is the name of the table, ts is a timestamp of when the snapshot was taken in the form YYYYMMDDHHMMSS_MS, and id is a 4 character id of the snapshot.

On each snapshot topic, the first message will be a snapshot header containing a JSON record in the following form:

{
    "fingerprint": "Sykce18MgAQ=", // Base64 url-encoded fingerprint
    "schema": "...",
    "tx_id": 6550070, // Postgres transaction id
    "lsn": "54/D8213EB8", // Postgres WAL log sequence number (lsn)
    "at": "YYYY...", // Timestamp
    "approx_rows": 2312
}

Following will be $n$ number of messages containing the data for each row in the database. This is follow by an end message containing the bytes 0x45 0x4f 0x46 (EOF).

Clients can request a snapshot using NATS Request-Reply. Clients send a JSON message on the channel [creek-ns]._snapshot in the following form:

{
    "database": "db",
    "namespace": "namespace",
    "table": "table"
}

If the database, namespace, and table exists, a producer will respond with a topic on which the snapshot will written to. The client can now begin reading from this channel.

Postgres setup

The producer database requires a user with replication permission, access allowed in pg_hba.conf, and logical replication enabled in postgresql.conf.

Add a replication line to your pg_hba.conf:

host replication [database] [ip address]/32 md5

Make sure that the following is set in your postgresql.conf:

wal_level=logical
max_wal_senders=5
max_replication_slots=5

Also, it is a (very) good idea to set a max size of the WAL size, otherwise it will grow to infinity when the producer is offline. This option only exists since Postgres 13.

max_slot_wal_keep_size=16GB

Configuring

The producer is configured using the following environment variables:

PG_URI
PG_PUBLICATION_NAME
PG_PUBLICATION_SLOT
PG_MESSAGE_TIMEOUT
PG_TABLES

NATS_NAMESPACE
NATS_URI
NATS_TIMEOUT
NATS_MAX_PENDING


LOG_LEVEL
PROMETHEUS_PORT

It is also possible to add tables to listen to while the producer is running using a PostgreSQL API. On the same database as the producer is connected to:

-- Add table to listen to
SELECT _creek.add_table('publication_name', 'namespace.table');

-- Remove a table to listen to
SELECT _creek.remove_table('publication_name', 'namespace.table');

Usage

This project includes a client library for consumers written in Go. Refer to the documentation for more information. Example usage of the client:

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "github.com/modfin/creek"
  "github.com/nats-io/nats.go"
)

func main() {
  conn, err := creek.NewClient(nats.DefaultURL, "CREEK").Connect()
  if err != nil {
    panic("failed to connect to creek")
  }

  stream, err := conn.StreamWAL(context.Background(), "db", "namespace.table")
  if err != nil {
    panic("failed to to stream WAL")
  }

  for {
    msg, err := stream.Next(context.Background())
    if err != nil {
      panic(fmt.Errorf("failed to get next wal message: %w", err))
    }

    b, _ := json.Marshal(msg)
    fmt.Println(string(b))
  }
}

Security considerations

There is currently no authentication built into Creek. You will probably want to enable authentication to your NATS cluster. Also, be careful with what tables you export from the producer, since all clients connected to nats will be able to stream WAL events and request full snapshots of this table which will be visible to all clients, even if it might contain sensitive data.

Metrics

The producer produces Prometheus metrics that are available on ip:PROMETHEUS_PORT/metrics.

Limitations

Due to the scope of the project, not all Postgres types are supported. Refer to the pgtype-avro README for a full list of supported types.

Documentation

Index

Constants

View Source
const SnapEOF = "EOF"

SnapEOF message that is sent at end of snapshot

Variables

View Source
var ErrNoSchemaFound = Error{Message: "no schema found"}

Functions

func AvroSchema

func AvroSchema(before, after *pgtypeavro.Record) pgtypeavro.Schema

AvroSchema returns a full WAL message Avro schema based on the before and after schemas

func JetstreamOptions

func JetstreamOptions(opts ...jetstream.JetStreamOpt) func(c *Client)

func LoggerOpt

func LoggerOpt(log Logger) func(c *Client)

func NatsOptions

func NatsOptions(opts ...nats.Option) func(c *Client)

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(natsUri string, rootNamespace string) *Client

NewClient Creates a new creek client

func (*Client) Connect

func (c *Client) Connect() (*Conn, error)

Connect Connects the Client to nats

func (*Client) With

func (c *Client) With(opts ...func(c *Client)) *Client

func (*Client) WithJetstreamOptions

func (c *Client) WithJetstreamOptions(opts ...jetstream.JetStreamOpt)

func (*Client) WithLogger

func (c *Client) WithLogger(log Logger)

func (*Client) WithNatsOptions

func (c *Client) WithNatsOptions(opts ...nats.Option)

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func (*Conn) Close

func (c *Conn) Close()

func (*Conn) GetLastSchema

func (c *Conn) GetLastSchema(ctx context.Context, database string, table string) (schema SchemaMsg, err error)

GetLastSchema returns the latest schema if it exists.

func (*Conn) GetSchema

func (c *Conn) GetSchema(ctx context.Context, fingerprint string) (SchemaMsg, error)

GetSchema requests a schema from Creek. If no schema is found, it will hang. Please use with a context with an appropriate timeout

func (*Conn) GetSnapshot

func (c *Conn) GetSnapshot(ctx context.Context, topic string) (*SnapshotReader, error)

func (*Conn) ListSnapshots

func (c *Conn) ListSnapshots(ctx context.Context, database string, table string) ([]SnapMetadata, error)

ListSnapshots returns a sorted list (in ascending order by creation date) of existing snapshots for a particular table

func (*Conn) Snapshot

func (c *Conn) Snapshot(ctx context.Context, database string, table string) (*SnapshotReader, error)

Snapshot request a new snapshot from creek. Returns a blocking channel containing snapshot data rows.

func (*Conn) StreamWAL

func (c *Conn) StreamWAL(ctx context.Context, database string, table string) (stream *WALStream, err error)

StreamWAL opens a consumer for the database and table topic. The table topic should be in the form `namespace.table`.

func (*Conn) StreamWALFrom

func (c *Conn) StreamWALFrom(ctx context.Context, database string, table string, timestamp time.Time, lsn string) (stream *WALStream, err error)

StreamWALFrom opens a consumer for the database and table topic. The table topic should be in the form `namespace.table`. Starts streaming from the first message with the timestamp AND log sequence number (lsn) that is greater than the one provided.

type Error

type Error struct {
	Message string
}

func (Error) Error

func (err Error) Error() string

type Logger

type Logger interface {
	Info(format string, args ...interface{})
	Debug(format string, args ...interface{})
	Error(format string, args ...interface{})
}

type MessageSource

type MessageSource struct {
	Name    string    `json:"name" avro:"name"`
	TxAt    time.Time `json:"tx_at" avro:"tx_at"`
	DB      string    `json:"db" avro:"db"`
	Schema  string    `json:"schema" avro:"schema"`
	Table   string    `json:"table" avro:"table"`
	TxId    uint32    `json:"tx_id" avro:"tx_id"`
	LastLSN string    `json:"last_lsn" avro:"last_lsn"`
	LSN     string    `json:"lsn" avro:"lsn"`
}

MessageSource source information about a WAL message

type Op

type Op string
const (
	OpInsert   Op = "c"
	OpUpdate   Op = "u"
	OpUpdatePk Op = "u_pk" // Update operation that changed the primary key
	OpDelete   Op = "d"
	OpTruncate Op = "t"
)

type SchemaMsg

type SchemaMsg struct {
	Fingerprint string    `json:"fingerprint"`
	Schema      string    `json:"schema"`
	Source      string    `json:"source"`
	CreatedAt   time.Time `json:"created_at"`
}

SchemaMsg messages emitted to the schema stream and returned using the schema API

type SnapMetadata

type SnapMetadata struct {
	Name     string
	At       time.Time
	Messages uint64
}

type SnapRow

type SnapRow map[string]any

SnapRow a snapshot data row

type SnapshotHeader

type SnapshotHeader struct {
	Topic       string    `json:"topic"`
	Fingerprint string    `json:"fingerprint"`
	Schema      string    `json:"schema"`
	TxId        uint32    `json:"tx_id"`
	LSN         string    `json:"lsn"`
	At          time.Time `json:"at"`
	ApproxRows  int       `json:"approx_rows"`
}

SnapshotHeader the first message on a snapshot channel

type SnapshotReader

type SnapshotReader struct {
	// contains filtered or unexported fields
}

func (*SnapshotReader) Chan

func (sr *SnapshotReader) Chan() <-chan SnapRow

Chan returns a channel over snapshot row data. Blocks until next message is received.

func (*SnapshotReader) Header

func (sr *SnapshotReader) Header() SnapshotHeader

func (*SnapshotReader) Keys

func (sr *SnapshotReader) Keys() []string

Keys returns the primary keys for this table

func (*SnapshotReader) Next

func (sr *SnapshotReader) Next() (map[string]any, error)

Next returns the next snapshot row. Blocks until next message is received.

type SnapshotRequest

type SnapshotRequest struct {
	Database  string `json:"database"`
	Namespace string `json:"namespace"`
	Table     string `json:"table"`
}

SnapshotRequest RPC message when requesting a snapshot

type StreamType

type StreamType string

StreamType types of stream

const SchemaStream StreamType = "schema"
const SnapStream StreamType = "snap"
const WalStream StreamType = "wal"

type WAL

type WAL struct {
	Fingerprint string                  `json:"fingerprint" avro:"fingerprint"`
	Source      MessageSource           `json:"source" avro:"source"`
	Op          Op                      `json:"op" avro:"op"`
	SentAt      time.Time               `json:"sent_at" avro:"sent_at"`
	Before      *map[string]interface{} `json:"before" avro:"before"`
	After       *map[string]interface{} `json:"after" avro:"after"`
}

func (WAL) FullIdentifier

func (w WAL) FullIdentifier() string

FullIdentifier returns the full identifier of the WAL message, ie db.namespace.table

func (WAL) LocalIdentifier

func (w WAL) LocalIdentifier() string

LocalIdentifier returns the local identifier of the WAL message, ie namespace.table

type WALStream

type WALStream struct {
	// contains filtered or unexported fields
}

func (*WALStream) Close

func (ws *WALStream) Close()

Close closes the WALStream. Can only be called once

func (*WALStream) Next

func (ws *WALStream) Next(ctx context.Context) (msg WAL, err error)

Next Returns the next message in the WAL stream. Blocks until a message is received.

Directories

Path Synopsis
dbc
env
internal
dao
mq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL