warehouse

package
v0.0.0-...-0ade494 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: AGPL-3.0 Imports: 60 Imported by: 0

Documentation

Index

Constants

View Source
const (
	STATS_WORKER_IDLE_TIME                  = "worker_idle_time"
	STATS_WORKER_CLAIM_PROCESSING_TIME      = "worker_claim_processing_time"
	STATS_WORKER_CLAIM_PROCESSING_FAILED    = "worker_claim_processing_failed"
	STATS_WORKER_CLAIM_PROCESSING_SUCCEEDED = "worker_claim_processing_succeeded"
	TAG_WORKERID                            = "workerId"
)
View Source
const (
	Waiting                   = "waiting"
	GeneratedUploadSchema     = "generated_upload_schema"
	CreatedTableUploads       = "created_table_uploads"
	GeneratedLoadFiles        = "generated_load_files"
	UpdatedTableUploadsCounts = "updated_table_uploads_counts"
	CreatedRemoteSchema       = "created_remote_schema"
	ExportedUserTables        = "exported_user_tables"
	ExportedData              = "exported_data"
	ExportedIdentities        = "exported_identities"
	Aborted                   = "aborted"
)

Upload Status

View Source
const (
	GeneratingStagingFileFailedState        = "generating_staging_file_failed"
	GeneratedStagingFileState               = "generated_staging_file"
	PopulatingHistoricIdentitiesState       = "populating_historic_identities"
	PopulatingHistoricIdentitiesStateFailed = "populating_historic_identities_failed"
	FetchingRemoteSchemaFailed              = "fetching_remote_schema_failed"
	InternalProcessingFailed                = "internal_processing_failed"
)
View Source
const (
	TableUploadExecuting               = "executing"
	TableUploadUpdatingSchema          = "updating_schema"
	TableUploadUpdatingSchemaFailed    = "updating_schema_failed"
	TableUploadUpdatedSchema           = "updated_schema"
	TableUploadExporting               = "exporting_data"
	TableUploadExportingFailed         = "exporting_data_failed"
	UserTableUploadExportingFailed     = "exporting_user_tables_failed"
	IdentityTableUploadExportingFailed = "exporting_identities_failed"
	TableUploadExported                = "exported_data"
)

Table Upload status

View Source
const (
	UploadStatusField          = "status"
	UploadStartLoadFileIDField = "start_load_file_id"
	UploadEndLoadFileIDField   = "end_load_file_id"
	UploadUpdatedAtField       = "updated_at"
	UploadTimingsField         = "timings"
	UploadSchemaField          = "schema"
	MergedSchemaField          = "mergedschema"
	UploadLastExecAtField      = "last_exec_at"
	UploadInProgress           = "in_progress"
)
View Source
const (
	MasterMode        = "master"
	SlaveMode         = "slave"
	MasterSlaveMode   = "master_and_slave"
	EmbeddedMode      = "embedded"
	PooledWHSlaveMode = "embedded_master"
)

warehouses worker modes

View Source
const (
	CloudSourceCateogry = "cloud"
)
View Source
const (
	DegradedMode = "degraded"
)
View Source
const (
	WorkerProcessingDownloadStagingFileFailed = "worker_processing_download_staging_file_failed"
)

Variables

View Source
var (
	ShouldForceSetLowerVersion bool
)

Functions

func CheckCurrentTimeExistsInExcludeWindow

func CheckCurrentTimeExistsInExcludeWindow(currentTime time.Time, windowStartTime, windowEndTime string) bool

func CheckForWarehouseEnvVars

func CheckForWarehouseEnvVars() bool

CheckForWarehouseEnvVars Checks if all the required Env Variables for Warehouse are present

func CheckPGHealth

func CheckPGHealth(dbHandle *sql.DB) bool

func GetExludeWindowStartEndTimes

func GetExludeWindowStartEndTimes(excludeWindow map[string]interface{}) (string, string)

func GetPrevScheduledTime

func GetPrevScheduledTime(syncFrequency, syncStartAt string, currTime time.Time) time.Time

GetPrevScheduledTime returns closest previous scheduled time eg. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00) prev scheduled time for current time (eg. 18:00 -> 16:00 same day, 00:30 -> 22:00 prev day)

func HandleSchemaChange

func HandleSchemaChange(existingDataType, columnType string, columnVal interface{}) (newColumnVal interface{}, ok bool)

func Init

func Init()

func Init2

func Init2()

func Init3

func Init3()

func Init4

func Init4()

func Init5

func Init5()

func Init6

func Init6()

func InitWarehouseAPI

func InitWarehouseAPI(dbHandle *sql.DB, log logger.LoggerI)

func PickupStagingConfiguration

func PickupStagingConfiguration(job *PayloadT) bool

func ScheduledTimes

func ScheduledTimes(syncFrequency, syncStartAt string) []int

ScheduledTimes returns all possible start times (minutes from start of day) as per schedule eg. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00)

func Start

func Start(ctx context.Context, app app.Interface) error

func TriggerUploadHandler

func TriggerUploadHandler(sourceID, destID string) error

Types

type BatchRouterEventT

type BatchRouterEventT struct {
	Metadata MetadataT `json:"metadata"`
	Data     DataT     `json:"data"`
}

func (*BatchRouterEventT) GetColumnInfo

func (event *BatchRouterEventT) GetColumnInfo(columnName string) (columnInfo ColumnInfoT, ok bool)

type ColumnInfoT

type ColumnInfoT struct {
	ColumnVal  interface{}
	ColumnType string
}

type ConfigurationTestInput

type ConfigurationTestInput struct {
	DestID string
}

type ConfigurationTestOutput

type ConfigurationTestOutput struct {
	Valid bool
	Error string
}

type ConstraintsI

type ConstraintsI interface {
	// contains filtered or unexported methods
}

type ConstraintsViolationT

type ConstraintsViolationT struct {
	// contains filtered or unexported fields
}

func ViolatedConstraints

func ViolatedConstraints(destinationType string, brEvent *BatchRouterEventT, columnName string) (cv *ConstraintsViolationT)

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB encapsulate interactions of warehouse operations with the database.

func NewWarehouseDB

func NewWarehouseDB(handle *sql.DB) *DB

func (*DB) GetLatestUploadStatus

func (db *DB) GetLatestUploadStatus(ctx context.Context, destType, sourceID, destinationID string) (int64, string, int, error)

type DataT

type DataT map[string]interface{}

type ErrorResponseT

type ErrorResponseT struct {
	Error string
}

type HandleT

type HandleT struct {
	// contains filtered or unexported fields
}

func (*HandleT) Disable

func (wh *HandleT) Disable()

Disable disables a router:)

func (*HandleT) Enable

func (wh *HandleT) Enable()

Enable enables a router :)

func (*HandleT) Setup

func (wh *HandleT) Setup(whType string)

func (*HandleT) Shutdown

func (wh *HandleT) Shutdown()

type IndexConstraintT

type IndexConstraintT struct {
	TableName    string
	ColumnName   string
	IndexColumns []string
	Limit        int
}

type JobIDT

type JobIDT int64

type JobRunT

type JobRunT struct {
	// contains filtered or unexported fields
}

JobRunT Temporary store for processing staging file to load file

func (*JobRunT) GetWriter

func (jobRun *JobRunT) GetWriter(tableName string) (warehouseutils.LoadFileWriterI, error)

type LoadFileJobT

type LoadFileJobT struct {
	Upload                     UploadT
	StagingFile                *StagingFileT
	Schema                     map[string]map[string]string
	Warehouse                  warehouseutils.WarehouseT
	Wg                         *misc.WaitGroup
	LoadFileIDsChan            chan []int64
	TableToBucketFolderMap     map[string]string
	TableToBucketFolderMapLock *sync.RWMutex
}

type MetadataT

type MetadataT struct {
	Table        string            `json:"table"`
	Columns      map[string]string `json:"columns"`
	IsMergeRule  bool              `json:"isMergeRule"`
	ReceivedAt   time.Time         `json:"receivedAt"`
	MergePropOne string            `json:"mergePropOne"`
	MergePropTwo string            `json:"mergePropTwo"`
}

type PayloadT

type PayloadT struct {
	BatchID                      string
	UploadID                     int64
	StagingFileID                int64
	StagingFileLocation          string
	UploadSchema                 map[string]map[string]string
	SourceID                     string
	SourceName                   string
	DestinationID                string
	DestinationName              string
	DestinationType              string
	DestinationNamespace         string
	DestinationRevisionID        string
	StagingDestinationRevisionID string
	DestinationConfig            interface{}
	StagingDestinationConfig     interface{}
	UseRudderStorage             bool
	StagingUseRudderStorage      bool
	UniqueLoadGenID              string
	RudderStoragePrefix          string
	Output                       []loadFileUploadOutputT
	LoadFilePrefix               string // prefix for the load file name
	LoadFileType                 string
}

type ProcessStagingFilesJobT

type ProcessStagingFilesJobT struct {
	Upload    UploadT
	List      []*StagingFileT
	Warehouse warehouseutils.WarehouseT
}

type QueryInput

type QueryInput struct {
	DestID       string
	SourceID     string
	SQLStatement string
}

type RetryRequest

type RetryRequest struct {
	WorkspaceID     string
	SourceID        string
	DestinationID   string
	DestinationType string
	IntervalInHours int64   // Optional, if provided we will retry based on the interval provided
	UploadIds       []int64 // Optional, if provided we will retry the upload ids provided
	ForceRetry      bool
	API             UploadAPIT
}

func (*RetryRequest) RetryWHUploads

func (retryReq *RetryRequest) RetryWHUploads() (response RetryResponse, err error)

type RetryResponse

type RetryResponse struct {
	Message    string
	StatusCode int32
}

type SchemaHandleT

type SchemaHandleT struct {
	// contains filtered or unexported fields
}

type StagingFileT

type StagingFileT struct {
	ID                    int64
	Location              string
	SourceID              string
	Schema                json.RawMessage
	Status                string // enum
	CreatedAt             time.Time
	FirstEventAt          time.Time
	LastEventAt           time.Time
	UseRudderStorage      bool
	DestinationRevisionID string
	// cloud sources specific info
	SourceBatchID   string
	SourceTaskID    string
	SourceTaskRunID string
	SourceJobID     string
	SourceJobRunID  string
	TimeWindow      time.Time
}

type TableSkipError

type TableSkipError struct {
	// contains filtered or unexported fields
}

TableSkipError is a custom error type to capture if a table load is skipped because of a previously failed table load

func (*TableSkipError) Error

func (tse *TableSkipError) Error() string

type TableUploadIDInfoT

type TableUploadIDInfoT struct {
	// contains filtered or unexported fields
}

TableUploadIDInfoT captures the uploadID and error for [uploadID][tableName]

type TableUploadReqT

type TableUploadReqT struct {
	UploadID int64
	Name     string
	API      UploadAPIT
}

func (TableUploadReqT) GetWhTableUploads

func (tableUploadReq TableUploadReqT) GetWhTableUploads() ([]*proto.WHTable, error)

type TableUploadResT

type TableUploadResT struct {
	ID         int64     `json:"id"`
	UploadID   int64     `json:"upload_id"`
	Name       string    `json:"name"`
	Error      string    `json:"error"`
	Status     string    `json:"status"`
	Count      int32     `json:"count"`
	LastExecAt time.Time `json:"last_exec_at"`
	Duration   int32     `json:"duration"`
}

type TableUploadStatusInfoT

type TableUploadStatusInfoT struct {
	// contains filtered or unexported fields
}

TableUploadStatusInfoT captures the status and error for [uploadID][tableName]

type TableUploadStatusT

type TableUploadStatusT struct {
	// contains filtered or unexported fields
}

TableUploadStatusT captures the status of each table upload along with its parent upload_job's info like destionation_id and namespace

type TableUploadT

type TableUploadT struct {
	// contains filtered or unexported fields
}

func NewTableUpload

func NewTableUpload(uploadID int64, tableName string) *TableUploadT

type TablesResT

type TablesResT struct {
	Tables []TableUploadResT `json:"tables,omitempty"`
}

type UploadAPIT

type UploadAPIT struct {
	// contains filtered or unexported fields
}
var UploadAPI UploadAPIT

type UploadColumnT

type UploadColumnT struct {
	Column string
	Value  interface{}
}

type UploadColumnsOpts

type UploadColumnsOpts struct {
	Fields []UploadColumnT
	Txn    *sql.Tx
}

type UploadJobT

type UploadJobT struct {
	// contains filtered or unexported fields
}

func (*UploadJobT) Aborted

func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool

Aborted makes a check that if the state of the job should be aborted

func (*UploadJobT) GetFirstLastEvent

func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)

func (*UploadJobT) GetLoadFileGenStartTIme

func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time

func (*UploadJobT) GetLoadFileType

func (job *UploadJobT) GetLoadFileType() string

func (*UploadJobT) GetLoadFilesMetadata

func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)

func (*UploadJobT) GetLocalSchema

func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT

func (*UploadJobT) GetSampleLoadFileLocation

func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)

func (*UploadJobT) GetSchemaInWarehouse

func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)

func (*UploadJobT) GetSingleLoadFile

func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)

func (*UploadJobT) GetTableSchemaInUpload

func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT

func (*UploadJobT) GetTableSchemaInWarehouse

func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT

func (*UploadJobT) ShouldOnDedupUseNewRecord

func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool

func (*UploadJobT) UpdateLocalSchema

func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error

func (*UploadJobT) UseRudderStorage

func (job *UploadJobT) UseRudderStorage() bool

type UploadPagination

type UploadPagination struct {
	Total  int32 `json:"total"`
	Limit  int32 `json:"limit"`
	Offset int32 `json:"offset"`
}

type UploadReqT

type UploadReqT struct {
	WorkspaceID string
	UploadId    int64
	API         UploadAPIT
}

func (UploadReqT) GetWHUpload

func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error)

func (UploadReqT) TriggerWHUpload

func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error)

type UploadResT

type UploadResT struct {
	ID              int64             `json:"id"`
	Namespace       string            `json:"namespace"`
	SourceID        string            `json:"source_id"`
	DestinationID   string            `json:"destination_id"`
	DestinationType string            `json:"destination_type"`
	Status          string            `json:"status"`
	Error           string            `json:"error"`
	Attempt         int32             `json:"attempt"`
	Duration        int32             `json:"duration"`
	NextRetryTime   string            `json:"nextRetryTime"`
	FirstEventAt    time.Time         `json:"first_event_at"`
	LastEventAt     time.Time         `json:"last_event_at"`
	Tables          []TableUploadResT `json:"tables,omitempty"`
}

type UploadStatusOpts

type UploadStatusOpts struct {
	Status           string
	AdditionalFields []UploadColumnT
	ReportingMetric  types.PUReportedMetric
}

type UploadT

type UploadT struct {
	ID                   int64
	Namespace            string
	SourceID             string
	SourceType           string
	SourceCategory       string
	DestinationID        string
	DestinationType      string
	StartStagingFileID   int64
	EndStagingFileID     int64
	StartLoadFileID      int64
	EndLoadFileID        int64
	Status               string
	UploadSchema         warehouseutils.SchemaT
	MergedSchema         warehouseutils.SchemaT
	Error                json.RawMessage
	Timings              []map[string]string
	FirstAttemptAt       time.Time
	LastAttemptAt        time.Time
	Attempts             int64
	Metadata             json.RawMessage
	FirstEventAt         time.Time
	LastEventAt          time.Time
	UseRudderStorage     bool
	LoadFileGenStartTime time.Time
	TimingsObj           sql.NullString
	Priority             int
	// cloud sources specific info
	SourceBatchID   string
	SourceTaskID    string
	SourceTaskRunID string
	SourceJobID     string
	SourceJobRunID  string
	LoadFileType    string
}

type UploadsReqT

type UploadsReqT struct {
	WorkspaceID     string
	SourceID        string
	DestinationID   string
	DestinationType string
	Status          string
	Limit           int32
	Offset          int32
	API             UploadAPIT
}

func (*UploadsReqT) GetWhUploads

func (uploadsReq *UploadsReqT) GetWhUploads() (uploadsRes *proto.WHUploadsResponse, err error)

func (*UploadsReqT) TriggerWhUploads

func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUploadsResponse, err error)

type UploadsResT

type UploadsResT struct {
	Uploads    []UploadResT     `json:"uploads"`
	Pagination UploadPagination `json:"pagination"`
}

type WarehouseAdmin

type WarehouseAdmin struct{}

func (*WarehouseAdmin) ConfigurationTest

func (wh *WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error

ConfigurationTest test the underlying warehouse destination

func (*WarehouseAdmin) Query

Query the underlying warehouse

func (*WarehouseAdmin) TriggerUpload

func (wh *WarehouseAdmin) TriggerUpload(off bool, reply *string) error

TriggerUpload sets uploads to start without delay

type WorkerIdentifierT

type WorkerIdentifierT string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL