Documentation ¶
Index ¶
- Constants
- Variables
- func AvroSchema(before, after *pgtypeavro.Record) pgtypeavro.Schema
- func JetstreamOptions(opts ...jetstream.JetStreamOpt) func(c *Client)
- func LoggerOpt(log Logger) func(c *Client)
- func NatsOptions(opts ...nats.Option) func(c *Client)
- type Client
- type Conn
- func (c *Conn) Close()
- func (c *Conn) GetLastSchema(ctx context.Context, database string, table string) (schema SchemaMsg, err error)
- func (c *Conn) GetSchema(ctx context.Context, fingerprint string) (SchemaMsg, error)
- func (c *Conn) GetSnapshot(ctx context.Context, topic string) (*SnapshotReader, error)
- func (c *Conn) ListSnapshots(ctx context.Context, database string, table string) ([]SnapMetadata, error)
- func (c *Conn) Snapshot(ctx context.Context, database string, table string) (*SnapshotReader, error)
- func (c *Conn) StreamWAL(ctx context.Context, database string, table string) (stream *WALStream, err error)
- func (c *Conn) StreamWALFrom(ctx context.Context, database string, table string, timestamp time.Time, ...) (stream *WALStream, err error)
- type Error
- type Logger
- type MessageSource
- type Op
- type SchemaMsg
- type SnapMetadata
- type SnapRow
- type SnapshotHeader
- type SnapshotReader
- type SnapshotRequest
- type StreamType
- type WAL
- type WALStream
Constants ¶
const SnapEOF = "EOF"
SnapEOF message that is sent at end of snapshot
Variables ¶
var ErrNoSchemaFound = Error{Message: "no schema found"}
Functions ¶
func AvroSchema ¶
func AvroSchema(before, after *pgtypeavro.Record) pgtypeavro.Schema
AvroSchema returns a full WAL message Avro schema based on the before and after schemas
func JetstreamOptions ¶
func JetstreamOptions(opts ...jetstream.JetStreamOpt) func(c *Client)
func NatsOptions ¶
func NatsOptions(opts ...nats.Option) func(c *Client)
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) WithJetstreamOptions ¶
func (c *Client) WithJetstreamOptions(opts ...jetstream.JetStreamOpt)
func (*Client) WithLogger ¶
func (*Client) WithNatsOptions ¶
func (c *Client) WithNatsOptions(opts ...nats.Option)
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
func (*Conn) GetLastSchema ¶
func (c *Conn) GetLastSchema(ctx context.Context, database string, table string) (schema SchemaMsg, err error)
GetLastSchema returns the latest schema if it exists.
func (*Conn) GetSchema ¶
GetSchema requests a schema from Creek. If no schema is found, it will hang. Please use with a context with an appropriate timeout
func (*Conn) GetSnapshot ¶
func (*Conn) ListSnapshots ¶
func (c *Conn) ListSnapshots(ctx context.Context, database string, table string) ([]SnapMetadata, error)
ListSnapshots returns a sorted list (in ascending order by creation date) of existing snapshots for a particular table
func (*Conn) Snapshot ¶
func (c *Conn) Snapshot(ctx context.Context, database string, table string) (*SnapshotReader, error)
Snapshot request a new snapshot from creek. Returns a blocking channel containing snapshot data rows.
func (*Conn) StreamWAL ¶
func (c *Conn) StreamWAL(ctx context.Context, database string, table string) (stream *WALStream, err error)
StreamWAL opens a consumer for the database and table topic. The table topic should be in the form `namespace.table`.
func (*Conn) StreamWALFrom ¶
func (c *Conn) StreamWALFrom(ctx context.Context, database string, table string, timestamp time.Time, lsn string) (stream *WALStream, err error)
StreamWALFrom opens a consumer for the database and table topic. The table topic should be in the form `namespace.table`. Starts streaming from the first message with the timestamp AND log sequence number (lsn) that is greater than the one provided.
type MessageSource ¶
type MessageSource struct { Name string `json:"name" avro:"name"` TxAt time.Time `json:"tx_at" avro:"tx_at"` DB string `json:"db" avro:"db"` Schema string `json:"schema" avro:"schema"` Table string `json:"table" avro:"table"` TxId uint32 `json:"tx_id" avro:"tx_id"` LastLSN string `json:"last_lsn" avro:"last_lsn"` LSN string `json:"lsn" avro:"lsn"` }
MessageSource source information about a WAL message
type SchemaMsg ¶
type SchemaMsg struct { Fingerprint string `json:"fingerprint"` Schema string `json:"schema"` Source string `json:"source"` CreatedAt time.Time `json:"created_at"` }
SchemaMsg messages emitted to the schema stream and returned using the schema API
type SnapshotHeader ¶
type SnapshotHeader struct { Topic string `json:"topic"` Fingerprint string `json:"fingerprint"` Schema string `json:"schema"` TxId uint32 `json:"tx_id"` LSN string `json:"lsn"` At time.Time `json:"at"` ApproxRows int `json:"approx_rows"` }
SnapshotHeader the first message on a snapshot channel
type SnapshotReader ¶
type SnapshotReader struct {
// contains filtered or unexported fields
}
func (*SnapshotReader) Chan ¶
func (sr *SnapshotReader) Chan() <-chan SnapRow
Chan returns a channel over snapshot row data. Blocks until next message is received.
func (*SnapshotReader) Header ¶
func (sr *SnapshotReader) Header() SnapshotHeader
func (*SnapshotReader) Keys ¶
func (sr *SnapshotReader) Keys() []string
Keys returns the primary keys for this table
type SnapshotRequest ¶
type SnapshotRequest struct { Database string `json:"database"` Namespace string `json:"namespace"` Table string `json:"table"` }
SnapshotRequest RPC message when requesting a snapshot
type StreamType ¶
type StreamType string
StreamType types of stream
const SchemaStream StreamType = "schema"
const SnapStream StreamType = "snap"
const WalStream StreamType = "wal"
type WAL ¶
type WAL struct { Fingerprint string `json:"fingerprint" avro:"fingerprint"` Source MessageSource `json:"source" avro:"source"` Op Op `json:"op" avro:"op"` SentAt time.Time `json:"sent_at" avro:"sent_at"` Before *map[string]interface{} `json:"before" avro:"before"` After *map[string]interface{} `json:"after" avro:"after"` }
func (WAL) FullIdentifier ¶
FullIdentifier returns the full identifier of the WAL message, ie db.namespace.table
func (WAL) LocalIdentifier ¶
LocalIdentifier returns the local identifier of the WAL message, ie namespace.table