model

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DBZUMMYSQL = "debezium-mysql"
	DBZUMMONGO = "debezium-mongodb"
	CANALMYSQL = "canal-mysql"
	CONNMONGO  = "connector-mongodb"
)
View Source
const (
	DataTypeString = iota
	DataTypeInt
	DataTypeFloat
	DataTypeTime
)

supported data type

View Source
const (
	SQLDateCanalTimeLayout = "2006-01-02 15:04:05"
	MySQLTimeLayout        = "2006-01-02T15:04:05Z"
	DateTimeLayout         = "2006-01-02"
)

SQLDateTimeLayout SQLDateTimeLayout

Variables

View Source
var (
	// ErrAction 错误的类型信息
	ErrAction = errors.New("not except action")
	// ErrEmptyPayload
	ErrEmptyPayload = errors.New("payload is empty")
)
View Source
var NullValMap = map[DataType]interface{}{
	DataTypeString: "",
	DataTypeInt:    0,
	DataTypeFloat:  0.0,
	DataTypeTime:   time.Unix(0, 0),
}

NullValMap 记录该类型的默认值

Functions

func FormatDate

func FormatDate(timeStr string) time.Time

func ParseSQLValueByType

func ParseSQLValueByType(typ DataType, str string) (interface{}, error)

ParseSQLValueByType 按类型

Types

type CanalMysql

type CanalMysql struct {
	Before []ValueMap              `json:"old"`
	Cache  []originJson.RawMessage `json:"data"`
	After  []ValueMap
	Op     string `json:"type"`
	// contains filtered or unexported fields
}

CanalMysql 新的类型

func (*CanalMysql) GetCacheMap

func (mysql *CanalMysql) GetCacheMap() *ValueMap

GetCacheMap 获取解析好的map数据

func (*CanalMysql) GetCategory

func (mysql *CanalMysql) GetCategory() string

GetCategory 获取类型,是mysql还是mongo

func (*CanalMysql) GetExistsKeys

func (mysql *CanalMysql) GetExistsKeys() []int8

GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在

func (*CanalMysql) GetOp

func (mysql *CanalMysql) GetOp() string

GetOp 获取 opLog/binLog 的操作类型

func (*CanalMysql) GetValues

func (mysql *CanalMysql) GetValues() []interface{}

GetValues 用于获取 准备好插入db的[]interface{}

func (*CanalMysql) ParseToMap

func (mysql *CanalMysql) ParseToMap(table *SQLTable) (ValueMap, error)

ParseToMap 将json解析成map

func (*CanalMysql) SetCacheMap

func (mysql *CanalMysql) SetCacheMap(m *ValueMap)

SetCacheMap 将json解析成map并放进来

func (*CanalMysql) SetExistsKeys

func (mysql *CanalMysql) SetExistsKeys(val []int8)

SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新

func (*CanalMysql) SetOp

func (mysql *CanalMysql) SetOp(s string)

SetOp 写入opLog/binLog 的类型

func (*CanalMysql) SetValues

func (mysql *CanalMysql) SetValues(val []interface{})

SetValues 存放将插入db的 []interface{}

func (*CanalMysql) UnmarshalFromByte

func (mysql *CanalMysql) UnmarshalFromByte(b []byte, pool *MapPool) error

UnmarshalFromStr UnmarshalFromStr

func (*CanalMysql) UnmarshalFromStr

func (mysql *CanalMysql) UnmarshalFromStr(str string, pool *MapPool) error

UnmarshalFromStr UnmarshalFromStr

func (*CanalMysql) Unpack

func (mysql *CanalMysql) Unpack() []DataInterface

Unpack 展开,防止 canal 用 batch 方式提交

type ConnectorMongo

type ConnectorMongo struct {
	Payload           ValueMap `json:"fullDocument"`
	DocumentKey       ValueMap `json:"documentKey"`
	UpdateDescription struct {
		UpdatedFields ValueMap `json:"updatedFields"`
		RemovedFields []string `json:"removedFields"`
	} `json:"updateDescription"`

	OperationType string `json:"operationType"`

	FullDocument ValueMap
	// contains filtered or unexported fields
}

ConnectorMongo mongo type of connector

func (*ConnectorMongo) GetCacheMap

func (mongo *ConnectorMongo) GetCacheMap() *ValueMap

GetCacheMap 获取解析好的map数据

func (*ConnectorMongo) GetCategory

func (mongo *ConnectorMongo) GetCategory() string

GetCategory 获取类型,是mysql还是mongo

func (*ConnectorMongo) GetExistsKeys

func (mongo *ConnectorMongo) GetExistsKeys() []int8

GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在

func (*ConnectorMongo) GetOp

func (mongo *ConnectorMongo) GetOp() string

GetOp 获取操作类型

func (*ConnectorMongo) GetValues

func (mongo *ConnectorMongo) GetValues() []interface{}

GetValues 用于获取 准备好插入db的[]interface{}

func (*ConnectorMongo) ParseToMap

func (mongo *ConnectorMongo) ParseToMap(table *SQLTable) (ValueMap, error)

ParseToMap 将json解析成map

func (*ConnectorMongo) SetCacheMap

func (mongo *ConnectorMongo) SetCacheMap(m *ValueMap)

SetCacheMap 将json解析成map并放进来

func (*ConnectorMongo) SetExistsKeys

func (mongo *ConnectorMongo) SetExistsKeys(val []int8)

SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新

func (*ConnectorMongo) SetOp

func (mongo *ConnectorMongo) SetOp(s string)

SetOp 写入opLog/binLog 的类型

func (*ConnectorMongo) SetValues

func (mongo *ConnectorMongo) SetValues(val []interface{})

SetValues 存放将插入db的 []interface{}

func (*ConnectorMongo) UnmarshalFromByte

func (mongo *ConnectorMongo) UnmarshalFromByte(b []byte, mp *MapPool) error

UnmarshalFromByte UnmarshalFromStr

func (*ConnectorMongo) UnmarshalFromStr

func (mongo *ConnectorMongo) UnmarshalFromStr(str string, mp *MapPool) error

UnmarshalFromStr UnmarshalFromStr

func (*ConnectorMongo) Unpack

func (mongo *ConnectorMongo) Unpack() []DataInterface

Unpack Unpack

type Data

type Data struct {
	Keys                  []string
	Values                []interface{} // 不用map是因为数据量少的情况下,slice性能更高
	CheckPoint, Operation string
}

Data 关系型数据的一条记录

func NewData

func NewData() *Data

NewData 新建Data

func (*Data) Exists

func (s *Data) Exists(k string) bool

Exists 判断键是否存在

func (*Data) Get

func (s *Data) Get(k string) interface{}

Get 获取键值

func (*Data) GetValues

func (s *Data) GetValues(keys []string) []interface{}

GetValues 获取指定键的值

func (*Data) Set

func (s *Data) Set(k string, v interface{})

Set 设置键值

func (*Data) String

func (s *Data) String() string

String 转换成string,方便打印

type DataInterface

type DataInterface interface {
	GetCategory() string
	GetOp() string
	SetOp(s string)

	SetValues(val []interface{})
	GetValues() []interface{}
	ParseToMap(table *SQLTable) (ValueMap, error)
	SetExistsKeys([]int8)
	GetExistsKeys() []int8
	SetCacheMap(m *ValueMap)
	GetCacheMap() *ValueMap

	UnmarshalFromStr(str string, mappool *MapPool) error
	UnmarshalFromByte(b []byte, mappool *MapPool) error

	Unpack() []DataInterface
}

DataInterface mysql 和 mongo 的封装

type DataType

type DataType int

DataType 用于Go内部转换的数据类型

func ParseTypeByCkType

func ParseTypeByCkType(ckType string) DataType

ParseTypeByCkType 解析类型到ch的类型

func ParseTypeByMysqlType

func ParseTypeByMysqlType(sqlType string) DataType

ParseTypeByMysqlType 将MySQL的数据类型转换为Go语言内部转换用的DataType

type DebeziumMongo

type DebeziumMongo struct {
	Payload *debeziumMongoPayload `json:"payload"`
	// contains filtered or unexported fields
}

DebeziumMongo 类型

func (*DebeziumMongo) GetCacheMap

func (mongo *DebeziumMongo) GetCacheMap() *ValueMap

GetCacheMap 获取解析好的map数据

func (*DebeziumMongo) GetCategory

func (mongo *DebeziumMongo) GetCategory() string

GetCategory 获取类型,是mysql还是mongo

func (*DebeziumMongo) GetExistsKeys

func (mongo *DebeziumMongo) GetExistsKeys() []int8

GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在

func (*DebeziumMongo) GetOp

func (mongo *DebeziumMongo) GetOp() string

GetOp 获取 opLog/binLog 的操作类型

func (*DebeziumMongo) GetValues

func (mongo *DebeziumMongo) GetValues() []interface{}

GetValues 用于获取 准备好插入db的[]interface{}

func (*DebeziumMongo) ParseToMap

func (mongo *DebeziumMongo) ParseToMap(table *SQLTable) (ValueMap, error)

ParseToMap 将json解析成map

func (*DebeziumMongo) SetCacheMap

func (mongo *DebeziumMongo) SetCacheMap(m *ValueMap)

SetCacheMap 将json解析成map并放进来

func (*DebeziumMongo) SetExistsKeys

func (mongo *DebeziumMongo) SetExistsKeys(val []int8)

SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新

func (*DebeziumMongo) SetOp

func (mongo *DebeziumMongo) SetOp(s string)

SetOp 写入opLog/binLog 的类型

func (*DebeziumMongo) SetValues

func (mongo *DebeziumMongo) SetValues(val []interface{})

SetValues 存放将插入db的 []interface{}

func (*DebeziumMongo) UnmarshalFromByte

func (mongo *DebeziumMongo) UnmarshalFromByte(b []byte, mp *MapPool) error

UnmarshalFromByte UnmarshalFromStr

func (*DebeziumMongo) UnmarshalFromStr

func (mongo *DebeziumMongo) UnmarshalFromStr(str string, mp *MapPool) error

UnmarshalFromStr UnmarshalFromStr

func (*DebeziumMongo) Unpack

func (mongo *DebeziumMongo) Unpack() []DataInterface

Unpack Unpack

type DebeziumMySQL

type DebeziumMySQL struct {
	Payload *debeziumMysqlPayload `json:"payload"`
	// contains filtered or unexported fields
}

DebeziumMySQL 类型

func (*DebeziumMySQL) GetCacheMap

func (mysql *DebeziumMySQL) GetCacheMap() *ValueMap

GetCacheMap 获取解析好的map数据

func (*DebeziumMySQL) GetCategory

func (mysql *DebeziumMySQL) GetCategory() string

GetCategory 获取类型,是mysql还是mongo

func (*DebeziumMySQL) GetExistsKeys

func (mysql *DebeziumMySQL) GetExistsKeys() []int8

GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在

func (*DebeziumMySQL) GetOp

func (mysql *DebeziumMySQL) GetOp() string

GetOp 获取 opLog/binLog 的操作类型

func (*DebeziumMySQL) GetValues

func (mysql *DebeziumMySQL) GetValues() []interface{}

GetValues 用于获取 准备好插入db的[]interface{}

func (*DebeziumMySQL) ParseToMap

func (mysql *DebeziumMySQL) ParseToMap(table *SQLTable) (ValueMap, error)

ParseToMap 将json解析成map

func (*DebeziumMySQL) SetCacheMap

func (mysql *DebeziumMySQL) SetCacheMap(m *ValueMap)

SetCacheMap 将json解析成map并放进来

func (*DebeziumMySQL) SetExistsKeys

func (mysql *DebeziumMySQL) SetExistsKeys(val []int8)

SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新

func (*DebeziumMySQL) SetOp

func (mysql *DebeziumMySQL) SetOp(s string)

SetOp 写入opLog/binLog 的类型

func (*DebeziumMySQL) SetValues

func (mysql *DebeziumMySQL) SetValues(val []interface{})

SetValues 存放将插入db的 []interface{}

func (*DebeziumMySQL) UnmarshalFromByte

func (mysql *DebeziumMySQL) UnmarshalFromByte(b []byte, mp *MapPool) error

UnmarshalFromByte UnmarshalFromStr

func (*DebeziumMySQL) UnmarshalFromStr

func (mysql *DebeziumMySQL) UnmarshalFromStr(str string, mp *MapPool) error

UnmarshalFromStr UnmarshalFromStr

func (*DebeziumMySQL) Unpack

func (mysql *DebeziumMySQL) Unpack() []DataInterface

Unpack Unpack

type MapPool

type MapPool struct {
	// contains filtered or unexported fields
}

MapPool map pool

func NewMapPool

func NewMapPool() *MapPool

NewMapPool 生成新的 MapPool

func (*MapPool) Get

func (p *MapPool) Get() ValueMap

Get 获取新的 ValueMap

func (*MapPool) Put

func (p *MapPool) Put(m ValueMap)

Put 放入不用的 ValueMap

type SQLTable

type SQLTable struct {
	DbName              string
	Table               string
	Types               map[string]DataType // 列名和类型的映射
	PrimaryKey          string
	Columns             []string      // 列信息(有序)
	ColumnsDefaultValue []interface{} //  用作nil填充
	PrimaryKeyIndex     int

	InsertSQL string
	QuerySQL  string
	QueryNode *sql.DB
}

SQLTable 记录 clickhouse table 各种信息

type ValueMap

type ValueMap map[string]interface{}

ValueMap 存储的解析出来的map, k 是字段名,v是数据

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL