common

package
v1.0.1-0...-386defc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package common creates an outline for common functionality across the multiple source databases we support. While adding new methods or code here

  1. Ensure that the changes do not adversely impact any source that uses the common code
  2. Test cases might not sufficiently cover all cases, so integration and manual testing should be done ensure no functionality is breaking. Most of the test cases that cover the code in this package will lie in the implementing source databases, so it might not be required to have unit tests for each method here.
  3. Any functions added here should be used by two or more databases
  4. If it looks like the code is getting more complex due to refactoring, it is probably better off leaving the functionality out of common

Index

Constants

View Source
const DefaultWorkers = 20 // Default to 20 - observed diminishing returns above this value

Variables

View Source
var DATATYPE_TO_STORAGE_SIZE = map[string]int{
	ddl.Bool:      1,
	ddl.Date:      4,
	ddl.Float64:   8,
	ddl.Int64:     8,
	ddl.JSON:      ddl.StringMaxLength,
	ddl.Numeric:   22,
	ddl.Timestamp: 12,
}

Data type sizes are referred from https://cloud.google.com/spanner/docs/reference/standard-sql/data-types#storage_size_for_data_types

Functions

func ComputeNonKeyColumnSize

func ComputeNonKeyColumnSize(conv *internal.Conv, tableId string)

func CvtForeignKeysHelper

func CvtForeignKeysHelper(conv *internal.Conv, spTableName string, srcTableId string, srcKey schema.ForeignKey, isRestore bool) (ddl.Foreignkey, error)

func CvtIndexHelper

func CvtIndexHelper(conv *internal.Conv, tableId string, srcIndex schema.Index, spColIds []string, spColDef map[string]ddl.ColumnDef) ddl.CreateIndex

func GenerateSrcSchema

func GenerateSrcSchema(conv *internal.Conv, infoSchema InfoSchema, numWorkers int) (int, error)

func GetColsAndSchemas

func GetColsAndSchemas(conv *internal.Conv, tableId string) (schema.Table, string, []string, ddl.CreateTable, error)

GetColsAndSchemas provides information about columns and schema for a table.

func GetCommonColumnIds

func GetCommonColumnIds(conv *internal.Conv, tableId string, colIds []string) []string

func GetIncludedSrcTablesFromConv

func GetIncludedSrcTablesFromConv(conv *internal.Conv) (tableList []string, err error)

getIncludedSrcTablesFromConv fetches the list of tables from the source database that need to be migrated.

func GetSortedTableIdsBySrcName

func GetSortedTableIdsBySrcName(srcSchema map[string]schema.Table) []string

func IntersectionOfTwoStringSlices

func IntersectionOfTwoStringSlices(a []string, b []string) []string

func PrepareColumns

func PrepareColumns(conv *internal.Conv, tableId string, srcCols []string) ([]string, error)

func PrepareValues

func PrepareValues[T interface{}](conv *internal.Conv, tableId string, colNameIdMap map[string]string, commonColIds, srcCols []string, values []T) ([]T, error)

func ProcessData

func ProcessData(conv *internal.Conv, infoSchema InfoSchema, additionalAttributes internal.AdditionalDataAttributes)

ProcessData performs data conversion for source database 'db'. For each table, we extract and convert the data to Spanner data (based on the source and Spanner schemas), and write it to Spanner. If we can't get/process data for a table, we skip that table and process the remaining tables.

func ProcessDbDump

func ProcessDbDump(conv *internal.Conv, r *internal.Reader, dbDump DbDump) error

ProcessDbDump reads dump data from r and does schema or data conversion, depending on whether conv is configured for schema mode or data mode. In schema mode, this method incrementally builds a schema (updating conv). In data mode, this method uses this schema to convert data and writes it to Spanner, using the data sink specified in conv.

func ProcessSchema

func ProcessSchema(conv *internal.Conv, infoSchema InfoSchema, numWorkers int, attributes internal.AdditionalSchemaAttributes) error

ProcessSchema performs schema conversion for source database 'db'. Information schema tables are a broadly supported ANSI standard, and we use them to obtain source database's schema information.

func SchemaToSpannerDDL

func SchemaToSpannerDDL(conv *internal.Conv, toddl ToDdl) error

SchemaToSpannerDDL performs schema conversion from the source DB schema to Spanner. It uses the source schema in conv.SrcSchema, and writes the Spanner schema to conv.SpSchema.

func SchemaToSpannerDDLHelper

func SchemaToSpannerDDLHelper(conv *internal.Conv, toddl ToDdl, srcTable schema.Table, isRestore bool) error

func SetRowStats

func SetRowStats(conv *internal.Conv, infoSchema InfoSchema)

SetRowStats populates conv with the number of rows in each table.

func SrcTableToSpannerDDL

func SrcTableToSpannerDDL(conv *internal.Conv, toddl ToDdl, srcTable schema.Table) error

func ToNotNull

func ToNotNull(conv *internal.Conv, isNullable string) bool

ToNotNull returns true if a column is not nullable and false if it is.

func ToPGDialectType

func ToPGDialectType(standardType ddl.Type) ddl.Type

Types

type DbDump

type DbDump interface {
	GetToDdl() ToDdl
	ProcessDump(conv *internal.Conv, r *internal.Reader) error
}

DbDump common interface for database dump functions.

type FkConstraint

type FkConstraint struct {
	Name    string
	Table   string
	Refcols []string
	Cols    []string
}

FkConstraint contains foreign key constraints

type InfoSchema

type InfoSchema interface {
	GetToDdl() ToDdl
	GetTableName(schema string, tableName string) string
	GetTables() ([]SchemaAndName, error)
	GetColumns(conv *internal.Conv, table SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error)
	GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)
	GetRowCount(table SchemaAndName) (int64, error)
	GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, map[string][]string, error)
	GetForeignKeys(conv *internal.Conv, table SchemaAndName) (foreignKeys []schema.ForeignKey, err error)
	GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
	ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
	StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)
	StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (internal.DataflowOutput, error)
}

InfoSchema contains database information.

type SchemaAndName

type SchemaAndName struct {
	Schema string
	Name   string
}

SchemaAndName contains the schema and name for a table

type TaskResult

type TaskResult[O any] struct {
	Result O
	Err    error
}

func RunParallelTasks

func RunParallelTasks[I any, O any](input []I, numWorkers int, f func(i I, mutex *sync.Mutex) TaskResult[O],
	fastExit bool) ([]TaskResult[O], error)

Run multiple tasks in parallel. The tasks are expected to be thread safe input: List of inputs numWorkers: Size of worker pool f: Function to execute the task fastExit: If an error is encountered, gracefully exit all running or queued tasks Returns an array of TaskResults and last error

type ToDdl

type ToDdl interface {
	ToSpannerType(conv *internal.Conv, spType string, srcType schema.Type) (ddl.Type, []internal.SchemaIssue)
}

ToDdl interface is meant to be implemented by all sources. When support for a new target database is added, please add a new method here with the output type expected. In case a particular source to target transoformation is not supported, an error is to be returned by the corresponding method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL