schema

package
v0.0.0-...-8aeb8a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JitsuEnvelopParameter    = "JITSU_ENVELOP"
	JitsuUserRecognizedEvent = "JITSU_UR_EVENT"
)
View Source
const SqlTypeKeyword = "__sql_type_"

Variables

View Source
var (
	JSONMarshallerInstance = JSONMarshaller{}
	CSVMarshallerInstance  = CSVMarshaller{}
)
View Source
var ErrSkipObject = errors.New("Transform or table name filter marked object to be skipped. This object will be skipped.")

Functions

func ClearTypeMetaFields

func ClearTypeMetaFields(object map[string]interface{})

func ConvertOldMappings

func ConvertOldMappings(mappingType config.FieldMappingType, oldStyleMappings []string) (*config.Mapping, error)

ConvertOldMappings converts old style mappings into new style return new style mappings or error

func IsLetterOrNumber

func IsLetterOrNumber(symbol int32) bool

IsLetterOrNumber returns true if input symbol is:

A - Z: 65-90
a - z: 97-122

func NewFieldMapper

func NewFieldMapper(newStyleMappings *config.Mapping) (events.Mapper, typing.SQLTypes, error)

NewFieldMapper return FieldMapper, sql typecast and err

func Reformat

func Reformat(key string) string

Reformat makes all keys to lower case and replaces all special symbols with '_'

Types

type BatchHeader

type BatchHeader struct {
	TableName string
	Fields    Fields
	Partition DatePartition
}

BatchHeader is the schema result of parsing JSON objects

func (*BatchHeader) Exists

func (bh *BatchHeader) Exists() bool

Exists returns true if there is at least one field

type CSVMarshaller

type CSVMarshaller struct {
}

func (CSVMarshaller) Marshal

func (cm CSVMarshaller) Marshal(fields []string, object map[string]interface{}, buf *bytes.Buffer) error

Marshal marshals input object as csv values string with delimiter

func (CSVMarshaller) NeedHeader

func (cm CSVMarshaller) NeedHeader() bool

type DatePartition

type DatePartition struct {
	Field       string
	Granularity Granularity
}

type DummyFlattener

type DummyFlattener struct {
}

func NewDummyFlattener

func NewDummyFlattener() *DummyFlattener

func (*DummyFlattener) FlattenObject

func (df *DummyFlattener) FlattenObject(json map[string]interface{}) (map[string]interface{}, error)

FlattenObject return the same json object

type DummyMapper

type DummyMapper struct{}

func (DummyMapper) Map

func (DummyMapper) Map(object map[string]interface{}) (map[string]interface{}, error)

Map returns object as is

type DummyTypeResolver

type DummyTypeResolver struct {
}

DummyTypeResolver doesn't do anything

func NewDummyTypeResolver

func NewDummyTypeResolver() *DummyTypeResolver

NewDummyTypeResolver return DummyTypeResolver

func (*DummyTypeResolver) Resolve

func (dtr *DummyTypeResolver) Resolve(object map[string]interface{}) (Fields, error)

Resolve return one dummy field and Fields becomes not empty. (it is used in Facebook destination)

type Envelope

type Envelope struct {
	Header        *BatchHeader
	Event         events.Event
	OriginalEvent string
}

type Field

type Field struct {
	// contains filtered or unexported fields
}

Field is a data type holder with occurrences

func NewField

func NewField(t typing.DataType) Field

NewField returns Field instance

func NewFieldWithSQLType

func NewFieldWithSQLType(t typing.DataType, sqlTypeSuggestion *SQLTypeSuggestion) Field

NewFieldWithSQLType returns Field instance with configured suggested sql types

func (Field) GetSuggestedSQLType

func (f Field) GetSuggestedSQLType(destinationType string) (typing.SQLColumn, bool)

GetSuggestedSQLType returns suggested SQL type if configured is used in case when source overrides destination type

func (Field) GetType

func (f Field) GetType() typing.DataType

GetType get field type based on occurrence in one file lazily get common ancestor type (typing.GetCommonAncestorType)

func (*Field) Merge

func (f *Field) Merge(anotherField *Field)

Merge adds new type occurrences wipes field.type if new type was added

type FieldMapper

type FieldMapper struct {
	// contains filtered or unexported fields
}

func (*FieldMapper) Map

func (fm *FieldMapper) Map(object map[string]interface{}) (map[string]interface{}, error)

Map changes input object and applies deletes and mappings

type Fields

type Fields map[string]Field

func (Fields) Add

func (f Fields) Add(other Fields)

Add all new fields from other to current instance if field exists - skip it

func (Fields) Clone

func (f Fields) Clone() Fields

Clone copies fields into a new Fields object

func (Fields) Header

func (f Fields) Header() (header []string)

Header return fields names as a string slice

func (Fields) Merge

func (f Fields) Merge(other Fields)

Merge adds all fields from other to current instance or merge if exists

func (Fields) OverrideTypes

func (f Fields) OverrideTypes(other Fields)

OverrideTypes check if field exists in other then put its type

type Flattener

type Flattener interface {
	FlattenObject(map[string]interface{}) (map[string]interface{}, error)
}

func NewFlattener

func NewFlattener() Flattener

type FlattenerImpl

type FlattenerImpl struct {
	// contains filtered or unexported fields
}

func (*FlattenerImpl) FlattenObject

func (f *FlattenerImpl) FlattenObject(json map[string]interface{}) (map[string]interface{}, error)

FlattenObject flatten object e.g. from {"key1":{"key2":123}} to {"key1_key2":123} from {"$key1":1} to {"_key1":1} from {"(key1)":1} to {"_key1_":1}

type Granularity

type Granularity string

Granularity is a granularity of TimeInterval

const (
	HOUR    Granularity = "HOUR"
	DAY     Granularity = "DAY"
	WEEK    Granularity = "WEEK"
	MONTH   Granularity = "MONTH"
	QUARTER Granularity = "QUARTER"
	YEAR    Granularity = "YEAR"
	ALL     Granularity = "ALL"
)

func (Granularity) Format

func (g Granularity) Format(t time.Time) string

Format returns formatted string value representation

func (Granularity) Lower

func (g Granularity) Lower(t time.Time) time.Time

Lower returns the lower value of interval

func (Granularity) String

func (g Granularity) String() string

String returns string value representation

func (Granularity) Upper

func (g Granularity) Upper(t time.Time) time.Time

Upper returns the upper value of interval

type JSLogListener

type JSLogListener struct {
	// contains filtered or unexported fields
}

func (*JSLogListener) Data

func (j *JSLogListener) Data(data []byte)

func (*JSLogListener) Log

func (j *JSLogListener) Log(level, message string)

func (*JSLogListener) Timeout

func (j *JSLogListener) Timeout() time.Duration

type JSONMarshaller

type JSONMarshaller struct {
}

func (JSONMarshaller) Marshal

func (jm JSONMarshaller) Marshal(fields []string, object map[string]interface{}, buf *bytes.Buffer) error

Marshal object as json

func (JSONMarshaller) NeedHeader

func (jm JSONMarshaller) NeedHeader() bool

type MappingRule

type MappingRule struct {
	// contains filtered or unexported fields
}

type Marshaller

type Marshaller interface {
	Marshal([]string, map[string]interface{}, *bytes.Buffer) error
	NeedHeader() bool
}

type ParquetMarshaller

type ParquetMarshaller struct {
	GoroutinesCount int64
	UseGZIP         bool
}

func (*ParquetMarshaller) Marshal

func (pm *ParquetMarshaller) Marshal(bh *BatchHeader, data []map[string]interface{}) ([]byte, error)

type ProcessedFile

type ProcessedFile struct {
	FileName    string
	BatchHeader *BatchHeader

	RecognitionPayload bool
	// contains filtered or unexported fields
}

ProcessedFile collect data in payload and return it in two formats

func (*ProcessedFile) GetEventsPerSrc

func (pf *ProcessedFile) GetEventsPerSrc() map[string]int

GetEventsPerSrc returns events quantity per src

func (*ProcessedFile) GetOriginalRawEvents

func (pf *ProcessedFile) GetOriginalRawEvents() []string

GetOriginalRawEvents return payload as is

func (*ProcessedFile) GetPayload

func (pf *ProcessedFile) GetPayload() []map[string]interface{}

GetPayload return payload as is

func (*ProcessedFile) GetPayloadBytes

func (pf *ProcessedFile) GetPayloadBytes(marshaller Marshaller) ([]byte, error)

GetPayloadBytes returns marshaling by marshaller func, joined with \n, bytes assume that payload can't be empty

func (*ProcessedFile) GetPayloadBytesWithHeader

func (pf *ProcessedFile) GetPayloadBytesWithHeader(marshaller Marshaller) ([]byte, []string, error)

GetPayloadBytesWithHeader returns marshaling by marshaller func, joined with \n, bytes assume that payload can't be empty

func (*ProcessedFile) GetPayloadLen

func (pf *ProcessedFile) GetPayloadLen() int

GetPayloadLen return count of rows(objects)

func (*ProcessedFile) GetPayloadUsingStronglyTypedMarshaller

func (pf *ProcessedFile) GetPayloadUsingStronglyTypedMarshaller(stm StronglyTypedMarshaller) ([]byte, error)

GetPayloadUsingStronglyTypedMarshaller returns bytes, containing marshalled payload StronglyTypedMarshaller needs to know payload schema (types of fields) to convert payload to byte slice

type Processor

type Processor struct {
	MappingStyle string
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(destinationID string, destinationConfig *config.DestinationConfig, isSQLType bool, tableNameFuncExpression string, fieldMapper events.Mapper, enrichmentRules []enrichment.Rule, flattener Flattener, typeResolver TypeResolver, uniqueIDField *identifiers.UniqueID, maxColumnNameLen int, mappingStyle string, userRecognitionEnabled bool) (*Processor, error)

func (*Processor) AddJavaScript

func (p *Processor) AddJavaScript(js string)

AddJavaScript loads javascript to transformation template's vm

func (*Processor) AddJavaScriptVariables

func (p *Processor) AddJavaScriptVariables(jsVar map[string]interface{})

AddJavaScriptVariables loads variable to globalThis object of transformation template's vm

func (*Processor) Close

func (p *Processor) Close()

func (*Processor) CloseJavaScriptTemplates

func (p *Processor) CloseJavaScriptTemplates()

func (*Processor) DestinationType

func (p *Processor) DestinationType() string

func (*Processor) GetTransformer

func (p *Processor) GetTransformer() templates.TemplateExecutor

func (*Processor) InitJavaScriptTemplates

func (p *Processor) InitJavaScriptTemplates() (err error)

InitJavaScriptTemplates loads destination transform javascript, inits context variables. and sets up template executor

func (*Processor) ProcessEvent

func (p *Processor) ProcessEvent(event map[string]interface{}, needCopyEvent bool) ([]Envelope, error)

ProcessEvent returns table representation, processed flatten object

func (*Processor) ProcessEvents

func (p *Processor) ProcessEvents(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (flatData map[string]*ProcessedFile, recognizedFlatData map[string]*ProcessedFile, failedEvents *events.FailedEvents, skippedEvents *events.SkippedEvents, err error)

ProcessEvents processes events objects returns array of processed objects per table like {"table1": []objects, "table2": []objects}, All failed events are moved to separate collection for sending to fallback

func (*Processor) ProcessPulledEvents

func (p *Processor) ProcessPulledEvents(tableName string, objects []map[string]interface{}) (map[string]*ProcessedFile, error)

ProcessPulledEvents processes events objects without applying mapping rules returns array of processed objects under tablename or error if at least 1 was occurred

func (*Processor) SetBuiltinTransformer

func (p *Processor) SetBuiltinTransformer(builtinTransformer templates.TemplateExecutor)

SetBuiltinTransformer javascript executor for builtin js code (e.g. npm destination)

func (*Processor) SetDefaultUserTransform

func (p *Processor) SetDefaultUserTransform(defaultUserTransform string)

SetDefaultUserTransform set default transformation code that will be used if no transform or mapping settings provided

type SQLTypeSuggestion

type SQLTypeSuggestion struct {
	// contains filtered or unexported fields
}

SQLTypeSuggestion is a struct which keeps certain SQL types per certain destination type

func NewSQLTypeSuggestion

func NewSQLTypeSuggestion(sqlType typing.SQLColumn, sqlTypePerDestination map[string]typing.SQLColumn) *SQLTypeSuggestion

NewSQLTypeSuggestion returns configured SQLTypeSuggestion instance

type StronglyTypedMarshaller

type StronglyTypedMarshaller interface {
	Marshal(bh *BatchHeader, data []map[string]interface{}) ([]byte, error)
}

func NewParquetMarshaller

func NewParquetMarshaller(useGZIP bool) StronglyTypedMarshaller

type TableNameExtractor

type TableNameExtractor struct {
	Expression string
	// contains filtered or unexported fields
}

TableNameExtractor extracts table name from every JSON event

func NewTableNameExtractor

func NewTableNameExtractor(tableNameExtractExpression string, funcMap template.FuncMap) (*TableNameExtractor, error)

NewTableNameExtractor returns configured TableNameExtractor

func (*TableNameExtractor) Close

func (tne *TableNameExtractor) Close()

func (*TableNameExtractor) Extract

func (tne *TableNameExtractor) Extract(object map[string]interface{}) (result string, err error)

Extract returns table name string. Extracts it from JSON event with text/template expression or javascript code. replaces all empty fields with 'null': {{.field1}} with object {'field2':2} => returns 'null'

func (*TableNameExtractor) Format

func (tne *TableNameExtractor) Format() string

type TypeResolver

type TypeResolver interface {
	Resolve(map[string]interface{}) (Fields, error)
}

TypeResolver resolves schema.Fields from input object

type TypeResolverImpl

type TypeResolverImpl struct {
}

TypeResolverImpl resolves types based on converter.go rules

func NewTypeResolver

func NewTypeResolver() *TypeResolverImpl

NewTypeResolver returns TypeResolverImpl

func (*TypeResolverImpl) Resolve

func (tr *TypeResolverImpl) Resolve(object map[string]interface{}) (Fields, error)

Resolve return Fields representation of input object apply default typecast and define column types reformat from json.Number into int64 or float64 and put back reformat from string with timestamp into time.Time and put back

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL