Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitBinlogSyncer ¶
func InitBinlogSyncer() (err error)
func InitTxInfoSyncer ¶
func InitTxInfoSyncer() (err error)
Types ¶
type BinlogSynchronizer ¶
type BinlogSynchronizer struct {
// contains filtered or unexported fields
}
BinlogSynchronizer 将river中的数据通过broker流向clickhouse
var (
BinlogSyncer *BinlogSynchronizer
)
func NewBinlogSyncer ¶
func NewBinlogSyncer(river *river.River, broker *broker.BinlogKafkaBroker) *BinlogSynchronizer
func (*BinlogSynchronizer) Sync ¶
func (s *BinlogSynchronizer) Sync()
type TxInfoSynchronizer ¶
type TxInfoSynchronizer struct { *broker.TxKafkaBroker // contains filtered or unexported fields }
var (
TxInfoSyncer *TxInfoSynchronizer
)
func NewTxInfoSyncer ¶
func NewTxInfoSyncer(broker *broker.TxKafkaBroker) *TxInfoSynchronizer
func (*TxInfoSynchronizer) HandleAuditLog ¶
func (s *TxInfoSynchronizer) HandleAuditLog(fn func(txEvent *types.AuditLog) error)
func (*TxInfoSynchronizer) Sync ¶
func (s *TxInfoSynchronizer) Sync()
Sync 获取kafka中的txInfo数据,根据gtid从clickhouse中获取对应的binlogEvent 然后将二者组合,流入auditChan,最后将txInfo存入clickhouse
Click to show internal directories.
Click to hide internal directories.