migrator

package module
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2023 License: Apache-2.0 Imports: 16 Imported by: 1

README

MIGRATOR

Build Status Go Report Card GoDoc

ETL / data migrator.

Parameters

Parameter Type Default Description
BatchSize integer 1000 Extractor: Number of rows polled from the source database at a time
Debug bool false Show additional debugging information
InsertBatchSize integer 100 Loader: Number of rows inserted per statement
OnlyPast bool false Extractor(timestamp): Only poll for timestamps in the past ( #1 )
SequentialReplace bool false Loader: Use REPLACE instead of INSERT for sequentially extracted data.
SleepBetweenRuns integer 5 Migrator: Seconds to sleep when no data has been found

Extractors

  • Sequential: Tracks status via a table's primary key to see whether or not the table entries have been migrated. Useful for RO data which is written in sequence and not updated.
  • Timestamp: Tracks status via a table's written timestamp column to determine whether table entries have been migrated from that point on.
  • Queue: Tracks status via a triggered table which contains indexed entries which need to be migrated. This requires modification of the source database to include Insert and Update triggers. Useful for all kinds of data, but needs modification to source database.

Tracking Table

CREATE TABLE `EtlTracking` (
	sourceDatabase		VARCHAR(100) DEFAULT '',
	sourceTable		VARCHAR(100) DEFAULT '',
	columnName		VARCHAR(100) DEFAULT '',
	sequentialPosition	BIGINT DEFAULT 0,
	timestampPosition	TIMESTAMP NULL DEFAULT NULL,
	lastRun			TIMESTAMP NULL DEFAULT NULL
);

RecordQueue Table

CREATE TABLE `MigratorRecordQueue` (
	sourceDatabase		VARCHAR(100) NOT NULL,
	sourceTable			VARCHAR(100) NOT NULL,
	pkColumn 			VARCHAR(100) NOT NULL,
	pkValue 			VARCHAR(100) NOT NULL,
	timestampUpdated 	TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

	KEY (sourceDatabase, sourceTable),
	KEY (timestampUpdated)
);

Documentation

Index

Constants

View Source
const (
	// S_NEW is the status of a new migrator instance
	S_NEW = 0
	// S_TERMINATED is the migrator instance not running due to
	// intervention. This differs from S_STOPPED because it cannot be
	// restarted and should only be used for process termination.
	S_TERMINATED = 1
	// S_RUNNING is the status of a migrator which has been initialized
	S_RUNNING = 2
	// S_PAUSING is the status of a migrator when a stop has been
	// requested
	S_STOPPING = 3
	// S_STOPPED is the status of a migrator when it has been stopped
	S_STOPPED = 4
	// S_STARTING is the status of a migrator when a start has been
	// requested
	S_STARTING = 5
	// S_PAUSED is the status of a migrator when a pause has been
	// implemented
	S_PAUSED = 6
	// S_INVALID represents an invalid state
	S_INVALID = -1
)

Variables

View Source
var (
	// DefaultBatchSize represents the default size of extracted batches
	DefaultBatchSize = 1000
	// TrackingTableName represents the name of the database table used
	// to track TrackingStatus instances, and exists within the target
	// database.
	TrackingTableName = "EtlPosition"
	// TransformerMap is a map of Transformer functions which can be used
	// to instantiate a Transformer based only on a string.
	TransformerMap = make(map[string]Transformer)
	// ExtractorMap is a map of Extractor functions which can be used
	// to instantiate an Extractor based only on a string.
	ExtractorMap = make(map[string]Extractor)
	// RecordQueueTable is the table name for the non-update field
	// capable entries.
	RecordQueueTable = "MigratorRecordQueue"
	// ParamMethod is the parameter name which specifies the insert or
	// update method being used by portions of the migrator.
	ParamMethod = "METHOD"
	// ParamInsertBatchSize is the parameter used by the default loader
	// to batch queries. Int, defaults to 1000.
	ParamInsertBatchSize = "InsertBatchSize"
	// ParamDebug is the parameter used to enable basic debugging
	// code in modules. Boolean, defaults to false.
	ParamDebug = "Debug"
	// ParamLowLevelDebug is the parameter used to enable lower level
	// debugging code in modules. It is boolean and defaults to false.
	ParamLowLevelDebug = "LowLevelDebug"
	// ParamBatchSize is the parameter used to specify general batch
	// processing size for polling records from the database. Int,
	// defaults to 100.
	ParamBatchSize = "BatchSize"
	// ParamOnlyPast is the parameter for timestamp-based polling which
	// only polls for timestamps in the past. Boolean, defaults to
	// false.
	ParamOnlyPast = "OnlyPast"
	// ParamSequentialReplace is the parameter for loading which uses
	// REPLACE instead of INSERT for sequentially extracted data. Boolean,
	// defaults to false.
	ParamSequentialReplace = "SequentialReplace"
	// ParamTableName is the parameter for an adjusted table name.
	// String, defaults to "".
	ParamTableName = "TableName"
	// ParamSleepBetweenRuns is the parameter which defines the amount of
	// time between runs in seconds. Int, defaults to 5.
	ParamSleepBetweenRuns = "SleepBetweenRuns"
)
View Source
var DefaultLoader = func(db *sql.DB, tables []TableData, params *Parameters) error {
	var err error

	size := paramInt(*params, ParamInsertBatchSize, 100)

	for _, table := range tables {
		tag := "DefaultLoader(" + table.DbName + "." + table.TableName + "): "
		tsStart := time.Now()

		rowsByMethod := make(map[string][]SQLUntypedRow, 0)
		for _, r := range table.Data {
			if _, ok := rowsByMethod[r.Method]; !ok {
				rowsByMethod[r.Method] = make([]SQLUntypedRow, 0)
			}
			rowsByMethod[r.Method] = append(rowsByMethod[r.Method], r.Data)
		}

		for method := range rowsByMethod {
			logger.Debugf(tag+"Beginning transaction, InsertBatchSize == %d", size)
			tx, err := db.Begin()
			if err != nil {
				logger.Errorf(tag + "Transaction start: " + err.Error())
				return err
			}
			switch method {
			case "REPLACE":
				logger.Printf(tag + "Method REPLACE")
				err = BatchedReplace(tx, table.TableName, rowsByMethod[method], size, params)

			case "INSERT":
				logger.Printf(tag + "Method INSERT")
				err = BatchedInsert(tx, table.TableName, rowsByMethod[method], size, params)
				break

			case "REMOVE":
				logger.Printf(tag + "Method REMOVE")
				err = BatchedRemove(tx, table.TableName, rowsByMethod[method], size, params)
				break

			default:
				logger.Debugf(tag+"Unknown method '%s' present, falling back on REPLACE", method)
				err = BatchedReplace(tx, table.TableName, rowsByMethod[method], size, params)
				break
			}
			if err != nil {
				logger.Warnf(tag + "Rolling back transaction")
				err2 := tx.Rollback()
				if err2 != nil {
					logger.Errorf(tag + "Error during rollback: " + err2.Error())
				}
				return err
			}

			logger.Infof(tag+"Duration to insert %d rows: %s", len(table.Data), time.Since(tsStart).String())

			logger.Debugf(tag + "Committing transaction")
			err = tx.Commit()
			if err != nil {
				logger.Errorf(tag + "Error during commit: " + err.Error())
			}
		}
	}

	return err
}

DefaultLoader represents a default Loader instance.

View Source
var DefaultTransformer = func(dbName, tableName string, data []SQLRow, params *Parameters) []TableData {
	method, ok := (*params)[ParamMethod].(string)
	if !ok {
		method = ""
	}
	return []TableData{
		{
			DbName:    dbName,
			TableName: tableName,
			Data:      data,
			Method:    method,
		},
	}
}

DefaultTransformer by default does nothing -- the data is not transformed.

View Source
var ExtractorQueue = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) {
	batchSize := paramInt(*params, "BatchSize", DefaultBatchSize)
	debug := paramBool(*params, ParamDebug, false)

	tag := fmt.Sprintf("ExtractorQueue[%s.%s]: ", dbName, tableName)

	moreData := false

	if debug {
		logger.Debugf(tag+"Beginning run with params %#v", params)
	}

	data := make([]SQLRow, 0)

	tsStart := time.Now()

	rowsToProcess, err := db.Query("SELECT * FROM `"+RecordQueueTable+"` WHERE sourceDatabase = ? AND sourceTable = ? ORDER BY timestampUpdated LIMIT ?",
		dbName, tableName, DefaultBatchSize)
	if err != nil {
		logger.Errorf(tag+"Error extracting queue rows: %s", err.Error())
		return false, data, ts, err
	}
	dataCount := 0
	for rowsToProcess.Next() {
		rq := RecordQueue{Db: db}
		err := rowsToProcess.Scan(
			&(rq.SourceDatabase),
			&(rq.SourceTable),
			&(rq.PrimaryKeyColumnName),
			&(rq.PrimaryKeyColumnValue),
			&(rq.TimestampUpdated),
			&(rq.Method),
		)
		if err != nil {
			logger.Errorf(tag + "Queue Scan: " + err.Error())
			return false, data, ts, err
		}

		if rq.Method == "REMOVE" {
			if debug {
				logger.Debugf(tag+"Found REMOVE -- processing : %#v", rq)
			}
			rowData := SQLRow{}
			rowData.Method = "REMOVE"
			rowData.Data = SQLUntypedRow{}
			rowData.Data[rq.PrimaryKeyColumnName] = rq.PrimaryKeyColumnValue
			data = append(data, rowData)
			err = rq.Remove()
			if err != nil {
				return false, data, ts, err
			}
			continue
		}

		var rows *sql.Rows
		if strings.Index(rq.PrimaryKeyColumnName, ",") != -1 {

			qs := "SELECT * FROM `" + tableName + "` WHERE "
			for iter, x := range strings.Split(rq.PrimaryKeyColumnName, ",") {
				if iter != 0 {
					qs += " AND "
				}
				qs += "`" + x + "` = ? "
			}
			qs += " LIMIT 1"
			qvRaw := strings.Split(rq.PrimaryKeyColumnValue, ",")
			qv := []any{}
			for _, v := range qvRaw {
				qv = append(qv, v)
			}
			rows, err = db.Query(qs, qv...)
		} else {
			rows, err = db.Query("SELECT * FROM `"+tableName+"` WHERE `"+rq.PrimaryKeyColumnName+"` = ? LIMIT 1", rq.PrimaryKeyColumnValue)
		}
		if err != nil {
			return false, data, ts, err
		}
		defer rows.Close()
		cols, err := rows.Columns()
		if err != nil {
			return false, data, ts, err
		}
		if debug {
			logger.Debugf(tag+"Columns %v", cols)
		}
		for rows.Next() {
			dataCount++
			scanArgs := make([]any, len(cols))
			values := make([]any, len(cols))
			for i := range values {
				scanArgs[i] = &values[i]
			}

			err = rows.Scan(scanArgs...)
			if err != nil {
				logger.Errorf(tag + "Scan: " + err.Error())
				return false, data, ts, err
			}

			rowData := SQLRow{}
			rowData.Method = "REPLACE"
			rowData.Data = make(SQLUntypedRow, len(cols))
			for i := range cols {
				rowData.Data[cols[i]] = values[i]
			}
			data = append(data, rowData)

		}
		err = rq.Remove()
		if err != nil {
			logger.Warnf(tag+"Error removing record queue entry: %s", err.Error())
		}
	}

	logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String())

	if dataCount == 0 {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		return false, data, ts, nil
	}

	if dataCount < batchSize {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		moreData = false
	} else {
		if debug {
			logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount)
		}
		moreData = true
	}

	newTs := &TrackingStatus{
		Db:             ts.Db,
		SourceDatabase: ts.SourceDatabase,
		SourceTable:    ts.SourceTable,
		ColumnName:     ts.ColumnName,

		SequentialPosition: ts.SequentialPosition,
		LastRun:            NullTimeNow(),
	}

	(*params)[ParamMethod] = "REPLACE"
	return moreData, data, *newTs, nil
}

ExtractorQueue is an Extractor instance which uses a table which is triggered by INSERT or UPDATE to notify the extractor that it needs to replicate a row.

View Source
var ExtractorSequential = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) {
	batchSize := paramInt(*params, ParamBatchSize, DefaultBatchSize)
	sequentialReplace := paramBool(*params, ParamSequentialReplace, false)
	debug := paramBool(*params, ParamDebug, false)

	tag := fmt.Sprintf("ExtractorSequential[%s.%s]: ", dbName, tableName)

	moreData := false

	if debug {
		logger.Debugf(tag+"Beginning run with params %#v", params)
	}

	data := make([]SQLRow, 0)
	minSeq := int64(math.MaxInt64)
	var maxSeq int64

	tsStart := time.Now()

	if debug {
		logger.Printf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > %d LIMIT %d\"", ts.SequentialPosition, batchSize)
	}
	rows, err := db.Query("SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > ? LIMIT ?", ts.SequentialPosition, batchSize)
	if err != nil {
		logger.Errorf(tag + "ERR: " + err.Error())
		return false, data, ts, err
	}
	defer rows.Close()
	cols, err := rows.Columns()
	if err != nil {
		return false, data, ts, err
	}
	if debug {
		logger.Debugf(tag+"Columns %v", cols)
	}
	dataCount := 0
	for rows.Next() {
		dataCount++
		scanArgs := make([]any, len(cols))
		values := make([]any, len(cols))
		for i := range values {
			scanArgs[i] = &values[i]
		}

		err = rows.Scan(scanArgs...)
		if err != nil {
			logger.Errorf(tag + "Scan: " + err.Error())
			return false, data, ts, err
		}

		rowData := SQLRow{}
		if sequentialReplace {
			rowData.Method = "REPLACE"
		} else {
			rowData.Method = "INSERT"
		}
		rowData.Data = make(SQLUntypedRow, len(cols))
		for i := range cols {
			rowData.Data[cols[i]] = values[i]
		}
		data = append(data, rowData)

		seqno, ok := rowData.Data[ts.ColumnName].(int64)
		if !ok {
			logger.Errorf(tag+"ERROR: Unable to process table %s due to column %s not being an integer", dbName+"."+tableName, ts.ColumnName)
			return false, data, ts, nil
		}
		minSeq = int64min(minSeq, seqno)
		maxSeq = int64max(maxSeq, seqno)
	}

	logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String())

	if dataCount == 0 {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		return false, data, ts, nil
	}

	if dataCount < batchSize {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		moreData = false
	} else {
		if debug {
			logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount)
		}
		moreData = true
	}

	logger.Infof(tag+"%s seq value range %d - %d", ts.ColumnName, minSeq, maxSeq)

	newTs := &TrackingStatus{
		Db:             ts.Db,
		SourceDatabase: ts.SourceDatabase,
		SourceTable:    ts.SourceTable,
		ColumnName:     ts.ColumnName,

		SequentialPosition: maxSeq,
		LastRun:            NullTimeNow(),
	}

	if sequentialReplace {
		(*params)[ParamMethod] = "REPLACE"
	} else {
		(*params)[ParamMethod] = "INSERT"
	}

	return moreData, data, *newTs, nil
}

ExtractorSequential is an Extractor instance which uses the primary key sequence to determine which rows should be extracted from the source database table.

View Source
var ExtractorTimestamp = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) {
	batchSize := paramInt(*params, ParamBatchSize, DefaultBatchSize)
	debug := paramBool(*params, ParamDebug, false)
	onlyPast := paramBool(*params, ParamOnlyPast, false)

	tag := fmt.Sprintf("ExtractorTimestamp[%s.%s]: ", dbName, tableName)

	moreData := false

	if debug {
		logger.Debugf(tag+"Beginning run with params %#v", params)
	}

	data := make([]SQLRow, 0)
	var maxStamp time.Time

	tsStart := time.Now()

	if debug {
		if onlyPast {
			logger.Debugf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > %v AND `"+ts.ColumnName+"` <= NOW() LIMIT %d\"", ts.TimestampPosition, batchSize)
		} else {
			logger.Debugf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > %v LIMIT %d\"", ts.TimestampPosition, batchSize)
		}
	}
	var rows *sql.Rows
	var err error
	if onlyPast {
		rows, err = db.Query("SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > ? AND `"+ts.ColumnName+"` <= NOW() LIMIT ?", ts.TimestampPosition, batchSize)
	} else {
		rows, err = db.Query("SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > ? LIMIT ?", ts.TimestampPosition, batchSize)
	}
	if err != nil {
		logger.Errorf(tag + "ERR: " + err.Error())
		return false, data, ts, err
	}
	defer rows.Close()
	cols, err := rows.Columns()
	if err != nil {
		return false, data, ts, err
	}
	if debug {
		logger.Debugf(tag+"Columns %v", cols)
	}
	dataCount := 0
	for rows.Next() {
		dataCount++
		scanArgs := make([]any, len(cols))
		values := make([]any, len(cols))
		for i := range values {
			scanArgs[i] = &values[i]
		}

		err = rows.Scan(scanArgs...)
		if err != nil {
			logger.Errorf(tag + "Scan: " + err.Error())
			return false, data, ts, err
		}

		rowData := SQLRow{}
		rowData.Data = make(SQLUntypedRow, len(cols))
		for i := range cols {
			rowData.Data[cols[i]] = values[i]
		}
		data = append(data, rowData)

		timestamp, ok := rowData.Data[ts.ColumnName].(time.Time)
		if !ok {
			logger.Errorf(tag+"ERROR: Unable to process table %s due to column %s not being a Time", dbName+"."+tableName, ts.ColumnName)
			return false, data, ts, err
		}
		maxStamp = timemax(maxStamp, timestamp)
	}

	logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String())

	if dataCount == 0 {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		return false, data, ts, nil
	}

	if dataCount < batchSize {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		moreData = false
	} else {
		if debug {
			logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount)
		}
		moreData = true
	}

	if debug {
		logger.Debugf(tag+"%s high timestamp value %#v", ts.ColumnName, maxStamp)
	}
	err = SetTrackingStatusTimestamp(ts.Db, dbName, tableName, maxStamp)

	newTs := &TrackingStatus{
		Db:             ts.Db,
		SourceDatabase: ts.SourceDatabase,
		SourceTable:    ts.SourceTable,
		ColumnName:     ts.ColumnName,

		TimestampPosition: NullTimeFromTime(maxStamp),
		LastRun:           NullTimeFromTime(tsStart),
	}

	(*params)[ParamMethod] = "REPLACE"

	return moreData, data, *newTs, err
}

ExtractorTimestamp is an Extractor instance which uses a DATETIME/TIMESTAMP field to determine which rows to pull from the source database table.

View Source
var ExtractorTimestampFallback = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) {
	batchSize := paramInt(*params, ParamBatchSize, DefaultBatchSize)
	debug := paramBool(*params, ParamDebug, false)

	tag := fmt.Sprintf("ExtractorTimestampFallback[%s.%s]: ", dbName, tableName)

	moreData := false

	if debug {
		logger.Debugf(tag+"Beginning run with params %#v", params)
	}

	data := make([]SQLRow, 0)
	var maxStamp time.Time

	tsStart := time.Now()

	colnames := strings.Split(ts.ColumnName, ",")
	if len(colnames) < 2 {
		err := fmt.Errorf("Requires two columns separated by a comma")
		logger.Errorf(tag + "ERR: " + err.Error())
		return false, data, ts, err
	}

	if debug {
		logger.Printf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE IFNULL(`"+colnames[0]+"`,`"+colnames[1]+"`) > %v LIMIT %d\"", ts.TimestampPosition, batchSize)
	}
	rows, err := db.Query("SELECT * FROM `"+tableName+"` WHERE IFNULL(`"+colnames[0]+"`,`"+colnames[1]+"`) > ? LIMIT ?", ts.TimestampPosition, batchSize)
	if err != nil {
		logger.Errorf(tag + "ERR: " + err.Error())
		return false, data, ts, err
	}
	defer rows.Close()
	cols, err := rows.Columns()
	if err != nil {
		return false, data, ts, err
	}
	if debug {
		logger.Debugf(tag+"Columns %v", cols)
	}
	dataCount := 0
	for rows.Next() {
		dataCount++
		scanArgs := make([]any, len(cols))
		values := make([]any, len(cols))
		for i := range values {
			scanArgs[i] = &values[i]
		}

		err = rows.Scan(scanArgs...)
		if err != nil {
			logger.Errorf(tag + "Scan: " + err.Error())
			return false, data, ts, err
		}

		rowData := SQLRow{}
		rowData.Data = make(SQLUntypedRow, len(cols))
		for i := range cols {
			rowData.Data[cols[i]] = values[i]
		}
		data = append(data, rowData)

		timestamp, ok := rowData.Data[ts.ColumnName].(time.Time)
		if !ok {
			logger.Errorf(tag+"ERROR: Unable to process table %s due to column %s not being a Time", dbName+"."+tableName, ts.ColumnName)
			return false, data, ts, err
		}
		maxStamp = timemax(maxStamp, timestamp)
	}

	logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String())

	if dataCount == 0 {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		return false, data, ts, nil
	}

	if dataCount < batchSize {
		if debug {
			logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount)
		}
		moreData = false
	} else {
		if debug {
			logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount)
		}
		moreData = true
	}

	if debug {
		logger.Debugf(tag+"%s high timestamp value %#v", ts.ColumnName, maxStamp)
	}
	err = SetTrackingStatusTimestamp(ts.Db, dbName, tableName, maxStamp)

	newTs := &TrackingStatus{
		Db:             ts.Db,
		SourceDatabase: ts.SourceDatabase,
		SourceTable:    ts.SourceTable,
		ColumnName:     ts.ColumnName,

		TimestampPosition: NullTimeFromTime(maxStamp),
		LastRun:           NullTimeFromTime(tsStart),
	}

	(*params)[ParamMethod] = "REPLACE"

	return moreData, data, *newTs, err
}

ExtractorTimestampFallback is an Extractor instance which uses a DATETIME/TIMESTAMP field to determine which rows to pull from the source database table.

View Source
var TableRenamerTransformer = func(dbName, tableName string, data []SQLRow, params *Parameters) []TableData {
	debug := paramBool(*params, ParamDebug, false)

	method, ok := (*params)[ParamMethod].(string)
	if !ok {
		method = ""
	}

	newTableName, ok := (*params)[ParamTableName].(string)
	if !ok {
		if debug {
			logger.Debugf("TableRenamerTransformer: parameter TableName not passed, retaining %s as name", tableName)
		}
		newTableName = tableName
	}

	return []TableData{
		{
			DbName:    dbName,
			TableName: newTableName,
			Data:      data,
			Method:    method,
		},
	}
}

TableRenamerTransformer adjusts the table name of a destination table based on the "TableName" parameter passed.

Functions

func BatchedInsert

func BatchedInsert(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error

BatchedInsert takes an array of SQL data rows and creates a series of batched inserts to insert the data into an existing sql.Tx (transaction) object.

func BatchedQuery

func BatchedQuery(tx *sql.Tx, table string, data []SQLUntypedRow, size int, op string, params *Parameters) error

BatchedQuery takes an array of SQL data rows and creates a series of batched queries to insert/replace the data into an existing sql.Tx (transaction) object.

func BatchedRemove

func BatchedRemove(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error

BatchedRemove takes an array of SQL data rows and creates a series of DELETE FROM statements to remove the data in an existing sql.Tx (transaction) object.

func BatchedReplace

func BatchedReplace(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error

BatchedReplace takes an array of SQL data rows and creates a series of batched replaces to replace the data into an existing sql.Tx (transaction) object.

func CreateTrackingTable

func CreateTrackingTable(db *sql.DB) error

CreateTrackingTable attempts to create the tracking table for the specified database connection. If the table already exists, this does nothing.

func FileExists

func FileExists(name string) bool

FileExists reports whether the named file or directory exists.

func GetTrackingStatusSequential

func GetTrackingStatusSequential(db *sql.DB, sourceDatabase, sourceTable string) (int64, error)

GetTrackingStatusSequential retrieves the sequentialPosition for a TrackingStatus from its underlying database table.

func ParseDSN

func ParseDSN(name string) apmsql.DSNInfo

ParseDSN parses the given go-sql-driver/mysql datasource name.

func RemoveRecordQueueItem

func RemoveRecordQueueItem(db *sql.DB, sourceDatabase, sourceTable, pkColumn, pkValue string) error

RemoveRecordQueueItem removes an item from the record queue

func SerializeNewTrackingStatus

func SerializeNewTrackingStatus(tt TrackingStatus) error

SerializeNewTrackingStatus serializes a TrackingStatus object to its database table.

func SerializeTrackingStatus

func SerializeTrackingStatus(db *sql.DB, ts TrackingStatus) error

SerializeTrackingStatus serializes a copy of an actively modified TrackingStatus to its underlying database table.

func SetLogger

func SetLogger(l *log.Logger)

SetLogger sets a logrus Logger object used by the migrator

func SetTrackingStatusSequential

func SetTrackingStatusSequential(db *sql.DB, sourceDatabase, sourceTable string, seq int64) error

SetTrackingStatusSequential updates a TrackingStatus object's sequentialPosition in its underlying database table.

func SetTrackingStatusTimestamp

func SetTrackingStatusTimestamp(db *sql.DB, sourceDatabase, sourceTable string, stamp time.Time) error

SetTrackingStatusTimestamp updates a TrackingStatus object's timestampPosition in its underlying database table.

Types

type Extractor

Extractor is a callback function type

type Iteration

type Iteration struct {
	// DestinationTable defines the table name where data will be pushed
	// by the Loader.
	DestinationTable string

	// SourceTable defines the table name where data will be pulled
	// for the Extractor.
	SourceTable string

	// SourceKey is the key field which is used to determine position.
	// This is only specified for the creation of the tracking
	// table if necessary.
	SourceKey string

	// Parameters are a map of arbitrary values / structures which are
	// passed to all of the constituent functions except for Transformer
	// ( Extractor, Loader ) in the Migrator.
	Parameters *Parameters

	// Extractor represents the Extractor callback.
	Extractor Extractor

	// ExtractorName represents the name of the extractor used
	ExtractorName string

	// Transformer represents the Transformer callback. This should be,
	// at a minimum, set to DefaultTransformer if there is no conversion
	// set to take place.
	Transformer Transformer

	// TransformerParameters are a map of arbitrary parameters specific
	// to transformers.
	TransformerParameters *Parameters

	// LoaderName represents the name of the loader used
	LoaderName string

	// Loader represents the Loader callback.
	Loader Loader
}

Iteration defines the individual sub-migrator configuration which replicates a single table

type Loader

type Loader func(*sql.DB, []TableData, *Parameters) error

Loader is a callback function type

type Migrator

type Migrator struct {
	// SourceDsn represents the DSN (data source name) for the source
	// table. Format is:
	// https://github.com/go-sql-driver/mysql#dsn-data-source-name
	SourceDsn *mysql.Config

	// DestinationDsn represents the DSN (data source name) for the
	// destination table. Format is:
	// https://github.com/go-sql-driver/mysql#dsn-data-source-name
	DestinationDsn *mysql.Config

	// Iterations represents all of the actual migrations being performed.
	Iterations []Iteration

	// Apm determines whether APM support will be enabled or disabled
	Apm bool

	// Parameters are a map of arbitrary values / structures which are
	// passed to all of the constituent functions except for Transformer
	// ( Extractor, Loader ) in the Migrator.
	Parameters *Parameters

	// ErrorCallback represents a logging callback for errors
	ErrorCallback func(map[string]string, error)
	// contains filtered or unexported fields
}

Migrator represents an object which encompasses an entire end-to-end ETL process.

func (*Migrator) Close

func (m *Migrator) Close()

Close forcibly closes the database connections for the Migrator instance and marks it as being uninitialized.

func (*Migrator) GetTrackingStatus

func (m *Migrator) GetTrackingStatus(iter Iteration) (TrackingStatus, error)

GetTrackingStatus retrieves the live tracking status for an Iteration from the destination database tracking table

func (Migrator) GetWaitGroup

func (m Migrator) GetWaitGroup() *sync.WaitGroup

GetWaitGroup returns the wait group instance being used

func (*Migrator) Init

func (m *Migrator) Init() error

Init initializes the underlying MySQL database connections for the Migrator instance.

func (*Migrator) Pause added in v0.1.9

func (m *Migrator) Pause() error

Pause will "pause" the migrator

func (*Migrator) Quit

func (m *Migrator) Quit() error

Quit is the method which should be used as the "preferred method" for terminating a Migrator instance.

func (*Migrator) Run

func (m *Migrator) Run() error

Run spins off a goroutine with a running migrator until the corresponding Quit() method is called.

func (*Migrator) SerializeTrackingStatus

func (m *Migrator) SerializeTrackingStatus(ts TrackingStatus) error

SerializeTrackingStatus serializes a live tracking status for the current migrator.

func (*Migrator) SetErrorCallback

func (m *Migrator) SetErrorCallback(f func(map[string]string, error))

SetErrorCallback sets the error callback function

func (*Migrator) SetState added in v0.1.9

func (m *Migrator) SetState(s MigratorState)

SetState sets the current state of the migrator

func (*Migrator) SetWaitGroup

func (m *Migrator) SetWaitGroup(wg *sync.WaitGroup)

SetWaitGroup sets the wait group instance being used

func (Migrator) State added in v0.1.9

func (m Migrator) State() MigratorState

State returns the current state of the migrator

func (*Migrator) Unpause added in v0.1.10

func (m *Migrator) Unpause() error

Unpause will "un-pause" the migrator

type MigratorState added in v0.1.9

type MigratorState int

func MigratorStateFromString added in v0.1.9

func MigratorStateFromString(s string) (MigratorState, error)

MigratorStateFromString derives a migrator state from a string

func (MigratorState) String added in v0.1.9

func (m MigratorState) String() string

type NullTime

type NullTime struct {
	Time  time.Time
	Valid bool
}

NullTime represents a time.Time object which can also represent a NULL DATETIME / TIMESTAMP value in MySQL.

func GetTrackingStatusTimestamp

func GetTrackingStatusTimestamp(db *sql.DB, sourceDatabase, sourceTable string) (NullTime, error)

GetTrackingStatusTimestamp retrieves the timestampPosition for a TrackingStatus from its underlying database table.

func NullTimeFromTime

func NullTimeFromTime(t time.Time) NullTime

NullTimeFromTime creates a new NullTime instance with the specified time.

func NullTimeNow

func NullTimeNow() NullTime

NullTimeNow creates a new NullTime instance representing the current time.

func (NullTime) MarshalJSON

func (nt NullTime) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface for encoding/json.

func (*NullTime) Scan

func (nt *NullTime) Scan(value any) error

Scan implements the Scanner interface.

func (NullTime) Unix

func (nt NullTime) Unix() int64

Unix exposes the underlying Unix() call of the wrapped time.Time structure.

func (*NullTime) UnmarshalJSON

func (nt *NullTime) UnmarshalJSON(data []byte) (err error)

UnmarshalJSON implements the json.Unmarshaler interface for encoding/json.

func (NullTime) Value

func (nt NullTime) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type Parameters

type Parameters map[string]any

Parameters represents a series of untyped parameters which are passed to Extractors, Transformers, and Loaders. All stages of the ETL process receive the same parameters.

type PersistenceQueue

type PersistenceQueue struct {
	// contains filtered or unexported fields
}

PersistenceQueue is a wrapper around goque, a LevelDB instance wrapped around some usage code. It is used to serialize queue items to disk in case of failure.

func OpenQueue

func OpenQueue(path string) (PersistenceQueue, error)

OpenQueue creates an instance of a FIFO queue

func (*PersistenceQueue) AddItem

func (pq *PersistenceQueue) AddItem(item any) error

AddItem adds an item to the FIFO queue

func (*PersistenceQueue) Close

func (pq *PersistenceQueue) Close()

Close closes the underlying FIFO queue

func (*PersistenceQueue) GrabItem

func (pq *PersistenceQueue) GrabItem(item any, fn func(any) error) error

GrabItem takes an item off of the FIFO queue

type RecordQueue

type RecordQueue struct {
	Db                    *sql.DB
	SourceDatabase        string    `db:"sourceDatabase"`
	SourceTable           string    `db:"sourceTable"`
	PrimaryKeyColumnName  string    `db:"pkColumn"`
	PrimaryKeyColumnValue string    `db:"pkValue"`
	SequentialPosition    int64     `db:"sequentialPosition"`
	TimestampUpdated      time.Time `db:"timestampPosition"`
	Method                string    `db:"method"`
}

RecordQueue is the table definition for the tracking table which is used for timestamp updated tables which do not have a lastUpdated or equivalent field.

func (RecordQueue) Remove

func (t RecordQueue) Remove() error

Remove removes an entry from the record queue

func (RecordQueue) String

func (t RecordQueue) String() string

String produces a human readable representation of a TrackingStatus object.

type SQLRow

type SQLRow struct {
	Data   SQLUntypedRow
	Method string
}

SQLRow represents a single row of SQL data with an action associated with it

type SQLUntypedRow

type SQLUntypedRow map[string]any

SQLUntypedRow represents a single row of SQL data which is not strongly typed to a structure. This obviates the need to create Golang-level language structures to represent tables.

type TableData

type TableData struct {
	DbName    string
	TableName string
	Data      []SQLRow
	Method    string // only used with loader, specifies INSERT/REPLACE
}

TableData represents identifying information and data for a table.

type TrackingStatus

type TrackingStatus struct {
	Db                 *sql.DB  `json:"-"`
	SourceDatabase     string   `json:"source-database" db:"sourceDatabase"`
	SourceTable        string   `json:"source-table" db:"sourceTable"`
	ColumnName         string   `json:"column-name" db:"columnName"`
	SequentialPosition int64    `json:"sequential-position" db:"sequentialPosition"`
	TimestampPosition  NullTime `json:"timestamp-position" db:"timestampPosition"`
	LastRun            NullTime `json:"last-run" db:"lastRun"`
}

TrackingStatus is the table definition for the tracking table which maintains the ETL positioning

func GetTrackingStatus

func GetTrackingStatus(db *sql.DB, sourceDatabase, sourceTable string) (TrackingStatus, error)

GetTrackingStatus retrieves a TrackingStatus object from its underlying database table.

func (TrackingStatus) String

func (t TrackingStatus) String() string

String produces a human readable representation of a TrackingStatus object.

type Transformer

type Transformer func(string, string, []SQLRow, *Parameters) []TableData

Transformer is a callback function type which transforms an array of untyped information into another array of untyped information. This is used for the "transform" step of the ETL process.

Directories

Path Synopsis
cmd
migrator Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL