postgres

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2022 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrZeroType Postgres type OID can not be zero
	ErrZeroType = errors.New("postgres type OID can't be zero")
	// ErrTypeNotFound can not find type OID among discovered types
	ErrTypeNotFound = errors.New("can not find discovered type")
)
View Source
var (
	ErrTimeout  = errors.New("timeout")
	ErrChClosed = errors.New("document channel is closed")
)
View Source
var (
	// ErrColumnOutOfRange means that received result tuple is smaller than expected column position
	ErrColumnOutOfRange = errors.New("column out of range")
)
View Source
var (
	// ErrUnknownType type discovery failed
	ErrUnknownType = errors.New("unknown type")
)

Functions

This section is empty.

Types

type Column

type Column struct {
	// contains filtered or unexported fields
}

func (*Column) MarshalJSON

func (col *Column) MarshalJSON() ([]byte, error)

func (*Column) Omit

func (col *Column) Omit() bool

type Database

type Database struct {
	SlotName string

	Publication    string
	StandbyTimeout time.Duration
	// contains filtered or unexported fields
}

func New

func New(stream *StreamPipe, logger *zap.Logger) *Database

func (*Database) Close

func (db *Database) Close(ctx context.Context) error

func (*Database) Commit

func (db *Database) Commit(ctx context.Context) error

func (*Database) Connect

func (db *Database) Connect(ctx context.Context) error

Connect makes two connections to DB for discovery and replication

func (*Database) CreateReplicationSlot

func (db *Database) CreateReplicationSlot(ctx context.Context)

CreateReplicationSlot creates a replication slot at current position and uses newly created snapshot in current transaction. For the sake of consistency it's important to use this method and initial data copying within transaction db.Tx(ctx) db.CreateReplicationSlot(ctx) ... copy data db.Commit(ctx)

func (*Database) Discover

func (db *Database) Discover(ctx context.Context) error

Discover uses Postgres publication and table or column comments to generate replication config Only tables explosed via Publication will be considered for exporting to ES.

func (*Database) DropReplicationSlot

func (db *Database) DropReplicationSlot(ctx context.Context)

func (*Database) HandleLogical

func (db *Database) HandleLogical(ctx context.Context, lsn pglogrepl.LSN, msg pglogrepl.Message) error

func (*Database) PrintSatus

func (db *Database) PrintSatus()

PrintStatus prints some debug information TODO: remove

func (*Database) RegisterSlotLagMetric

func (db *Database) RegisterSlotLagMetric(ctx context.Context)

func (*Database) Reindex

func (db *Database) Reindex(ctx context.Context) error

Select everything and push (streaming) it into elasticsearch.

func (*Database) StartReplication

func (db *Database) StartReplication(ctx context.Context, at pglogrepl.LSN) error

StartReplication switches replConn into `CopyBoth` mode and starts streaming. decoded logical messages are passed for further processing, while status updates happens here. during the streaming, replConn is locked and can not be used for anything else.

XXX: PrimaryKeepaliveMessage.ServerWALEnd is actually the location up to which the WAL is sent See: https://stackoverflow.com/questions/71016200/proper-standby-status-update-in-streaming-replication-protocol

func (*Database) Tx

func (db *Database) Tx(ctx context.Context) error

type DecoderValue

type DecoderValue interface {
	pgtype.TextDecoder
	pgtype.BinaryDecoder // Not all types support BinaryDecoder.
	pgtype.Value
}

DecoderValue can decode value and return pgtype object which implies Value interface.

type Doc

type Doc interface {
	NDJSON() [][]byte
	LSN() pglogrepl.LSN
}

type Document

type Document struct {
	Position
	Meta []byte // Op type, index and document id
	Data []byte // document content or script
}

Document represents one single operation in bulk request.

func (Document) NDJSON

func (d Document) NDJSON() [][]byte

type ESAction

type ESAction string
const (
	ESInsert ESAction = "insert"
	ESUpdate ESAction = "update"
	ESDelete ESAction = "delete"
	ESIndex  ESAction = "index" // Upsert
)

type Inline

type Inline struct {
	// contains filtered or unexported fields
}

Inline defines table like abstraction for inligning into ES doc

type Position

type Position pglogrepl.LSN

func (Position) LSN

func (p Position) LSN() pglogrepl.LSN

func (Position) NDJSON

func (d Position) NDJSON() [][]byte

type Schema

type Schema struct {
	// contains filtered or unexported fields
}

Schema describes Postgres schema/namespace

type StreamPipe

type StreamPipe struct {
	// contains filtered or unexported fields
}

StreamPipe connects decoded postgres stream with elasticsearch client. There should not be any other interaction.

func NewStreamPipe

func NewStreamPipe(ctx context.Context) *StreamPipe

func (*StreamPipe) CommitPosition

func (p *StreamPipe) CommitPosition(pos pglogrepl.LSN)

func (*StreamPipe) Next

func (p *StreamPipe) Next(ctx context.Context) (Doc, error)

func (*StreamPipe) Position

func (p *StreamPipe) Position() pglogrepl.LSN

type Table

type Table struct {
	// contains filtered or unexported fields
}

func (*Table) Column

func (t *Table) Column(name string) (col *Column)

Column gets (existing or default) column config.

func (*Table) CopyAll

func (t *Table) CopyAll(ctx context.Context, conn *pgconn.PgConn) error

Selects all rown from table, and populates results into Database.results chanel. Copy existing data snapshoted by slot creation, using simple protocol.

func (*Table) EncodeUpdateRowJSON

func (t *Table) EncodeUpdateRowJSON() ([]byte, error)

EncodeUpdateRowJSON wraps row into `{"doc": ... }` object, required by ElasticSearch bulk request syntax for update queries

func (*Table) MarshalJSON

func (t *Table) MarshalJSON() ([]byte, error)

func (*Table) Schema

func (t *Table) Schema() *Schema

Table returns column's owner table

func (*Table) SetRelationID

func (t *Table) SetRelationID(id uint32)

TODO: take RelID set cache out of this tree config

Directories

Path Synopsis
package pgcopy - parser for PostgreSQL `COPY TO ...
package pgcopy - parser for PostgreSQL `COPY TO ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL