gobinlog

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

README

gobinlog

Go Report CardGoDocBuild StatusCoverage StatusLICENSE

gobinlog将自己伪装成slave获取mysql主从复杂流来获取mysql数据库的数据变更,提供轻量级,快速的dump协议交互以及binlog的row模式下的格式解析

Features

  • 轻量级,快速的dump协议交互以及binlog的row模式格式解析
  • 支持mysql 5.6.x,5.7.x,8.0.x的所有数据类型变更
  • 支持使用完整dump协议连接数据库并接受binlog数据
  • 提供函数来接受解析后完整的事务数据
  • 事务数据提供变更的列名,列数据类型,bytes类型的数据

Requests

  • mysql 5.6+
  • golang 1.11+

Installation

第三方库管理已经托管到go mod下,请开启环境变量

Quick Start

Prepare
  • 对于自建MySQL,需要先开启Binlog写入功能,配置binlog-format为ROW模式
  • 授权examle链接MySQL账号具有作为MySQL slave的权限,如果已有账户可直接grant
Coding
  • 检查mysql的binlog格式是否是row模式,并且获取一个正确的binlog位置(以文件名和位移量作定义)
  • 实现MysqlTableMapper接口,该接口是用于获取表信息的,主要是获取列属性
  • 表MysqlTable和列MysqlColumn需要实现,用于MysqlTableMapper接口
  • 生成一个RowStreamer,设置一个正确的binlog位置并使用Stream接受数据,具体可以使用sendTransaction进行具体的行为定义

See the binlogStream and documentation for more details.

Documentation

Overview

Package gobinlog 将自己伪装成slave获取mysql主从复杂流来 获取mysql数据库的数据变更,提供轻量级,快速的dump协议交互 以及binlog的row模式下的格式解析。

gobinlog使用方式非常简单,你要实现一个MysqlTableMapper

type mysqlColumnAttribute struct {
	field         string
	typ           string
}

func (m *mysqlColumnAttribute) Field() string {
	return m.field
}

func (m *mysqlColumnAttribute) IsUnSignedInt() bool {
	return strings.Contains(m.typ, mysqlUnsigned)
}

type mysqlTableInfo struct {
	name    MysqlTableName
	columns []MysqlColumn
}

func (m *mysqlTableInfo) Name() MysqlTableName {
	return m.name
}

func (m *mysqlTableInfo) Columns() []MysqlColumn {
	return m.columns
}

type exampleMysqlTableMapper struct {
	db *sql.DB
}

func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) {
	info := &mysqlTableInfo{
		name:    name,
		columns: make([]MysqlColumn, 0, 10),
	}

	query := "desc " + name.String()
	rows, err := e.db.Query(query)
	if err != nil {
		return info, fmt.Errorf("query failed query: %s, error: %v", query, err)
	}
	defer rows.Close()

	var null,key,extra string
	var columnDefault []byte
	for i := 0; rows.Next(); i++ {
		column := &mysqlColumnAttribute{}
		err = rows.Scan(&column.field, &column.typ, &null, &key, &columnDefault, &extra)
		if err != nil {
			return info, err
		}
		info.columns = append(info.columns, column)
	}
	return info, nil
}

你需要申请一个NewRowStreamer,数据库连接信息如下

user:password@tcp(ip:port)/db

其中user是mysql的用户名,password是mysql的密码,ip是mysql的ip地址, port是mysql的端口,db是mysql的数据库名,serverID要与主库不同,

s, err := gobinlog.NewStreamer(dsn, 1234, e)
if err != nil {
	_log.Errorf("NewStreamer fail. err: %v", err)
	return
}

SetBinlogPosition的参数可以通过SHOW MASTER STATUS获取,通过这个函数 可以设置同步起始位置

s.SetBinlogPosition(pos)

通过开启Stream,可以在SendTransactionFun用于处理事务信息函数,如打印事务信息

err = s.Stream(ctx, func(t *Transaction) error {
	fmt.Printf("%v", *t)
	return nil
})

如果有需要,你可以通过Error获取Stream过程中的错误

err = s.Error()

当然你如果需要详细的调试信息,你可以通过SetLogger函数设置对应的调试接口

gobinlog.SetLogger(NewDefaultLogger(os.Stdout, DebugLevel))

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(logger log.Logger)

SetLogger 设置一个符合Logger日志来打印binlog包的调试信息

Types

type ColumnData

type ColumnData struct {
	Filed   string     // 字段信息
	Type    ColumnType // binlog中的列类型
	IsEmpty bool       // data is empty,即该列没有变化
	Data    []byte     // the data
}

ColumnData 单个列的信息

func (*ColumnData) MarshalJSON

func (c *ColumnData) MarshalJSON() ([]byte, error)

MarshalJSON 实现ColumnData的json序列化

type ColumnType

type ColumnType int

ColumnType 从binlog中获取的列类型

func (ColumnType) IsBit

func (c ColumnType) IsBit() bool

IsBit 是否是bit

func (ColumnType) IsBlob

func (c ColumnType) IsBlob() bool

IsBlob 是否是二进制

func (ColumnType) IsDate

func (c ColumnType) IsDate() bool

IsDate 是否是日期

func (ColumnType) IsDateTime

func (c ColumnType) IsDateTime() bool

IsDateTime 是否是日期时间

func (ColumnType) IsDecimal

func (c ColumnType) IsDecimal() bool

IsDecimal 是否是精确实数

func (ColumnType) IsFloat

func (c ColumnType) IsFloat() bool

IsFloat 是否是实数

func (ColumnType) IsGeometry

func (c ColumnType) IsGeometry() bool

IsGeometry 是否是几何

func (ColumnType) IsInteger

func (c ColumnType) IsInteger() bool

IsInteger 是否是整形

func (ColumnType) IsString

func (c ColumnType) IsString() bool

IsString 是否是字符串

func (ColumnType) IsTime

func (c ColumnType) IsTime() bool

IsTime 是否是时间

func (ColumnType) IsTimestamp

func (c ColumnType) IsTimestamp() bool

IsTimestamp 是否是时间戳

func (ColumnType) String

func (c ColumnType) String() string

String 打印

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error gobinlog的错误

func (*Error) Error

func (e *Error) Error() string

Error 获取详细错误信息

func (*Error) Original

func (e *Error) Original() error

Original 获取不带msg的原本的错误

type FormatType

type FormatType string

FormatType binlog格式类型

func (FormatType) IsMixed

func (f FormatType) IsMixed() bool

IsMixed 是否是混合binlog格式类型

func (FormatType) IsRow

func (f FormatType) IsRow() bool

IsRow 是否是列binlog格式类型

func (FormatType) IsStatement

func (f FormatType) IsStatement() bool

IsStatement 是否是语句binlog格式类型

type MysqlColumn

type MysqlColumn interface {
	Field() string       //列字段名
	IsUnSignedInt() bool //是否是无符号整形
}

MysqlColumn 用于实现mysql表列的接口

type MysqlTable

type MysqlTable interface {
	Name() MysqlTableName   //表名
	Columns() []MysqlColumn //所有列
}

MysqlTable 用于实现mysql表的接口

type MysqlTableMapper

type MysqlTableMapper interface {
	MysqlTable(name MysqlTableName) (MysqlTable, error)
}

MysqlTableMapper 用于获取表信息的接口

type MysqlTableName

type MysqlTableName struct {
	DbName    string `json:"db"`    //数据库名
	TableName string `json:"table"` //表名
}

MysqlTableName mysql的表名

func NewMysqlTableName

func NewMysqlTableName(database, table string) MysqlTableName

NewMysqlTableName 创建MysqlTableName

func (*MysqlTableName) String

func (m *MysqlTableName) String() string

String 打印

type Position

type Position struct {
	Filename string `json:"filename"` //binlog文件名
	Offset   int64  `json:"offset"`   //在binlog文件中的位移
}

Position 指定binlog的位置,以文件名和位移

func (Position) IsZero

func (p Position) IsZero() bool

IsZero means Position is existed

type RowData

type RowData struct {
	Columns []*ColumnData
}

RowData 行数据

type SendTransactionFunc

type SendTransactionFunc func(*Transaction) error

SendTransactionFunc 处理事务信息函数,你可以将一个chan注册到这个函数中如

  func getTransaction(tran *Transaction) error{
	     Transactions <- tran
	     return nil
  }

如果这个函数返回错误,那么RowStreamer.Stream会停止dump以及解析binlog且返回错误

type StatementType

type StatementType int

StatementType means the sql statement type

const (
	StatementUnknown  StatementType = iota //不知道的语句
	StatementBegin                         //开始语句
	StatementCommit                        //提交语句
	StatementRollback                      //回滚语句
	StatementInsert                        //插入语句
	StatementUpdate                        //更新语句
	StatementDelete                        //删除语句
	StatementCreate                        //创建表语句
	StatementAlter                         //改变表属性语句
	StatementDrop                          //删除表语句
	StatementTruncate                      //截取表语句
	StatementRename                        //重命名表语句
	StatementSet                           //设置属性语句
)

sql语句类型

func GetStatementCategory

func GetStatementCategory(sql string) StatementType

GetStatementCategory we can get statement type from a SQL

func (StatementType) IsDDL

func (s StatementType) IsDDL() bool

IsDDL 是否是数据定义语句

func (StatementType) String

func (s StatementType) String() string

String 表语句类型的信息

type StreamEvent

type StreamEvent struct {
	Type          StatementType     //语句类型
	Table         MysqlTableName    //表名
	Query         replication.Query //sql
	Timestamp     int64             //执行时间
	RowValues     []*RowData        //which data come to used for StatementInsert and  StatementUpdate
	RowIdentifies []*RowData        //which data come from used for  StatementUpdate and StatementDelete
}

StreamEvent means a SQL or a rows in binlog

func (*StreamEvent) MarshalJSON

func (s *StreamEvent) MarshalJSON() ([]byte, error)

MarshalJSON 实现StreamEvent的json序列化

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer 从github.com/youtube/vitess/go/vt/binlog/binlog_streamer.go的基础上移植过来 专门用来RowStreamer解析row模式的binlog event,将其变为对应的事务

func NewStreamer

func NewStreamer(dsn string, serverID uint32,
	tableMapper MysqlTableMapper) (*Streamer, error)

NewStreamer dsn是mysql数据库的信息,serverID是标识该数据库的信息

func (*Streamer) Error

func (s *Streamer) Error() error

Error 每次使用Stream后需要检测Error

func (*Streamer) SetBinlogPosition

func (s *Streamer) SetBinlogPosition(startPos Position)

SetBinlogPosition 设置开始的binlog位置

func (*Streamer) Stream

func (s *Streamer) Stream(ctx context.Context, sendTransaction SendTransactionFunc) error

Stream 注册一个处理事务信息函数到Stream中

type Transaction

type Transaction struct {
	NowPosition  Position       //在binlog中的当前位置
	NextPosition Position       //在binlog中的下一个位置
	Timestamp    int64          //执行时间
	Events       []*StreamEvent //一组有事务的binlog evnet
}

Transaction 代表一组有事务的binlog evnet

func (*Transaction) MarshalJSON

func (t *Transaction) MarshalJSON() ([]byte, error)

MarshalJSON 实现Transaction的json序列化

Directories

Path Synopsis
cmd
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL