common

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReceiverTableID             = 1
	SequenceGeneratorTableID    = 2
	LocksTableID                = 3
	LastLogIndexReceivedTableID = 4
	SyncTableID                 = 5
	SchemaTableID               = 6 // SchemaTableID stores table schemas
	ProtobufTableID             = 7
	IndexTableID                = 8
	ToDeleteTableID             = 9
	LocalConfigTableID          = 10
	ForwardDedupTableID         = 11
	ShardLeaderTableID          = 12
	DummyTableID                = 13
	UserTableIDBase             = 1000
)

System table ids

View Source
const SignBitMask uint64 = 1 << 63

Variables

View Source
var (
	TinyIntColumnType   = ColumnType{Type: TypeTinyInt}
	IntColumnType       = ColumnType{Type: TypeInt}
	BigIntColumnType    = ColumnType{Type: TypeBigInt}
	DoubleColumnType    = ColumnType{Type: TypeDouble}
	VarcharColumnType   = ColumnType{Type: TypeVarchar}
	TimestampColumnType = ColumnType{Type: TypeTimestamp}
	UnknownColumnType   = ColumnType{Type: TypeUnknown}

	// ColumnTypesByType allows lookup of non-parameterised ColumnType by Type.
	ColumnTypesByType = map[Type]ColumnType{
		TypeTinyInt: TinyIntColumnType,
		TypeInt:     IntColumnType,
		TypeBigInt:  BigIntColumnType,
		TypeDouble:  DoubleColumnType,
		TypeVarchar: VarcharColumnType,
	}
)
View Source
var (
	KafkaEncodingUnknown     = KafkaEncoding{Encoding: EncodingUnknown}
	KafkaEncodingRaw         = KafkaEncoding{Encoding: EncodingRaw}
	KafkaEncodingCSV         = KafkaEncoding{Encoding: EncodingCSV}
	KafkaEncodingJSON        = KafkaEncoding{Encoding: EncodingJSON}
	KafkaEncodingFloat32BE   = KafkaEncoding{Encoding: EncodingFloat32BE}
	KafkaEncodingFloat64BE   = KafkaEncoding{Encoding: EncodingFloat64BE}
	KafkaEncodingInt32BE     = KafkaEncoding{Encoding: EncodingInt32BE}
	KafkaEncodingInt64BE     = KafkaEncoding{Encoding: EncodingInt64BE}
	KafkaEncodingInt16BE     = KafkaEncoding{Encoding: EncodingInt16BE}
	KafkaEncodingStringBytes = KafkaEncoding{Encoding: EncodingStringBytes}
)
View Source
var IsLittleEndian = isLittleEndian()

Functions

func AppendFloat32ToBufferBE

func AppendFloat32ToBufferBE(buffer []byte, value float32) []byte

func AppendFloat64ToBufferBE

func AppendFloat64ToBufferBE(buffer []byte, value float64) []byte

func AppendFloat64ToBufferLE

func AppendFloat64ToBufferLE(buffer []byte, value float64) []byte

func AppendStringToBufferLE

func AppendStringToBufferLE(buffer []byte, value string) []byte

func AppendTimestampToBuffer

func AppendTimestampToBuffer(buffer []byte, ts Timestamp) ([]byte, error)

func AppendUint16ToBufferBE

func AppendUint16ToBufferBE(buffer []byte, v uint16) []byte

func AppendUint32ToBufferBE

func AppendUint32ToBufferBE(buffer []byte, v uint32) []byte

func AppendUint32ToBufferLE

func AppendUint32ToBufferLE(buffer []byte, v uint32) []byte

func AppendUint64ToBufferBE

func AppendUint64ToBufferBE(buffer []byte, v uint64) []byte

func AppendUint64ToBufferLE

func AppendUint64ToBufferLE(buffer []byte, v uint64) []byte

func ByteSliceToStringZeroCopy

func ByteSliceToStringZeroCopy(buffer []byte) string

func ConvertPranaTypeToTiDBType

func ConvertPranaTypeToTiDBType(columnType ColumnType) *types.FieldType

