pgoutput

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2018 License: MIT Imports: 8 Imported by: 7

README

pgoutput

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/jackc/pgx"
	"github.com/kyleconroy/pgoutput"
)

func main() {
	ctx := context.Background()
	config := pgx.ConnConfig{Database: "opsdash", User: "replicant"}
	conn, err := pgx.ReplicationConnect(config)
	if err != nil {
		log.Fatal(err)
	}

  // Create a slot if it doesn't already exist
	// if err := conn.CreateReplicationSlot("sub2", "pgoutput"); err != nil {
	// 	log.Fatalf("Failed to create replication slot: %v", err)
	// }

	set := pgoutput.NewRelationSet()

	dump := func(relation uint32, row []pgoutput.Tuple) error {
		values, err := set.Values(relation, row)
		if err != nil {
			return fmt.Errorf("error parsing values: %s", err)
		}
		for name, value := range values {
			val := value.Get()
			log.Printf("%s (%T): %#v", name, val, val)
		}
		return nil
	}

	handler := func(m pgoutput.Message) error {
		switch v := m.(type) {
		case pgoutput.Relation:
			log.Printf("RELATION")
			set.Add(v)
		case pgoutput.Insert:
			log.Printf("INSERT")
			return dump(v.RelationID, v.Row)
		case pgoutput.Update:
			log.Printf("UPDATE")
			return dump(v.RelationID, v.Row)
		case pgoutput.Delete:
			log.Printf("DELETE")
			return dump(v.RelationID, v.Row)
		}
		return nil
	}

	replication := pgoutput.LogicalReplication{
		Subscription:  "sub2",
		Publication:   "pub2",
		WaitTimeout:   time.Second * 10,
		StatusTimeout: time.Second * 10,
		Handler:       handler,
	}

	if err := replication.Start(ctx, conn); err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Begin

type Begin struct {
	// The final LSN of the transaction.
	LSN uint64
	// Commit timestamp of the transaction. The value is in number of
	// microseconds since PostgreSQL epoch (2000-01-01).
	Timestamp time.Time
	// 	Xid of the transaction.
	XID int32
}

type Column

type Column struct {
	Key  bool
	Name string
	Type uint32
	Mode uint32
}

func (Column) Decoder

func (c Column) Decoder() DecoderValue

type Commit

type Commit struct {
	Flags uint8
	// The final LSN of the transaction.
	LSN uint64
	// The final LSN of the transaction.
	TransactionLSN uint64
	Timestamp      time.Time
}

type DecoderValue

type DecoderValue interface {
	pgtype.TextDecoder
	pgtype.Value
}

type Delete

type Delete struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID uint32
	// Identifies the following TupleData message as a new tuple.
	Key bool // TODO
	Old bool // TODO
	Row []Tuple
}

type Handler

type Handler func(Message) error

type Insert

type Insert struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID uint32
	// Identifies the following TupleData message as a new tuple.
	New bool
	Row []Tuple
}

type Message

type Message interface {
	// contains filtered or unexported methods
}

func Parse

func Parse(src []byte) (Message, error)

Parse a logical replication message. See https://www.postgresql.org/docs/current/static/protocol-logicalrep-message-formats.html

type Origin

type Origin struct {
	LSN  uint64
	Name string
}

type Relation

type Relation struct {
	// ID of the relation.
	ID uint32
	// Namespace (empty string for pg_catalog).
	Namespace string
	Name      string
	Replica   uint8
	Columns   []Column
}

type RelationSet

type RelationSet struct {
	// contains filtered or unexported fields
}

func NewRelationSet

func NewRelationSet() *RelationSet

func (*RelationSet) Add

func (rs *RelationSet) Add(r Relation)

func (*RelationSet) Values

func (rs *RelationSet) Values(id uint32, row []Tuple) (map[string]pgtype.Value, error)

type Subscription

type Subscription struct {
	Name          string
	Publication   string
	WaitTimeout   time.Duration
	StatusTimeout time.Duration
	CopyData      bool
}

func NewSubscription

func NewSubscription(name, publication string) *Subscription

func (*Subscription) Start

func (s *Subscription) Start(ctx context.Context, conn *pgx.ReplicationConn, h Handler) error

type Tuple

type Tuple struct {
	Flag  int8
	Value []byte
}

type Type

type Type struct {
	// ID of the data type
	ID        uint32
	Namespace string
	Name      string
}

type Update

type Update struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID uint32
	// Identifies the following TupleData message as a new tuple.
	Old    bool
	Key    bool
	New    bool
	OldRow []Tuple
	Row    []Tuple
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL