sling

package
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: GPL-3.0 Imports: 40 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ShowProgress = true
View Source
var SourceDBOptionsDefault = SourceOptions{
	EmptyAsNull:    g.Bool(true),
	NullIf:         g.String("NULL"),
	DatetimeFormat: "AUTO",
	MaxDecimals:    g.Int(-1),
}
View Source
var SourceFileOptionsDefault = SourceOptions{
	TrimSpace:      g.Bool(false),
	EmptyAsNull:    g.Bool(true),
	Header:         g.Bool(true),
	Flatten:        g.Bool(false),
	Compression:    iop.CompressorTypePtr(iop.AutoCompressorType),
	NullIf:         g.String("NULL"),
	DatetimeFormat: "AUTO",
	SkipBlankLines: g.Bool(false),

	FieldsPerRec: g.Int(-1),
	MaxDecimals:  g.Int(-1),
}
View Source
var StoreInsert, StoreUpdate func(t *TaskExecution)

Set in the store/store.go file for history keeping

View Source
var TargetDBOptionsDefault = TargetOptions{
	FileMaxRows: lo.Ternary(
		os.Getenv("FILE_MAX_ROWS") != "",
		cast.ToInt64(os.Getenv("FILE_MAX_ROWS")),
		0,
	),
	UseBulk:          g.Bool(true),
	AddNewColumns:    g.Bool(true),
	AdjustColumnType: g.Bool(false),
	DatetimeFormat:   "auto",
	MaxDecimals:      g.Int(-1),
	ColumnCasing:     (*ColumnCasing)(g.String(string(SourceColumnCasing))),
}
View Source
var TargetFileOptionsDefault = TargetOptions{
	Header: g.Bool(true),
	Compression: lo.Ternary(
		os.Getenv("COMPRESSION") != "",
		iop.CompressorTypePtr(iop.CompressorType(os.Getenv("COMPRESSION"))),
		iop.CompressorTypePtr(iop.AutoCompressorType),
	),
	Concurrency: lo.Ternary(
		os.Getenv("CONCURRENCY") != "",
		cast.ToInt(os.Getenv("CONCURRENCY")),
		7,
	),
	FileMaxRows: lo.Ternary(
		os.Getenv("FILE_MAX_ROWS") != "",
		cast.ToInt64(os.Getenv("FILE_MAX_ROWS")),
		0,
	),
	FileMaxBytes: lo.Ternary(
		os.Getenv("FILE_MAX_BYTES") != "",
		cast.ToInt64(os.Getenv("FILE_MAX_BYTES")),
		0,
	),
	Format:         filesys.FileTypeNone,
	UseBulk:        g.Bool(true),
	AddNewColumns:  g.Bool(true),
	DatetimeFormat: "auto",
	Delimiter:      ",",
	MaxDecimals:    g.Int(-1),
	ColumnCasing:   (*ColumnCasing)(g.String(string(SourceColumnCasing))),
}

Functions

func ClientDelete

func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientDelete sends a DELETE request

func ClientGet

func ClientGet(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientGet sends a GET request

func ClientOptions

func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientOptions sends a HEAD request

func ClientPatch

func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPatch sends a PATCH request

func ClientPost

func ClientPost(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPost sends a POST request

func ClientPut

func ClientPut(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPut sends a PUT request

func Decode added in v1.0.31

func Decode(sp *iop.StreamProcessor, decoder *encoding.Decoder, val string) (string, error)

func ErrorHelper added in v1.0.61

func ErrorHelper(err error) (helpString string)

func GetJWTFromKey

func GetJWTFromKey(masterServerURL, key string) (string, error)

GetJWTFromKey logs in and returns the JWT based on the provided key

func GetSQLText added in v1.1.14

func GetSQLText(sqlStringPath string) (string, error)

GetSQLText process source sql file / text

func LoadProject

func LoadProject(path string)

func NewExecID added in v1.1.13

func NewExecID() string

func ParseBit added in v1.0.68

func ParseBit(sp *iop.StreamProcessor, val string) (string, error)

func ParseFIX added in v1.1.13

func ParseFIX(sp *iop.StreamProcessor, message string) (string, error)

ParseFIX converts a FIX message into a json format

func ParseUUID added in v1.0.31

func ParseUUID(sp *iop.StreamProcessor, val string) (string, error)

func Replace0x00 added in v1.0.70

func Replace0x00(sp *iop.StreamProcessor, val string) (string, error)

func SHA256 added in v1.1.14

func SHA256(val string) string

func SHA512 added in v1.1.14

func SHA512(val string) string

func SetStreamDefaults added in v0.86.36

func SetStreamDefaults(stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)

func Sling

func Sling(cfg *Config) (err error)

Sling accepts a configuration and runs an Extract-Load task

Types

type ColumnCasing added in v1.0.50

type ColumnCasing string

ColumnCasing is the casing method to use

const (
	SourceColumnCasing ColumnCasing = "source" // keeps source column name casing. The default.
	TargetColumnCasing ColumnCasing = "target" // converts casing according to target database. Lower-case for files.
	SnakeColumnCasing  ColumnCasing = "snake"  // converts snake casing according to target database. Lower-case for files.
)

type Config

type Config struct {
	Source  Source            `json:"source,omitempty" yaml:"source,omitempty"`
	Target  Target            `json:"target" yaml:"target"`
	Mode    Mode              `json:"mode,omitempty" yaml:"mode,omitempty"`
	Options ConfigOptions     `json:"options,omitempty" yaml:"options,omitempty"`
	Env     map[string]string `json:"env,omitempty" yaml:"env,omitempty"`

	StreamName      string                `json:"stream_name,omitempty" yaml:"stream_name,omitempty"`
	SrcConn         connection.Connection `json:"_src_conn,omitempty" yaml:"_src_conn,omitempty"`
	TgtConn         connection.Connection `json:"_tgt_conn,omitempty" yaml:"_tgt_conn,omitempty"`
	Prepared        bool                  `json:"_prepared,omitempty" yaml:"_prepared,omitempty"`
	IncrementalVal  string                `json:"-" yaml:"-"`
	ReplicationMode bool                  `json:"-" yaml:"-"`

	MetadataLoadedAt  bool `json:"-" yaml:"-"`
	MetadataStreamURL bool `json:"-" yaml:"-"`
	MetadataRowNum    bool `json:"-" yaml:"-"`
	MetadataRowID     bool `json:"-" yaml:"-"`
}

Config is the new config struct

func NewConfig

func NewConfig(cfgStr string) (cfg *Config, err error)

NewConfig return a config object from a YAML / JSON string

func (*Config) AsReplication added in v1.1.14

func (cfg *Config) AsReplication() (rc ReplicationConfig)

func (*Config) DetermineType added in v0.84.9

func (cfg *Config) DetermineType() (Type JobType, err error)

func (*Config) FormatTargetObjectName added in v0.85.54

func (cfg *Config) FormatTargetObjectName() (err error)

func (*Config) GetFormatMap added in v1.0.31

func (cfg *Config) GetFormatMap() (m map[string]any, err error)

GetFormatMap returns a map to format a string with provided with variables

func (*Config) HasWildcard added in v1.1.14

func (cfg *Config) HasWildcard() bool

func (*Config) MD5 added in v1.1.6

func (cfg *Config) MD5() string

func (*Config) Marshal

func (cfg *Config) Marshal() (cfgBytes []byte, err error)

Marshal marshals into JSON

func (*Config) Prepare

func (cfg *Config) Prepare() (err error)

Prepare prepares the config

func (*Config) Scan

func (cfg *Config) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (*Config) SetDefault

func (cfg *Config) SetDefault()

SetDefault sets default options

func (*Config) Unmarshal

func (cfg *Config) Unmarshal(cfgStr string) error

Unmarshal parse a configuration file path or config text

func (Config) Value

func (cfg Config) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type ConfigOptions

type ConfigOptions struct {
	Debug  bool `json:"debug,omitempty" yaml:"debug,omitempty"`
	StdIn  bool `json:"-"`                                        // whether stdin is passed
	StdOut bool `json:"stdout,omitempty" yaml:"stdout,omitempty"` // whether to output to stdout
}

ConfigOptions are configuration options

type ExecStatus

type ExecStatus string

ExecStatus is the status of an execution

const (
	// ExecStatusCreated = created
	ExecStatusCreated ExecStatus = "created"
	// ExecStatusQueued = queued
	ExecStatusQueued ExecStatus = "queued"
	// ExecStatusStarted = started
	ExecStatusStarted ExecStatus = "started"
	// ExecStatusRunning = running
	ExecStatusRunning ExecStatus = "running"
	// ExecStatusSuccess = success
	ExecStatusSuccess ExecStatus = "success"
	// ExecStatusTerminated = terminated
	ExecStatusTerminated ExecStatus = "terminated"
	// ExecStatusInterrupted = interrupted
	ExecStatusInterrupted ExecStatus = "interrupted"
	// ExecStatusTimedOut = timed-out (when no heartbeat sent for 30 sec)
	ExecStatusTimedOut ExecStatus = "timed-out"
	// ExecStatusError = error
	ExecStatusError ExecStatus = "error"
	// ExecStatusSkipped = skipped
	ExecStatusSkipped ExecStatus = "skipped"
	// ExecStatusStalled = stalled (when still heartbeating, but rows are unchanged for a while)
	ExecStatusStalled ExecStatus = "stalled"
)

func (ExecStatus) IsFailure

func (s ExecStatus) IsFailure() bool

IsFailure returns true if an execution is failed

func (ExecStatus) IsFinished

func (s ExecStatus) IsFinished() bool

IsFinished returns true if an execution is finished

func (ExecStatus) IsRunning

func (s ExecStatus) IsRunning() bool

IsRunning returns true if an execution is running

func (ExecStatus) IsSuccess

func (s ExecStatus) IsSuccess() bool

IsSuccess returns true if an execution is successful

type ExecutionStatus

type ExecutionStatus struct {
	JobID       int        `json:"job_id,omitempty"`
	ExecID      int64      `json:"exec_id,omitempty"`
	Status      ExecStatus `json:"status,omitempty"`
	Text        string     `json:"text,omitempty"`
	Rows        uint64     `json:"rows,omitempty"`
	Bytes       uint64     `json:"bytes,omitempty"`
	Percent     int        `json:"percent,omitempty"`
	Stalled     bool       `json:"stalled,omitempty"`
	Duration    *int       `json:"duration,omitempty"`
	AvgDuration int        `json:"avg_duration,omitempty"`
}

ExecutionStatus is an execution status object

type JobType

type JobType string

JobType is an enum type for jobs

const ConnDiscover JobType = "conn-discover"

ConnTest is for a connection discover

const ConnExec JobType = "conn-exec"

ConnTest is for a connection exec

const ConnTest JobType = "conn-test"

ConnTest is for a connection test

const DbSQL JobType = "db-sql"

DbSQL is for a db sql query

const DbToDb JobType = "db-db"

DbToDb is from db to db

const DbToFile JobType = "db-file"

DbToFile is from db to file

const FileToDB JobType = "file-db"

FileToDB is from db to db

const FileToFile JobType = "file-file"

FileToFile is from file to file

type Mode

type Mode string

Mode is a load mode

const (
	// TruncateMode is to truncate
	TruncateMode Mode = "truncate"
	// FullRefreshMode is to drop
	FullRefreshMode Mode = "full-refresh"
	// IncrementalMode is to incremental
	IncrementalMode Mode = "incremental"
	// SnapshotMode is to snapshot
	SnapshotMode Mode = "snapshot"
	// BackfillMode is to backfill
	BackfillMode Mode = "backfill"
)

type NotificationConfig

type NotificationConfig struct {
	Name        string   `json:"name"`
	Emails      []string `json:"emails"`
	Slack       bool     `json:"slack"`
	MsTeams     bool     `json:"msteams"`
	WebhookURLs []string `json:"webhook_urls"` // urls
	OnSuccess   bool     `json:"on_success"`
	OnFailure   bool     `json:"on_failure"`
	OnLinger    bool     `json:"on_linger"`
	OnEmpty     bool     `json:"on_empty"`
}

type ProgressBar

type ProgressBar struct {
	// contains filtered or unexported fields
}

func NewPBar

func NewPBar(d time.Duration) *ProgressBar

NewPBar creates a new progress bar

func (*ProgressBar) Finish

func (pb *ProgressBar) Finish()

func (*ProgressBar) SetStatus

func (pb *ProgressBar) SetStatus(status string)

SetStatus sets the progress bar status

func (*ProgressBar) Start

func (pb *ProgressBar) Start()

type Project

type Project struct {
	Config      ProjectConfig
	TaskConfigs map[string]Config
}

type ProjectConfig

type ProjectConfig struct {
	Project          string                        `json:"project" yaml:"project"`
	TaskPaths        []string                      `json:"task-paths" yaml:"task-paths"`
	Defaults         map[string]interface{}        `json:"defaults" yaml:"defaults"`
	NotificationTags map[string]NotificationConfig `json:"notification_tags" yaml:"notification_tags"`
}

type ReplicationConfig added in v0.86.36

type ReplicationConfig struct {
	Source   string                              `json:"source,omitempty" yaml:"source,omitempty"`
	Target   string                              `json:"target,omitempty" yaml:"target,omitempty"`
	Defaults ReplicationStreamConfig             `json:"defaults,omitempty" yaml:"defaults,omitempty"`
	Streams  map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"`
	Env      map[string]any                      `json:"env,omitempty" yaml:"env,omitempty"`
	// contains filtered or unexported fields
}

func LoadReplicationConfig added in v0.87.17

func LoadReplicationConfig(cfgPath string) (config ReplicationConfig, err error)

func UnmarshalReplication added in v0.86.39

func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err error)

UnmarshalReplication converts a yaml file to a replication

func (*ReplicationConfig) AddStream added in v1.1.15

func (rd *ReplicationConfig) AddStream(key string, cfg *ReplicationStreamConfig)

func (*ReplicationConfig) Compile added in v1.2.2

func (rd *ReplicationConfig) Compile(cfgOverwrite *Config, selectStreams ...string) (tasks []Config, err error)

TODO: Compile

func (*ReplicationConfig) DeleteStream added in v1.1.15

func (rd *ReplicationConfig) DeleteStream(key string)

func (ReplicationConfig) GetStream added in v1.1.15

func (rd ReplicationConfig) GetStream(name string) (streamName string, cfg *ReplicationStreamConfig, found bool)

GetStream returns the stream if the it exists

func (*ReplicationConfig) MD5 added in v1.1.6

func (rd *ReplicationConfig) MD5() string

MD5 returns a md5 hash of the config

func (ReplicationConfig) MatchStreams added in v1.1.15

func (rd ReplicationConfig) MatchStreams(pattern string) (streams map[string]*ReplicationStreamConfig)

GetStream returns the stream if the it exists

func (ReplicationConfig) Normalize added in v1.1.14

func (rd ReplicationConfig) Normalize(n string) string

Normalize normalized the name

func (*ReplicationConfig) OriginalCfg added in v1.0.63

func (rd *ReplicationConfig) OriginalCfg() string

OriginalCfg returns original config

func (*ReplicationConfig) ProcessWildcards added in v0.87.18

func (rd *ReplicationConfig) ProcessWildcards() (err error)

ProcessWildcards process the streams using wildcards such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`

func (*ReplicationConfig) ProcessWildcardsDatabase added in v1.0.68

func (rd *ReplicationConfig) ProcessWildcardsDatabase(c connection.ConnEntry, wildcardNames []string) (err error)

func (*ReplicationConfig) ProcessWildcardsFile added in v1.0.68

func (rd *ReplicationConfig) ProcessWildcardsFile(c connection.ConnEntry, wildcardNames []string) (err error)

func (*ReplicationConfig) Scan added in v0.86.36

func (rd *ReplicationConfig) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (ReplicationConfig) StreamsOrdered added in v1.0.6

func (rd ReplicationConfig) StreamsOrdered() []string

StreamsOrdered returns the stream names as ordered in the YAML file

func (ReplicationConfig) Value added in v0.86.36

func (rd ReplicationConfig) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type ReplicationStreamConfig added in v0.86.36

type ReplicationStreamConfig struct {
	Mode          Mode           `json:"mode,omitempty" yaml:"mode,omitempty"`
	Object        string         `json:"object,omitempty" yaml:"object,omitempty"`
	Select        []string       `json:"select,omitempty" yaml:"select,flow,omitempty"`
	PrimaryKeyI   any            `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"`
	UpdateKey     string         `json:"update_key,omitempty" yaml:"update_key,omitempty"`
	SQL           string         `json:"sql,omitempty" yaml:"sql,omitempty"`
	Schedule      []string       `json:"schedule,omitempty" yaml:"schedule,omitempty"`
	SourceOptions *SourceOptions `json:"source_options,omitempty" yaml:"source_options,omitempty"`
	TargetOptions *TargetOptions `json:"target_options,omitempty" yaml:"target_options,omitempty"`
	Disabled      bool           `json:"disabled,omitempty" yaml:"disabled,omitempty"`

	State *StreamIncrementalState `json:"state,omitempty" yaml:"state,omitempty"`
}

func (*ReplicationStreamConfig) PrimaryKey added in v0.86.36

func (s *ReplicationStreamConfig) PrimaryKey() []string

type RouteName

type RouteName string

RouteName is the name of a route

const (
	RouteStatus         RouteName = "/status"
	RouteNotice         RouteName = "/notice"
	RouteError          RouteName = "/error"
	RouteSignUpUser     RouteName = "/sign-up"
	RouteUser           RouteName = "/user"
	RouteForgotPassword RouteName = "/forgot-password"
	RouteResetPassword  RouteName = "/reset-password"
	RouteLogin          RouteName = "/login"
	RouteLogout         RouteName = "/logout"
	RouteProxy          RouteName = "/p"

	RouteAppIndex  RouteName = "/app"
	RouteAppLogin  RouteName = "/app/login"
	RouteAppLogout RouteName = "/app/logout"
	RouteAppAPIKey RouteName = "/app/apikey"

	RouteAPI               RouteName = "/api/v1"
	RouteMasterStatus      RouteName = "/api/v1/master-status"
	RouteMasterDBReset     RouteName = "/api/v1/master-db-reset"
	RouteUploads           RouteName = "/api/v1/uploads"
	RouteAPIAccounts       RouteName = "/api/v1/accounts"
	RouteAPIProjects       RouteName = "/api/v1/projects"
	RouteAPIKey            RouteName = "/api/v1/apikey"
	RouteAPIUsers          RouteName = "/api/v1/users"
	RouteAPIJobs           RouteName = "/api/v1/jobs"
	RouteAPILogs           RouteName = "/api/v1/logs"
	RouteAPIExecutions     RouteName = "/api/v1/executions"
	RouteAPIConnections    RouteName = "/api/v1/connections"
	RouteAPIConnectionTest RouteName = "/api/v1/connection-test"
	RouteAPIResetPassword  RouteName = "/api/v1/reset-password"
	RouteAPIDataRequest    RouteName = "/api/v1/data-request"
	RouteAPIWorkers        RouteName = "/api/v1/workers"
	RouteAPISettings       RouteName = "/api/v1/settings"
	RouteAlertLog          RouteName = "/alert/log"

	RouteWs       RouteName = "/ws"
	RouteWsClient RouteName = "/ws/client"
	RouteWsWorker RouteName = "/ws/worker"
)

type Source

type Source struct {
	Conn        string                 `json:"conn,omitempty" yaml:"conn,omitempty"`
	Stream      string                 `json:"stream,omitempty" yaml:"stream,omitempty"`
	Select      []string               `json:"select,omitempty" yaml:"select,omitempty"` // Select or exclude columns. Exclude with prefix "-".
	PrimaryKeyI any                    `json:"primary_key,omitempty" yaml:"primary_key,omitempty"`
	UpdateKey   string                 `json:"update_key,omitempty" yaml:"update_key,omitempty"`
	Options     *SourceOptions         `json:"options,omitempty" yaml:"options,omitempty"`
	Data        map[string]interface{} `json:"data,omitempty" yaml:"data,omitempty"`
	// contains filtered or unexported fields
}

Source is a source of data

func (*Source) HasPrimaryKey added in v1.0.50

func (s *Source) HasPrimaryKey() bool

func (*Source) HasUpdateKey added in v1.0.50

func (s *Source) HasUpdateKey() bool

func (*Source) Limit

func (s *Source) Limit() int

func (*Source) MD5 added in v1.1.6

func (s *Source) MD5() string

func (*Source) PrimaryKey added in v0.84.0

func (s *Source) PrimaryKey() []string

type SourceOptions

type SourceOptions struct {
	TrimSpace      *bool               `json:"trim_space,omitempty" yaml:"trim_space,omitempty"`
	EmptyAsNull    *bool               `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`
	Header         *bool               `json:"header,omitempty" yaml:"header,omitempty"`
	Flatten        *bool               `json:"flatten,omitempty" yaml:"flatten,omitempty"`
	FieldsPerRec   *int                `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`
	Compression    *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
	Format         *filesys.FileType   `json:"format,omitempty" yaml:"format,omitempty"`
	NullIf         *string             `json:"null_if,omitempty" yaml:"null_if,omitempty"`
	DatetimeFormat string              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
	SkipBlankLines *bool               `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
	Delimiter      string              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
	Escape         string              `json:"escape,omitempty" yaml:"escape,omitempty"`
	MaxDecimals    *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
	JmesPath       *string             `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
	Sheet          *string             `json:"sheet,omitempty" yaml:"sheet,omitempty"`
	Range          *string             `json:"range,omitempty" yaml:"range,omitempty"`
	Limit          *int                `json:"limit,omitempty" yaml:"limit,omitempty"`
	Columns        any                 `json:"columns,omitempty" yaml:"columns,omitempty"`
	Transforms     any                 `json:"transforms,omitempty" yaml:"transforms,omitempty"`
	// contains filtered or unexported fields
}

SourceOptions are connection and stream processing options

func (*SourceOptions) SetDefaults added in v1.0.31

func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions)

type StreamIncrementalState added in v1.2.2

type StreamIncrementalState struct {
	Value int64            `json:"value,omitempty" yaml:"value,omitempty"`
	Files map[string]int64 `json:"files,omitempty" yaml:"files,omitempty"`
}

type Target

type Target struct {
	Conn    string                 `json:"conn,omitempty" yaml:"conn,omitempty"`
	Object  string                 `json:"object,omitempty" yaml:"object,omitempty"`
	Options *TargetOptions         `json:"options,omitempty" yaml:"options,omitempty"`
	Data    map[string]interface{} `json:"data,omitempty" yaml:"data,omitempty"`

	TmpTableCreated bool `json:"-" yaml:"-"`
	// contains filtered or unexported fields
}

Target is a target of data

func (*Target) MD5 added in v1.1.6

func (t *Target) MD5() string

type TargetOptions

type TargetOptions struct {
	Header           *bool               `json:"header,omitempty" yaml:"header,omitempty"`
	Compression      *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
	Concurrency      int                 `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
	DatetimeFormat   string              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
	Delimiter        string              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
	FileMaxRows      int64               `json:"file_max_rows,omitempty" yaml:"file_max_rows,omitempty"`
	FileMaxBytes     int64               `json:"file_max_bytes,omitempty" yaml:"file_max_bytes,omitempty"`
	Format           filesys.FileType    `json:"format,omitempty" yaml:"format,omitempty"`
	MaxDecimals      *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
	UseBulk          *bool               `json:"use_bulk,omitempty" yaml:"use_bulk,omitempty"`
	AddNewColumns    *bool               `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"`
	AdjustColumnType *bool               `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"`
	ColumnCasing     *ColumnCasing       `json:"column_casing,omitempty" yaml:"column_casing,omitempty"`

	TableKeys database.TableKeys `json:"table_keys,omitempty" yaml:"table_keys,omitempty"`
	TableTmp  string             `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"`
	TableDDL  string             `json:"table_ddl,omitempty" yaml:"table_ddl,omitempty"`
	PreSQL    string             `json:"pre_sql,omitempty" yaml:"pre_sql,omitempty"`
	PostSQL   string             `json:"post_sql,omitempty" yaml:"post_sql,omitempty"`
}

TargetOptions are target connection and stream processing options

func (*TargetOptions) SetDefaults added in v1.0.31

func (o *TargetOptions) SetDefaults(targetOptions TargetOptions)

type TaskExecution

type TaskExecution struct {
	ExecID    string     `json:"exec_id"`
	Config    *Config    `json:"config"`
	Type      JobType    `json:"type"`
	Status    ExecStatus `json:"status"`
	Err       error      `json:"error"`
	StartTime *time.Time `json:"start_time"`
	EndTime   *time.Time `json:"end_time"`
	Bytes     uint64     `json:"bytes"`
	Context   *g.Context `json:"-"`
	Progress  string     `json:"progress"`

	Output string `json:"-"`

	Replication    *ReplicationConfig `json:"replication"`
	ProgressHist   []string           `json:"progress_hist"`
	PBar           *ProgressBar       `json:"-"`
	ProcStatsStart g.ProcStats        `json:"-"` // process stats at beginning
	// contains filtered or unexported fields
}

TaskExecution is a sling ELT task run, synonymous to an execution

func NewTask

func NewTask(execID string, cfg *Config) (t *TaskExecution)

NewTask creates a Sling task with given configuration

func (*TaskExecution) AddCleanupTaskFirst added in v1.1.8

func (t *TaskExecution) AddCleanupTaskFirst(f func())

func (*TaskExecution) AddCleanupTaskLast added in v1.1.8

func (t *TaskExecution) AddCleanupTaskLast(f func())

func (*TaskExecution) AppendOutput added in v1.1.6

func (t *TaskExecution) AppendOutput(text string)

func (*TaskExecution) Cleanup added in v0.84.3

func (t *TaskExecution) Cleanup()

func (*TaskExecution) Execute

func (t *TaskExecution) Execute() error

Execute runs a Sling task. This may be a file/db to file/db transfer

func (*TaskExecution) GetBytes

func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)

GetBytes return the current total of bytes processed

func (*TaskExecution) GetBytesString

func (t *TaskExecution) GetBytesString() (s string)

func (*TaskExecution) GetCount

func (t *TaskExecution) GetCount() (count uint64)

GetCount return the current count of rows processed

func (*TaskExecution) GetRate

func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)

GetRate return the speed of flow (rows / sec and bytes / sec) secWindow is how many seconds back to measure (0 is since beginning)

func (*TaskExecution) GetTotalBytes

func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)

GetTotalBytes gets the inbound/oubound bytes of the task

func (*TaskExecution) IsStalled

func (t *TaskExecution) IsStalled(window float64) bool

IsStalled determines if the task has stalled (no row increment)

func (*TaskExecution) ReadFromDB

func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)

ReadFromDB reads from a source database

func (*TaskExecution) ReadFromFile

func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)

ReadFromFile reads from a source file

func (*TaskExecution) SetProgress

func (t *TaskExecution) SetProgress(progressText string, args ...interface{})

SetProgress sets the progress

func (*TaskExecution) WriteToDb

func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)

WriteToDb writes to a target DB create temp table load into temp table insert / incremental / replace into target table

func (*TaskExecution) WriteToFile

func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64, err error)

WriteToFile writes to a target file

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL