Documentation ¶
Index ¶
- Constants
- type ActionData
- type ActionKind
- type Begin
- type BinaryParser
- type Column
- type Commit
- type DataType
- type Delete
- type Insert
- type Listener
- type Origin
- type Relation
- type RelationColumn
- type RelationData
- type RepositoryImpl
- func (r RepositoryImpl) Close() error
- func (r RepositoryImpl) CreatePublication(name string) error
- func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error)
- func (r RepositoryImpl) IsAlive() bool
- func (r RepositoryImpl) NewStandbyStatus(walPositions ...uint64) (status *pgx.StandbyStatus, err error)
- type TupleData
- type Update
- type WalTransaction
- func (w *WalTransaction) Clear()
- func (w *WalTransaction) CreateActionData(relationID int32, oldRows []TupleData, newRows []TupleData, kind ActionKind) (a ActionData, err error)
- func (w *WalTransaction) CreateEventsWithFilter(ctx context.Context, tableMap map[string][]string) <-chan *publisher.Event
Constants ¶
const ( Int2OID = 21 Int4OID = 23 Int8OID = 20 TextOID = 25 VarcharOID = 1043 TimestampOID = 1114 TimestamptzOID = 1184 DateOID = 1082 TimeOID = 1083 JSONBOID = 3802 UUIDOID = 2950 BoolOID = 16 )
PostgreSQL OIDs https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat
const ( // CommitMsgType protocol commit message type. CommitMsgType byte = 'C' // BeginMsgType protocol begin message type. BeginMsgType byte = 'B' // OriginMsgType protocol original message type. OriginMsgType byte = 'O' // RelationMsgType protocol relation message type. RelationMsgType byte = 'R' // TypeMsgType protocol message type. TypeMsgType byte = 'Y' // InsertMsgType protocol insert message type. InsertMsgType byte = 'I' // UpdateMsgType protocol update message type. UpdateMsgType byte = 'U' // DeleteMsgType protocol delete message type. DeleteMsgType byte = 'D' // NewTupleDataType protocol new tuple data type. NewTupleDataType byte = 'N' // TextDataType protocol test data type. TextDataType byte = 't' // NullDataType protocol NULL data type. NullDataType byte = 'n' // ToastDataType protocol toast data type. ToastDataType byte = 'u' )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActionData ¶
type ActionData struct { Schema string Table string Kind ActionKind OldColumns []Column NewColumns []Column }
ActionData kind of WAL message data.
type ActionKind ¶
type ActionKind string
ActionKind kind of action on WAL message.
const ( ActionKindInsert ActionKind = "INSERT" ActionKindUpdate ActionKind = "UPDATE" ActionKindDelete ActionKind = "DELETE" )
kind of WAL message.
type Begin ¶
type Begin struct { // Identifies the message as a begin message. LSN int64 // Commit timestamp of the transaction. Timestamp time.Time // Xid of the transaction. XID int32 }
Begin message format.
type BinaryParser ¶
type BinaryParser struct {
// contains filtered or unexported fields
}
BinaryParser represent binary protocol parser.
func NewBinaryParser ¶
func NewBinaryParser(logger *slog.Logger, byteOrder binary.ByteOrder) *BinaryParser
NewBinaryParser create instance of binary parser.
func (*BinaryParser) ParseWalMessage ¶
func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error
ParseWalMessage parse postgres WAL message.
type Column ¶
type Column struct {
// contains filtered or unexported fields
}
Column of the table with which changes occur.
func (*Column) AssertValue ¶
AssertValue converts bytes to a specific type depending on the type of this data in the database table.
type Commit ¶
type Commit struct { // Flags; currently unused (must be 0). Flags int8 // The LSN of the commit. LSN int64 // The end LSN of the transaction. TransactionLSN int64 // Commit timestamp of the transaction. Timestamp time.Time }
Commit message format.
type DataType ¶
type DataType struct { // ID of the data type. ID int32 // Namespace (empty string for pg_catalog). Namespace string // name of the data type. Name string }
DataType path of WAL message data.
type Delete ¶
type Delete struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData submessage as a key. KeyTuple bool // Identifies the following TupleData message as a old tuple. OldTuple bool // TupleData message part representing the contents of new tuple. OldRow []TupleData }
Delete message format.
type Insert ¶
type Insert struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData message as a new tuple. NewTuple bool // TupleData message part representing the contents of new tuple. NewRow []TupleData }
Insert message format.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener main service struct.
func NewWalListener ¶
func NewWalListener( cfg *config.Config, log *slog.Logger, repo repository, repl replication, pub eventPublisher, parser parser, monitor monitor, ) *Listener
NewWalListener create and initialize new service instance.
func (*Listener) AckWalMessage ¶
AckWalMessage acknowledge received wal message.
func (*Listener) SendPeriodicHeartbeats ¶
SendPeriodicHeartbeats send periodic keep alive heartbeats to the server.
func (*Listener) SendStandbyStatus ¶
SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.
type Origin ¶
type Origin struct { // The LSN of the commit on the origin server. LSN int64 // name of the origin. Name string }
Origin message format.
type Relation ¶
type Relation struct { // ID of the relation. ID int32 // Namespace (empty string for pg_catalog). Namespace string // Relation name. Name string // Replica identity setting for the relation (same as relreplident in pg_class). Replica int8 Columns []RelationColumn }
Relation message format.
type RelationColumn ¶
type RelationColumn struct { // Flags for the column which marks the column as part of the key. Key bool // name of the column. Name string // ID of the column's data type. TypeID int32 // valueType modifier of the column (atttypmod). ModifierType int32 }
RelationColumn path of WAL message data.
type RelationData ¶
RelationData kind of WAL message data.
type RepositoryImpl ¶
type RepositoryImpl struct {
// contains filtered or unexported fields
}
RepositoryImpl service repository.
func NewRepository ¶
func NewRepository(conn *pgx.Conn) *RepositoryImpl
NewRepository returns a new instance of the repository.
func (RepositoryImpl) CreatePublication ¶
func (r RepositoryImpl) CreatePublication(name string) error
CreatePublication create publication fo all.
func (RepositoryImpl) GetSlotLSN ¶
func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error)
GetSlotLSN returns the value of the last offset for a specific slot.
func (RepositoryImpl) IsAlive ¶
func (r RepositoryImpl) IsAlive() bool
IsAlive check database connection problems.
func (RepositoryImpl) NewStandbyStatus ¶ added in v2.5.0
func (r RepositoryImpl) NewStandbyStatus(walPositions ...uint64) (status *pgx.StandbyStatus, err error)
IsAlive check database connection problems.
type Update ¶
type Update struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 // Identifies the following TupleData submessage as a key. KeyTuple bool // Identifies the following TupleData message as a old tuple. OldTuple bool // TupleData message part representing the contents of the old tuple // or primary key (Only present if the previous 'O' or 'K' part is present.) OldRow []TupleData // Identifies the following TupleData message as a new tuple. NewTuple bool // TupleData message part representing the contents of new tuple. NewRow []TupleData }
Update message format.
type WalTransaction ¶
type WalTransaction struct { LSN int64 BeginTime *time.Time CommitTime *time.Time RelationStore map[int32]RelationData Actions []ActionData // contains filtered or unexported fields }
WalTransaction transaction specified WAL message.
func NewWalTransaction ¶
func NewWalTransaction(log *slog.Logger, pool *sync.Pool, monitor transactionMonitor) *WalTransaction
NewWalTransaction create and initialize new WAL transaction.
func (*WalTransaction) CreateActionData ¶
func (w *WalTransaction) CreateActionData( relationID int32, oldRows []TupleData, newRows []TupleData, kind ActionKind, ) (a ActionData, err error)
CreateActionData create action from WAL message data.
func (*WalTransaction) CreateEventsWithFilter ¶
func (w *WalTransaction) CreateEventsWithFilter(ctx context.Context, tableMap map[string][]string) <-chan *publisher.Event
CreateEventsWithFilter filter WAL message by table, action and create events for each value.