migration_tools

package
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: AGPL-3.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LOGRUS_FILE           = "LOGRUS_FILE"
	LOGRUS_LEVEL          = "LOGRUS_LEVEL"
	LOG_READ_GAPS_DIR     = "LOG_READ_GAPS_DIR"
	LOG_WRITE_GAPS_DIR    = "LOG_WRITE_GAPS_DIR"
	LOG_TRANSFER_GAPS_DIR = "LOG_TRANSFER_GAPS_DIR"

	MIGRATION_START                   = "MIGRATION_START"
	MIGRATION_STOP                    = "MIGRATION_STOP"
	MIGRATION_TABLE_NAMES             = "MIGRATION_TABLE_NAMES"
	MIGRATION_WORKERS_PER_TABLE       = "MIGRATION_WORKERS_PER_TABLE"
	MIGRATION_AUTO_RANGE              = "MIGRATION_AUTO_RANGE"
	MIGRATION_AUTO_RANGE_SEGMENT_SIZE = "MIGRATION_AUTO_RANGE_SEGMENT_SIZE"

	TRANSFER_TABLE_NAME     = "TRANSFER_TABLE_NAME"
	TRANSFER_SEGMENT_SIZE   = "TRANSFER_SEGMENT_SIZE"
	TRANSFER_SEGMENT_OFFSET = "TRANSFER_SEGMENT_OFFSET"
	TRANSFER_MAX_PAGE       = "TRANSFER_MAX_PAGE"

	OLD_DATABASE_NAME                 = "OLD_DATABASE_NAME"
	OLD_DATABASE_HOSTNAME             = "OLD_DATABASE_HOSTNAME"
	OLD_DATABASE_PORT                 = "OLD_DATABASE_PORT"
	OLD_DATABASE_USER                 = "OLD_DATABASE_USER"
	OLD_DATABASE_PASSWORD             = "OLD_DATABASE_PASSWORD"
	OLD_DATABASE_MAX_IDLE_CONNECTIONS = "OLD_DATABASE_MAX_IDLE_CONNECTIONS"
	OLD_DATABASE_MAX_OPEN_CONNECTIONS = "OLD_DATABASE_MAX_OPEN_CONNECTIONS"
	OLD_DATABASE_MAX_CONN_LIFETIME    = "OLD_DATABASE_MAX_CONN_LIFETIME"

	NEW_DATABASE_NAME                 = "NEW_DATABASE_NAME"
	NEW_DATABASE_HOSTNAME             = "NEW_DATABASE_HOSTNAME"
	NEW_DATABASE_PORT                 = "NEW_DATABASE_PORT"
	NEW_DATABASE_USER                 = "NEW_DATABASE_USER"
	NEW_DATABASE_PASSWORD             = "NEW_DATABASE_PASSWORD"
	NEW_DATABASE_MAX_IDLE_CONNECTIONS = "NEW_DATABASE_MAX_IDLE_CONNECTIONS"
	NEW_DATABASE_MAX_OPEN_CONNECTIONS = "NEW_DATABASE_MAX_OPEN_CONNECTIONS"
	NEW_DATABASE_MAX_CONN_LIFETIME    = "NEW_DATABASE_MAX_CONN_LIFETIME"
)

ENV bindings

View Source
const (
	TOML_LOGRUS_FILE           = "log.file"
	TOML_LOGRUS_LEVEL          = "log.level"
	TOML_LOG_READ_GAPS_DIR     = "log.readGapsDir"
	TOML_LOG_WRITE_GAPS_DIR    = "log.writeGapsDir"
	TOML_LOG_TRANSFER_GAPS_DIR = "log.transferGapDir"

	TOML_MIGRATION_RANGES                  = "migrator.ranges"
	TOML_MIGRATION_START                   = "migrator.start"
	TOML_MIGRATION_STOP                    = "migrator.stop"
	TOML_MIGRATION_TABLE_NAMES             = "migrator.migrationTableNames"
	TOML_MIGRATION_WORKERS_PER_TABLE       = "migrator.workersPerTable"
	TOML_MIGRATION_AUTO_RANGE              = "migrator.autoRange"
	TOML_MIGRATION_AUTO_RANGE_SEGMENT_SIZE = "migrator.segmentSize"

	TOML_TRANSFER_TABLE_NAME     = "migrator.transferTableName"
	TOML_TRANSFER_SEGMENT_SIZE   = "migrator.pagesPerTx"
	TOML_TRANSFER_SEGMENT_OFFSET = "migrator.segmentOffset"
	TOML_TRANSFER_MAX_PAGE       = "migrator.maxPage"

	TOML_OLD_DATABASE_NAME                 = "old.databaseName"
	TOML_OLD_DATABASE_HOSTNAME             = "old.databaseHostName"
	TOML_OLD_DATABASE_PORT                 = "old.databasePort"
	TOML_OLD_DATABASE_USER                 = "old.databaseUser"
	TOML_OLD_DATABASE_PASSWORD             = "old.databasePassword"
	TOML_OLD_DATABASE_MAX_IDLE_CONNECTIONS = "old.databaseMaxIdleConns"
	TOML_OLD_DATABASE_MAX_OPEN_CONNECTIONS = "old.databaseMaxOpenConns"
	TOML_OLD_DATABASE_MAX_CONN_LIFETIME    = "old.databaseMaxConnLifetime"

	TOML_NEW_DATABASE_NAME                 = "new.databaseName"
	TOML_NEW_DATABASE_HOSTNAME             = "new.databaseHostName"
	TOML_NEW_DATABASE_PORT                 = "new.databasePort"
	TOML_NEW_DATABASE_USER                 = "new.databaseUser"
	TOML_NEW_DATABASE_PASSWORD             = "new.databasePassword"
	TOML_NEW_DATABASE_MAX_IDLE_CONNECTIONS = "new.databaseMaxIdleConns"
	TOML_NEW_DATABASE_MAX_OPEN_CONNECTIONS = "new.databaseMaxOpenConns"
	TOML_NEW_DATABASE_MAX_CONN_LIFETIME    = "new.databaseMaxConnLifetime"
)

TOML mappings

View Source
const (
	CLI_LOGRUS_FILE           = "log-file"
	CLI_LOGRUS_LEVEL          = "log-level"
	CLI_LOG_READ_GAPS_DIR     = "read-gaps-dir"
	CLI_LOG_WRITE_GAPS_DIR    = "write-gaps-dir"
	CLI_LOG_TRANSFER_GAPS_DIR = "transfer-gap-dir"

	CLI_MIGRATION_START                   = "start-height"
	CLI_MIGRATION_STOP                    = "stop-height"
	CLI_MIGRATION_TABLE_NAMES             = "migration-table-names"
	CLI_MIGRATION_WORKERS_PER_TABLE       = "workers-per-table"
	CLI_MIGRATION_AUTO_RANGE              = "auto-range"
	CLI_MIGRATION_AUTO_RANGE_SEGMENT_SIZE = "migration-segment-size"

	CLI_TRANSFER_TABLE_NAME     = "transfer-table-name"
	CLI_TRANSFER_SEGMENT_SIZE   = "transfer-segment-size"
	CLI_TRANSFER_SEGMENT_OFFSET = "transfer-segment-offset"
	CLI_TRANSFER_MAX_PAGE       = "transfer-max-page"

	CLI_OLD_DATABASE_NAME                 = "old-db-name"
	CLI_OLD_DATABASE_HOSTNAME             = "old-db-hostname"
	CLI_OLD_DATABASE_PORT                 = "old-db-port"
	CLI_OLD_DATABASE_USER                 = "old-db-username"
	CLI_OLD_DATABASE_PASSWORD             = "old-db-password"
	CLI_OLD_DATABASE_MAX_IDLE_CONNECTIONS = "old-db-max-idle"
	CLI_OLD_DATABASE_MAX_OPEN_CONNECTIONS = "old-db-max-open"
	CLI_OLD_DATABASE_MAX_CONN_LIFETIME    = "old-db-max-lifetime"

	CLI_NEW_DATABASE_NAME                 = "new-db-name"
	CLI_NEW_DATABASE_HOSTNAME             = "new-db-hostname"
	CLI_NEW_DATABASE_PORT                 = "new-db-port"
	CLI_NEW_DATABASE_USER                 = "new-db-username"
	CLI_NEW_DATABASE_PASSWORD             = "new-db-password"
	CLI_NEW_DATABASE_MAX_IDLE_CONNECTIONS = "new-db-max-idle"
	CLI_NEW_DATABASE_MAX_OPEN_CONNECTIONS = "new-db-max-open"
	CLI_NEW_DATABASE_MAX_CONN_LIFETIME    = "new-db-max-lifetime"
)

CLI flags

View Source
const PgReadMinAndMaxBlockNumbers = `SELECT MIN(block_number) min, MAX(block_number) max
									FROM eth.header_cids`

PgReadMinAndMaxBlockNumbers for finding the min and max block height in the DB

Variables

View Source
var (
	// block data
	TestConfig  = params.RopstenChainConfig
	BlockNumber = TestConfig.LondonBlock
	MockHeader  = types.Header{
		Time:        0,
		Number:      new(big.Int).Set(BlockNumber),
		Root:        common.HexToHash("0x0"),
		TxHash:      common.HexToHash("0x0"),
		ReceiptHash: common.HexToHash("0x0"),
		Difficulty:  big.NewInt(5000000),
		Extra:       []byte{},
		BaseFee:     big.NewInt(params.InitialBaseFee),
		Coinbase:    common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476777"),
	}
	MockTransactions, MockReceipts, SenderAddr = createTransactionsAndReceipts(TestConfig, BlockNumber)
	MockBlock                                  = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts, new(trie.Trie))
	Address                                    = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592")
	AnotherAddress                             = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593")
	ContractAddress                            = crypto.CreateAddress(SenderAddr, MockTransactions[2].Nonce())
	MockContractByteCode                       = []byte{0, 1, 2, 3, 4, 5}

	MockLog1 = &types.Log{
		Address: Address,
		Topics:  []common.Hash{mockTopic11, mockTopic12},
		Data:    []byte{},
	}
	MockLog2 = &types.Log{
		Address: AnotherAddress,
		Topics:  []common.Hash{mockTopic21, mockTopic22},
		Data:    []byte{},
	}
	MockLog3 = &types.Log{
		Address: Address,
		Topics:  []common.Hash{mockTopic11, mockTopic22},
		Data:    []byte{},
	}
	MockLog4 = &types.Log{
		Address: AnotherAddress,
		Topics:  []common.Hash{mockTopic21, mockTopic12},
		Data:    []byte{},
	}
	ShortLog1 = &types.Log{
		Address: AnotherAddress,
		Topics:  []common.Hash{},
		Data:    []byte{},
	}
	ShortLog1RLP, _ = rlp.EncodeToBytes(ShortLog1)
	ShortLog1CID, _ = ipld.RawdataToCid(ipld.MEthLog, ShortLog1RLP, multihash.KECCAK_256)
	ShotLog1MhKey   = blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(ShortLog1CID.Hash()).String()
	ShortLog2       = &types.Log{
		Address: Address,
		Topics:  []common.Hash{},
		Data:    []byte{},
	}
	ShortLog2RLP, _ = rlp.EncodeToBytes(ShortLog2)
	ShortLog2CID, _ = ipld.RawdataToCid(ipld.MEthLog, ShortLog2RLP, multihash.KECCAK_256)
	ShotLog2MhKey   = blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(ShortLog2CID.Hash()).String()

	// access list entries
	AccessListEntry1 = types.AccessTuple{
		Address: Address,
	}
	AccessListEntry2 = types.AccessTuple{
		Address:     AnotherAddress,
		StorageKeys: []common.Hash{common.BytesToHash(StorageLeafKey), common.BytesToHash(MockStorageLeafKey)},
	}

	StorageLeafKey = crypto.Keccak256Hash(storageLocation[:]).Bytes()

	MockStorageLeafKey = crypto.Keccak256Hash(mockStorageLocation[:]).Bytes()
	StorageValue       = common.Hex2Bytes("01")
	StoragePartialPath = common.Hex2Bytes("20290decd9548b62a8d60345a988386fc84ba6bc95484008f6362f93160ef3e563")
	StorageLeafNode, _ = rlp.EncodeToBytes([]interface{}{
		StoragePartialPath,
		StorageValue,
	})

	ContractRoot       = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0"
	ContractCodeHash   = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea")
	ContractLeafKey    = test_helpers.AddressToLeafKey(ContractAddress)
	ContractAccount, _ = rlp.EncodeToBytes(&types.StateAccount{
		Nonce:    nonce1,
		Balance:  big.NewInt(0),
		CodeHash: ContractCodeHash.Bytes(),
		Root:     common.HexToHash(ContractRoot),
	})
	ContractPartialPath = common.Hex2Bytes("3114658a74d9cc9f7acf2c5cd696c3494d7c344d78bfec3add0d91ec4e8d1c45")
	ContractLeafNode, _ = rlp.EncodeToBytes([]interface{}{
		ContractPartialPath,
		ContractAccount,
	})

	AccountRoot     = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
	AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
	AccountLeafKey  = test_helpers.Account2LeafKey
	RemovedLeafKey  = test_helpers.Account1LeafKey
	Account, _      = rlp.EncodeToBytes(&types.StateAccount{
		Nonce:    nonce0,
		Balance:  big.NewInt(1000),
		CodeHash: AccountCodeHash.Bytes(),
		Root:     common.HexToHash(AccountRoot),
	})
	AccountPartialPath = common.Hex2Bytes("3957f3e2f04a0764c3a0491b175f69926da61efbcc8f61fa1455fd2d2b4cdd45")
	AccountLeafNode, _ = rlp.EncodeToBytes([]interface{}{
		AccountPartialPath,
		Account,
	})

	StateDiffs = []sdtypes.StateNode{
		{
			Path:      []byte{'\x06'},
			NodeType:  sdtypes.Leaf,
			LeafKey:   ContractLeafKey,
			NodeValue: ContractLeafNode,
			StorageNodes: []sdtypes.StorageNode{
				{
					Path:      []byte{},
					NodeType:  sdtypes.Leaf,
					LeafKey:   StorageLeafKey,
					NodeValue: StorageLeafNode,
				},
				{
					Path:      []byte{'\x03'},
					NodeType:  sdtypes.Removed,
					LeafKey:   RemovedLeafKey,
					NodeValue: []byte{},
				},
			},
		},
		{
			Path:         []byte{'\x0c'},
			NodeType:     sdtypes.Leaf,
			LeafKey:      AccountLeafKey,
			NodeValue:    AccountLeafNode,
			StorageNodes: []sdtypes.StorageNode{},
		},
		{
			Path:      []byte{'\x02'},
			NodeType:  sdtypes.Removed,
			LeafKey:   RemovedLeafKey,
			NodeValue: []byte{},
		},
	}
)

Test variables

Functions

func DetectAndSegmentRangeByChunkSize

func DetectAndSegmentRangeByChunkSize(readConf postgres.Config, chunkSize uint64) ([][2]uint64, error)

DetectAndSegmentRangeByChunkSize finds the min and max block heights in the DB, and breaks the range up into segments based on the provided chunk size

func NewDB

func NewDB(ctx context.Context, conf postgres.Config) (*sqlx.DB, error)

NewDB returns a new sqlx.DB instance using the provided config

func NewTableReadModels

func NewTableReadModels(tableName TableName) (interface{}, error)

NewTableReadModels returns an allocation for the read DB models of the provided table

func NewTableTransformer

func NewTableTransformer(table TableName) interfaces.Transformer

NewTableTransformer inits and returns a Transformers for the provided tables

func NewTableTransformerSet

func NewTableTransformerSet(tables []TableName) map[TableName]interfaces.Transformer

NewTableTransformerSet inits and returns a set of Transformers for the provided tables

func SegmentRangeByChunkSize

func SegmentRangeByChunkSize(chunkSize, start, stop uint64) [][2]uint64

SegmentRangeByChunkSize splits the provided range up into segments based on the desired size of the segments

Types

type Config

type Config struct {
	ReadDB          postgres.Config
	WriteDB         postgres.Config
	WorkersPerTable int
}

Config struct holds the configuration params for a Migrator

func NewConfig

func NewConfig() *Config

NewConfig returns a new Config

type Migrator

type Migrator interface {
	Migrate(wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error)
	Transfer(wg *sync.WaitGroup, fdwTableName string, segmentSize, segmentOffset, maxPage uint64) (chan [2]uint64, chan struct{}, chan error, error)
	TransformToCSV(csvWriter csv.Writer, wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error)
	io.Closer
}

Migrator interface for migrating from v2 DB to v3 DB

func NewMigrator

func NewMigrator(ctx context.Context, conf *Config) (Migrator, error)

NewMigrator returns a new Migrator from the given Config

type MinAndMux

type MinAndMux struct {
	Min string `db:"min"`
	Max string `db:"max"`
}

MinAndMux struct to hold min and max block_number values

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader struct for reading v2 DB eth.log_cids models

func NewReader

func NewReader(db *sqlx.DB) *Reader

NewReader satisfies interfaces.ReaderConstructor for eth.log_cids

func (*Reader) Close

func (r *Reader) Close() error

Close satisfies io.Closer

func (*Reader) Read

func (r *Reader) Read(blockRange [2]uint64, pgStr sql.ReadPgStr, models interface{}) error

Read satisfies interfaces.Reader for eth.log_cids Read is safe for concurrent use, as the only shared state is the concurrent safe *sqlx.DB

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service struct underpinning the Migrator interface

func (*Service) Close

func (s *Service) Close() error

Close satisfied io.Closer Close shuts down the Migrator, it quits all Migrate goroutines that are currently running whereas closing the chan returned by Migrate only closes the goroutines spun up by that method call

func (*Service) Migrate

func (s *Service) Migrate(wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64,
	chan [2]uint64, chan struct{}, chan struct{}, chan error)

Migrate satisfies Migrator Migrate spins up a goroutine to process the block ranges provided through the blockRanges work chan for the specified tables Migrate returns a channel for emitting read gaps and failed write ranges, a channel for signaling completion of the process, a quitChan for closing the single process, and a channel for writing out errors

func (*Service) Transfer added in v1.1.0

func (s *Service) Transfer(wg *sync.WaitGroup, fdwTableName string, segmentSize, segmentOffset, maxPage uint64) (chan [2]uint64,
	chan struct{}, chan error, error)

Transfer for transferring public.blocks to a new DB page-by-page Transfer assumes the targeted postgres_fdw is already in the db returns a chan for logging failed transfer page ranges, a chan for the errors that caused them, a chan for signalling success, and any error during initialization

func (*Service) TransformToCSV added in v1.1.0

func (s *Service) TransformToCSV(csvWriter csv.Writer, wg *sync.WaitGroup, tableName TableName,
	blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error)

TransformToCSV satisfies Migrator TransformToCSV spins up a goroutine to process the block ranges provided through the blockRanges work chan for the specified tables TransformToCSV returns a channel for emitting read gaps and failed write ranges, a channel for signaling completion of the process, a quitChan for closing the single process, and a channel for writing out errors

type TableName

type TableName string

TableName explicitly types table name strings

const (
	PublicNodes           TableName = "nodes"
	EthHeaders            TableName = "header_cids"
	EthUncles             TableName = "uncle_cids"
	EthTransactions       TableName = "transaction_cids"
	EthAccessListElements TableName = "access_list_elements"
	EthReceipts           TableName = "receipt_cids"
	EthLogs               TableName = "log_cids"
	EthLogsRepair         TableName = "log_cids_repair"
	EthState              TableName = "state_cids"
	EthAccounts           TableName = "state_accounts"
	EthStorage            TableName = "storage_cids"
	Unknown               TableName = "unknown"
)

func NewTableNameFromString

func NewTableNameFromString(tableNameStr string) (TableName, error)

NewTableNameFromString returns the TableName from the provided string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL