Documentation ¶
Overview ¶
Package binlog 将自己伪装成slave获取mysql主从复杂流来 获取mysql数据库的数据变更,提供轻量级,快速的dump协议交互 以及binlog的row模式下的格式解析。使用方式较为简单,首先你 要实现一个MysqlTableMapper
type mysqlColumnAttribute struct { field string typ string } func (m *mysqlColumnAttribute) Field() string { return m.field } func (m *mysqlColumnAttribute) IsUnSignedInt() bool { return strings.Contains(m.typ, mysqlUnsigned) } type mysqlTableInfo struct { name MysqlTableName columns []MysqlColumn } func (m *mysqlTableInfo) Name() MysqlTableName { return m.name } func (m *mysqlTableInfo) Columns() []MysqlColumn { return m.columns } type exampleMysqlTableMapper struct { db *sql.DB } func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) { info := &mysqlTableInfo{ name: name, columns: make([]MysqlColumn, 0, 10), } query := "desc " + name.String() rows, err := e.db.Query(query) if err != nil { return info, fmt.Errorf("query failed query: %s, error: %v", query, err) } defer rows.Close() var null,key,extra string var columnDefault []byte for i := 0; rows.Next(); i++ { column := &mysqlColumnAttribute{} err = rows.Scan(&column.field, &column.typ, &null, &key, &columnDefault, &extra) if err != nil { return info, err } info.columns = append(info.columns, column) } return info, nil }
再申请一个NewRowStreamer,数据库连接信息为user:password@tcp(ip:port)/db user是mysql的用户名,password是mysql的密码,ip是mysql的ip地址, port是mysql的端口,db是mysql的数据库名,serverID要与主库不同, SetStartBinlogPosition的参数可以通过SHOW MASTER STATUS获取
dsn := "example:example@tcp(localhost:3306)/mysql" r, err := NewRowStreamer(dsn, 1234, e) if err != nil { fmt.Printf("NewRowStreamer fail. err: %v", err) return } r.SetStartBinlogPosition(pos)
然后开启Stream,可以在SendTransactionFun用于处理事务信息函数,如打印事务信息
ctx := context.Background() err = r.Stream(ctx, func(t *Transaction) error { fmt.Printf("%v", *t) return nil })
最后可以通过ctx的cancal结束本binlog流的同步
Index ¶
- Constants
- Variables
- func SetLogger(logger Logger)
- type ColumnData
- type ColumnType
- func (c ColumnType) IsBit() bool
- func (c ColumnType) IsBlob() bool
- func (c ColumnType) IsDate() bool
- func (c ColumnType) IsDateTime() bool
- func (c ColumnType) IsDecimal() bool
- func (c ColumnType) IsFloat() bool
- func (c ColumnType) IsGeometry() bool
- func (c ColumnType) IsInteger() bool
- func (c ColumnType) IsString() bool
- func (c ColumnType) IsTime() bool
- func (c ColumnType) IsTimestamp() bool
- func (c ColumnType) String() string
- type FormatType
- type LogLevel
- type Logger
- type MysqlColumn
- type MysqlTable
- type MysqlTableMapper
- type MysqlTableName
- type Position
- type RowData
- type RowStreamer
- type SendTransactionFunc
- type StatementType
- type StreamEvent
- type Transaction
Examples ¶
Constants ¶
const ( ColumnTypeDecimal = replication.TypeDecimal //精确实数 ColumnTypeTiny = replication.TypeTiny //int8 ColumnTypeShort = replication.TypeShort //int16 ColumnTypeLong = replication.TypeLong //int32 ColumnTypeFloat = replication.TypeFloat //float32 ColumnTypeDouble = replication.TypeDouble //float64 ColumnTypeNull = replication.TypeNull //null ColumnTypeTimestamp = replication.TypeTimestamp //时间戳 ColumnTypeLongLong = replication.TypeLongLong //int64 ColumnTypeInt24 = replication.TypeInt24 //int24 ColumnTypeDate = replication.TypeDate //日期 ColumnTypeTime = replication.TypeTime //时间 ColumnTypeDateTime = replication.TypeDateTime //日期时间 ColumnTypeYear = replication.TypeYear //year ColumnTypeNewDate = replication.TypeNewDate //日期 ColumnTypeVarchar = replication.TypeVarchar //可变字符串 ColumnTypeBit = replication.TypeBit //bit ColumnTypeTimestamp2 = replication.TypeTimestamp2 //时间戳 ColumnTypeDateTime2 = replication.TypeDateTime2 //日期时间 ColumnTypeTime2 = replication.TypeTime2 //时间 ColumnTypeJSON = replication.TypeJSON //json ColumnTypeNewDecimal = replication.TypeNewDecimal //精确实数 ColumnTypeEnum = replication.TypeEnum //枚举 ColumnTypeSet = replication.TypeSet //字符串 ColumnTypeTinyBlob = replication.TypeTinyBlob //小型二进制 ColumnTypeMediumBlob = replication.TypeMediumBlob //中型二进制 ColumnTypeLongBlob = replication.TypeLongBlob //长型二进制 ColumnTypeBlob = replication.TypeBlob //长型二进制 ColumnTypeVarString = replication.TypeVarString //可变字符串 ColumnTypeString = replication.TypeString //字符串 ColumnTypeGeometry = replication.TypeGeometry //几何 )
列数据类型
Variables ¶
var ( FormatTypeRow = FormatType("ROW") //列 FormatTypeMixed = FormatType("MIXED") //混合 FormatTypeStatement = FormatType("STATEMENT") //语句 )
binlog格式类型
var (
ErrStreamEOF = errors.New("stream reached EOF") //信息流到达EOF
)
信息流到达EOF错误信息用于标识binlog流结束
Functions ¶
Types ¶
type ColumnData ¶
type ColumnData struct { Filed string // 字段信息 Type ColumnType // binlog中的列类型 IsEmpty bool // data is empty,即该列没有变化 Data []byte // the data }
ColumnData 单个列的信息
func NewColumnData ¶
func NewColumnData(filed string, typ ColumnType, isEmpty bool) *ColumnData
NewColumnData 创建ColumnData
func (*ColumnData) MarshalJSON ¶
func (c *ColumnData) MarshalJSON() ([]byte, error)
MarshalJSON 实现ColumnData的json序列化
type Logger ¶
type Logger interface { Errorf(string, ...interface{}) //错误日志打印 Infof(string, ...interface{}) //进程日志打印 Debugf(string, ...interface{}) //调试日志打印 Print(args ...interface{}) //打印dump包的错误日志 }
Logger 用于打印binlog包的调试日志
type MysqlColumn ¶
MysqlColumn 用于实现mysql表列的接口
type MysqlTable ¶
type MysqlTable interface { Name() MysqlTableName //表名 Columns() []MysqlColumn //所有列 }
MysqlTable 用于实现mysql表的接口
type MysqlTableMapper ¶
type MysqlTableMapper interface {
MysqlTable(name MysqlTableName) (MysqlTable, error)
}
MysqlTableMapper 用于获取表信息的接口
type MysqlTableName ¶
type MysqlTableName struct { DbName string `json:"db"` //数据库名 TableName string `json:"table"` //表名 }
MysqlTableName mysql的表名
func NewMysqlTableName ¶
func NewMysqlTableName(database, table string) MysqlTableName
NewMysqlTableName 创建MysqlTableName
type Position ¶
type Position struct { Filename string `json:"filename"` //binlog文件名 Offset int64 `json:"offset"` //在binlog文件中的位移 }
Position 指定binlog的位置,以文件名和位移
type RowStreamer ¶
type RowStreamer struct {
// contains filtered or unexported fields
}
RowStreamer 从github.com/youtube/vitess/go/vt/binlog/binlog_streamer.go的基础上移植过来 专门用来RowStreamer解析row模式的binlog event,将其变为对应的事务
func NewRowStreamer ¶
func NewRowStreamer(dsn string, serverID uint32, tableMapper MysqlTableMapper) (*RowStreamer, error)
NewRowStreamer dsn是mysql数据库的信息,serverID是标识该数据库的信息
func (*RowStreamer) SetStartBinlogPosition ¶
func (s *RowStreamer) SetStartBinlogPosition(startPos Position)
SetStartBinlogPosition 设置开始的binlog位置
func (*RowStreamer) Stream ¶
func (s *RowStreamer) Stream(ctx context.Context, sendTransaction SendTransactionFunc) error
Stream 注册一个处理事务信息函数到Stream中
Example ¶
package main import ( "context" "database/sql" "fmt" "log" "os" "os/signal" "strings" //_ "github.com/go-sql-driver/mysql" you need it in you own project ) const ( mysqlUnsigned = "unsigned" //无符号 ) // 列属性 type mysqlColumnAttribute struct { field string //列名 typ string //列类型 null string //是否为空 key string //PRI代表主键,UNI代表唯一索引 columnDefault []byte //默认值 extra string //其他备注信息 } func (m *mysqlColumnAttribute) Field() string { return m.field } func (m *mysqlColumnAttribute) IsUnSignedInt() bool { return strings.Contains(m.typ, mysqlUnsigned) } type mysqlTableInfo struct { name MysqlTableName columns []MysqlColumn } func (m *mysqlTableInfo) Name() MysqlTableName { return m.name } func (m *mysqlTableInfo) Columns() []MysqlColumn { return m.columns } type exampleMysqlTableMapper struct { db *sql.DB } func (e *exampleMysqlTableMapper) GetBinlogFormat() (format FormatType, err error) { query := "SHOW VARIABLES LIKE 'binlog_format'" var name, str string err = e.db.QueryRow(query).Scan(&name, &str) if err != nil { err = fmt.Errorf("QueryRow fail. query: %s, error: %v", query, err) return } format = FormatType(str) return } func (e *exampleMysqlTableMapper) GetBinlogPosition() (pos Position, err error) { query := "SHOW MASTER STATUS" var metaDoDb, metaIgnoreDb, executedGTidSet string err = e.db.QueryRow(query).Scan(&pos.Filename, &pos.Offset, &metaDoDb, &metaIgnoreDb, &executedGTidSet) if err != nil { err = fmt.Errorf("query fail. query: %s, error: %v", query, err) return } return } func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) { info := &mysqlTableInfo{ name: name, columns: make([]MysqlColumn, 0, 10), } query := "desc " + name.String() rows, err := e.db.Query(query) if err != nil { return info, fmt.Errorf("query failed query: %s, error: %v", query, err) } defer rows.Close() for i := 0; rows.Next(); i++ { column := &mysqlColumnAttribute{} err = rows.Scan(&column.field, &column.typ, &column.null, &column.key, &column.columnDefault, &column.extra) if err != nil { return info, err } info.columns = append(info.columns, column) } return info, nil } func showTransaction(t *Transaction) { b, err := t.MarshalJSON() if err != nil { lw.logger().Errorf("MarshalJSON fail. err: %v", err) return } lw.logger().Print("%v", string(b)) } func main() { SetLogger(NewDefaultLogger(os.Stdout, DebugLevel)) dsn := "example:example@tcp(localhost:3306)/mysql?charset=utf8mb4" db, err := sql.Open("mysql", dsn) if err != nil { lw.logger().Errorf("open fail. err: %v", err) return } defer db.Close() db.SetMaxIdleConns(2) db.SetMaxOpenConns(4) e := &exampleMysqlTableMapper{db: db} format, err := e.GetBinlogFormat() if err != nil { lw.logger().Errorf("getBinlogFormat fail. err: %v", err) return } if !format.IsRow() { lw.logger().Errorf("binlog format is not row. format: %v", format) return } pos, err := e.GetBinlogPosition() if err != nil { lw.logger().Errorf("GetBinlogPosition fail. err: %v", err) return } r, err := NewRowStreamer(dsn, 1234, e) if err != nil { lw.logger().Errorf("NewRowStreamer fail. err: %v", err) return } r.SetStartBinlogPosition(pos) ctx := context.Background() ctx, cancel := context.WithCancel(ctx) processWait := make(chan os.Signal, 1) signal.Notify(processWait, os.Kill, os.Interrupt) go func() { select { case <-processWait: cancel() } }() err = r.Stream(ctx, func(t *Transaction) error { showTransaction(t) return nil }) if err != nil { log.Fatalf("Stream fail. err: %v", err) return } }
Output:
type SendTransactionFunc ¶
type SendTransactionFunc func(*Transaction) error
SendTransactionFunc 处理事务信息函数,你可以将一个chan注册到这个函数中如
func getTransaction(tran *Transaction) error{ Transactions <- tran return nil }
如果这个函数返回错误,那么RowStreamer.Stream会停止dump以及解析binlog且返回错误
type StatementType ¶
type StatementType int
StatementType means the sql statement type
const ( StatementUnknown StatementType = iota //不知道的语句 StatementBegin //开始语句 StatementCommit //提交语句 StatementRollback //回滚语句 StatementInsert //插入语句 StatementUpdate //更新语句 StatementDelete //删除语句 StatementCreate //创建表语句 StatementAlter //改变表属性语句 StatementDrop //删除表语句 StatementTruncate //截取表语句 StatementRename //重命名表语句 StatementSet //设置属性语句 )
sql语句类型
func GetStatementCategory ¶
func GetStatementCategory(sql string) StatementType
GetStatementCategory we can get statement type from a SQL
type StreamEvent ¶
type StreamEvent struct { Type StatementType //语句类型 Table MysqlTableName //表名 SQL string //sql Timestamp int64 //执行时间 RowValues []*RowData //which data come to used for StatementInsert and StatementUpdate RowIdentifies []*RowData //which data come from used for StatementUpdate and StatementDelete }
StreamEvent means a SQL or a rows in binlog
func NewStreamEvent ¶
func NewStreamEvent(tranType StatementType, timestamp int64, table MysqlTableName) *StreamEvent
NewStreamEvent 创建StreamEvent
func (*StreamEvent) MarshalJSON ¶
func (s *StreamEvent) MarshalJSON() ([]byte, error)
MarshalJSON 实现StreamEvent的json序列化
type Transaction ¶
type Transaction struct { NowPosition Position //在binlog中的当前位置 NextPosition Position //在binlog中的下一个位置 Timestamp int64 //执行时间 Events []*StreamEvent //一组有事务的binlog evnet }
Transaction 代表一组有事务的binlog evnet
func NewTransaction ¶
func NewTransaction(now, next Position, timestamp int64, events []*StreamEvent) *Transaction
NewTransaction 创建Transaction
func (*Transaction) MarshalJSON ¶
func (t *Transaction) MarshalJSON() ([]byte, error)
MarshalJSON 实现Transaction的json序列化
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package dump 用于dump协议交互的, 从github.com/go-sql-driver/mysql的基础上修改而来,主要功能如下: 1.通过MysqlConn可以执行简单的sql命令,如set命令, 2.通过MysqlConn来和mysql库进行binlog dump github.com/go-sql-driver/mysql已经支持了所有的协议包的读写,但是 由于以下原因需要修改:1.该包不支持dump协议的交互。
|
Package dump 用于dump协议交互的, 从github.com/go-sql-driver/mysql的基础上修改而来,主要功能如下: 1.通过MysqlConn可以执行简单的sql命令,如set命令, 2.通过MysqlConn来和mysql库进行binlog dump github.com/go-sql-driver/mysql已经支持了所有的协议包的读写,但是 由于以下原因需要修改:1.该包不支持dump协议的交互。 |
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。
|
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。 |
tests
|
|