func CopyByteSlice

func CopyByteSlice(buff []byte) []byte

func DecodeIndexOrPKCol

func DecodeIndexOrPKCol(buffer []byte, offset int, colType ColumnType, outputColIndex int, pkCol bool, rows *Rows) (int, error)

func DecodeIndexOrPKCols

func DecodeIndexOrPKCols(buffer []byte, offset int, pk bool, indexOrPKColTypes []ColumnType, indexOrPKOutputCols []int, rows *Rows) (int, error)

func DecodeRow

func DecodeRow(buffer []byte, colTypes []ColumnType, rows *Rows) error

func DecodeRowWithIgnoredCols

func DecodeRowWithIgnoredCols(buffer []byte, colTypes []ColumnType, includeCol []bool, rows *Rows) error

func DoDumpStacks

func DoDumpStacks(filterSpam bool)

func DumpDataKey

func DumpDataKey(bytes []byte) string

func DumpStacks

func DumpStacks()

DumpStacks dumps stacks for all goroutines to stdout, useful when debugging

func EncodeIndexKeyCols

func EncodeIndexKeyCols(row *Row, colIndexes []int, colTypes []ColumnType, buffer []byte) ([]byte, error)

func EncodeKey

func EncodeKey(key Key, colTypes []ColumnType, keyColIndexes []int, buffer []byte) ([]byte, error)

func EncodeKeyCol

func EncodeKeyCol(row *Row, colIndex int, colType ColumnType, buffer []byte) ([]byte, error)

func EncodeKeyCols

func EncodeKeyCols(row *Row, colIndexes []int, colTypes []ColumnType, buffer []byte) ([]byte, error)

func EncodeKeyElement

func EncodeKeyElement(value interface{}, colType ColumnType, buffer []byte) ([]byte, error)

func EncodeRow

func EncodeRow(row *Row, colTypes []ColumnType, buffer []byte) ([]byte, error)

func GetOrDefaultIntProperty added in v0.1.6

func GetOrDefaultIntProperty(propName string, props map[string]string, def int) (int, error)

func IncrementBytesBigEndian

func IncrementBytesBigEndian(bytes []byte) []byte

IncrementBytesBigEndian returns a new byte slice which is 1 larger than the provided slice when represented in big endian layout, but without changing the key length

func InvokeCloser

func InvokeCloser(closer io.Closer)

func KeyDecodeFloat64

func KeyDecodeFloat64(buffer []byte, offset int) (float64, int)

func KeyDecodeInt64 added in v0.1.1

func KeyDecodeInt64(buffer []byte, offset int) (int64, int)

func KeyDecodeString

func KeyDecodeString(buffer []byte, offset int) (string, int, error)

func KeyEncodeDecimal

func KeyEncodeDecimal(buffer []byte, val Decimal, precision int, scale int) ([]byte, error)

func KeyEncodeFloat64

func KeyEncodeFloat64(buffer []byte, val float64) []byte

func KeyEncodeInt64

func KeyEncodeInt64(buffer []byte, val int64) []byte

func KeyEncodeString

func KeyEncodeString(buff []byte, val string) []byte

func KeyEncodeTimestamp

func KeyEncodeTimestamp(buffer []byte, val Timestamp) ([]byte, error)

func LogInternalError added in v0.1.1

func LogInternalError(err error) errors.PranaError

func NanoTime added in v0.1.6

func NanoTime() uint64

NanoTime returns the current time in nanoseconds from a monotonic clock. The time returned is based on some arbitrary platform-specific point in the past. The time returned is guaranteed to increase monotonically at a constant rate, unlike time.Now() from the Go standard library, which may slow down, speed up, jump forward or backward, due to NTP activity or leap seconds.

func PanicHandler

func PanicHandler()

func ReadFloat32FromBufferBE

func ReadFloat32FromBufferBE(buffer []byte, offset int) (val float32, off int)

func ReadFloat64FromBufferBE

func ReadFloat64FromBufferBE(buffer []byte, offset int) (val float64, off int)

func ReadFloat64FromBufferLE

func ReadFloat64FromBufferLE(buffer []byte, offset int) (val float64, off int)

func ReadInt64FromBufferLE

func ReadInt64FromBufferLE(buffer []byte, offset int) (int64, int)

func ReadStringFromBufferLE

func ReadStringFromBufferLE(buffer []byte, offset int) (val string, off int)

func ReadUint16FromBufferBE

func ReadUint16FromBufferBE(buffer []byte, offset int) (uint16, int)

func ReadUint32FromBufferBE

func ReadUint32FromBufferBE(buffer []byte, offset int) (uint32, int)

func ReadUint32FromBufferLE

func ReadUint32FromBufferLE(buffer []byte, offset int) (uint32, int)

func ReadUint64FromBufferBE

func ReadUint64FromBufferBE(buffer []byte, offset int) (uint64, int)

func ReadUint64FromBufferLE

func ReadUint64FromBufferLE(buffer []byte, offset int) (uint64, int)

func RoundTimestampToFSP

func RoundTimestampToFSP(ts *Timestamp, fsp int8) error

func StringToByteSliceZeroCopy

func StringToByteSliceZeroCopy(str string) []byte

func TiDBValueToPranaValue

func TiDBValueToPranaValue(tidbValue interface{}) interface{}

Types

type AtomicBool

type AtomicBool struct {
	// contains filtered or unexported fields
}

func (*AtomicBool) CompareAndSet

func (a *AtomicBool) CompareAndSet(expected bool, val bool) bool

func (*AtomicBool) Get

func (a *AtomicBool) Get() bool

func (*AtomicBool) Set

func (a *AtomicBool) Set(val bool)

type ByteSliceMap

type ByteSliceMap struct {
	TheMap map[string][]byte
}

func NewByteSliceMap

func NewByteSliceMap() *ByteSliceMap

func (*ByteSliceMap) Get

func (b *ByteSliceMap) Get(key []byte) (v []byte, ok bool)

func (*ByteSliceMap) Put

func (b *ByteSliceMap) Put(key []byte, value []byte)

type ColumnInfo

type ColumnInfo struct {
	Name string
	ColumnType
}

type ColumnType

type ColumnType struct {
	Type         Type
	DecPrecision int
	DecScale     int
	FSP          int8 // fractional seconds precision for time types
}

func ConvertTiDBTypeToPranaType

func ConvertTiDBTypeToPranaType(columnType *types.FieldType) (ColumnType, error)

func NewDecimalColumnType

func NewDecimalColumnType(precision int, scale int) ColumnType

func NewTimestampColumnType

func NewTimestampColumnType(fsp int8) ColumnType

func (*ColumnType) String

func (t *ColumnType) String() string

type Decimal

type Decimal struct {
	// contains filtered or unexported fields
}

func KeyDecodeDecimal added in v0.1.1

func KeyDecodeDecimal(buffer []byte, offset int, precision int, scale int) (Decimal, int, error)

func NewDecFromFloat64

func NewDecFromFloat64(f float64) (*Decimal, error)

func NewDecFromInt64

func NewDecFromInt64(i int64) *Decimal

func NewDecFromString

func NewDecFromString(s string) (*Decimal, error)

func NewDecFromUint64

func NewDecFromUint64(i uint64) *Decimal

func NewDecimal

func NewDecimal(dec *types.MyDecimal) *Decimal

func ReadDecimalFromBuffer

func ReadDecimalFromBuffer(buffer []byte, offset int, precision int, scale int) (val Decimal, off int, err error)

func ZeroDecimal

func ZeroDecimal() *Decimal

func (*Decimal) Add

func (d *Decimal) Add(other *Decimal) (*Decimal, error)

func (*Decimal) CompareTo

func (d *Decimal) CompareTo(dec *Decimal) int

func (*Decimal) Decode

func (d *Decimal) Decode(buffer []byte, offset int, precision int, scale int) (int, error)

func (*Decimal) Encode

func (d *Decimal) Encode(buffer []byte, precision int, scale int) ([]byte, error)

func (*Decimal) String

func (d *Decimal) String() string

func (*Decimal) Subtract

func (d *Decimal) Subtract(other *Decimal) (*Decimal, error)

type Encoding

type Encoding int
const (
	EncodingUnknown  Encoding = iota
	EncodingRaw               // No encoding - value retained as []byte
	EncodingCSV               // Comma separated
	EncodingJSON              // JSON
	EncodingProtobuf          // Protobuf
	EncodingFloat32BE
	EncodingFloat64BE
	EncodingInt32BE
	EncodingInt64BE
	EncodingInt16BE
	EncodingStringBytes
)

func EncodingFormatFromString

func EncodingFormatFromString(str string) Encoding

type Expression

type Expression struct {
	// contains filtered or unexported fields
}

func NewColumnExpression

func NewColumnExpression(colIndex int, colType ColumnType) *Expression

func NewConstantDouble

func NewConstantDouble(colType ColumnType, val float64) *Expression

func NewConstantInt

func NewConstantInt(colType ColumnType, val int64) *Expression

func NewConstantVarchar

func NewConstantVarchar(colType ColumnType, val string) *Expression

func NewExpression

func NewExpression(exp expression.Expression, ctx sessionctx.Context) *Expression

func NewScalarFunctionExpression

func NewScalarFunctionExpression(colType ColumnType, funcName string, args ...*Expression) (*Expression, error)

func (*Expression) EvalBoolean

func (e *Expression) EvalBoolean(row *Row) (bool, bool, error)

func (*Expression) EvalDecimal

func (e *Expression) EvalDecimal(row *Row) (Decimal, bool, error)

func (*Expression) EvalFloat64

func (e *Expression) EvalFloat64(row *Row) (val float64, null bool, err error)

func (*Expression) EvalInt64

func (e *Expression) EvalInt64(row *Row) (val int64, null bool, err error)

func (*Expression) EvalString

func (e *Expression) EvalString(row *Row) (val string, null bool, err error)

func (*Expression) EvalTimestamp

func (e *Expression) EvalTimestamp(row *Row) (Timestamp, bool, error)

func (*Expression) GetColumnIndex

func (e *Expression) GetColumnIndex() (int, bool)

func (*Expression) ReturnType

func (e *Expression) ReturnType(colTypes []ColumnType) (ColumnType, error)

type IndexInfo

type IndexInfo struct {
	SchemaName string
	ID         uint64
	TableName  string
	Name       string
	IndexCols  []int
	// contains filtered or unexported fields
}

func NewIndexInfo added in v0.1.6

func NewIndexInfo(schemaName string, id uint64, tableName string, name string, indexCols []int) *IndexInfo

func (*IndexInfo) CalcColsSet added in v0.1.6

func (i *IndexInfo) CalcColsSet()

func (*IndexInfo) ContainsColIndex

func (i *IndexInfo) ContainsColIndex(colIndex int) bool

type InternalTableInfo

type InternalTableInfo struct {
	*TableInfo
	// For aggregation tables that are implicit tables of materialized views with group by clauses.
	MaterializedViewName string
}

type KafkaEncoding

type KafkaEncoding struct {
	Encoding   Encoding
	SchemaName string
}

func KafkaEncodingFromString

func KafkaEncodingFromString(str string) KafkaEncoding

KafkaEncodingFromString decodes an encoding and an optional schema name from the string, in the format "<encoding>[:<schema>]". For example, for a "com.squareup.cash.Payment" protobuf, encoding should be specified as "protobuf:com.squareup.cash.Payment"

type Key

type Key []interface{}

type MaterializedViewInfo

type MaterializedViewInfo struct {
	*TableInfo
	OriginInfo *MaterializedViewOriginInfo
	Query      string
}

func (*MaterializedViewInfo) String

func (i *MaterializedViewInfo) String() string

type MaterializedViewOriginInfo added in v0.1.1

type MaterializedViewOriginInfo struct {
	InitialState string
}

type MetaTableInfo

type MetaTableInfo struct {
	*TableInfo
}

MetaTableInfo describes a system table that is neither a source or mv.

func (*MetaTableInfo) String

func (i *MetaTableInfo) String() string

type PreAllocatedSeqGenerator

type PreAllocatedSeqGenerator struct {
	// contains filtered or unexported fields
}

PreAllocatedSeqGenerator is a sequence generator that enumerates a fixed, already obtained sequence IDs. We need to reserve the table sequences required for the DDL statement *before* we broadcast the DDL across the cluster, and those same table sequence values have to be used on every node for consistency.

func NewPreallocSeqGen

func NewPreallocSeqGen(seq []uint64) *PreAllocatedSeqGenerator

func (*PreAllocatedSeqGenerator) GenerateSequence

func (p *PreAllocatedSeqGenerator) GenerateSequence() uint64

type Row

type Row struct {
	// contains filtered or unexported fields
}

func (*Row) ColCount

func (r *Row) ColCount() int

func (*Row) ColumnTypes

func (r *Row) ColumnTypes() []ColumnType

func (*Row) GetByte

func (r *Row) GetByte(colIndex int) byte

func (*Row) GetDecimal

func (r *Row) GetDecimal(colIndex int) Decimal

func (*Row) GetFloat64

func (r *Row) GetFloat64(colIndex int) float64

func (*Row) GetInt64

func (r *Row) GetInt64(colIndex int) int64

func (*Row) GetString

func (r *Row) GetString(colIndex int) string

func (*Row) GetTimestamp

func (r *Row) GetTimestamp(colIndex int) Timestamp

func (*Row) IsNull

func (r *Row) IsNull(colIndex int) bool

func (*Row) String

func (r *Row) String() string

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

func NewRows

func NewRows(columnTypes []ColumnType, capacity int) *Rows

func (*Rows) AppendAll

func (r *Rows) AppendAll(other *Rows)

func (*Rows) AppendDecimalToColumn

func (r *Rows) AppendDecimalToColumn(colIndex int, val Decimal)

func (*Rows) AppendFloat64ToColumn

func (r *Rows) AppendFloat64ToColumn(colIndex int, val float64)

func (*Rows) AppendInt64ToColumn

func (r *Rows) AppendInt64ToColumn(colIndex int, val int64)

func (*Rows) AppendNullToColumn

func (r *Rows) AppendNullToColumn(colIndex int)

func (*Rows) AppendRow

func (r *Rows) AppendRow(row Row)

func (*Rows) AppendStringToColumn

func (r *Rows) AppendStringToColumn(colIndex int, val string)

func (*Rows) AppendTimestampToColumn

func (r *Rows) AppendTimestampToColumn(colIndex int, val Timestamp)

func (*Rows) ColumnTypes

func (r *Rows) ColumnTypes() []ColumnType

func (*Rows) Deserialize

func (r *Rows) Deserialize(buff []byte)

func (*Rows) GetRow

func (r *Rows) GetRow(rowIndex int) Row

func (*Rows) RowCount

func (r *Rows) RowCount() int

func (*Rows) Serialize

func (r *Rows) Serialize() []byte

func (*Rows) String

func (r *Rows) String() string

type RowsFactory

type RowsFactory struct {
	ColumnTypes []ColumnType
	// contains filtered or unexported fields
}

RowsFactory caches the field types so we don't have to calculate them each time we create a new Rows

func NewRowsFactory

func NewRowsFactory(columnTypes []ColumnType) *RowsFactory

func (*RowsFactory) NewRows

func (rf *RowsFactory) NewRows(capacity int) *Rows

type Schema

type Schema struct {
	Name string
	// contains filtered or unexported fields
}

func NewSchema

func NewSchema(name string) *Schema

func (*Schema) DeleteIndex

func (s *Schema) DeleteIndex(tableName string, indexName string) error

func (*Schema) DeleteTable

func (s *Schema) DeleteTable(name string)

func (*Schema) Equal

func (s *Schema) Equal(other *Schema) bool

func (*Schema) GetAllTableInfos

func (s *Schema) GetAllTableInfos() map[string]*TableInfo

func (*Schema) GetTable

func (s *Schema) GetTable(name string) (Table, bool)

func (*Schema) LenTables

func (s *Schema) LenTables() int

func (*Schema) PutIndex

func (s *Schema) PutIndex(indexInfo *IndexInfo) error

func (*Schema) PutTable

func (s *Schema) PutTable(name string, table Table)

type SeqGenerator

type SeqGenerator interface {
	GenerateSequence() uint64
}

type SimpleQueryExec

type SimpleQueryExec interface {
	ExecuteQuery(schemaName string, query string) (rows *Rows, err error)
}

type SinkInfo

type SinkInfo struct {
	Name      string
	Query     string
	TopicInfo *SourceOriginInfo
}

type SourceInfo

type SourceInfo struct {
	*TableInfo
	OriginInfo *SourceOriginInfo
}

func (*SourceInfo) String

func (i *SourceInfo) String() string

type SourceOriginInfo added in v0.1.1

type SourceOriginInfo struct {
	BrokerName      string
	TopicName       string
	KeyEncoding     KafkaEncoding
	ValueEncoding   KafkaEncoding
	HeaderEncoding  KafkaEncoding
	ColSelectors    []selector.ColumnSelector
	Properties      map[string]string
	IngestFilter    string
	InitialState    string
	ConsumerGroupID string
	Transient       bool
}

type SpinLock added in v0.1.6

type SpinLock struct {
	// contains filtered or unexported fields
}

SpinLock is a spinlock implementation.

A SpinLock must not be copied after first use.

func (*SpinLock) Lock added in v0.1.6

func (l *SpinLock) Lock()

Lock locks l. If the lock is already in use, the calling goroutine blocks until the locker is available.

func (*SpinLock) Unlock added in v0.1.6

func (l *SpinLock) Unlock()

Unlock unlocks l.

type Table

type Table interface {
	GetTableInfo() *TableInfo
}

type TableInfo

type TableInfo struct {
	ID             uint64
	SchemaName     string
	Name           string
	PrimaryKeyCols []int
	ColumnNames    []string
	ColumnTypes    []ColumnType
	IndexInfos     map[string]*IndexInfo
	ColsVisible    []bool
	Internal       bool
	// contains filtered or unexported fields
}

func NewTableInfo added in v0.1.6

func NewTableInfo(id uint64, schemaName string, name string, pkCols []int, colNames []string, colTypes []ColumnType) *TableInfo

func (*TableInfo) CalcPKColsSet added in v0.1.6

func (t *TableInfo) CalcPKColsSet()

func (*TableInfo) GetTableInfo

func (t *TableInfo) GetTableInfo() *TableInfo

func (*TableInfo) IsPrimaryKeyCol

func (t *TableInfo) IsPrimaryKeyCol(colIndex int) bool

func (*TableInfo) String

func (t *TableInfo) String() string

type Timestamp

type Timestamp = types.Time

func KeyDecodeTimestamp added in v0.1.1

func KeyDecodeTimestamp(buffer []byte, offset int, fsp int8) (Timestamp, int, error)

func NewTimestampFromGoTime

func NewTimestampFromGoTime(t time.Time) Timestamp

func NewTimestampFromString

func NewTimestampFromString(str string) Timestamp

NewTimestampFromString parses a Timestamp from a string in MySQL datetime format.

func NewTimestampFromUnixEpochMillis

func NewTimestampFromUnixEpochMillis(v int64) Timestamp

func ReadTimestampFromBuffer

func ReadTimestampFromBuffer(buffer []byte, offset int, fsp int8) (val Timestamp, off int, err error)

func ReadTimestampFromBufferBE

func ReadTimestampFromBufferBE(buffer []byte, offset int, fsp int8) (val Timestamp, off int, err error)

type Type

type Type int
const (
	TypeUnknown Type = iota
	TypeTinyInt
	TypeInt
	TypeBigInt
	TypeDouble
	TypeDecimal
	TypeVarchar
	TypeTimestamp
)

func (*Type) Capture

func (t *Type) Capture(tokens []string) error

func (Type) String

func (t Type) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL