Documentation ¶
Index ¶
- Constants
- Variables
- func BatchedInsert(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error
- func BatchedQuery(tx *sql.Tx, table string, data []SQLUntypedRow, size int, op string, ...) error
- func BatchedRemove(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error
- func BatchedReplace(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error
- func CreateTrackingTable(db *sql.DB) error
- func FileExists(name string) bool
- func GetTrackingStatusSequential(db *sql.DB, sourceDatabase, sourceTable string) (int64, error)
- func ParseDSN(name string) apmsql.DSNInfo
- func RemoveRecordQueueItem(db *sql.DB, sourceDatabase, sourceTable, pkColumn, pkValue string) error
- func SerializeNewTrackingStatus(tt TrackingStatus) error
- func SerializeTrackingStatus(db *sql.DB, ts TrackingStatus) error
- func SetLogger(l *log.Logger)
- func SetTrackingStatusSequential(db *sql.DB, sourceDatabase, sourceTable string, seq int64) error
- func SetTrackingStatusTimestamp(db *sql.DB, sourceDatabase, sourceTable string, stamp time.Time) error
- type Extractor
- type Iteration
- type Loader
- type Migrator
- func (m *Migrator) Close()
- func (m *Migrator) GetTrackingStatus(iter Iteration) (TrackingStatus, error)
- func (m Migrator) GetWaitGroup() *sync.WaitGroup
- func (m *Migrator) Init() error
- func (m *Migrator) Pause() error
- func (m *Migrator) Quit() error
- func (m *Migrator) Run() error
- func (m *Migrator) SerializeTrackingStatus(ts TrackingStatus) error
- func (m *Migrator) SetErrorCallback(f func(map[string]string, error))
- func (m *Migrator) SetState(s MigratorState)
- func (m *Migrator) SetWaitGroup(wg *sync.WaitGroup)
- func (m Migrator) State() MigratorState
- func (m *Migrator) Unpause() error
- type MigratorState
- type NullTime
- type Parameters
- type PersistenceQueue
- type RecordQueue
- type SQLRow
- type SQLUntypedRow
- type TableData
- type TrackingStatus
- type Transformer
Constants ¶
const ( // S_NEW is the status of a new migrator instance S_NEW = 0 // S_TERMINATED is the migrator instance not running due to // intervention. This differs from S_STOPPED because it cannot be // restarted and should only be used for process termination. S_TERMINATED = 1 // S_RUNNING is the status of a migrator which has been initialized S_RUNNING = 2 // S_PAUSING is the status of a migrator when a stop has been // requested S_STOPPING = 3 // S_STOPPED is the status of a migrator when it has been stopped S_STOPPED = 4 // S_STARTING is the status of a migrator when a start has been // requested S_STARTING = 5 // S_PAUSED is the status of a migrator when a pause has been // implemented S_PAUSED = 6 // S_INVALID represents an invalid state S_INVALID = -1 )
Variables ¶
var ( // DefaultBatchSize represents the default size of extracted batches DefaultBatchSize = 1000 // TrackingTableName represents the name of the database table used // to track TrackingStatus instances, and exists within the target // database. TrackingTableName = "EtlPosition" // TransformerMap is a map of Transformer functions which can be used // to instantiate a Transformer based only on a string. TransformerMap = make(map[string]Transformer) // ExtractorMap is a map of Extractor functions which can be used // to instantiate an Extractor based only on a string. ExtractorMap = make(map[string]Extractor) // RecordQueueTable is the table name for the non-update field // capable entries. RecordQueueTable = "MigratorRecordQueue" // ParamMethod is the parameter name which specifies the insert or // update method being used by portions of the migrator. ParamMethod = "METHOD" // ParamInsertBatchSize is the parameter used by the default loader // to batch queries. Int, defaults to 1000. ParamInsertBatchSize = "InsertBatchSize" // ParamDebug is the parameter used to enable basic debugging // code in modules. Boolean, defaults to false. ParamDebug = "Debug" // ParamLowLevelDebug is the parameter used to enable lower level // debugging code in modules. It is boolean and defaults to false. ParamLowLevelDebug = "LowLevelDebug" // ParamBatchSize is the parameter used to specify general batch // processing size for polling records from the database. Int, // defaults to 100. ParamBatchSize = "BatchSize" // ParamOnlyPast is the parameter for timestamp-based polling which // only polls for timestamps in the past. Boolean, defaults to // false. ParamOnlyPast = "OnlyPast" // ParamSequentialReplace is the parameter for loading which uses // REPLACE instead of INSERT for sequentially extracted data. Boolean, // defaults to false. ParamSequentialReplace = "SequentialReplace" // ParamTableName is the parameter for an adjusted table name. // String, defaults to "". ParamTableName = "TableName" // ParamSleepBetweenRuns is the parameter which defines the amount of // time between runs in seconds. Int, defaults to 5. ParamSleepBetweenRuns = "SleepBetweenRuns" )
var DefaultLoader = func(db *sql.DB, tables []TableData, params *Parameters) error { var err error size := paramInt(*params, ParamInsertBatchSize, 100) for _, table := range tables { tag := "DefaultLoader(" + table.DbName + "." + table.TableName + "): " tsStart := time.Now() rowsByMethod := make(map[string][]SQLUntypedRow, 0) for _, r := range table.Data { if _, ok := rowsByMethod[r.Method]; !ok { rowsByMethod[r.Method] = make([]SQLUntypedRow, 0) } rowsByMethod[r.Method] = append(rowsByMethod[r.Method], r.Data) } for method := range rowsByMethod { logger.Debugf(tag+"Beginning transaction, InsertBatchSize == %d", size) tx, err := db.Begin() if err != nil { logger.Errorf(tag + "Transaction start: " + err.Error()) return err } switch method { case "REPLACE": logger.Printf(tag + "Method REPLACE") err = BatchedReplace(tx, table.TableName, rowsByMethod[method], size, params) case "INSERT": logger.Printf(tag + "Method INSERT") err = BatchedInsert(tx, table.TableName, rowsByMethod[method], size, params) break case "REMOVE": logger.Printf(tag + "Method REMOVE") err = BatchedRemove(tx, table.TableName, rowsByMethod[method], size, params) break default: logger.Debugf(tag+"Unknown method '%s' present, falling back on REPLACE", method) err = BatchedReplace(tx, table.TableName, rowsByMethod[method], size, params) break } if err != nil { logger.Warnf(tag + "Rolling back transaction") err2 := tx.Rollback() if err2 != nil { logger.Errorf(tag + "Error during rollback: " + err2.Error()) } return err } logger.Infof(tag+"Duration to insert %d rows: %s", len(table.Data), time.Since(tsStart).String()) logger.Debugf(tag + "Committing transaction") err = tx.Commit() if err != nil { logger.Errorf(tag + "Error during commit: " + err.Error()) } } } return err }
DefaultLoader represents a default Loader instance.
var DefaultTransformer = func(dbName, tableName string, data []SQLRow, params *Parameters) []TableData { method, ok := (*params)[ParamMethod].(string) if !ok { method = "" } return []TableData{ { DbName: dbName, TableName: tableName, Data: data, Method: method, }, } }
DefaultTransformer by default does nothing -- the data is not transformed.
var ExtractorQueue = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) { batchSize := paramInt(*params, "BatchSize", DefaultBatchSize) debug := paramBool(*params, ParamDebug, false) tag := fmt.Sprintf("ExtractorQueue[%s.%s]: ", dbName, tableName) moreData := false if debug { logger.Debugf(tag+"Beginning run with params %#v", params) } data := make([]SQLRow, 0) tsStart := time.Now() rowsToProcess, err := db.Query("SELECT * FROM `"+RecordQueueTable+"` WHERE sourceDatabase = ? AND sourceTable = ? ORDER BY timestampUpdated LIMIT ?", dbName, tableName, DefaultBatchSize) if err != nil { logger.Errorf(tag+"Error extracting queue rows: %s", err.Error()) return false, data, ts, err } dataCount := 0 for rowsToProcess.Next() { rq := RecordQueue{Db: db} err := rowsToProcess.Scan( &(rq.SourceDatabase), &(rq.SourceTable), &(rq.PrimaryKeyColumnName), &(rq.PrimaryKeyColumnValue), &(rq.TimestampUpdated), &(rq.Method), ) if err != nil { logger.Errorf(tag + "Queue Scan: " + err.Error()) return false, data, ts, err } if rq.Method == "REMOVE" { if debug { logger.Debugf(tag+"Found REMOVE -- processing : %#v", rq) } rowData := SQLRow{} rowData.Method = "REMOVE" rowData.Data = SQLUntypedRow{} rowData.Data[rq.PrimaryKeyColumnName] = rq.PrimaryKeyColumnValue data = append(data, rowData) err = rq.Remove() if err != nil { return false, data, ts, err } continue } var rows *sql.Rows if strings.Index(rq.PrimaryKeyColumnName, ",") != -1 { qs := "SELECT * FROM `" + tableName + "` WHERE " for iter, x := range strings.Split(rq.PrimaryKeyColumnName, ",") { if iter != 0 { qs += " AND " } qs += "`" + x + "` = ? " } qs += " LIMIT 1" qvRaw := strings.Split(rq.PrimaryKeyColumnValue, ",") qv := []any{} for _, v := range qvRaw { qv = append(qv, v) } rows, err = db.Query(qs, qv...) } else { rows, err = db.Query("SELECT * FROM `"+tableName+"` WHERE `"+rq.PrimaryKeyColumnName+"` = ? LIMIT 1", rq.PrimaryKeyColumnValue) } if err != nil { return false, data, ts, err } defer rows.Close() cols, err := rows.Columns() if err != nil { return false, data, ts, err } if debug { logger.Debugf(tag+"Columns %v", cols) } for rows.Next() { dataCount++ scanArgs := make([]any, len(cols)) values := make([]any, len(cols)) for i := range values { scanArgs[i] = &values[i] } err = rows.Scan(scanArgs...) if err != nil { logger.Errorf(tag + "Scan: " + err.Error()) return false, data, ts, err } rowData := SQLRow{} rowData.Method = "REPLACE" rowData.Data = make(SQLUntypedRow, len(cols)) for i := range cols { rowData.Data[cols[i]] = values[i] } data = append(data, rowData) } err = rq.Remove() if err != nil { logger.Warnf(tag+"Error removing record queue entry: %s", err.Error()) } } logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String()) if dataCount == 0 { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } return false, data, ts, nil } if dataCount < batchSize { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } moreData = false } else { if debug { logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount) } moreData = true } newTs := &TrackingStatus{ Db: ts.Db, SourceDatabase: ts.SourceDatabase, SourceTable: ts.SourceTable, ColumnName: ts.ColumnName, SequentialPosition: ts.SequentialPosition, LastRun: NullTimeNow(), } (*params)[ParamMethod] = "REPLACE" return moreData, data, *newTs, nil }
ExtractorQueue is an Extractor instance which uses a table which is triggered by INSERT or UPDATE to notify the extractor that it needs to replicate a row.
var ExtractorSequential = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) { batchSize := paramInt(*params, ParamBatchSize, DefaultBatchSize) sequentialReplace := paramBool(*params, ParamSequentialReplace, false) debug := paramBool(*params, ParamDebug, false) tag := fmt.Sprintf("ExtractorSequential[%s.%s]: ", dbName, tableName) moreData := false if debug { logger.Debugf(tag+"Beginning run with params %#v", params) } data := make([]SQLRow, 0) minSeq := int64(math.MaxInt64) var maxSeq int64 tsStart := time.Now() if debug { logger.Printf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > %d LIMIT %d\"", ts.SequentialPosition, batchSize) } rows, err := db.Query("SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > ? LIMIT ?", ts.SequentialPosition, batchSize) if err != nil { logger.Errorf(tag + "ERR: " + err.Error()) return false, data, ts, err } defer rows.Close() cols, err := rows.Columns() if err != nil { return false, data, ts, err } if debug { logger.Debugf(tag+"Columns %v", cols) } dataCount := 0 for rows.Next() { dataCount++ scanArgs := make([]any, len(cols)) values := make([]any, len(cols)) for i := range values { scanArgs[i] = &values[i] } err = rows.Scan(scanArgs...) if err != nil { logger.Errorf(tag + "Scan: " + err.Error()) return false, data, ts, err } rowData := SQLRow{} if sequentialReplace { rowData.Method = "REPLACE" } else { rowData.Method = "INSERT" } rowData.Data = make(SQLUntypedRow, len(cols)) for i := range cols { rowData.Data[cols[i]] = values[i] } data = append(data, rowData) seqno, ok := rowData.Data[ts.ColumnName].(int64) if !ok { logger.Errorf(tag+"ERROR: Unable to process table %s due to column %s not being an integer", dbName+"."+tableName, ts.ColumnName) return false, data, ts, nil } minSeq = int64min(minSeq, seqno) maxSeq = int64max(maxSeq, seqno) } logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String()) if dataCount == 0 { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } return false, data, ts, nil } if dataCount < batchSize { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } moreData = false } else { if debug { logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount) } moreData = true } logger.Infof(tag+"%s seq value range %d - %d", ts.ColumnName, minSeq, maxSeq) newTs := &TrackingStatus{ Db: ts.Db, SourceDatabase: ts.SourceDatabase, SourceTable: ts.SourceTable, ColumnName: ts.ColumnName, SequentialPosition: maxSeq, LastRun: NullTimeNow(), } if sequentialReplace { (*params)[ParamMethod] = "REPLACE" } else { (*params)[ParamMethod] = "INSERT" } return moreData, data, *newTs, nil }
ExtractorSequential is an Extractor instance which uses the primary key sequence to determine which rows should be extracted from the source database table.
var ExtractorTimestamp = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) { batchSize := paramInt(*params, ParamBatchSize, DefaultBatchSize) debug := paramBool(*params, ParamDebug, false) onlyPast := paramBool(*params, ParamOnlyPast, false) tag := fmt.Sprintf("ExtractorTimestamp[%s.%s]: ", dbName, tableName) moreData := false if debug { logger.Debugf(tag+"Beginning run with params %#v", params) } data := make([]SQLRow, 0) var maxStamp time.Time tsStart := time.Now() if debug { if onlyPast { logger.Debugf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > %v AND `"+ts.ColumnName+"` <= NOW() LIMIT %d\"", ts.TimestampPosition, batchSize) } else { logger.Debugf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > %v LIMIT %d\"", ts.TimestampPosition, batchSize) } } var rows *sql.Rows var err error if onlyPast { rows, err = db.Query("SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > ? AND `"+ts.ColumnName+"` <= NOW() LIMIT ?", ts.TimestampPosition, batchSize) } else { rows, err = db.Query("SELECT * FROM `"+tableName+"` WHERE `"+ts.ColumnName+"` > ? LIMIT ?", ts.TimestampPosition, batchSize) } if err != nil { logger.Errorf(tag + "ERR: " + err.Error()) return false, data, ts, err } defer rows.Close() cols, err := rows.Columns() if err != nil { return false, data, ts, err } if debug { logger.Debugf(tag+"Columns %v", cols) } dataCount := 0 for rows.Next() { dataCount++ scanArgs := make([]any, len(cols)) values := make([]any, len(cols)) for i := range values { scanArgs[i] = &values[i] } err = rows.Scan(scanArgs...) if err != nil { logger.Errorf(tag + "Scan: " + err.Error()) return false, data, ts, err } rowData := SQLRow{} rowData.Data = make(SQLUntypedRow, len(cols)) for i := range cols { rowData.Data[cols[i]] = values[i] } data = append(data, rowData) timestamp, ok := rowData.Data[ts.ColumnName].(time.Time) if !ok { logger.Errorf(tag+"ERROR: Unable to process table %s due to column %s not being a Time", dbName+"."+tableName, ts.ColumnName) return false, data, ts, err } maxStamp = timemax(maxStamp, timestamp) } logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String()) if dataCount == 0 { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } return false, data, ts, nil } if dataCount < batchSize { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } moreData = false } else { if debug { logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount) } moreData = true } if debug { logger.Debugf(tag+"%s high timestamp value %#v", ts.ColumnName, maxStamp) } err = SetTrackingStatusTimestamp(ts.Db, dbName, tableName, maxStamp) newTs := &TrackingStatus{ Db: ts.Db, SourceDatabase: ts.SourceDatabase, SourceTable: ts.SourceTable, ColumnName: ts.ColumnName, TimestampPosition: NullTimeFromTime(maxStamp), LastRun: NullTimeFromTime(tsStart), } (*params)[ParamMethod] = "REPLACE" return moreData, data, *newTs, err }
ExtractorTimestamp is an Extractor instance which uses a DATETIME/TIMESTAMP field to determine which rows to pull from the source database table.
var ExtractorTimestampFallback = func(db *sql.DB, dbName, tableName string, ts TrackingStatus, params *Parameters) (bool, []SQLRow, TrackingStatus, error) { batchSize := paramInt(*params, ParamBatchSize, DefaultBatchSize) debug := paramBool(*params, ParamDebug, false) tag := fmt.Sprintf("ExtractorTimestampFallback[%s.%s]: ", dbName, tableName) moreData := false if debug { logger.Debugf(tag+"Beginning run with params %#v", params) } data := make([]SQLRow, 0) var maxStamp time.Time tsStart := time.Now() colnames := strings.Split(ts.ColumnName, ",") if len(colnames) < 2 { err := fmt.Errorf("Requires two columns separated by a comma") logger.Errorf(tag + "ERR: " + err.Error()) return false, data, ts, err } if debug { logger.Printf(tag+"Query: \"SELECT * FROM `"+tableName+"` WHERE IFNULL(`"+colnames[0]+"`,`"+colnames[1]+"`) > %v LIMIT %d\"", ts.TimestampPosition, batchSize) } rows, err := db.Query("SELECT * FROM `"+tableName+"` WHERE IFNULL(`"+colnames[0]+"`,`"+colnames[1]+"`) > ? LIMIT ?", ts.TimestampPosition, batchSize) if err != nil { logger.Errorf(tag + "ERR: " + err.Error()) return false, data, ts, err } defer rows.Close() cols, err := rows.Columns() if err != nil { return false, data, ts, err } if debug { logger.Debugf(tag+"Columns %v", cols) } dataCount := 0 for rows.Next() { dataCount++ scanArgs := make([]any, len(cols)) values := make([]any, len(cols)) for i := range values { scanArgs[i] = &values[i] } err = rows.Scan(scanArgs...) if err != nil { logger.Errorf(tag + "Scan: " + err.Error()) return false, data, ts, err } rowData := SQLRow{} rowData.Data = make(SQLUntypedRow, len(cols)) for i := range cols { rowData.Data[cols[i]] = values[i] } data = append(data, rowData) timestamp, ok := rowData.Data[ts.ColumnName].(time.Time) if !ok { logger.Errorf(tag+"ERROR: Unable to process table %s due to column %s not being a Time", dbName+"."+tableName, ts.ColumnName) return false, data, ts, err } maxStamp = timemax(maxStamp, timestamp) } logger.Infof(tag+"Duration to extract %d rows: %s", dataCount, time.Since(tsStart).String()) if dataCount == 0 { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } return false, data, ts, nil } if dataCount < batchSize { if debug { logger.Debugf(tag+"Batch size %d, row count %d; indicating no more data", batchSize, dataCount) } moreData = false } else { if debug { logger.Debugf(tag+"Batch size %d == row count %d; indicating more data", batchSize, dataCount) } moreData = true } if debug { logger.Debugf(tag+"%s high timestamp value %#v", ts.ColumnName, maxStamp) } err = SetTrackingStatusTimestamp(ts.Db, dbName, tableName, maxStamp) newTs := &TrackingStatus{ Db: ts.Db, SourceDatabase: ts.SourceDatabase, SourceTable: ts.SourceTable, ColumnName: ts.ColumnName, TimestampPosition: NullTimeFromTime(maxStamp), LastRun: NullTimeFromTime(tsStart), } (*params)[ParamMethod] = "REPLACE" return moreData, data, *newTs, err }
ExtractorTimestampFallback is an Extractor instance which uses a DATETIME/TIMESTAMP field to determine which rows to pull from the source database table.
var TableRenamerTransformer = func(dbName, tableName string, data []SQLRow, params *Parameters) []TableData { debug := paramBool(*params, ParamDebug, false) method, ok := (*params)[ParamMethod].(string) if !ok { method = "" } newTableName, ok := (*params)[ParamTableName].(string) if !ok { if debug { logger.Debugf("TableRenamerTransformer: parameter TableName not passed, retaining %s as name", tableName) } newTableName = tableName } return []TableData{ { DbName: dbName, TableName: newTableName, Data: data, Method: method, }, } }
TableRenamerTransformer adjusts the table name of a destination table based on the "TableName" parameter passed.
Functions ¶
func BatchedInsert ¶
func BatchedInsert(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error
BatchedInsert takes an array of SQL data rows and creates a series of batched inserts to insert the data into an existing sql.Tx (transaction) object.
func BatchedQuery ¶
func BatchedQuery(tx *sql.Tx, table string, data []SQLUntypedRow, size int, op string, params *Parameters) error
BatchedQuery takes an array of SQL data rows and creates a series of batched queries to insert/replace the data into an existing sql.Tx (transaction) object.
func BatchedRemove ¶
func BatchedRemove(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error
BatchedRemove takes an array of SQL data rows and creates a series of DELETE FROM statements to remove the data in an existing sql.Tx (transaction) object.
func BatchedReplace ¶
func BatchedReplace(tx *sql.Tx, table string, data []SQLUntypedRow, size int, params *Parameters) error
BatchedReplace takes an array of SQL data rows and creates a series of batched replaces to replace the data into an existing sql.Tx (transaction) object.
func CreateTrackingTable ¶
CreateTrackingTable attempts to create the tracking table for the specified database connection. If the table already exists, this does nothing.
func FileExists ¶
FileExists reports whether the named file or directory exists.
func GetTrackingStatusSequential ¶
GetTrackingStatusSequential retrieves the sequentialPosition for a TrackingStatus from its underlying database table.
func RemoveRecordQueueItem ¶
RemoveRecordQueueItem removes an item from the record queue
func SerializeNewTrackingStatus ¶
func SerializeNewTrackingStatus(tt TrackingStatus) error
SerializeNewTrackingStatus serializes a TrackingStatus object to its database table.
func SerializeTrackingStatus ¶
func SerializeTrackingStatus(db *sql.DB, ts TrackingStatus) error
SerializeTrackingStatus serializes a copy of an actively modified TrackingStatus to its underlying database table.
func SetTrackingStatusSequential ¶
SetTrackingStatusSequential updates a TrackingStatus object's sequentialPosition in its underlying database table.
Types ¶
type Extractor ¶
type Extractor func(*sql.DB, string, string, TrackingStatus, *Parameters) (bool, []SQLRow, TrackingStatus, error)
Extractor is a callback function type
type Iteration ¶
type Iteration struct { // DestinationTable defines the table name where data will be pushed // by the Loader. DestinationTable string // SourceTable defines the table name where data will be pulled // for the Extractor. SourceTable string // SourceKey is the key field which is used to determine position. // This is only specified for the creation of the tracking // table if necessary. SourceKey string // Parameters are a map of arbitrary values / structures which are // passed to all of the constituent functions except for Transformer // ( Extractor, Loader ) in the Migrator. Parameters *Parameters // Extractor represents the Extractor callback. Extractor Extractor // ExtractorName represents the name of the extractor used ExtractorName string // Transformer represents the Transformer callback. This should be, // at a minimum, set to DefaultTransformer if there is no conversion // set to take place. Transformer Transformer // TransformerParameters are a map of arbitrary parameters specific // to transformers. TransformerParameters *Parameters // LoaderName represents the name of the loader used LoaderName string // Loader represents the Loader callback. Loader Loader }
Iteration defines the individual sub-migrator configuration which replicates a single table
type Loader ¶
type Loader func(*sql.DB, []TableData, *Parameters) error
Loader is a callback function type
type Migrator ¶
type Migrator struct { // SourceDsn represents the DSN (data source name) for the source // table. Format is: // https://github.com/go-sql-driver/mysql#dsn-data-source-name SourceDsn *mysql.Config // DestinationDsn represents the DSN (data source name) for the // destination table. Format is: // https://github.com/go-sql-driver/mysql#dsn-data-source-name DestinationDsn *mysql.Config // Iterations represents all of the actual migrations being performed. Iterations []Iteration // Apm determines whether APM support will be enabled or disabled Apm bool // Parameters are a map of arbitrary values / structures which are // passed to all of the constituent functions except for Transformer // ( Extractor, Loader ) in the Migrator. Parameters *Parameters // ErrorCallback represents a logging callback for errors ErrorCallback func(map[string]string, error) // contains filtered or unexported fields }
Migrator represents an object which encompasses an entire end-to-end ETL process.
func (*Migrator) Close ¶
func (m *Migrator) Close()
Close forcibly closes the database connections for the Migrator instance and marks it as being uninitialized.
func (*Migrator) GetTrackingStatus ¶
func (m *Migrator) GetTrackingStatus(iter Iteration) (TrackingStatus, error)
GetTrackingStatus retrieves the live tracking status for an Iteration from the destination database tracking table
func (Migrator) GetWaitGroup ¶
GetWaitGroup returns the wait group instance being used
func (*Migrator) Init ¶
Init initializes the underlying MySQL database connections for the Migrator instance.
func (*Migrator) Quit ¶
Quit is the method which should be used as the "preferred method" for terminating a Migrator instance.
func (*Migrator) Run ¶
Run spins off a goroutine with a running migrator until the corresponding Quit() method is called.
func (*Migrator) SerializeTrackingStatus ¶
func (m *Migrator) SerializeTrackingStatus(ts TrackingStatus) error
SerializeTrackingStatus serializes a live tracking status for the current migrator.
func (*Migrator) SetErrorCallback ¶
SetErrorCallback sets the error callback function
func (*Migrator) SetState ¶ added in v0.1.9
func (m *Migrator) SetState(s MigratorState)
SetState sets the current state of the migrator
func (*Migrator) SetWaitGroup ¶
SetWaitGroup sets the wait group instance being used
func (Migrator) State ¶ added in v0.1.9
func (m Migrator) State() MigratorState
State returns the current state of the migrator
type MigratorState ¶ added in v0.1.9
type MigratorState int
func MigratorStateFromString ¶ added in v0.1.9
func MigratorStateFromString(s string) (MigratorState, error)
MigratorStateFromString derives a migrator state from a string
func (MigratorState) String ¶ added in v0.1.9
func (m MigratorState) String() string
type NullTime ¶
NullTime represents a time.Time object which can also represent a NULL DATETIME / TIMESTAMP value in MySQL.
func GetTrackingStatusTimestamp ¶
GetTrackingStatusTimestamp retrieves the timestampPosition for a TrackingStatus from its underlying database table.
func NullTimeFromTime ¶
NullTimeFromTime creates a new NullTime instance with the specified time.
func NullTimeNow ¶
func NullTimeNow() NullTime
NullTimeNow creates a new NullTime instance representing the current time.
func (NullTime) MarshalJSON ¶
MarshalJSON implements the json.Marshaler interface for encoding/json.
func (*NullTime) UnmarshalJSON ¶
UnmarshalJSON implements the json.Unmarshaler interface for encoding/json.
type Parameters ¶
Parameters represents a series of untyped parameters which are passed to Extractors, Transformers, and Loaders. All stages of the ETL process receive the same parameters.
type PersistenceQueue ¶
type PersistenceQueue struct {
// contains filtered or unexported fields
}
PersistenceQueue is a wrapper around goque, a LevelDB instance wrapped around some usage code. It is used to serialize queue items to disk in case of failure.
func OpenQueue ¶
func OpenQueue(path string) (PersistenceQueue, error)
OpenQueue creates an instance of a FIFO queue
func (*PersistenceQueue) AddItem ¶
func (pq *PersistenceQueue) AddItem(item any) error
AddItem adds an item to the FIFO queue
func (*PersistenceQueue) Close ¶
func (pq *PersistenceQueue) Close()
Close closes the underlying FIFO queue
type RecordQueue ¶
type RecordQueue struct { Db *sql.DB SourceDatabase string `db:"sourceDatabase"` SourceTable string `db:"sourceTable"` PrimaryKeyColumnName string `db:"pkColumn"` PrimaryKeyColumnValue string `db:"pkValue"` SequentialPosition int64 `db:"sequentialPosition"` TimestampUpdated time.Time `db:"timestampPosition"` Method string `db:"method"` }
RecordQueue is the table definition for the tracking table which is used for timestamp updated tables which do not have a lastUpdated or equivalent field.
func (RecordQueue) Remove ¶
func (t RecordQueue) Remove() error
Remove removes an entry from the record queue
func (RecordQueue) String ¶
func (t RecordQueue) String() string
String produces a human readable representation of a TrackingStatus object.
type SQLRow ¶
type SQLRow struct { Data SQLUntypedRow Method string }
SQLRow represents a single row of SQL data with an action associated with it
type SQLUntypedRow ¶
SQLUntypedRow represents a single row of SQL data which is not strongly typed to a structure. This obviates the need to create Golang-level language structures to represent tables.
type TableData ¶
type TableData struct { DbName string TableName string Data []SQLRow Method string // only used with loader, specifies INSERT/REPLACE }
TableData represents identifying information and data for a table.
type TrackingStatus ¶
type TrackingStatus struct { Db *sql.DB `json:"-"` SourceDatabase string `json:"source-database" db:"sourceDatabase"` SourceTable string `json:"source-table" db:"sourceTable"` ColumnName string `json:"column-name" db:"columnName"` SequentialPosition int64 `json:"sequential-position" db:"sequentialPosition"` TimestampPosition NullTime `json:"timestamp-position" db:"timestampPosition"` LastRun NullTime `json:"last-run" db:"lastRun"` }
TrackingStatus is the table definition for the tracking table which maintains the ETL positioning
func GetTrackingStatus ¶
func GetTrackingStatus(db *sql.DB, sourceDatabase, sourceTable string) (TrackingStatus, error)
GetTrackingStatus retrieves a TrackingStatus object from its underlying database table.
func (TrackingStatus) String ¶
func (t TrackingStatus) String() string
String produces a human readable representation of a TrackingStatus object.
type Transformer ¶
type Transformer func(string, string, []SQLRow, *Parameters) []TableData
Transformer is a callback function type which transforms an array of untyped information into another array of untyped information. This is used for the "transform" step of the ETL process.