sink

package
v0.0.59 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpdateSourceSQL = "update pgcapture.sources set commit=$1,seq=$2,mid=$3,commit_ts=$4,apply_ts=now() where id=$5"
)

Variables

View Source
var (
	LogLSNRegex    = regexp.MustCompile(`(?:consistent recovery state reached at|redo done at) ([0-9A-F]{1,8}\/[0-9A-F]{1,8})`)
	LogTxTimeRegex = regexp.MustCompile(`last completed transaction was at log time (.*)\.?$`)
)

Functions

func PGTime2Time

func PGTime2Time(ts uint64) time.Time

func ScanCheckpointFromLog

func ScanCheckpointFromLog(f io.Reader) (lsn, ts string, err error)

Types

type ApplyFn

type ApplyFn func(sourceRemaining int, message source.Change, committed chan cursor.Checkpoint) error

type BaseSink

type BaseSink struct {
	CleanFn CleanFn
	// contains filtered or unexported fields
}

func (*BaseSink) Apply

func (b *BaseSink) Apply(changes chan source.Change) (committed chan cursor.Checkpoint)

func (*BaseSink) Error

func (b *BaseSink) Error() error

func (*BaseSink) Setup

func (b *BaseSink) Setup() (cp cursor.Checkpoint, err error)

func (*BaseSink) Stop

func (b *BaseSink) Stop() error

type CleanFn

type CleanFn func()

type PGXSink

type PGXSink struct {
	BaseSink

	ConnStr     string
	SourceID    string
	Renice      int64
	LogReader   io.Reader
	BatchTXSize int
	// contains filtered or unexported fields
}

func (*PGXSink) Apply

func (p *PGXSink) Apply(changes chan source.Change) chan cursor.Checkpoint

func (*PGXSink) ReplicationLagMilliseconds

func (p *PGXSink) ReplicationLagMilliseconds() int64

func (*PGXSink) Setup

func (p *PGXSink) Setup() (cp cursor.Checkpoint, err error)

type PulsarSink

type PulsarSink struct {
	BaseSink

	PulsarOption pulsar.ClientOptions
	PulsarTopic  string
	// For overriding the cluster list to be replicated to
	ReplicatedClusters []string

	SetupTracker SetupTracker
	// contains filtered or unexported fields
}

func (*PulsarSink) Apply

func (p *PulsarSink) Apply(changes chan source.Change) chan cursor.Checkpoint

func (*PulsarSink) Setup

func (p *PulsarSink) Setup() (cp cursor.Checkpoint, err error)

type SetupTracker

type SetupTracker func(client pulsar.Client, topic string) (cursor.Tracker, error)

type Sink

type Sink interface {
	Setup() (cp cursor.Checkpoint, err error)
	Apply(changes chan source.Change) (committed chan cursor.Checkpoint)
	Error() error
	Stop() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL