memstore

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2018 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package memstore has to put test factory here since otherwise we will have a memstore -> utils -> memstore import cycle.

Index

Constants

View Source
const (
	// IgnoreCount skip setting value counts.
	IgnoreCount common.ValueCountsUpdateMode = iota
	// IncrementCount only increment count.
	IncrementCount
	// CheckExistingCount also check existing count.
	CheckExistingCount
)
View Source
const BaseBatchID = int32(math.MinInt32)

BaseBatchID is the starting id of all batches.

View Source
const UpsertHeader uint32 = 0xADDAFEED

UpsertHeader is the magic header written into the beginning of each redo log file.

View Source
const VectorPartyHeader uint32 = 0xFADEFACE

VectorPartyHeader is the magic header written into the beginning of each vector party file.

Variables

This section is empty.

Functions

func CalculateVectorBytes

func CalculateVectorBytes(dataType common.DataType, size int) int

CalculateVectorBytes calculates bytes the vector will occupy given data type and size without actual allocation.

func CalculateVectorPartyBytes

func CalculateVectorPartyBytes(dataType common.DataType, size int, hasNulls bool, hasCounts bool) int

CalculateVectorPartyBytes calculates bytes the vector party will occupy. Note: data type supported in go memory will report memory usage when value is actually set, therefore report 0 here

func GetPrimaryKeyBytes

func GetPrimaryKeyBytes(primaryKeyValues []common.DataValue, key []byte) error

GetPrimaryKeyBytes returns primary key bytes for a given row.

func MarshalPrimaryKey

func MarshalPrimaryKey(pk PrimaryKey) ([]byte, error)

MarshalPrimaryKey marshals a PrimaryKey into json. We cannot define MarshalJson for PrimaryKey since pointer cannot be a receiver.

func NewHostMemoryManager

func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager

NewHostMemoryManager is used to init a HostMemoryManager.

func NewLiveVectorParty

func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, hostMemoryManager common.HostMemoryManager) common.LiveVectorParty

NewLiveVectorParty creates LiveVectorParty

func NewVectorPartyArchiveSerializer

func NewVectorPartyArchiveSerializer(hostMemManager common.HostMemoryManager, diskStore diskstore.DiskStore, table string, shardID int, columnID int, batchID int, batchVersion uint32, seqNum uint32) common.VectorPartySerializer

NewVectorPartyArchiveSerializer returns a new VectorPartySerializer

func NewVectorPartySnapshotSerializer

func NewVectorPartySnapshotSerializer(
	shard *TableShard, columnID, batchID int, batchVersion uint32, seqNum uint32, redoLogFile int64, offset uint32) common.VectorPartySerializer

NewVectorPartySnapshotSerializer returns a new VectorPartySerializer

func UnpinVectorParties

func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty)

UnpinVectorParties unpins all vector parties in the slice.

func VectorPartyEquals

func VectorPartyEquals(v1 common.VectorParty, v2 common.VectorParty) bool

VectorPartyEquals covers nil VectorParty compare

Types

type ArchiveBatch

type ArchiveBatch struct {
	Batch

	// Size of the batch (number of rows). Notice that compression changes the
	// length of some columns, does not change the size of the batch.
	Size int

	// Version for archive batches.
	Version uint32

	// SeqNum denotes backfill sequence number
	SeqNum uint32

	// For convenience.
	BatchID int32
	Shard   *TableShard
}

ArchiveBatch represents a archive batch.

func (*ArchiveBatch) BlockingDelete

func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty

BlockingDelete blocks until all users are finished with the specified column, and then deletes the column from the batch. Returns the vector party deleted if any.

func (*ArchiveBatch) BuildIndex

func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk PrimaryKey) error

BuildIndex builds an index over the primary key columns of this archive batch and inserts the records id into the given primary key.

func (*ArchiveBatch) Clone

func (b *ArchiveBatch) Clone() *ArchiveBatch

Clone returns a copy of current batch including all references to underlying vector parties. Caller is responsible for holding the lock (if necessary).

func (*ArchiveBatch) MarshalJSON

func (b *ArchiveBatch) MarshalJSON() ([]byte, error)

MarshalJSON marshals a ArchiveBatch into json.

func (*ArchiveBatch) RequestVectorParty

func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty

RequestVectorParty creates(optional), pins, and returns the requested vector party. On creation it also asynchronously loads a vector party from disk into memory.

Caller must call vp.WaitForDiskLoad() before using it, and call vp.Release() afterwards.

func (*ArchiveBatch) TryEvict

func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty

TryEvict attempts to evict and destruct the specified column from the archive batch. It will fail fast if the column is currently in use so that host memory manager can try evicting other VPs immediately. Returns vector party evicted if succeeded.

func (*ArchiveBatch) WriteToDisk

func (b *ArchiveBatch) WriteToDisk() error

WriteToDisk writes each column of a batch to disk. It happens on archiving stage for merged archive batch so there is no need to lock it.

type ArchiveJobDetail

type ArchiveJobDetail struct {
	JobDetail

	// Current cutoff.
	CurrentCutoff uint32 `json:"currentCutoff"`
	// Stage of the job is running.
	Stage ArchivingStage `json:"stage"`
	// New Cutoff.
	RunningCutoff uint32 `json:"runningCutoff"`
	// Cutoff of last completed archiving job.
	LastCutoff uint32 `json:"lastCutoff"`
}

ArchiveJobDetail represents archiving job status of a table Shard.

type ArchiveJobDetailMutator

type ArchiveJobDetailMutator func(jobDetail *ArchiveJobDetail)

ArchiveJobDetailMutator is the mutator functor to change ArchiveJobDetail.

type ArchiveJobDetailReporter

type ArchiveJobDetailReporter func(key string, mutator ArchiveJobDetailMutator)

ArchiveJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type ArchiveStore

type ArchiveStore struct {
	// The mutex protects the pointer pointing to the current version of archived vector version.
	sync.RWMutex

	PurgeManager *PurgeManager
	// Current version points to the most recent version of vector store version for queries to use.
	CurrentVersion *ArchiveStoreVersion
}

ArchiveStore manages archive stores versions. Archive store version evolves to a new version after archiving. Readers should follow the following locking protocol:

archiveStore.Users.Add(1)
// tableShard.ArchiveStore can no longer be accessed directly.
// continue reading from archiveStore
archiveStore.Users.Done()

func NewArchiveStore

func NewArchiveStore(shard *TableShard) ArchiveStore

NewArchiveStore creates a new archive store. Current version is just a place holder for test. It will be replaced during recovery.

func (*ArchiveStore) Destruct

func (s *ArchiveStore) Destruct()

Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.

func (*ArchiveStore) GetCurrentVersion

func (s *ArchiveStore) GetCurrentVersion() *ArchiveStoreVersion

GetCurrentVersion returns current SortedVectorStoreVersion and does proper locking. It'v used by query and data browsing. Users need to call version.Users.Done() after their work.

func (*ArchiveStore) MarshalJSON

func (s *ArchiveStore) MarshalJSON() ([]byte, error)

MarshalJSON marshals a ArchiveStore into json.

type ArchiveStoreVersion

type ArchiveStoreVersion struct {
	// The mutex
	// protects the Batches map structure and the archiving cutoff field.
	// It does not protect contents within a batch. Before releasing
	// the VectorStore mutex, user should lock the batch level mutex if necessary
	// to ensure proper protection at batch level.
	sync.RWMutex `json:"-"`

	// Wait group used to prevent this ArchiveStore from being evicted
	Users sync.WaitGroup `json:"-"`

	// Each batch in the slice is identified by BaseBatchID+index.
	// Index out of bound and nil Batch for archive batches indicates that none of
	// the columns have been loaded into memory from disk.
	Batches map[int32]*ArchiveBatch `json:"batches"`

	// The archiving cutoff used for this version of the sorted store.
	ArchivingCutoff uint32 `json:"archivingCutoff"`
	// contains filtered or unexported fields
}

ArchiveStoreVersion stores a version of archive batches of columnar data.

func NewArchiveStoreVersion

func NewArchiveStoreVersion(cutoff uint32, shard *TableShard) *ArchiveStoreVersion

NewArchiveStoreVersion creates a new empty archive store version given cutoff.

func (*ArchiveStoreVersion) GetBatchForRead

func (v *ArchiveStoreVersion) GetBatchForRead(batchID int) *ArchiveBatch

GetBatchForRead returns a archiveBatch for read, reader needs to unlock after use

func (*ArchiveStoreVersion) MarshalJSON

func (v *ArchiveStoreVersion) MarshalJSON() ([]byte, error)

MarshalJSON marshals a ArchiveStoreVersion into json.

func (*ArchiveStoreVersion) RequestBatch

func (v *ArchiveStoreVersion) RequestBatch(batchID int32) *ArchiveBatch

RequestBatch returns the requested archive batch from the archive store version.

type ArchivingJob

type ArchivingJob struct {
	// contains filtered or unexported fields
}

ArchivingJob defines the structure that an archiving job needs.

func (*ArchivingJob) GetIdentifier

func (job *ArchivingJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*ArchivingJob) Run

func (job *ArchivingJob) Run() error

Run starts the archiving process and wait for it to finish.

func (*ArchivingJob) String

func (job *ArchivingJob) String() string

String gives meaningful string representation for this job

type ArchivingStage

type ArchivingStage string

ArchivingStage represents different stages of a running archive job.

const (
	ArchivingCreatePatch ArchivingStage = "create patch"
	ArchivingMerge       ArchivingStage = "merge"
	ArchivingPurge       ArchivingStage = "purge"
	ArchivingComplete    ArchivingStage = "complete"
)

List of ArchivingStages.

type BackfillJob

type BackfillJob struct {
	// contains filtered or unexported fields
}

BackfillJob defines the structure that a backfill job needs.

func (*BackfillJob) GetIdentifier

func (job *BackfillJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*BackfillJob) Run

func (job *BackfillJob) Run() error

Run starts the backfill process and wait for it to finish.

func (*BackfillJob) String

func (job *BackfillJob) String() string

String gives meaningful string representation for this job

type BackfillJobDetail

type BackfillJobDetail struct {
	JobDetail
	// Stage of the job is running.
	Stage BackfillStage `json:"stage"`
	// Current redolog file that's being backfilled.
	RedologFile int64 `json:"redologFile"`
	// Batch offset within the RedologFile.
	BatchOffset uint32 `json:"batchOffset"`
}

BackfillJobDetail represents backfill job status of a table Shard.

type BackfillJobDetailMutator

type BackfillJobDetailMutator func(jobDetail *BackfillJobDetail)

BackfillJobDetailMutator is the mutator functor to change BackfillJobDetail.

type BackfillJobDetailReporter

type BackfillJobDetailReporter func(key string, mutator BackfillJobDetailMutator)

BackfillJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type BackfillManager

type BackfillManager struct {
	sync.RWMutex `json:"-"`
	// Name of the table.
	TableName string `json:"-"`

	// The shard id of the table.
	Shard int `json:"-"`

	// queue to hold UpsertBatches to backfill
	UpsertBatches []*UpsertBatch `json:"-"`

	// keep track of the number of records in backfill queue
	NumRecords int `json:"numRecords"`

	// keep track of the size of the buffer that holds batches to be backfilled
	CurrentBufferSize int64 `json:"currentBufferSize"`

	// keep track of the size of the buffer that holds batches being backfilled
	BackfillingBufferSize int64 `json:"backfillingBufferSize"`

	// max buffer size to hold backfill data
	MaxBufferSize int64 `json:"maxBufferSize"`

	// threshold to trigger archive
	BackfillThresholdInBytes int64 `json:"backfillThresholdInBytes"`

	// keep track of the redo log file of the last batch backfilled
	LastRedoFile int64 `json:"lastRedoFile"`

	// keep track of the offset of the last batch backfilled
	LastBatchOffset uint32 `json:"lastBatchOffset"`

	// keep track of the redo log file of the last batch queued
	CurrentRedoFile int64 `json:"currentRedoFile"`

	// keep track of the offset of the last batch being queued
	CurrentBatchOffset uint32 `json:"currentBatchOffset"`

	AppendCond *sync.Cond `json:"-"`
}

BackfillManager manages the records that need to be put into a backfill queue and merged with sorted batches directly.

func NewBackfillManager

func NewBackfillManager(tableName string, shard int, tableConfig metaCom.TableConfig) *BackfillManager

NewBackfillManager creates a new BackfillManager instance.

func (*BackfillManager) Append

func (r *BackfillManager) Append(upsertBatch *UpsertBatch, redoFile int64, batchOffset uint32) bool

Append appends an upsert batch into the backfill queue. Returns true if buffer limit has been reached and caller may need to wait

func (*BackfillManager) Destruct

func (r *BackfillManager) Destruct()

Destruct set the golang object references used by backfill manager to be nil to trigger gc ealier.

func (*BackfillManager) Done

func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32,
	metaStore metastore.MetaStore) error

Done updates the backfill progress both in memory and in metastore.

func (*BackfillManager) GetLatestRedoFileAndOffset

func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32)

GetLatestRedoFileAndOffset returns latest redofile and its batch offset

func (*BackfillManager) MarshalJSON

func (r *BackfillManager) MarshalJSON() ([]byte, error)

MarshalJSON marshals a BackfillManager into json.

func (*BackfillManager) QualifyToTriggerBackfill

func (r *BackfillManager) QualifyToTriggerBackfill() bool

QualifyToTriggerBackfill decides if OK to trigger size-based backfill process

func (*BackfillManager) ReadUpsertBatch

func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *TableSchema) (data [][]interface{}, columnNames []string, err error)

ReadUpsertBatch reads upsert batch in backfill queue, user should not lock schema

func (*BackfillManager) StartBackfill

func (r *BackfillManager) StartBackfill() ([]*UpsertBatch, int64, uint32)

StartBackfill gets a slice of UpsertBatches from backfill queue and returns the CurrentRedoFile and CurrentBatchOffset.

func (*BackfillManager) WaitForBackfillBufferAvailability

func (r *BackfillManager) WaitForBackfillBufferAvailability()

WaitForBackfillBufferAvailability blocks until backfill buffer is available

type BackfillStage

type BackfillStage string

BackfillStage represents different stages of a running backfill job.

const (
	BackfillCreatePatch BackfillStage = "create patch"
	BackfillApplyPatch  BackfillStage = "apply patch"
	BackfillPurge       BackfillStage = "purge"
	BackfillComplete    BackfillStage = "complete"
)

List of BackfillStages.

type Batch

type Batch struct {
	// Batch mutex is locked in reader mode by queries during the entire transfer
	// to ensure row level consistent read. It is locked in writer mode only for
	// updates from ingestion, and for modifications to the columns slice itself
	// (e.g., adding new columns). Appends will update LastReadBatchID and
	// NumRecordsInLastWriteBatch to make newly added records visible only at the last
	// step, therefore the batch does not need to be locked for appends.
	// For sorted bathes this is also locked in writer mode for initiating loading
	// from disk (vector party creation, Loader/Users initialization), and for
	// vector party detaching during eviction.
	sync.RWMutex
	// For live batches, index out of bound and nil VectorParty indicates
	// mode 0 for the corresponding VectorParty.
	// For archive batches, index out of bound and nil VectorParty indicates that
	// the corresponding VectorParty has not been loaded into memory from disk.
	Columns []common.VectorParty
}

Batch represents a sorted or live batch.

func (*Batch) Equals

func (b *Batch) Equals(other *Batch) bool

Equals check whether two batches are the same. Notes both batches should have all its columns loaded into memory before comparison. Therefore this function should be only called for unit test purpose.

func (*Batch) GetDataValue

func (b *Batch) GetDataValue(row, columnID int) common.DataValue

GetDataValue read value from underlying columns.

func (*Batch) GetDataValueWithDefault

func (b *Batch) GetDataValueWithDefault(row, columnID int, defaultValue common.DataValue) common.DataValue

GetDataValueWithDefault read value from underlying columns and if it's missing, it will return passed value instead.

func (*Batch) GetVectorParty

func (b *Batch) GetVectorParty(columnID int) common.VectorParty

GetVectorParty returns the VectorParty for the specified column from the batch. It requires the batch to be locked for reading.

func (*Batch) ReplaceVectorParty

func (b *Batch) ReplaceVectorParty(columnID int, vp common.VectorParty)

ReplaceVectorParty replaces the VectorParty for the specified column in the archive batch. Existing copies will be destructed. vp can be specified as nil to purge the existing copy. It requires the batch to be locked for writing.

func (*Batch) SafeDestruct

func (b *Batch) SafeDestruct()

SafeDestruct destructs all vector parties of this batch.

type BatchReader

type BatchReader interface {
	GetDataValue(row, columnID int) common.DataValue
	GetDataValueWithDefault(row, columnID int, defaultValue common.DataValue) common.DataValue
}

BatchReader defines the interface to retrieve a DataValue given a row index and column index.

type BatchStatsReporter

type BatchStatsReporter struct {
	// contains filtered or unexported fields
}

BatchStatsReporter is used to report batch level stats like row count

func NewBatchStatsReporter

func NewBatchStatsReporter(intervalInSeconds int, memStore MemStore, metaStore metastore.MetaStore) *BatchStatsReporter

NewBatchStatsReporter create a new BatchStatsReporter instance

func (*BatchStatsReporter) Run

func (batchStats *BatchStatsReporter) Run()

Run is a ticker function to run report periodically

func (*BatchStatsReporter) Stop

func (batchStats *BatchStatsReporter) Stop()

Stop to stop the stats reporter

type CuckooIndex

type CuckooIndex struct {
	// contains filtered or unexported fields
}

CuckooIndex is a implementation of Hash Index using Cuckoo Hashing algorithm Lazy expiration is used to invalidate expired items CuckooIndex is not threadsafe

func (*CuckooIndex) AllocatedBytes

func (c *CuckooIndex) AllocatedBytes() uint

AllocatedBytes returns the allocated size of primary key in bytes.

func (*CuckooIndex) Capacity

func (c *CuckooIndex) Capacity() uint

Capacity returns how many items current primary key can hold.

func (*CuckooIndex) Delete

func (c *CuckooIndex) Delete(key Key)

Delete will delete a item with given key

func (*CuckooIndex) Destruct

func (c *CuckooIndex) Destruct()

Destruct frees all allocated memory

func (*CuckooIndex) Find

func (c *CuckooIndex) Find(key Key) (RecordID, bool)

Find looks up a record given key

func (*CuckooIndex) FindOrInsert

func (c *CuckooIndex) FindOrInsert(key Key, value RecordID, eventTime uint32) (existingFound bool, recordID RecordID, err error)

FindOrInsert find the existing key or insert a new (key, value) pair

func (*CuckooIndex) GetEventTimeCutoff

func (c *CuckooIndex) GetEventTimeCutoff() uint32

GetEventTimeCutoff returns the cutoff event time.

func (*CuckooIndex) LockForTransfer

func (c *CuckooIndex) LockForTransfer() PrimaryKeyData

LockForTransfer locks primary key for transfer and returns PrimaryKeyData

func (*CuckooIndex) Size

func (c *CuckooIndex) Size() uint

Size returns the current number of items stored in the hash table including expired items yet not known to the system

func (*CuckooIndex) UnlockAfterTransfer

func (c *CuckooIndex) UnlockAfterTransfer()

UnlockAfterTransfer release transfer lock

func (*CuckooIndex) Update

func (c *CuckooIndex) Update(key Key, value RecordID) bool

Update updates a key with a new recordID. Return whether key exists in the primary key or not.

func (*CuckooIndex) UpdateEventTimeCutoff

func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32)

UpdateEventTimeCutoff updates eventTimeCutoff

type EnumDict

type EnumDict struct {
	// Either 0x100 for small_enum, or 0x10000 for big_enum.
	Capacity    int            `json:"capacity"`
	Dict        map[string]int `json:"dict"`
	ReverseDict []string       `json:"reverseDict"`
}

EnumDict contains mapping from and to enum strings to numbers.

type Job

type Job interface {
	Run() error
	GetIdentifier() string
	String() string
}

Job defines the common interface for BackfillJob, ArchivingJob and SnapshotJob

type JobDetail

type JobDetail struct {
	// Status of archiving for current table Shard.
	Status JobStatus `json:"status"`

	// Time of next archiving job
	NextRun time.Time `json:"nextRun,omitempty"`

	// Time when the last backfill job finishes
	LastRun time.Time `json:"lastRun,omitempty"`

	// Error of last run if failed.
	LastError error `json:"lastError,omitempty"`
	// Start time of last archiving job.
	LastStartTime time.Time `json:"lastStartTime,omitempty"`
	// Duration of last archiving job.
	LastDuration time.Duration `json:"lastDuration,omitempty"`

	// Number of records processed.
	NumRecords int `json:"numRecords,omitempty"`
	// Number of days affected.
	NumAffectedDays int `json:"numAffectedDays,omitempty"`

	// Total amount of work
	// ie, archiving merge: number of days
	//     archiving snapshot: number of records
	Total int `json:"total,omitempty"`
	// Current finished work.
	Current int `json:"current,omitempty"`

	// Duration for waiting for lock.
	LockDuration time.Duration `json:"lockDuration,omitempty"`
}

JobDetail represents common job status of a table Shard.

type JobStatus

type JobStatus string

JobStatus represents the job status of a given table Shard.

const (
	JobWaiting   JobStatus = "waiting"
	JobReady     JobStatus = "ready"
	JobRunning   JobStatus = "running"
	JobSucceeded JobStatus = "succeeded"
	JobFailed    JobStatus = "failed"
)

List of JobStatus.

type Key

type Key []byte

Key represents the key for the item

type LiveBatch

type LiveBatch struct {
	// The common data structure holding column data.
	Batch

	// Capacity of the batch which is decided at the creation time.
	Capacity int

	// maximum of arrival time
	MaxArrivalTime uint32
	// contains filtered or unexported fields
}

LiveBatch represents a live batch.

func (*LiveBatch) GetOrCreateVectorParty

func (b *LiveBatch) GetOrCreateVectorParty(columnID int, locked bool) common.LiveVectorParty

GetOrCreateVectorParty returns LiveVectorParty for the specified column from the live batch. locked specifies whether the batch has been locked. The lock will be left in the same state after the function returns.

func (*LiveBatch) MarshalJSON

func (b *LiveBatch) MarshalJSON() ([]byte, error)

MarshalJSON marshals a LiveBatch into json.

type LiveStore

type LiveStore struct {
	sync.RWMutex

	// The batch id to batch map.
	Batches map[int32]*LiveBatch

	// Number of rows to create for new batches.
	BatchSize int

	// The upper bound of records (exclusive) that can be read by queries.
	LastReadRecord RecordID

	// This is the in memory archiving cutoff time high watermark that gets set by the archiving job
	// before each archiving run. Ingestion will not insert/update records that are older than
	// the archiving cutoff watermark.
	ArchivingCutoffHighWatermark uint32

	// Logs.
	RedoLogManager RedoLogManager

	// Manage backfill queue during ingestion.
	BackfillManager *BackfillManager

	// Manage snapshot related stats.
	SnapshotManager *SnapshotManager

	// The writer lock is to guarantee single writer to a Shard at all time. To ensure this, writers
	// (ingestion, archiving etc) need to hold this lock at all times. This lock
	// should be acquired before the VectorStore and Batch locks.
	// TODO: if spinning lock performance is a concern we may need to upgrade this
	// to a designated goroutine with a pc-queue channel.
	WriterLock sync.RWMutex

	// Primary key table of the Shard.
	PrimaryKey PrimaryKey

	// The position of the next record to be used for writing. Only used by the ingester.
	NextWriteRecord RecordID

	// For convenience.
	HostMemoryManager common.HostMemoryManager `json:"-"`
	// contains filtered or unexported fields
}

LiveStore stores live batches of columnar data.

func NewLiveStore

func NewLiveStore(batchSize int, shard *TableShard) *LiveStore

NewLiveStore creates a new live batch.

func (*LiveStore) AdvanceLastReadRecord

func (s *LiveStore) AdvanceLastReadRecord()

AdvanceLastReadRecord advances the high watermark of the rows to the next write record.

func (*LiveStore) AdvanceNextWriteRecord

func (s *LiveStore) AdvanceNextWriteRecord() RecordID

AdvanceNextWriteRecord reserves space for a record that return the next available record position back to the caller.

func (*LiveStore) Destruct

func (s *LiveStore) Destruct()

Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.

func (*LiveStore) GetBatchForRead

func (s *LiveStore) GetBatchForRead(id int32) *LiveBatch

GetBatchForRead returns and read locks the batch with its ID for reads. Caller must explicitly RUnlock() the returned batch after all reads.

func (*LiveStore) GetBatchForWrite

func (s *LiveStore) GetBatchForWrite(id int32) *LiveBatch

GetBatchForWrite returns and locks the batch with its ID for reads. Caller must explicitly Unlock() the returned batch after all reads.

func (*LiveStore) GetBatchIDs

func (s *LiveStore) GetBatchIDs() (batchIDs []int32, numRecordsInLastBatch int)

GetBatchIDs snapshots the batches and returns a list of batch ids for read with the number of records in batchIDs[len()-1].

func (*LiveStore) GetMemoryUsageForColumn

func (s *LiveStore) GetMemoryUsageForColumn(valueType common.DataType, columnID int) int

GetMemoryUsageForColumn get the live store memory usage for given data type

func (*LiveStore) LookupKey

func (s *LiveStore) LookupKey(keyStrs []string) (RecordID, bool)

LookupKey looks up the given key in primary key.

func (*LiveStore) MarshalJSON

func (s *LiveStore) MarshalJSON() ([]byte, error)

MarshalJSON marshals a LiveStore into json.

func (*LiveStore) PurgeBatch

func (s *LiveStore) PurgeBatch(id int32)

PurgeBatch purges the specified batch.

func (*LiveStore) PurgeBatches

func (s *LiveStore) PurgeBatches(ids []int32)

PurgeBatches purges the specified batches.

type MemStore

type MemStore interface {
	// GetMemoryUsageDetails
	GetMemoryUsageDetails() (map[string]TableShardMemoryUsage, error)
	// GetScheduler returns the scheduler for scheduling archiving and backfill jobs.
	GetScheduler() Scheduler
	// GetTableShard gets the data for a pinned table Shard. Caller needs to unpin after use.
	GetTableShard(table string, shardID int) (*TableShard, error)
	// GetSchema returns schema for a table.
	GetSchema(table string) (*TableSchema, error)
	// GetSchemas returns all table schemas.
	GetSchemas() map[string]*TableSchema
	// FetchSchema fetches schema from metaStore and updates in-memory copy of table schema,
	// and set up watch channels for metaStore schema changes, used for bootstrapping mem store.
	FetchSchema() error
	// InitShards loads/recovers data for shards initially owned by the current instance.
	InitShards(schedulerOff bool)
	// HandleIngestion logs an upsert batch and applies it to the in-memory store.
	HandleIngestion(table string, shardID int, upsertBatch *UpsertBatch) error
	// Archive is the process moving stable records in fact tables from live batches to archive
	// batches.
	Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error

	// Backfill is the process of merging records with event time older than cutoff with
	// archive batches.
	Backfill(table string, shardID int, reporter BackfillJobDetailReporter) error

	// Snapshot is the process to write the current content of dimension table live store in memory to disk.
	Snapshot(table string, shardID int, reporter SnapshotJobDetailReporter) error

	// Purge is the process to purge out of retention archive batches
	Purge(table string, shardID, batchIDStart, batchIDEnd int, reporter PurgeJobDetailReporter) error

	// Provide exclusive access to read/write data protected by MemStore.
	utils.RWLocker
}

MemStore defines the interface for managing multiple table shards in memory. This is for mocking in unit tests

func NewMemStore

func NewMemStore(metaStore metastore.MetaStore, diskStore diskstore.DiskStore) MemStore

NewMemStore creates a MemStore from the specified MetaStore.

type PrimaryKey

type PrimaryKey interface {
	// Find looks up a value given key
	Find(key Key) (RecordID, bool)
	// FindOrInsert find or insert a key value pair into
	FindOrInsert(key Key, value RecordID, eventTime uint32) (existingFound bool, recordID RecordID, err error)
	// Update updates a key with a new recordID. Return whether key exists in the primary key or not.
	Update(key Key, value RecordID) bool
	// Delete deletes a key if it exists
	Delete(key Key)
	// Update the cutoff event time.
	UpdateEventTimeCutoff(eventTimeCutoff uint32)
	// GetEventTimeCutoff returns the cutoff event time.
	GetEventTimeCutoff() uint32
	// GetDataForTransfer locks the primary key for transferring data
	// the caller should unlock by calling  UnlockAfterTransfer when done
	LockForTransfer() PrimaryKeyData
	// UnlockAfterTransfer unlocks primary key
	UnlockAfterTransfer()
	// Destruct clean up all existing resources used by primary key
	Destruct()
	// Size returns the current number of items.
	Size() uint
	// Capacity returns how many items current primary key can hold.
	Capacity() uint
	// AllocatedBytes returns the size of primary key in bytes.
	AllocatedBytes() uint
}

PrimaryKey is an interface for primary key index

func NewPrimaryKey

func NewPrimaryKey(keyBytes int, hasEventTime bool, initNumBuckets int,
	hostMemoryManager common.HostMemoryManager) PrimaryKey

NewPrimaryKey create a primary key data structure params:

  1. keyBytes, number of bytes of key
  2. hasEventTime determine whether primary key should record event time for expiration
  3. initNumBuckets determines the starting number of buckets, setting to 0 to use default

type PrimaryKeyData

type PrimaryKeyData struct {
	Data       unsafe.Pointer
	NumBytes   int
	Seeds      [numHashes]uint32
	KeyBytes   int
	NumBuckets int
}

PrimaryKeyData holds the data for transferring to GPU for query purposes

type PurgeJob

type PurgeJob struct {
	// contains filtered or unexported fields
}

PurgeJob defines the structure that a purge job needs.

func (*PurgeJob) GetIdentifier

func (job *PurgeJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*PurgeJob) Run

func (job *PurgeJob) Run() error

Run starts the purge process and wait for it to finish.

func (*PurgeJob) String

func (job *PurgeJob) String() string

String gives meaningful string representation for this job

type PurgeJobDetail

type PurgeJobDetail struct {
	JobDetail
	// Stage of the job is running.
	Stage PurgeStage `json:"stage"`
	// Number of batches purged
	NumBatches   int `json:"numBatches"`
	BatchIDStart int `json:"batchIDStart"`
	BatchIDEnd   int `json:"batchIDEnd"`
}

PurgeJobDetail represents purge job status of a table shard.

type PurgeJobDetailMutator

type PurgeJobDetailMutator func(jobDetail *PurgeJobDetail)

PurgeJobDetailMutator is the mutator functor to change PurgeJobDetail.

type PurgeJobDetailReporter

type PurgeJobDetailReporter func(key string, mutator PurgeJobDetailMutator)

PurgeJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type PurgeManager

type PurgeManager struct {
	sync.RWMutex `json:"-"`

	// Job Trigger Condition related fields
	// Last purge time.
	LastPurgeTime time.Time `json:"lastPurgeTime"`

	PurgeInterval time.Duration `json:"purgeInterval"`
	// contains filtered or unexported fields
}

PurgeManager manages the purge related stats and progress.

func NewPurgeManager

func NewPurgeManager(shard *TableShard) *PurgeManager

NewPurgeManager creates a new PurgeManager instance.

func (*PurgeManager) QualifyForPurge

func (p *PurgeManager) QualifyForPurge() bool

QualifyForPurge tells whether we can trigger a purge job.

type PurgeStage

type PurgeStage string

PurgeStage represents different stages of a running purge job.

const (
	PurgeMetaData PurgeStage = "purge metadata"
	PurgeDataFile PurgeStage = "purge data file"
	PurgeMemory   PurgeStage = "purge memory"
	PurgeComplete PurgeStage = "complete"
)

List of purge stages

type RecordID

type RecordID struct {
	BatchID int32  `json:"batchID"`
	Index   uint32 `json:"index"`
}

RecordID represents a record location with BatchID as the inflated vector id and offset determines the offset of record inside the vector

type RedoLogBrowser

type RedoLogBrowser interface {
	ListLogFiles() ([]int64, error)
	ListUpsertBatch(creationTime int64) ([]int64, error)
	ReadData(creationTime int64, upsertBatchOffset int64, start int, length int) (
		[][]interface{}, []string, int, error)
}

RedoLogBrowser is the interface to list redo log files, upsert batches and read upsert batch data.

type RedoLogManager

type RedoLogManager struct {
	// The lock is to protect MaxEventTimePerFile.
	sync.RWMutex `json:"-"`

	// The time interval of redo file rotations.
	RotationInterval int64 `json:"rotationInterval"`

	// The limit of redo file size to trigger rotations.
	MaxRedoLogSize int64 `json:"maxRedoLogSize"`

	// Current redo log size
	CurrentRedoLogSize uint32 `json:"currentRedoLogSize"`

	// size of all redologs
	TotalRedoLogSize uint `json:"totalRedologSize"`

	// The map with redo log creation time as the key and max event time as the value. Readers
	// need to hold the reader lock in accessing the field.
	MaxEventTimePerFile map[int64]uint32 `json:"maxEventTimePerFile"`

	// redo log creation time -> batch count mapping.
	// Readers need to hold the reader lock in accessing the field.
	BatchCountPerFile map[int64]uint32 `json:"batchCountPerFile"`

	// SizePerFile
	SizePerFile map[int64]uint32 `json:"sizePerFile"`

	// Current file creation time in milliseconds.
	CurrentFileCreationTime int64 `json:"currentFileCreationTime"`
	// contains filtered or unexported fields
}

RedoLogManager manages the redo log file append, rotation, purge. It is used by ingestion, recovery and archiving. Accessor must hold the TableShard.WriterLock to access it.

func NewRedoLogManager

func NewRedoLogManager(rotationInterval int64, maxRedoLogSize int64, diskStore diskstore.DiskStore, tableName string, shard int) RedoLogManager

NewRedoLogManager creates a new RedoLogManager instance.

func (*RedoLogManager) Close

func (r *RedoLogManager) Close()

Close closes the current log file.

func (*RedoLogManager) MarshalJSON

func (r *RedoLogManager) MarshalJSON() ([]byte, error)

MarshalJSON marshals a RedoLogManager into json.

func (*RedoLogManager) NextUpsertBatch

func (r *RedoLogManager) NextUpsertBatch() func() (*UpsertBatch, int64, uint32)

NextUpsertBatch returns a functor that can be used to iterate over redo logs on disk and returns one UpsertBatch at each call. It returns nil to indicate the end of the upsert batch stream.

Any failure in file reading and upsert batch creation will trigger system panic.

func (*RedoLogManager) PurgeRedologFileAndData

func (r *RedoLogManager) PurgeRedologFileAndData(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error

PurgeRedologFileAndData purges disk files and in memory data of redologs that are eligible to be purged.

func (*RedoLogManager) UpdateBatchCount

func (r *RedoLogManager) UpdateBatchCount(redoFile int64) uint32

UpdateBatchCount saves/updates batch counts for the given redolog

func (*RedoLogManager) UpdateMaxEventTime

func (r *RedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64)

UpdateMaxEventTime updates the max event time of the current redo log file. redoFile is the key to the corresponding redo file that needs to have the maxEventTime updated. redoFile == 0 is used in serving ingestion requests where the current file's max event time is updated. redoFile != 0 is used in recovery where the redo log file loaded from disk needs to get its max event time calculated.

func (*RedoLogManager) WriteUpsertBatch

func (r *RedoLogManager) WriteUpsertBatch(upsertBatch *UpsertBatch) (int64, uint32)

WriteUpsertBatch saves an upsert batch into disk before applying it. Any errors from diskStore will trigger system panic.

type Scheduler

type Scheduler interface {
	Start()
	Stop()
	SubmitJob(job Job) chan error
	DeleteTable(table string, isFactTable bool)
	GetJobDetails(jobType common.JobType) interface{}
	NewBackfillJob(tableName string, shardID int) Job
	NewArchivingJob(tableName string, shardID int, cutoff uint32) Job
	NewSnapshotJob(tableName string, shardID int) Job
	NewPurgeJob(tableName string, shardID int, batchIDStart int, batchIDEnd int) Job
	utils.RWLocker
}

Scheduler is for scheduling archiving jobs (and later backfill jobs) for table shards in memStore. It scans through all tables and shards to generate list of eligible jobs to run.

type SnapshotJob

type SnapshotJob struct {
	// contains filtered or unexported fields
}

SnapshotJob defines the structure that a snapshot job needs.

func (*SnapshotJob) GetIdentifier

func (job *SnapshotJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*SnapshotJob) Run

func (job *SnapshotJob) Run() error

Run starts the snapshot process and wait for it to finish.

func (*SnapshotJob) String

func (job *SnapshotJob) String() string

String gives meaningful string representation for this job

type SnapshotJobDetail

type SnapshotJobDetail struct {
	JobDetail
	// Number of mutations in this snapshot.
	NumMutations int `json:"numMutations"`
	// Number of batches written in this snapshot.
	NumBatches int `json:"numBatches"`
	// Current redolog file that's being backfilled.
	RedologFile int64 `json:"redologFile"`
	// Batch offset within the RedologFile.
	BatchOffset uint32 `json:"batchOffset"`
	// Stage of the job is running.
	Stage SnapshotStage `json:"stage"`
}

SnapshotJobDetail represents snapshot job status of a table shard.

type SnapshotJobDetailMutator

type SnapshotJobDetailMutator func(jobDetail *SnapshotJobDetail)

SnapshotJobDetailMutator is the mutator functor to change SnapshotJobDetail.

type SnapshotJobDetailReporter

type SnapshotJobDetailReporter func(key string, mutator SnapshotJobDetailMutator)

SnapshotJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type SnapshotManager

type SnapshotManager struct {
	sync.RWMutex `json:"-"`

	// Number of mutations since last snapshot. Measured as number of rows mutated.
	NumMutations int `json:"numMutations"`

	// Last snapshot time.
	LastSnapshotTime time.Time `json:"'lastSnapshotTime'"`

	// keep track of the redo log file of the last batch snapshotted.
	LastRedoFile int64 `json:"lastRedoFile"`

	// keep track of the offset of the last batch snapshotted
	LastBatchOffset uint32 `json:"lastBatchOffset"`

	// keep track of the record position of the last batch snapshotted
	LastRecord RecordID

	// keep track of the redo log file of the last batch queued
	CurrentRedoFile int64 `json:"currentRedoFile"`

	// keep track of the offset of the last batch queued
	CurrentBatchOffset uint32 `json:"currentBatchOffset"`

	// keep track of the record position when last batch queued
	CurrentRecord RecordID

	// Configs
	SnapshotInterval time.Duration `json:"snapshotInterval"`

	SnapshotThreshold int `json:"snapshotThreshold"`
	// contains filtered or unexported fields
}

SnapshotManager manages the snapshot related stats and progress.

func NewSnapshotManager

func NewSnapshotManager(shard *TableShard) *SnapshotManager

NewSnapshotManager creates a new SnapshotManager instance.

func (*SnapshotManager) ApplyUpsertBatch

func (s *SnapshotManager) ApplyUpsertBatch(redoFile int64, offset uint32, numMutations int, currentRecord RecordID)

ApplyUpsertBatch advances CurrentRedoLogFile and CurrentBatchOffset and increments NumMutations after applying an upsert batch to live store.

func (*SnapshotManager) Done

func (s *SnapshotManager) Done(currentRedoFile int64, currentBatchOffset uint32, lastNumMutations int, currentRecord RecordID) error

Done updates the snapshot progress both in memory and in metastore and updates number of mutations accordingly.

func (*SnapshotManager) GetLastSnapshotInfo

func (s *SnapshotManager) GetLastSnapshotInfo() (int64, uint32, time.Time, RecordID)

GetLastSnapshotInfo get last snapshot redolog file, offset and timestamp, lastRecord

func (*SnapshotManager) MarshalJSON

func (s *SnapshotManager) MarshalJSON() ([]byte, error)

MarshalJSON marshals a BackfillManager into json.

func (*SnapshotManager) QualifyForSnapshot

func (s *SnapshotManager) QualifyForSnapshot() bool

QualifyForSnapshot tells whether we can trigger a snapshot job.

func (*SnapshotManager) SetLastSnapshotInfo

func (s *SnapshotManager) SetLastSnapshotInfo(redoLogFile int64, offset uint32, record RecordID)

SetLastSnapshotInfo update last snapshot redolog file, offset and timestamp, lastRecord

func (*SnapshotManager) StartSnapshot

func (s *SnapshotManager) StartSnapshot() (int64, uint32, int, RecordID)

StartSnapshot returns current redo log file ,offset

type SnapshotStage

type SnapshotStage string

SnapshotStage represents different stages of a running snapshot job.

const (
	SnapshotSnapshot SnapshotStage = "snapshot"
	SnapshotCleanup  SnapshotStage = "cleanup"
	SnapshotComplete SnapshotStage = "complete"
)

List of SnapshotStages

type TableSchema

type TableSchema struct {
	sync.RWMutex `json:"-"`
	// Main schema of the table. Mutable.
	Schema metaCom.Table `json:"schema"`
	// Maps from column names to their IDs. Mutable.
	ColumnIDs map[string]int `json:"columnIDs"`
	// Maps from enum column names to their case dictionaries. Mutable.
	EnumDicts map[string]EnumDict `json:"enumDicts"`
	// DataType for each column ordered by column ID. Mutable.
	ValueTypeByColumn []memCom.DataType `json:"valueTypeByColumn"`
	// Number of bytes in the primary key. Immutable.
	PrimaryKeyBytes int `json:"primaryKeyBytes"`
	// Types of each primary key column. Immutable.
	PrimaryKeyColumnTypes []memCom.DataType `json:"primaryKeyColumnTypes"`
	// Default values of each column. Mutable. Nil means default value is not set.
	DefaultValues []*memCom.DataValue `json:"-"`
}

TableSchema stores metadata of the table such as columns and primary keys. It also stores the dictionaries for enum columns.

func NewTableSchema

func NewTableSchema(table *metaCom.Table) *TableSchema

NewTableSchema creates a new table schema object from metaStore table object, this does not set enum cases.

func (*TableSchema) GetArchivingSortColumns

func (t *TableSchema) GetArchivingSortColumns() []int

GetArchivingSortColumns makes a copy of the Schema.ArchivingSortColumns so callers don't have to hold a read lock to access it.

func (*TableSchema) GetColumnDeletions

func (t *TableSchema) GetColumnDeletions() []bool

GetColumnDeletions returns a boolean slice that indicates whether a column has been deleted. Callers need to hold a read lock.

func (*TableSchema) GetPrimaryKeyColumns

func (t *TableSchema) GetPrimaryKeyColumns() []int

GetPrimaryKeyColumns makes a copy of the Schema.PrimaryKeyColumns so callers don't have to hold a read lock to access it.

func (*TableSchema) GetValueTypeByColumn

func (t *TableSchema) GetValueTypeByColumn() []memCom.DataType

GetValueTypeByColumn makes a copy of the ValueTypeByColumn so callers don't have to hold a read lock to access it.

func (*TableSchema) MarshalJSON

func (t *TableSchema) MarshalJSON() ([]byte, error)

MarshalJSON marshals TableSchema into json.

func (*TableSchema) SetDefaultValue

func (t *TableSchema) SetDefaultValue(columnID int)

SetDefaultValue parses the default value string if present and sets to TableSchema. Schema lock should be acquired and release by caller and enum dict should already be created/update before this function.

func (*TableSchema) SetTable

func (t *TableSchema) SetTable(table *metaCom.Table)

SetTable sets a updated table and update TableSchema, should acquire lock before calling.

type TableShard

type TableShard struct {
	// Wait group used to prevent the stores from being prematurely deleted.
	Users sync.WaitGroup `json:"-"`

	ShardID int `json:"-"`

	// For convenience, reference to the table schema struct.
	Schema *TableSchema `json:"schema"`

	// Live store. Its locks also cover the primary key.
	LiveStore *LiveStore `json:"liveStore"`

	// Archive store.
	ArchiveStore ArchiveStore `json:"archiveStore"`

	// For convenience.
	HostMemoryManager common.HostMemoryManager `json:"-"`
	// contains filtered or unexported fields
}

TableShard stores the data for one table shard in memory.

func NewTableShard

func NewTableShard(schema *TableSchema, metaStore metastore.MetaStore,
	diskStore diskstore.DiskStore, hostMemoryManager common.HostMemoryManager, shard int) *TableShard

NewTableShard creates and initiates a table shard based on the schema.

func (*TableShard) ApplyUpsertBatch

func (shard *TableShard) ApplyUpsertBatch(upsertBatch *UpsertBatch, redoLogFile int64, offset uint32, skipBackfillRows bool) (bool, error)

ApplyUpsertBatch applies the upsert batch to the memstore shard. Returns true if caller needs to wait for availability of backfill buffer

func (*TableShard) DeleteColumn

func (shard *TableShard) DeleteColumn(columnID int) error

DeleteColumn deletes the data for the specified column.

func (*TableShard) Destruct

func (shard *TableShard) Destruct()

Destruct destructs the table shard. Caller must detach the shard from memstore first.

func (*TableShard) LoadMetaData

func (shard *TableShard) LoadMetaData()

LoadMetaData loads metadata for the table Shard from metastore.

func (*TableShard) LoadSnapshot

func (shard *TableShard) LoadSnapshot() error

LoadSnapshot load shard data from snapshot files

func (*TableShard) NewRedoLogBrowser

func (shard *TableShard) NewRedoLogBrowser() RedoLogBrowser

NewRedoLogBrowser creates a RedoLogBrowser using field from Shard.

func (*TableShard) PreloadColumn

func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int)

PreloadColumn loads the column into memory and wait for completion of loading within (startDay, endDay]. Note endDay is inclusive but startDay is exclusive.

func (*TableShard) ReplayRedoLogs

func (shard *TableShard) ReplayRedoLogs()

ReplayRedoLogs loads data for the table Shard from disk store and recovers the Shard for serving.

type TableShardMemoryUsage

type TableShardMemoryUsage struct {
	ColumnMemory     map[string]*common.ColumnMemoryUsage `json:"cols"`
	PrimaryKeyMemory uint                                 `json:"pk"`
}

TableShardMemoryUsage contains memory usage for column memory and primary key memory usage

type TestFactoryT

type TestFactoryT struct {
	RootPath string
	utils.FileSystem
}

TestFactoryT creates memstore test objects from text file

func (TestFactoryT) NewMockMemStore

func (t TestFactoryT) NewMockMemStore() *memStoreImpl

NewMockMemStore returns a new memstore with mocked diskstore and metastore.

func (TestFactoryT) ReadArchiveBatch

func (t TestFactoryT) ReadArchiveBatch(name string) (*Batch, error)

ReadArchiveBatch read batch and do pruning for every columns.

func (TestFactoryT) ReadArchiveVectorParty

func (t TestFactoryT) ReadArchiveVectorParty(name string, locker sync.Locker) (*archiveVectorParty, error)

ReadArchiveVectorParty loads a vector party and prune it after construction.

func (TestFactoryT) ReadBatch

func (t TestFactoryT) ReadBatch(name string) (*Batch, error)

ReadBatch returns a batch given batch name. Batch will be searched under testing/data/batches folder. Prune tells whether need to prune the columns after column contruction.

func (TestFactoryT) ReadLiveBatch

func (t TestFactoryT) ReadLiveBatch(name string) (*Batch, error)

ReadLiveBatch read batch and skip pruning for every columns.

func (TestFactoryT) ReadLiveVectorParty

func (t TestFactoryT) ReadLiveVectorParty(name string) (*cLiveVectorParty, error)

ReadLiveVectorParty loads a vector party and skip pruning.

func (TestFactoryT) ReadUpsertBatch

func (t TestFactoryT) ReadUpsertBatch(name string) (*UpsertBatch, error)

ReadUpsertBatch returns a pointer to UpsertBatch given the upsert batch name.

func (TestFactoryT) ReadVector

func (t TestFactoryT) ReadVector(name string) (*Vector, error)

ReadVector returns a vector given vector name. Vector will be searched under testing/data/vectors folder.

func (TestFactoryT) ReadVectorParty

func (t TestFactoryT) ReadVectorParty(name string) (*cVectorParty, error)

ReadVectorParty returns a vector party given vector party name. Vector party will be searched under testing/data/vps folder. Prune tells whether to prune this column.

type TransferableVectorParty

type TransferableVectorParty interface {
	// GetHostVectorPartySlice slice vector party between [startIndex, startIndex+length) before transfer to gpu
	GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice
}

TransferableVectorParty is vector party that can be transferred to gpu for processing

type UpsertBatch

type UpsertBatch struct {
	// Number of rows in the batch, must be between 0 and 65535.
	NumRows int

	// Number of columns.
	NumColumns int

	// Arrival Time of Upsert Batch
	ArrivalTime uint32
	// contains filtered or unexported fields
}

UpsertBatch stores and indexes a serialized upsert batch of data on a particular table. It is used for both client-server data transfer and redo logging. In redo logs each batch is prepended by a 4-byte buffer size. The serialized buffer of the batch is in the following format:

[uint32] magic_number
[uint32] buffer_size

<begin of buffer>
[int32]  version_number
[int32]  num_of_rows
[uint16] num_of_columns
<reserve 14 bytes>
[uint32] arrival_time
[uint32] column_offset_0 ... [uint32] column_offset_x+1
[uint32] column_reserved_field1_0 ... [uint32] column_reserved_field1_x
[uint32] column_reserved_field2_0 ... [uint32] column_reserved_field2_x
[uint32] column_data_type_0 ... [uint32] column_data_type_x
[uint16] column_id_0 ... [uint16] column_id_x
[uint8] column_mode_0 ... [uint8] column_mode_x

(optional) [uint8] null_vector_0
(optional) [padding to 4 byte alignment uint32] offset_vector_0
[padding for 8 byte alignment] value_vector_0
...

[padding for 8 byte alignment]
<end of buffer>

Each component in the serialized buffer is byte aligned (not pointer aligned or bit aligned). All serialized numbers are written in little-endian. The struct is used for both client serialization and server deserialization. See https://github.com/uber/aresdb/wiki/redo_logs for more details.

Note: only fixed size values are supported currently.

func NewUpsertBatch

func NewUpsertBatch(buffer []byte) (*UpsertBatch, error)

NewUpsertBatch deserializes an upsert batch on the server. buffer does not contain the 4-byte buffer size.

func (*UpsertBatch) ExtractBackfillBatch

func (u *UpsertBatch) ExtractBackfillBatch(backfillRows []int) *UpsertBatch

ExtractBackfillBatch extracts given rows and stores in a new UpsertBatch The returned new UpsertBatch is not fully serialized and can only be used for structured reads.

func (*UpsertBatch) GetBool

func (u *UpsertBatch) GetBool(row int, col int) (bool, bool, error)

GetBool returns the data (boolean type) stored at (row, col), and the validity of the value.

func (*UpsertBatch) GetBuffer

func (u *UpsertBatch) GetBuffer() []byte

GetBuffer returns the underline buffer used to construct the upsert batch.

func (*UpsertBatch) GetColumnID

func (u *UpsertBatch) GetColumnID(col int) (int, error)

GetColumnID returns the logical id of a column.

func (*UpsertBatch) GetColumnIndex

func (u *UpsertBatch) GetColumnIndex(columnID int) (int, error)

GetColumnIndex returns the local index of a column given a logical index id.

func (*UpsertBatch) GetColumnNames

func (u *UpsertBatch) GetColumnNames(schema *TableSchema) ([]string, error)

GetColumnNames reads columnNames in UpsertBatch, user should not lock schema

func (*UpsertBatch) GetColumnType

func (u *UpsertBatch) GetColumnType(col int) (common.DataType, error)

GetColumnType returns the data type of a column.

func (*UpsertBatch) GetDataValue

func (u *UpsertBatch) GetDataValue(row, col int) (common.DataValue, error)

GetDataValue returns the DataValue for the given row and col index. It first check validity of the value, then it check whether it's a boolean column to decide whether to load bool value or other value type.

func (*UpsertBatch) GetEventColumnIndex

func (u *UpsertBatch) GetEventColumnIndex() int

GetEventColumnIndex returns the column index of event time

func (*UpsertBatch) GetPrimaryKeyBytes

func (u *UpsertBatch) GetPrimaryKeyBytes(row int, primaryKeyCols []int, key []byte) error

GetPrimaryKeyBytes returns primary key bytes for a given row. Note primaryKeyCol is not list of primary key columnIDs.

func (*UpsertBatch) GetPrimaryKeyCols

func (u *UpsertBatch) GetPrimaryKeyCols(primaryKeyColumnIDs []int) ([]int, error)

GetPrimaryKeyCols converts primary key columnIDs to cols in this upsert batch.

func (*UpsertBatch) GetValue

func (u *UpsertBatch) GetValue(row int, col int) (unsafe.Pointer, bool, error)

GetValue returns the data (fixed sized) stored at (row, col), including the pointer to the data, and the validity of the value.

func (*UpsertBatch) ReadData

func (u *UpsertBatch) ReadData(start int, length int) ([][]interface{}, error)

ReadData reads data from upsert batch and convert values to meaningful representations given data type.

type Vector

type Vector struct {
	// The data type of the value stored in the vector.
	DataType common.DataType

	// Max number of values that can be stored in the vector.
	Size int
	// Allocated size of the vector in bytes.
	Bytes int
	// contains filtered or unexported fields
}

Vector stores a batch of columnar data (values, nulls, or counts) for a column.

func NewVector

func NewVector(dataType common.DataType, size int) *Vector

NewVector creates a vector with the specified bits per unit and size(capacity). The majority of its storage space is managed in C.

func (*Vector) Buffer

func (v *Vector) Buffer() unsafe.Pointer

Buffer returns the pointer to the underlying buffer.

func (*Vector) CheckAllValid

func (v *Vector) CheckAllValid() bool

CheckAllValid checks whether all bits are 1 in a bool typed vector.

func (*Vector) GetBool

func (v *Vector) GetBool(index int) bool

GetBool returns the bool value for the specified index. index bound is not checked!

func (*Vector) GetSliceBytesAligned

func (v *Vector) GetSliceBytesAligned(lowerBound int, upperBound int) (buffer unsafe.Pointer, startIndex int, bytes int)

GetSliceBytesAligned calculate the number of bytes of a slice of the vector, represented by [lowerBound, upperBound), aligned to 64-byte return the buffer pointer, new start index (start entry in vector), and length in bytes

func (*Vector) GetValue

func (v *Vector) GetValue(index int) unsafe.Pointer

GetValue returns the data value for the specified index. index bound is not checked! The return value points to the internal buffer location that stores the value.

func (*Vector) LowerBound

func (v *Vector) LowerBound(first int, last int, value unsafe.Pointer) int

LowerBound returns the index of the first element in vector[first, last) that is greater or equal to the given value. The result is only valid if vector[first, last) is fully sorted in ascendant order. If all values in the given range is less than the given value, LowerBound returns last. Note that first/last is not checked against vector bound.

func (*Vector) SafeDestruct

func (v *Vector) SafeDestruct()

SafeDestruct destructs this vector's storage space managed in C.

func (*Vector) SetAllValid

func (v *Vector) SetAllValid()

SetAllValid set all bits to be 1 in a bool typed vector.

func (*Vector) SetBool

func (v *Vector) SetBool(index int, value bool)

SetBool sets the bool value for the specified index.

func (*Vector) SetValue

func (v *Vector) SetValue(index int, data unsafe.Pointer)

SetValue sets the data value for the specified index. index bound is not checked! data points to a buffer (in UpsertBatch for instance) that contains the value to be set.

func (*Vector) UpperBound

func (v *Vector) UpperBound(first int, last int, value unsafe.Pointer) int

UpperBound returns the index of the first element in vector[first, last) that is greater than the given value. The result is only valid if vector[first, last) is fully sorted in ascendant order. If all values in the given range is less than the given value, LowerBound returns last. Note that first/last is not checked against vector bound.

Directories

Path Synopsis
mocks
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL