server

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2020 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var AllSchemaAndTablekey string = GetSchemaAndTableJoin("*", "*")
View Source
var DbList map[string]*db
View Source
var DbLock sync.Mutex
View Source
var TmpPositioin []*TmpPositioinStruct

Functions

func AddNewDB

func AddNewDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, serverId uint32, maxFileName string, maxPosition uint32, AddTime int64) *db

func AddTable

func AddTable(db string, schema string, tableName string, channelId int) error

func AddTableToServer

func AddTableToServer(db string, schemaName string, tableName string, ToServerInfo ToServer) error

func Close added in v1.2.2

func Close()

func CompareBinlogPositionAndReturnGreater

func CompareBinlogPositionAndReturnGreater(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, BinlogPosition2 uint32) (int, uint32)

func CompareBinlogPositionAndReturnLess

func CompareBinlogPositionAndReturnLess(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, BinlogPosition2 uint32) (int, uint32)

func DelChannel

func DelChannel(name string, channelID int) bool

func DelDB

func DelDB(Name string) bool

func DelTable

func DelTable(db string, schema string, tableName string) error

func DoRecoveryByBackupData added in v1.2.2

func DoRecoveryByBackupData(fileContent string)

func DoRecoverySnapshotData added in v1.2.2

func DoRecoverySnapshotData()

func DoSaveSnapshotData added in v1.2.2

func DoSaveSnapshotData()

func GetDBObj

func GetDBObj(Name string) *db

func GetFileQueue added in v1.2.2

func GetFileQueue(dbName, SchemaName, tableName, ToServerID string) string

func GetListDb

func GetListDb() map[string]DbListStruct

func GetSchemaAndTableBySplit

func GetSchemaAndTableBySplit(schemaAndTableName string) (schemaName, tableName string)

func GetSchemaAndTableJoin

func GetSchemaAndTableJoin(schema, tableName string) string

func GetSnapshotData added in v1.2.2

func GetSnapshotData() ([]byte, error)

func GetSnapshotData2 added in v1.2.2

func GetSnapshotData2() ([]byte, error)

只获取 数据源 和 目标库的镜像数据

func InitStorage

func InitStorage()

func NewConsumeChannel

func NewConsumeChannel(c *Channel) *consume_channel_obj

func NewDb

func NewDb(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, serverId uint32, maxFileName string, maxPosition uint32, AddTime int64) *db

func Recovery

func Recovery(content *json.RawMessage, isStop bool)

func SaveDBConfigInfo

func SaveDBConfigInfo()

func SaveDBInfoToFileData

func SaveDBInfoToFileData() interface{}

func StopAllChannel

func StopAllChannel()

func UpdateDB

func UpdateDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, serverId uint32, maxFileName string, maxPosition uint32, UpdateTime int64, updateToServer int8) error

Types

type Channel

type Channel struct {
	sync.Mutex
	Name string

	MaxThreadNum     int // 消费通道的最大线程数
	CurrentThreadNum int
	Status           string //stop ,starting,running,wait
	// contains filtered or unexported fields
}

func GetChannel

func GetChannel(name string, channelID int) *Channel

func NewChannel

func NewChannel(MaxThreadNum int, Name string, db *db) *Channel

func (*Channel) Close

func (Channel *Channel) Close()

func (*Channel) GetChannel

func (Channel *Channel) GetChannel() chan mysql.EventReslut

func (*Channel) GetChannelMaxThreadNum

func (This *Channel) GetChannelMaxThreadNum() int

func (*Channel) GetCountChan

func (Channel *Channel) GetCountChan() chan *count.FlowCount

func (*Channel) SetChannelMaxThreadNum

func (This *Channel) SetChannelMaxThreadNum(n int)

func (*Channel) SetFlowCountChan

func (Channel *Channel) SetFlowCountChan(flowChan chan *count.FlowCount)

func (*Channel) Start

func (Channel *Channel) Start() chan mysql.EventReslut

func (*Channel) Stop

func (Channel *Channel) Stop()

type DbListStruct

type DbListStruct struct {
	Name                  string
	ConnectUri            string
	ConnStatus            string //close,stop,starting,running
	ConnErr               string
	ChannelCount          int
	LastChannelID         int
	TableCount            int
	BinlogDumpFileName    string
	BinlogDumpPosition    uint32
	BinlogDumpTimestamp   uint32
	MaxBinlogDumpFileName string
	MaxBinlogDumpPosition uint32
	ReplicateDoDb         map[string]uint8
	ServerId              uint32
	AddTime               int64
}

func GetDbInfo added in v1.2.2

func GetDbInfo(dbname string) *DbListStruct

type Table

type Table struct {
	sync.Mutex
	Name           string
	ChannelKey     int
	LastToServerID int
	ToServerList   []*ToServer
}

type TmpPositioinStruct

type TmpPositioinStruct struct {
	sync.RWMutex
	Data map[string]positionStruct
}

type ToServer

type ToServer struct {
	sync.RWMutex
	ToServerID         int
	PluginName         string
	MustBeSuccess      bool
	FilterQuery        bool
	FilterUpdate       bool
	FieldList          []string
	ToServerKey        string
	BinlogFileNum      int
	BinlogPosition     uint32
	PluginParam        map[string]interface{}
	Status             string
	ToServerChan       *ToServerChan `json:"-"`
	Error              string
	ErrorWaitDeal      int
	ErrorWaitData      interface{}
	LastBinlogFileNum  int    // 由 channel 提交到 ToServerChan 的最后一个位点
	LastBinlogPosition uint32 // 假如 BinlogFileNum == LastBinlogFileNum && BinlogPosition == LastBinlogPosition 则说明这个位点是没有问题的
	LastBinlogKey      []byte `json:"-"` // 将数据保存到 level 的key
	QueueMsgCount      uint32 // 队列里的堆积的数量

	FileQueueStatus               bool // 是否启动文件队列
	Notes                         string
	ThreadCount                   int16                  // 消费线程数量
	FileQueueUsableCount          uint32                 // 在开始文件队列的配置下,每次写入 ToServerChan 后 ,在 FileQueueUsableCountTimeDiff 时间内 队列都是满的次数
	FileQueueUsableCountStartTime int64                  // 开始统计 FileQueueUsableCount 计算的时间
	CosumerPluginParamMap         map[uint16]interface{} `json:"-"` // 用以区分多个消费者的身份
	CosumerIdInrc                 uint16                 // 消费者自增id
	// contains filtered or unexported fields
}

func (*ToServer) AddWaitError

func (This *ToServer) AddWaitError(WaitErr error, WaitData interface{}) bool

func (*ToServer) AppendToFileQueue added in v1.2.2

func (This *ToServer) AppendToFileQueue(data *pluginDriver.PluginDataType) error

将数据刷到磁盘队列中

func (*ToServer) ConsumeToServer

func (This *ToServer) ConsumeToServer(db *db, SchemaName string, TableName string)

func (*ToServer) DealWaitError

func (This *ToServer) DealWaitError() bool

func (*ToServer) DelWaitError

func (This *ToServer) DelWaitError() bool

func (*ToServer) FileQueueStart added in v1.2.2

func (This *ToServer) FileQueueStart() error

文件队列启用

func (*ToServer) GetFileQueueInfo added in v1.2.2

func (This *ToServer) GetFileQueueInfo() (info filequeue.QueueInfo, err error)

查看文件队列基本信息

func (*ToServer) GetWaitErrorDeal

func (This *ToServer) GetWaitErrorDeal() int

func (*ToServer) InitFileQueue added in v1.2.2

func (This *ToServer) InitFileQueue(dbName, SchemaName, tableName string) *ToServer

初始化文件队列

func (*ToServer) PopFileQueue added in v1.2.2

func (This *ToServer) PopFileQueue() (*pluginDriver.PluginDataType, error)

从磁盘队列中取出最前面一条数据

func (*ToServer) ReadLastFromFileQueue added in v1.2.2

func (This *ToServer) ReadLastFromFileQueue() (*pluginDriver.PluginDataType, error)

从磁盘队列中取出最后面一条数据

func (*ToServer) UpdateBinlogPosition

func (This *ToServer) UpdateBinlogPosition(BinlogFileNum int, BinlogPosition uint32) bool

type ToServerChan

type ToServerChan struct {
	To chan *pluginDriver.PluginDataType
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL