redshift

package
v1.0.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

README

Redshift Library

Fork of s3-to-redshift

Fork because:

  1. It lacks all schema migration features and only support ADD columns.
  2. The repo owners does not actively respond to issues and pull requests. I have opened issues in the s3-to-redshift/issues for this. We will contribute back to s3-to-redshift if they are open for it.
  3. New Feature was added for merging data into the target table using staging table.

// TODO: Also, It is kept in this repo as making changes becomes easier to quickly develop, vendoring slows the development speed. Later, when we hit prod and we entain stability we should take out the library and keep it as a fork if contribution to s3-to-redshift does not work out.

Documentation

Index

Constants

View Source
const (
	RedshiftBoolean = "boolean"

	RedshiftString              = "character varying"
	RedshiftStringMax           = "character varying(65535)"
	RedshiftStringMaxLength     = 65535
	RedshiftStringDefaultLength = 256

	RedshiftMaskedDataType       = "character varying(50)"
	RedshiftMobileColType        = "character varying(10)"
	RedshiftMaskedDataTypeLength = 50

	RedshiftNumeric             = "numeric"
	RedshiftNumericMaxLength    = 38
	RedshiftNumericDefautLength = 18
	RedshiftNumericMaxScale     = 37
	RedshiftNumericDefaultScale = 0

	RedshiftDate      = "date"
	RedshiftInteger   = "integer"
	RedshiftTime      = "character varying(32)"
	RedshiftTimeStamp = "timestamp without time zone"

	// required to support utf8 characters
	// https://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html#r_Character_types-varchar-or-character-varying
	RedshiftToMysqlCharacterRatio = 4.0
)

Variables

View Source
var (
	Namespace     = "redshift"
	SubSystemScan = "scan"
)

Functions

func CheckSchemas

func CheckSchemas(inputTable, targetTable Table) (
	[]string, []string, []string, error)

CheckSchemas takes in two tables and compares their column schemas to make sure they're compatible. If they have any mismatched columns they are returned in the errors array. Covers most of the schema migration scenarios, and returns the ALTER commands to do it.

func ConvertDefaultValue

func ConvertDefaultValue(val string) string

func GetRedshiftDataType

func GetRedshiftDataType(sqlType, debeziumType, sourceColType,
	sourceColLength string, sourceColScale string,
	columnMasked bool) (string, error)

GetRedshiftDataType returns the mapped type for the sqlType's data type

Types

type ColInfo

type ColInfo struct {
	Name         string     `json:"name"`
	Type         string     `json:"type"`
	DebeziumType string     `json:"debeziumtype"`
	SourceType   SourceType `yaml:"sourceType"`
	DefaultVal   string     `json:"defaultval"`
	NotNull      bool       `json:"notnull"`
	PrimaryKey   bool       `json:"primarykey"`
	SortOrdinal  int        `json:"sortord"`
	DistKey      bool       `json:"distkey"`
}

ColInfo is a struct that contains information about a column in a Redshift database. SortOrdinal and DistKey only make sense for Redshift

type Meta

type Meta struct {
	Schema string `json:"schema"`
}

Meta holds information that might be not in Redshift or annoying to access in this case, schema a table is part of

type QueryTotalRow

type QueryTotalRow struct {
	Database   string
	Schema     string
	TableName  string
	TableID    string
	QueryTotal float64
}

type Redshift

type Redshift struct {
	// contains filtered or unexported fields
}

Redshift wraps a dbExecCloser and can be used to perform operations on a redshift database. Give it a context for the duration of the job

func NewRedshift

func NewRedshift(conf RedshiftConfig) (*Redshift, error)

func (*Redshift) Begin

func (r *Redshift) Begin(ctx context.Context) (*sql.Tx, error)

Begin wraps a new transaction in the databases context

func (*Redshift) Copy

func (r *Redshift) Copy(ctx context.Context, tx *sql.Tx,
	schema string, table string, s3ManifestURI string,
	typeJson bool, typeCsv bool, comupdateOff bool, statupdateOff bool) error

Copy using manifest file into redshift using manifest file. this is meant to be run in a transaction, so the first arg must be a sql.Tx

func (*Redshift) CreateSchema

func (r *Redshift) CreateSchema(ctx context.Context, schema string) error

func (*Redshift) CreateTable

func (r *Redshift) CreateTable(ctx context.Context, tx *sql.Tx, table Table) error

func (*Redshift) DeDupe

func (r *Redshift) DeDupe(ctx context.Context, tx *sql.Tx, schema string, table string,
	targetTablePrimaryKeys []string, stagingTablePrimaryKey string) error

DeDupe deletes the duplicates in the redshift table and keeps only the latest, it accepts a transaction

func (*Redshift) DeleteColumn

func (r *Redshift) DeleteColumn(ctx context.Context, tx *sql.Tx, schema string, table string,
	columnName string, columnValue string) error

func (*Redshift) DeleteCommon

func (r *Redshift) DeleteCommon(ctx context.Context, tx *sql.Tx, schema string, stagingTable string,
	targetTable string, commonColumns []string) error

DeleteCommon deletes the common based on commonColumn from targetTable.

func (*Redshift) DropColumn

func (r *Redshift) DropColumn(ctx context.Context, tx *sql.Tx, schema string, table string,
	columnName string) error

func (*Redshift) DropTable

func (r *Redshift) DropTable(ctx context.Context, tx *sql.Tx, schema string, table string) error

func (*Redshift) DropTableWithCascade

func (r *Redshift) DropTableWithCascade(ctx context.Context, tx *sql.Tx, schema string, table string) error

func (*Redshift) GetTableMetadata

func (r *Redshift) GetTableMetadata(ctx context.Context, schema, tableName string) (*Table, error)

GetTableMetadata looks for a table and returns the Table representation if the table does not exist it returns an empty table but does not error

func (*Redshift) GrantSchemaAccess

func (r *Redshift) GrantSchemaAccess(
	ctx context.Context,
	tx *sql.Tx,
	schema string,
	table string,
	group string,
) error

func (*Redshift) RenameTable

func (r *Redshift) RenameTable(
	ctx context.Context,
	tx *sql.Tx,
	schema string,
	sourceTableName string,
	destTableName string,
) error

func (*Redshift) ReplaceTable

func (r *Redshift) ReplaceTable(
	ctx context.Context, tx *sql.Tx, unLoadS3Key string, copyS3ManifestKey string,
	inputTable, targetTable Table) error

Replace Table replaces the current table with a new schema table this is required in Redshift as ALTER COLUMNs are not supported for all column types 1. Rename the table t1_migrating 2. Create table with new schema t1 3. UNLOAD the renamed table data t1_migrating to s3 4. COPY the unloaded data from s3 to the new table t1

func (*Redshift) ScanQueryTotal

func (r *Redshift) ScanQueryTotal(ctx context.Context) ([]QueryTotalRow, error)

ScanQueryTotal reads the view on top of STL_SCAN table to get total queries executed for each table TODO: expects the view redshiftsink_operator.scan_query_total to be present

func (*Redshift) SchemaExist

func (r *Redshift) SchemaExist(ctx context.Context, schema string) (bool, error)

func (*Redshift) TableExist

func (r *Redshift) TableExist(ctx context.Context, schema string, table string) (bool, error)

func (*Redshift) Unload

func (r *Redshift) Unload(ctx context.Context, tx *sql.Tx,
	schema string, table string, s3Key string, removeDuplicate bool) error

Unload copies data present in the table to s3 this loads data to s3 and generates a manifest file at s3key + manifest path

func (*Redshift) UpdateTable

func (r *Redshift) UpdateTable(ctx context.Context, inputTable, targetTable Table) (bool, error)

UpdateTable migrates the table schema using below 3 strategy:

  1. Strategy1: inplace-migration-varchar-type Change length of VARCHAR col Executed by this function
  2. Strategy2: inplace-migration using ALTER COMMANDS Supports: AddCol and DropCol
  3. Strategy3: table-migration using UNLOAD and COPY and a temp table Supports: all the other migration scenarios Exectued by ReplaceTable(), triggered by this function

type RedshiftCollector

type RedshiftCollector struct {
	// contains filtered or unexported fields
}

func NewRedshiftCollector

func NewRedshiftCollector(clients []*Redshift) *RedshiftCollector

func (*RedshiftCollector) Collect

func (c *RedshiftCollector) Collect(ch chan<- prometheus.Metric)

func (*RedshiftCollector) Describe

func (c *RedshiftCollector) Describe(ch chan<- *prometheus.Desc)

func (*RedshiftCollector) Fetch

func (c *RedshiftCollector) Fetch(ctx context.Context, wg *sync.WaitGroup)

type RedshiftConfig

type RedshiftConfig struct {
	Schema            string `yaml:"schema"`
	TableSuffix       string `yaml:"tableSuffix"`
	Host              string `yaml:"host"`
	Port              string `yaml:"port"`
	Database          string `yaml:"database"`
	User              string `yaml:"user"`
	Password          string `yaml:"password"`
	Timeout           int    `yaml:"timeout"`
	S3AccessKeyId     string `yaml:"s3AccessKeyId"`
	S3SecretAccessKey string `yaml:"s3SecretAccessKey"`
	Stats             bool   `yaml:"stats"`
	MaxOpenConns      int    `yaml:"maxOpenConns"`
	MaxIdleConns      int    `yaml:"maxIdleConns"`
}

type SourceType

type SourceType struct {
	ColumnLength string `yaml:"columnLength"`
	ColumnType   string `yaml:"columnType"`
	ColumnScale  string `yaml:"columnScale"`
}

type Table

type Table struct {
	Name    string    `json:"dest"`
	Columns []ColInfo `json:"columns"`
	Meta    Meta      `json:"meta"`
}

Table is representation of Redshift table

func NewTable

func NewTable(t Table) *Table

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL