cdc

package module
v0.0.0-...-fa04c6a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2021 License: MIT Imports: 12 Imported by: 0

README

postgresql-cdc

postgresql-cdc is a simple Golang library that allows you to stream postgresql changes over a configurable streaming interface. The library comes with a ChannelStream that allows you to listen to postgres changes over a golanng channel.

Documentation

Index

Constants

View Source
const (
	DEBUG = iota
	INFO
	WARNING
	ERROR
	PANIC
	FATAL
	NEVER_LOG
)
View Source
const MsgTimestampFormat = "2006-01-02 15:04:05.999999-07"

Variables

View Source
var LEVEL_TO_STR = map[int]string{
	DEBUG:     "debug",
	INFO:      "info",
	WARNING:   "warning",
	ERROR:     "error",
	PANIC:     "panic",
	FATAL:     "fatal",
	NEVER_LOG: "never_log",
}
View Source
var STR_TO_LEVEL = map[string]int{
	"debug":   DEBUG,
	"info":    INFO,
	"warning": WARNING,
	"warn":    WARNING,
	"error":   ERROR,
	"panic":   PANIC,
	"fatal":   FATAL,
}

Functions

func LevelToString

func LevelToString(level int) string

func OnMessageProcessed

func OnMessageProcessed(
	ctx context.Context,
	name string,
	walMessage *Wal2JsonMessage,
) error

func ParseLevel

func ParseLevel(level string) int

func StartSession

func StartSession(
	ctx context.Context,
	loggr Logger,
	dsn string,
	identifier string,
	connector CDCStreamer,
	startLSN pglogrepl.LSN,
	busyWaitInterval time.Duration,
	keepAlive time.Duration) (string, error)

Types

type CDCStreamer

type CDCStreamer interface {
	Send(walMessage *Wal2JsonMessage)
	Receive() (*Wal2JsonMessage, error)
	Close() error
}

type ChannelStream

type ChannelStream struct {
	// contains filtered or unexported fields
}

func NewChannelStream

func NewChannelStream() *ChannelStream

func (*ChannelStream) Close

func (stream *ChannelStream) Close() error

func (*ChannelStream) Receive

func (stream *ChannelStream) Receive() (*Wal2JsonMessage, error)

func (*ChannelStream) Send

func (stream *ChannelStream) Send(walMessage *Wal2JsonMessage)

type Logger

type Logger interface {
	Debug(...interface{})
	Info(...interface{})
	Warn(...interface{})
	Error(...interface{})
	Fatal(...interface{})
	Panic(...interface{})
	SetLevel(int)
	Debugf(string, ...interface{})
	Infof(string, ...interface{})
	Warnf(string, ...interface{})
	Errorf(string, ...interface{})
	Fatalf(string, ...interface{})
	Panicf(string, ...interface{})
}

type Wal2JsonChange

type Wal2JsonChange struct {
	Kind         string          `json:"kind"`
	Schema       string          `json:"schema"`
	Table        string          `json:"table"`
	ColumnNames  []string        `json:"columnnames"`
	ColumnTypes  []string        `json:"columntypes"`
	ColumnValues []interface{}   `json:"columnvalues"`
	OldKeys      Wal2JsonOldKeys `json:"oldkeys"`
}

Wal2JsonChange defines children of root documents

type Wal2JsonMessage

type Wal2JsonMessage struct {
	Change    []Wal2JsonChange `json:"change"`
	Timestamp time.Time        `json:"timestamp"`
	NextLSN   pglogrepl.LSN    `json:"nextlsn"`
	WALStart  pglogrepl.LSN    `json:"-"`
}

func (*Wal2JsonMessage) UnmarshalJSON

func (w *Wal2JsonMessage) UnmarshalJSON(data []byte) error

type Wal2JsonOldKeys

type Wal2JsonOldKeys struct {
	KeyNames  []string      `json:"keynames"`
	KeyTypes  []string      `json:"keytypes"`
	KeyValues []interface{} `json:"keyvalues"`
}

Wal2JsonOldKeys defines children of OldKeys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL