Documentation ¶
Index ¶
- Constants
- Variables
- func CheckSchemas(inputTable, targetTable Table) ([]string, []string, []string, error)
- func ConvertDefaultValue(val string) string
- func GetRedshiftDataType(sqlType, debeziumType, sourceColType, sourceColLength string, ...) (string, error)
- type ColInfo
- type Meta
- type QueryTotalRow
- type Redshift
- func (r *Redshift) Begin(ctx context.Context) (*sql.Tx, error)
- func (r *Redshift) Copy(ctx context.Context, tx *sql.Tx, schema string, table string, ...) error
- func (r *Redshift) CreateSchema(ctx context.Context, schema string) error
- func (r *Redshift) CreateTable(ctx context.Context, tx *sql.Tx, table Table) error
- func (r *Redshift) DeDupe(ctx context.Context, tx *sql.Tx, schema string, table string, ...) error
- func (r *Redshift) DeleteColumn(ctx context.Context, tx *sql.Tx, schema string, table string, ...) error
- func (r *Redshift) DeleteCommon(ctx context.Context, tx *sql.Tx, schema string, stagingTable string, ...) error
- func (r *Redshift) DropColumn(ctx context.Context, tx *sql.Tx, schema string, table string, ...) error
- func (r *Redshift) DropTable(ctx context.Context, tx *sql.Tx, schema string, table string) error
- func (r *Redshift) DropTableWithCascade(ctx context.Context, tx *sql.Tx, schema string, table string) error
- func (r *Redshift) GetTableMetadata(ctx context.Context, schema, tableName string) (*Table, error)
- func (r *Redshift) GrantSchemaAccess(ctx context.Context, tx *sql.Tx, schema string, table string, group string) error
- func (r *Redshift) RenameTable(ctx context.Context, tx *sql.Tx, schema string, sourceTableName string, ...) error
- func (r *Redshift) ReplaceTable(ctx context.Context, tx *sql.Tx, unLoadS3Key string, copyS3ManifestKey string, ...) error
- func (r *Redshift) ScanQueryTotal(ctx context.Context) ([]QueryTotalRow, error)
- func (r *Redshift) SchemaExist(ctx context.Context, schema string) (bool, error)
- func (r *Redshift) TableExist(ctx context.Context, schema string, table string) (bool, error)
- func (r *Redshift) Unload(ctx context.Context, tx *sql.Tx, schema string, table string, s3Key string, ...) error
- func (r *Redshift) UpdateTable(ctx context.Context, inputTable, targetTable Table) (bool, error)
- type RedshiftCollector
- type RedshiftConfig
- type SourceType
- type Table
Constants ¶
const ( RedshiftBoolean = "boolean" RedshiftString = "character varying" RedshiftStringMax = "character varying(65535)" RedshiftStringMaxLength = 65535 RedshiftStringDefaultLength = 256 RedshiftMaskedDataType = "character varying(50)" RedshiftMobileColType = "character varying(10)" RedshiftMaskedDataTypeLength = 50 RedshiftNumeric = "numeric" RedshiftNumericMaxLength = 38 RedshiftNumericDefautLength = 18 RedshiftNumericMaxScale = 37 RedshiftNumericDefaultScale = 0 RedshiftDate = "date" RedshiftInteger = "integer" RedshiftTime = "character varying(32)" RedshiftTimeStamp = "timestamp without time zone" // required to support utf8 characters // https://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html#r_Character_types-varchar-or-character-varying RedshiftToMysqlCharacterRatio = 4.0 )
Variables ¶
var ( Namespace = "redshift" SubSystemScan = "scan" )
Functions ¶
func CheckSchemas ¶
CheckSchemas takes in two tables and compares their column schemas to make sure they're compatible. If they have any mismatched columns they are returned in the errors array. Covers most of the schema migration scenarios, and returns the ALTER commands to do it.
func ConvertDefaultValue ¶
Types ¶
type ColInfo ¶
type ColInfo struct { Name string `json:"name"` Type string `json:"type"` DebeziumType string `json:"debeziumtype"` SourceType SourceType `yaml:"sourceType"` DefaultVal string `json:"defaultval"` NotNull bool `json:"notnull"` PrimaryKey bool `json:"primarykey"` SortOrdinal int `json:"sortord"` DistKey bool `json:"distkey"` }
ColInfo is a struct that contains information about a column in a Redshift database. SortOrdinal and DistKey only make sense for Redshift
type Meta ¶
type Meta struct {
Schema string `json:"schema"`
}
Meta holds information that might be not in Redshift or annoying to access in this case, schema a table is part of
type QueryTotalRow ¶
type Redshift ¶
type Redshift struct {
// contains filtered or unexported fields
}
Redshift wraps a dbExecCloser and can be used to perform operations on a redshift database. Give it a context for the duration of the job
func NewRedshift ¶
func NewRedshift(conf RedshiftConfig) (*Redshift, error)
func (*Redshift) Copy ¶
func (r *Redshift) Copy(ctx context.Context, tx *sql.Tx, schema string, table string, s3ManifestURI string, typeJson bool, typeCsv bool, comupdateOff bool, statupdateOff bool) error
Copy using manifest file into redshift using manifest file. this is meant to be run in a transaction, so the first arg must be a sql.Tx
func (*Redshift) CreateSchema ¶
func (*Redshift) CreateTable ¶
func (*Redshift) DeDupe ¶
func (r *Redshift) DeDupe(ctx context.Context, tx *sql.Tx, schema string, table string, targetTablePrimaryKeys []string, stagingTablePrimaryKey string) error
DeDupe deletes the duplicates in the redshift table and keeps only the latest, it accepts a transaction
func (*Redshift) DeleteColumn ¶
func (*Redshift) DeleteCommon ¶
func (r *Redshift) DeleteCommon(ctx context.Context, tx *sql.Tx, schema string, stagingTable string, targetTable string, commonColumns []string) error
DeleteCommon deletes the common based on commonColumn from targetTable.
func (*Redshift) DropColumn ¶
func (*Redshift) DropTableWithCascade ¶
func (*Redshift) GetTableMetadata ¶
GetTableMetadata looks for a table and returns the Table representation if the table does not exist it returns an empty table but does not error
func (*Redshift) GrantSchemaAccess ¶
func (*Redshift) RenameTable ¶
func (*Redshift) ReplaceTable ¶
func (r *Redshift) ReplaceTable( ctx context.Context, tx *sql.Tx, unLoadS3Key string, copyS3ManifestKey string, inputTable, targetTable Table) error
Replace Table replaces the current table with a new schema table this is required in Redshift as ALTER COLUMNs are not supported for all column types 1. Rename the table t1_migrating 2. Create table with new schema t1 3. UNLOAD the renamed table data t1_migrating to s3 4. COPY the unloaded data from s3 to the new table t1
func (*Redshift) ScanQueryTotal ¶
func (r *Redshift) ScanQueryTotal(ctx context.Context) ([]QueryTotalRow, error)
ScanQueryTotal reads the view on top of STL_SCAN table to get total queries executed for each table TODO: expects the view redshiftsink_operator.scan_query_total to be present
func (*Redshift) SchemaExist ¶
func (*Redshift) TableExist ¶
func (*Redshift) Unload ¶
func (r *Redshift) Unload(ctx context.Context, tx *sql.Tx, schema string, table string, s3Key string, removeDuplicate bool) error
Unload copies data present in the table to s3 this loads data to s3 and generates a manifest file at s3key + manifest path
func (*Redshift) UpdateTable ¶
UpdateTable migrates the table schema using below 3 strategy:
- Strategy1: inplace-migration-varchar-type Change length of VARCHAR col Executed by this function
- Strategy2: inplace-migration using ALTER COMMANDS Supports: AddCol and DropCol
- Strategy3: table-migration using UNLOAD and COPY and a temp table Supports: all the other migration scenarios Exectued by ReplaceTable(), triggered by this function
type RedshiftCollector ¶
type RedshiftCollector struct {
// contains filtered or unexported fields
}
func NewRedshiftCollector ¶
func NewRedshiftCollector(clients []*Redshift) *RedshiftCollector
func (*RedshiftCollector) Collect ¶
func (c *RedshiftCollector) Collect(ch chan<- prometheus.Metric)
func (*RedshiftCollector) Describe ¶
func (c *RedshiftCollector) Describe(ch chan<- *prometheus.Desc)
type RedshiftConfig ¶
type RedshiftConfig struct { Schema string `yaml:"schema"` TableSuffix string `yaml:"tableSuffix"` Host string `yaml:"host"` Port string `yaml:"port"` Database string `yaml:"database"` User string `yaml:"user"` Password string `yaml:"password"` Timeout int `yaml:"timeout"` S3AccessKeyId string `yaml:"s3AccessKeyId"` S3SecretAccessKey string `yaml:"s3SecretAccessKey"` Stats bool `yaml:"stats"` MaxOpenConns int `yaml:"maxOpenConns"` MaxIdleConns int `yaml:"maxIdleConns"` }