Documentation ¶
Index ¶
- func CreateOutputTable(dbpool *pgxpool.Pool, tableName pgx.Identifier, tableSpec *TableSpec) error
- func EvalHash(key interface{}, partitions uint64) *uint64
- func Hash(key []byte, partitions uint64) uint64
- func MakeJetsPartitionLabel(jetsPartitionKey interface{}) string
- func OverpunchNumber(value string, decimalPlaces int) (string, error)
- func ParseDate(date string) (*time.Time, error)
- func ParseDatetime(datetime string) (*time.Time, error)
- func PrepareOutoutTable(dbpool *pgxpool.Pool, tableIdentifier pgx.Identifier, tableSpec *TableSpec) error
- func SplitTableName(tableName string) (pgx.Identifier, error)
- func StartComputePipes(dbpool *pgxpool.Pool, inputHeaders []string, done chan struct{}, ...)
- func ToBool(b interface{}) bool
- func ToDouble(d interface{}) (float64, error)
- type AggregateTransformationPipe
- type BuilderContext
- func (ctx *BuilderContext) FileKey() string
- func (ctx *BuilderContext) JetsPartition() string
- func (ctx *BuilderContext) NewAggregateTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AggregateTransformationPipe, error)
- func (ctx *BuilderContext) NewMapRecordTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*MapRecordTransformationPipe, error)
- func (ctx *BuilderContext) NewPartitionWriterTransformationPipe(source *InputChannel, jetsPartitionKey interface{}, outputCh *OutputChannel, ...) (*PartitionWriterTransformationPipe, error)
- func (ctx *BuilderContext) ReportMetrics()
- func (ctx *BuilderContext) SessionId() string
- func (ctx *BuilderContext) StartClusterMap(spec *PipeSpec, source *InputChannel, ...)
- func (ctx *BuilderContext) StartFanOutPipe(spec *PipeSpec, source *InputChannel)
- func (ctx *BuilderContext) StartSplitterPipe(spec *PipeSpec, source *InputChannel, ...)
- type CaseExpression
- type Channel
- type ChannelRegistry
- type ChannelResults
- type ChannelSpec
- type ClusterSpec
- type ComputePipesConfig
- type ComputePipesResult
- type ConcatFunctionArg
- type ContextSpec
- type ExpressionNode
- type FindReplaceFunctionArg
- type HashExpression
- type Input2PipeSet
- type InputChannel
- type LoadFromS3FilesResult
- type MapExpression
- type MapRecordTransformationPipe
- type Metric
- type MetricsSpec
- type OutputChannel
- type Overpunch
- type PartitionWriterTransformationPipe
- type PathSubstitution
- type Peer
- type PeerRecordMessage
- type PeerReply
- type PeerServer
- type PipeSet
- type PipeSpec
- type PipeTransformationEvaluator
- type S3DeviceWriter
- type SaveResultsContext
- type SubStringFunctionArg
- type TableColumnSpec
- type TableSpec
- type TransformationColumnEvaluator
- type TransformationColumnSpec
- type TransformationSpec
- type WriteTableSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateOutputTable ¶
Create the Staging Table
func MakeJetsPartitionLabel ¶
func MakeJetsPartitionLabel(jetsPartitionKey interface{}) string
func PrepareOutoutTable ¶
func SplitTableName ¶
func StartComputePipes ¶
func StartComputePipes(dbpool *pgxpool.Pool, inputHeaders []string, done chan struct{}, errCh chan error, computePipesInputCh <-chan []interface{}, chResults *ChannelResults, cpConfig *ComputePipesConfig, envSettings map[string]interface{}, fileKeyComponents map[string]interface{})
Function to write transformed row to database
Types ¶
type AggregateTransformationPipe ¶
type AggregateTransformationPipe struct {
// contains filtered or unexported fields
}
type BuilderContext ¶
type BuilderContext struct {
// contains filtered or unexported fields
}
func (*BuilderContext) FileKey ¶
func (ctx *BuilderContext) FileKey() string
func (*BuilderContext) JetsPartition ¶
func (ctx *BuilderContext) JetsPartition() string
func (*BuilderContext) NewAggregateTransformationPipe ¶
func (ctx *BuilderContext) NewAggregateTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AggregateTransformationPipe, error)
func (*BuilderContext) NewMapRecordTransformationPipe ¶
func (ctx *BuilderContext) NewMapRecordTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*MapRecordTransformationPipe, error)
func (*BuilderContext) NewPartitionWriterTransformationPipe ¶
func (ctx *BuilderContext) NewPartitionWriterTransformationPipe(source *InputChannel, jetsPartitionKey interface{}, outputCh *OutputChannel, copy2DeviceResultCh chan ComputePipesResult, spec *TransformationSpec) (*PartitionWriterTransformationPipe, error)
Create a new jets_partition writer, the partition is identified by the jetsPartition
func (*BuilderContext) ReportMetrics ¶
func (ctx *BuilderContext) ReportMetrics()
func (*BuilderContext) SessionId ¶
func (ctx *BuilderContext) SessionId() string
func (*BuilderContext) StartClusterMap ¶
func (ctx *BuilderContext) StartClusterMap(spec *PipeSpec, source *InputChannel, clusterMapResultCh chan chan ComputePipesResult)
Cluster nodes sharding data using splitter key
func (*BuilderContext) StartFanOutPipe ¶
func (ctx *BuilderContext) StartFanOutPipe(spec *PipeSpec, source *InputChannel)
func (*BuilderContext) StartSplitterPipe ¶
func (ctx *BuilderContext) StartSplitterPipe(spec *PipeSpec, source *InputChannel, writePartitionsResultCh chan chan ComputePipesResult)
type CaseExpression ¶
type CaseExpression struct { When ExpressionNode `json:"when"` Then ExpressionNode `json:"then"` }
type ChannelRegistry ¶
type ChannelRegistry struct {
// contains filtered or unexported fields
}
func (*ChannelRegistry) AddDistributionChannel ¶
func (r *ChannelRegistry) AddDistributionChannel(input string) string
func (*ChannelRegistry) CloseChannel ¶
func (r *ChannelRegistry) CloseChannel(name string)
func (*ChannelRegistry) GetInputChannel ¶
func (r *ChannelRegistry) GetInputChannel(name string) (*InputChannel, error)
func (*ChannelRegistry) GetOutputChannel ¶
func (r *ChannelRegistry) GetOutputChannel(name string) (*OutputChannel, error)
type ChannelResults ¶
type ChannelResults struct { LoadFromS3FilesResultCh chan LoadFromS3FilesResult Copy2DbResultCh chan chan ComputePipesResult WritePartitionsResultCh chan chan chan ComputePipesResult MapOnClusterResultCh chan chan chan ComputePipesResult }
type ChannelSpec ¶
type ClusterSpec ¶
type ClusterSpec struct { CpipesMode string `json:"cpipes_mode"` ReadTimeout int `json:"read_timeout"` WriteTimeout int `json:"write_timeout"` PeerRegistrationTimeout int `json:"peer_registration_timeout"` NbrNodes int `json:"nbr_nodes"` ShardingNbrNodes int `json:"sharding_nbr_nodes"` ReducingNbrNodes int `json:"reducing_nbr_nodes"` NbrSubClusters int `json:"nbr_sub_clusters"` NbrJetsPartitions uint64 `json:"nbr_jets_partitions"` PeerBatchSize int `json:"peer_batch_size"` NodeId int // calculated field SubClusterId int // calculated field NbrSubClusterNodes int // calculated field SubClusterNodeId int // calculated field }
Config for peer2peer communication
type ComputePipesConfig ¶
type ComputePipesConfig struct { MetricsConfig *MetricsSpec `json:"metrics_config"` ClusterConfig *ClusterSpec `json:"cluster_config"` OutputTables []TableSpec `json:"output_tables"` Channels []ChannelSpec `json:"channels"` Context *[]ContextSpec `json:"context"` PipesConfig []PipeSpec `json:"pipes_config"` ShardingPipesConfig []PipeSpec `json:"sharding_pipes_config"` ReducingPipesConfig []PipeSpec `json:"reducing_pipes_config"` }
This file contains the Compute Pipes configuration model
func UnmarshalComputePipesConfig ¶
func UnmarshalComputePipesConfig(computePipesJson *string, nodeId, nbrNodes int) (*ComputePipesConfig, error)
type ComputePipesResult ¶
type ConcatFunctionArg ¶
type ContextSpec ¶
type ExpressionNode ¶
type ExpressionNode struct { // Type for leaf node: select, value, eval Type *string `json:"type"` Expr *string `json:"expr"` EvalExpr *ExpressionNode `json:"eval_expr"` Arg *ExpressionNode `json:"arg"` Lhs *ExpressionNode `json:"lhs"` Op *string `json:"op"` Rhs *ExpressionNode `json:"rhs"` }
type FindReplaceFunctionArg ¶
func ParseFindReplaceFunctionArgument ¶
func ParseFindReplaceFunctionArgument(rawArg *string, functionName string, cache map[string]interface{}) (*FindReplaceFunctionArg, error)
type HashExpression ¶
type HashExpression struct { Expr string `json:"expr"` Format *string `json:"format"` NbrJetsPartitions *uint64 `json:"nbr_jets_partitions"` DefaultExpr *ExpressionNode `json:"default_expr"` }
type Input2PipeSet ¶
type InputChannel ¶
type InputChannel struct {
// contains filtered or unexported fields
}
type LoadFromS3FilesResult ¶
type MapExpression ¶
type MapRecordTransformationPipe ¶
type MapRecordTransformationPipe struct {
// contains filtered or unexported fields
}
type MetricsSpec ¶
type OutputChannel ¶
type OutputChannel struct {
// contains filtered or unexported fields
}
type Overpunch ¶
OverpunchNumber Overpunch allows representation of positive and negatives in a numeric field without having to expand the size of the field for a plus or minus sign. The overpunch character replaces the right-most character in a numeric field. In a six-digit field, 99.95 would be represented as 00999E. In the same field, a negative 99.95 would be represented as 00999N. Lack of an overpunch character implies a positive amount - in other words, a positive 99.95 could be sent on the file as 009995.
Number,Positive Overpunch,Negative Overpunch 0, {, } 1, A, J 2, B, K 3, C, L 4, D, M 5, E, N 6, F, O 7, G, P 8, H, Q 9, I, R
type PartitionWriterTransformationPipe ¶
type PartitionWriterTransformationPipe struct {
// contains filtered or unexported fields
}
type PathSubstitution ¶
type PeerRecordMessage ¶
Message used to send records to remote peer Sender is subClusterNodeId of client peer
type PeerServer ¶
type PeerServer struct {
// contains filtered or unexported fields
}
The server handling incomming requests from peer nodes
func (*PeerServer) ClientDone ¶
func (ps *PeerServer) ClientDone(args *PeerRecordMessage, reply *PeerReply) error
func (*PeerServer) ClientReady ¶
func (ps *PeerServer) ClientReady(args *PeerRecordMessage, reply *PeerReply) error
func (*PeerServer) PushRecords ¶
func (ps *PeerServer) PushRecords(args *PeerRecordMessage, reply *PeerReply) error
type PipeSpec ¶
type PipeSpec struct { // Type range: fan_out, splitter, distribute_data Type string `json:"type"` Input string `json:"input"` Column *string `json:"column"` // splitter column Apply []TransformationSpec `json:"apply"` }
type PipeTransformationEvaluator ¶
type PipeTransformationEvaluator interface {
// contains filtered or unexported methods
}
type S3DeviceWriter ¶
type S3DeviceWriter struct {
// contains filtered or unexported fields
}
func (*S3DeviceWriter) WritePartition ¶
func (ctx *S3DeviceWriter) WritePartition(s3WriterResultCh chan<- ComputePipesResult)
type SaveResultsContext ¶
type SaveResultsContext struct { JetsPartition string NodeId int SessionId string // contains filtered or unexported fields }
func NewSaveResultsContext ¶
func NewSaveResultsContext(dbpool *pgxpool.Pool) *SaveResultsContext
func (*SaveResultsContext) Save ¶
func (ctx *SaveResultsContext) Save(category string, result *ComputePipesResult)
type SubStringFunctionArg ¶
func ParseSubStringFunctionArgument ¶
func ParseSubStringFunctionArgument(rawArg *string, functionName string, cache map[string]interface{}) (*SubStringFunctionArg, error)
type TableColumnSpec ¶
type TableSpec ¶
type TableSpec struct { Key string `json:"key"` Name string `json:"name"` Columns []TableColumnSpec `json:"columns"` }
type TransformationColumnEvaluator ¶
type TransformationColumnEvaluator interface {
// contains filtered or unexported methods
}
type TransformationColumnSpec ¶
type TransformationColumnSpec struct { // Type range: select, value, eval, map, hash // (applicable to aggregate) count, distinct_count, sum, min, // case, map_reduce Name string `json:"name"` Type string `json:"type"` Expr *string `json:"expr"` MapExpr *MapExpression `json:"map_expr"` EvalExpr *ExpressionNode `json:"eval_expr"` HashExpr *HashExpression `json:"hash_expr"` Where *ExpressionNode `json:"where"` CaseExpr []CaseExpression `json:"case_expr"` ElseExpr *ExpressionNode `json:"else_expr"` MapOn *string `json:"map_on"` ApplyMap *[]TransformationColumnSpec `json:"apply_map"` ApplyReduce *[]TransformationColumnSpec `json:"apply_reduce"` }
type TransformationSpec ¶
type TransformationSpec struct { // Type range: map_record, aggregate, partition_writer Type string `json:"type"` PartitionSize *int `json:"partition_size"` FilePathSubstitutions *[]PathSubstitution `json:"file_path_substitutions"` Columns []TransformationColumnSpec `json:"columns"` Output string `json:"output"` }
type WriteTableSource ¶
type WriteTableSource struct {
// contains filtered or unexported fields
}
func NewWriteTableSource ¶
func NewWriteTableSource(source <-chan []interface{}, tableIdentifier pgx.Identifier, columns []string) *WriteTableSource
func (*WriteTableSource) Err ¶
func (wt *WriteTableSource) Err() error
func (*WriteTableSource) Next ¶
func (wt *WriteTableSource) Next() bool
pgx.CopyFromSource interface
func (*WriteTableSource) Values ¶
func (wt *WriteTableSource) Values() ([]interface{}, error)
func (*WriteTableSource) WriteTable ¶
func (wt *WriteTableSource) WriteTable(dbpool *pgxpool.Pool, done chan struct{}, copy2DbResultCh chan<- ComputePipesResult)
Methods for writing output entity records to postgres
Source Files ¶
- cleansing_functions.go
- cluster_map_server.go
- column_evaluators.go
- column_evaluators_aggregate.go
- column_evaluators_case_expr.go
- column_evaluators_hash.go
- column_evaluators_map_reduce.go
- column_evaluators_mapping.go
- compute_pipes.go
- compute_pipes_results.go
- datacleansing.go
- datetime_parser.go
- eval_expression.go
- eval_operators.go
- eval_operators_other.go
- pipe_executor_cluster_map.go
- pipe_executor_fan_out.go
- pipe_executor_splitter.go
- pipe_transformation_aggregate.go
- pipe_transformation_map_record.go
- pipe_transformation_partition_writer.go
- pipes_model.go
- pipes_runtime.go
- runtime_metrics.go
- s3_device_writter.go
- write_table.go