memstore

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2019 License: Apache-2.0 Imports: 40 Imported by: 12

Documentation

Overview

Copyright (c) 2017-2018 Uber Technologies, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Package memstore has to put test factory here since otherwise we will have a memstore -> utils -> memstore import cycle.

Index

Constants

View Source
const BaseBatchID = int32(math.MinInt32)

BaseBatchID is the starting id of all batches.

Variables

This section is empty.

Functions

func NewHostMemoryManager

func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager

NewHostMemoryManager is used to init a HostMemoryManager.

func NewLiveVectorParty

func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, hostMemoryManager common.HostMemoryManager) common.LiveVectorParty

NewLiveVectorParty creates LiveVectorParty

func NewPrimaryKey

func NewPrimaryKey(keyBytes int, hasEventTime bool, initNumBuckets int,
	hostMemoryManager memCom.HostMemoryManager) memCom.PrimaryKey

NewPrimaryKey create a primary key data structure params:

  1. keyBytes, number of bytes of key
  2. hasEventTime determine whether primary key should record event time for expiration
  3. initNumBuckets determines the starting number of buckets, setting to 0 to use default

func UnpinVectorParties

func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty)

UnpinVectorParties unpins all vector parties in the slice.

Types

type ArchiveBatch

type ArchiveBatch struct {
	common.Batch

	// Size of the batch (number of rows). Notice that compression changes the
	// length of some columns, does not change the size of the batch.
	Size int

	// Version for archive batches.
	Version uint32

	// SeqNum denotes backfill sequence number
	SeqNum uint32

	// For convenience.
	BatchID int32
	Shard   *TableShard
}

ArchiveBatch represents a archive batch.

func (*ArchiveBatch) BlockingDelete

func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty

BlockingDelete blocks until all users are finished with the specified column, and then deletes the column from the batch. Returns the vector party deleted if any.

func (*ArchiveBatch) BuildIndex

func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk common.PrimaryKey) error

BuildIndex builds an index over the primary key columns of this archive batch and inserts the records id into the given primary key.

func (*ArchiveBatch) Clone

func (b *ArchiveBatch) Clone() *ArchiveBatch

Clone returns a copy of current batch including all references to underlying vector parties. Caller is responsible for holding the lock (if necessary).

func (*ArchiveBatch) MarshalJSON

func (b *ArchiveBatch) MarshalJSON() ([]byte, error)

MarshalJSON marshals a ArchiveBatch into json.

func (*ArchiveBatch) RequestVectorParty

func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty

RequestVectorParty creates(optional), pins, and returns the requested vector party. On creation it also asynchronously loads a vector party from disk into memory.

Caller must call vp.WaitForDiskLoad() before using it, and call vp.Release() afterwards.

func (*ArchiveBatch) TryEvict

func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty

TryEvict attempts to evict and destruct the specified column from the archive batch. It will fail fast if the column is currently in use so that host memory manager can try evicting other VPs immediately. Returns vector party evicted if succeeded.

func (*ArchiveBatch) WriteToDisk

func (b *ArchiveBatch) WriteToDisk() error

WriteToDisk writes each column of a batch to disk. It happens on archiving stage for merged archive batch so there is no need to lock it.

type ArchiveJobDetail

type ArchiveJobDetail struct {
	JobDetail

	// Current cutoff.
	CurrentCutoff uint32 `json:"currentCutoff"`
	// Stage of the job is running.
	Stage ArchivingStage `json:"stage"`
	// New Cutoff.
	RunningCutoff uint32 `json:"runningCutoff"`
	// Cutoff of last completed archiving job.
	LastCutoff uint32 `json:"lastCutoff"`
}

ArchiveJobDetail represents archiving job status of a table Shard.

type ArchiveJobDetailMutator

type ArchiveJobDetailMutator func(jobDetail *ArchiveJobDetail)

ArchiveJobDetailMutator is the mutator functor to change ArchiveJobDetail.

type ArchiveJobDetailReporter

type ArchiveJobDetailReporter func(key string, mutator ArchiveJobDetailMutator)

ArchiveJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type ArchiveStore

type ArchiveStore struct {
	// The mutex protects the pointer pointing to the current version of archived vector version.
	sync.RWMutex

	PurgeManager *PurgeManager
	// Current version points to the most recent version of vector store version for queries to use.
	CurrentVersion *ArchiveStoreVersion
}

ArchiveStore manages archive stores versions. Archive store version evolves to a new version after archiving. Readers should follow the following locking protocol:

archiveStore.Users.Add(1)
// tableShard.ArchiveStore can no longer be accessed directly.
// continue reading from archiveStore
archiveStore.Users.Done()

func NewArchiveStore

func NewArchiveStore(shard *TableShard) *ArchiveStore

NewArchiveStore creates a new archive store. Current version is just a place holder for test. It will be replaced during recovery.

func (*ArchiveStore) Destruct

func (s *ArchiveStore) Destruct()

Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.

func (*ArchiveStore) GetCurrentVersion

func (s *ArchiveStore) GetCurrentVersion() *ArchiveStoreVersion

GetCurrentVersion returns current SortedVectorStoreVersion and does proper locking. It'v used by query and data browsing. Users need to call version.Users.Done() after their work.

func (*ArchiveStore) MarshalJSON

func (s *ArchiveStore) MarshalJSON() ([]byte, error)

MarshalJSON marshals a ArchiveStore into json.

type ArchiveStoreVersion

type ArchiveStoreVersion struct {
	// The mutex
	// protects the Batches map structure and the archiving cutoff field.
	// It does not protect contents within a batch. Before releasing
	// the VectorStore mutex, user should lock the batch level mutex if necessary
	// to ensure proper protection at batch level.
	sync.RWMutex `json:"-"`

	// Wait group used to prevent this ArchiveStore from being evicted
	Users sync.WaitGroup `json:"-"`

	// Each batch in the slice is identified by BaseBatchID+index.
	// Index out of bound and nil Batch for archive batches indicates that none of
	// the columns have been loaded into memory from disk.
	Batches map[int32]*ArchiveBatch `json:"batches"`

	// The archiving cutoff used for this version of the sorted store.
	ArchivingCutoff uint32 `json:"archivingCutoff"`
	// contains filtered or unexported fields
}

ArchiveStoreVersion stores a version of archive batches of columnar data.

func NewArchiveStoreVersion

func NewArchiveStoreVersion(cutoff uint32, shard *TableShard) *ArchiveStoreVersion

NewArchiveStoreVersion creates a new empty archive store version given cutoff.

func (*ArchiveStoreVersion) GetBatchForRead

func (v *ArchiveStoreVersion) GetBatchForRead(batchID int) *ArchiveBatch

GetBatchForRead returns a archiveBatch for read, reader needs to unlock after use

func (*ArchiveStoreVersion) MarshalJSON

func (v *ArchiveStoreVersion) MarshalJSON() ([]byte, error)

MarshalJSON marshals a ArchiveStoreVersion into json.

func (*ArchiveStoreVersion) RequestBatch

func (v *ArchiveStoreVersion) RequestBatch(batchID int32) *ArchiveBatch

RequestBatch returns the requested archive batch from the archive store version.

type ArchivingJob

type ArchivingJob struct {
	// contains filtered or unexported fields
}

ArchivingJob defines the structure that an archiving job needs.

func (*ArchivingJob) GetIdentifier

func (job *ArchivingJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*ArchivingJob) JobType added in v0.0.2

func (job *ArchivingJob) JobType() common.JobType

JobType return job type

func (*ArchivingJob) Run

func (job *ArchivingJob) Run() error

Run starts the archiving process and wait for it to finish.

func (*ArchivingJob) String

func (job *ArchivingJob) String() string

String gives meaningful string representation for this job

type ArchivingStage

type ArchivingStage string

ArchivingStage represents different stages of a running archive job.

const (
	ArchivingCreatePatch ArchivingStage = "create patch"
	ArchivingMerge       ArchivingStage = "merge"
	ArchivingPurge       ArchivingStage = "purge"
	ArchivingComplete    ArchivingStage = "complete"
)

List of ArchivingStages.

type BackfillConfig added in v0.0.2

type BackfillConfig struct {
	// max buffer size to hold backfill data
	MaxBufferSize int64 `json:"maxBufferSize"`

	// threshold to trigger backfill
	BackfillThresholdInBytes int64 `json:"backfillThresholdInBytes"`
}

BackfillConfig defines configs for backfill

type BackfillJob

type BackfillJob struct {
	// contains filtered or unexported fields
}

BackfillJob defines the structure that a backfill job needs.

func (*BackfillJob) GetIdentifier

func (job *BackfillJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*BackfillJob) JobType added in v0.0.2

func (job *BackfillJob) JobType() common.JobType

JobType return job type

func (*BackfillJob) Run

func (job *BackfillJob) Run() error

Run starts the backfill process and wait for it to finish.

func (*BackfillJob) String

func (job *BackfillJob) String() string

String gives meaningful string representation for this job

type BackfillJobDetail

type BackfillJobDetail struct {
	JobDetail
	// Stage of the job is running.
	Stage BackfillStage `json:"stage"`
	// Current redolog file that's being backfilled.
	RedologFile int64 `json:"redologFile"`
	// Batch offset within the RedologFile.
	BatchOffset uint32 `json:"batchOffset"`
}

BackfillJobDetail represents backfill job status of a table Shard.

type BackfillJobDetailMutator

type BackfillJobDetailMutator func(jobDetail *BackfillJobDetail)

BackfillJobDetailMutator is the mutator functor to change BackfillJobDetail.

type BackfillJobDetailReporter

type BackfillJobDetailReporter func(key string, mutator BackfillJobDetailMutator)

BackfillJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type BackfillManager

type BackfillManager struct {
	sync.RWMutex `json:"-"`
	BackfillConfig

	// Name of the table.
	TableName string `json:"-"`

	// The shard id of the table.
	Shard int `json:"-"`

	// queue to hold UpsertBatches to backfill
	UpsertBatches []*memCom.UpsertBatch `json:"-"`

	// keep track of the number of records in backfill queue
	NumRecords int `json:"numRecords"`

	// keep track of the size of the buffer that holds batches to be backfilled
	CurrentBufferSize int64 `json:"currentBufferSize"`

	// keep track of the size of the buffer that holds batches being backfilled
	BackfillingBufferSize int64 `json:"backfillingBufferSize"`

	// keep track of the redo log file of the last batch backfilled
	LastRedoFile int64 `json:"lastRedoFile"`

	// keep track of the offset of the last batch backfilled
	LastBatchOffset uint32 `json:"lastBatchOffset"`

	// keep track of the redo log file of the last batch queued
	CurrentRedoFile int64 `json:"currentRedoFile"`

	// keep track of the offset of the last batch being queued
	CurrentBatchOffset uint32 `json:"currentBatchOffset"`

	AppendCond *sync.Cond `json:"-"`
}

BackfillManager manages the records that need to be put into a backfill queue and merged with sorted batches directly.

func NewBackfillManager

func NewBackfillManager(tableName string, shard int, config BackfillConfig) *BackfillManager

NewBackfillManager creates a new BackfillManager instance.

func (*BackfillManager) Append

func (r *BackfillManager) Append(upsertBatch *memCom.UpsertBatch, redoFile int64, batchOffset uint32) bool

Append appends an upsert batch into the backfill queue. Returns true if buffer limit has been reached and caller may need to wait

func (*BackfillManager) Destruct

func (r *BackfillManager) Destruct()

Destruct set the golang object references used by backfill manager to be nil to trigger gc ealier.

func (*BackfillManager) Done

func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32,
	metaStore metaCom.MetaStore) error

Done updates the backfill progress both in memory and in metastore.

func (*BackfillManager) GetLatestRedoFileAndOffset

func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32)

GetLatestRedoFileAndOffset returns latest redofile and its batch offset

func (*BackfillManager) MarshalJSON

func (r *BackfillManager) MarshalJSON() ([]byte, error)

MarshalJSON marshals a BackfillManager into json.

func (*BackfillManager) QualifyToTriggerBackfill

func (r *BackfillManager) QualifyToTriggerBackfill() bool

QualifyToTriggerBackfill decides if OK to trigger size-based backfill process

func (*BackfillManager) ReadUpsertBatch

func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *memCom.TableSchema) (data [][]interface{}, columnNames []string, err error)

ReadUpsertBatch reads upsert batch in backfill queue, user should not lock schema

func (*BackfillManager) StartBackfill

func (r *BackfillManager) StartBackfill() ([]*memCom.UpsertBatch, int64, uint32)

StartBackfill gets a slice of UpsertBatches from backfill queue and returns the CurrentRedoFile and CurrentBatchOffset.

func (*BackfillManager) WaitForBackfillBufferAvailability

func (r *BackfillManager) WaitForBackfillBufferAvailability()

WaitForBackfillBufferAvailability blocks until backfill buffer is available

type BackfillStage

type BackfillStage string

BackfillStage represents different stages of a running backfill job.

const (
	BackfillCreatePatch BackfillStage = "create patch"
	BackfillApplyPatch  BackfillStage = "apply patch"
	BackfillPurge       BackfillStage = "purge"
	BackfillComplete    BackfillStage = "complete"
)

List of BackfillStages.

type BatchStatsReporter

type BatchStatsReporter struct {
	// contains filtered or unexported fields
}

BatchStatsReporter is used to report batch level stats like row count

func NewBatchStatsReporter

func NewBatchStatsReporter(intervalInSeconds int, memStore MemStore, shardOwner topology.ShardOwner) *BatchStatsReporter

NewBatchStatsReporter create a new BatchStatsReporter instance

func (*BatchStatsReporter) Run

func (batchStats *BatchStatsReporter) Run()

Run is a ticker function to run report periodically

func (*BatchStatsReporter) Stop

func (batchStats *BatchStatsReporter) Stop()

Stop to stop the stats reporter

type CuckooIndex

type CuckooIndex struct {
	// contains filtered or unexported fields
}

CuckooIndex is a implementation of Hash Index using Cuckoo Hashing algorithm Lazy expiration is used to invalidate expired items CuckooIndex is not threadsafe

func (*CuckooIndex) AllocatedBytes

func (c *CuckooIndex) AllocatedBytes() uint

AllocatedBytes returns the allocated size of primary key in bytes.

func (*CuckooIndex) Capacity

func (c *CuckooIndex) Capacity() uint

Capacity returns how many items current primary key can hold.

func (*CuckooIndex) Delete

func (c *CuckooIndex) Delete(key memCom.Key)

Delete will delete a item with given key

func (*CuckooIndex) Destruct

func (c *CuckooIndex) Destruct()

Destruct frees all allocated memory

func (*CuckooIndex) Find

func (c *CuckooIndex) Find(key memCom.Key) (memCom.RecordID, bool)

Find looks up a record given key

func (*CuckooIndex) FindOrInsert

func (c *CuckooIndex) FindOrInsert(key memCom.Key, value memCom.RecordID, eventTime uint32) (existingFound bool, recordID memCom.RecordID, err error)

FindOrInsert find the existing key or insert a new (key, value) pair

func (*CuckooIndex) GetEventTimeCutoff

func (c *CuckooIndex) GetEventTimeCutoff() uint32

GetEventTimeCutoff returns the cutoff event time.

func (*CuckooIndex) LockForTransfer

func (c *CuckooIndex) LockForTransfer() memCom.PrimaryKeyData

LockForTransfer locks primary key for transfer and returns PrimaryKeyData

func (*CuckooIndex) Size

func (c *CuckooIndex) Size() uint

Size returns the current number of items stored in the hash table including expired items yet not known to the system

func (*CuckooIndex) UnlockAfterTransfer

func (c *CuckooIndex) UnlockAfterTransfer()

UnlockAfterTransfer release transfer lock

func (*CuckooIndex) Update

func (c *CuckooIndex) Update(key memCom.Key, value memCom.RecordID) bool

Update updates a key with a new recordID. Return whether key exists in the primary key or not.

func (*CuckooIndex) UpdateEventTimeCutoff

func (c *CuckooIndex) UpdateEventTimeCutoff(cutoff uint32)

UpdateEventTimeCutoff updates eventTimeCutoff

type Job

type Job interface {
	JobType() common.JobType
	Run() error
	GetIdentifier() string
	String() string
}

Job defines the common interface for BackfillJob, ArchivingJob and SnapshotJob

type JobDetail

type JobDetail struct {
	// Status of archiving for current table Shard.
	Status JobStatus `json:"status"`

	// Time of next archiving job
	NextRun time.Time `json:"nextRun,omitempty"`

	// Time when the last backfill job finishes
	LastRun time.Time `json:"lastRun,omitempty"`

	// Error of last run if failed.
	LastError error `json:"lastError,omitempty"`
	// Start time of last archiving job.
	LastStartTime time.Time `json:"lastStartTime,omitempty"`
	// Duration of last archiving job.
	LastDuration time.Duration `json:"lastDuration,omitempty"`

	// Number of records processed.
	NumRecords int `json:"numRecords,omitempty"`
	// Number of days affected.
	NumAffectedDays int `json:"numAffectedDays,omitempty"`

	// Total amount of work
	// ie, archiving merge: number of days
	//     archiving snapshot: number of records
	Total int `json:"total,omitempty"`
	// Current finished work.
	Current int `json:"current,omitempty"`

	// Duration for waiting for lock.
	LockDuration time.Duration `json:"lockDuration,omitempty"`
}

JobDetail represents common job status of a table Shard.

type JobStatus

type JobStatus string

JobStatus represents the job status of a given table Shard.

const (
	JobWaiting   JobStatus = "waiting"
	JobReady     JobStatus = "ready"
	JobRunning   JobStatus = "running"
	JobSucceeded JobStatus = "succeeded"
	JobFailed    JobStatus = "failed"
)

List of JobStatus.

type LiveBatch

type LiveBatch struct {
	// The common data structure holding column data.
	common.Batch

	// Capacity of the batch which is decided at the creation time.
	Capacity int

	// maximum of arrival time
	MaxArrivalTime uint32
	// contains filtered or unexported fields
}

LiveBatch represents a live batch.

func (*LiveBatch) GetOrCreateVectorParty

func (b *LiveBatch) GetOrCreateVectorParty(columnID int, locked bool) common.LiveVectorParty

GetOrCreateVectorParty returns LiveVectorParty for the specified column from the live batch. locked specifies whether the batch has been locked. The lock will be left in the same state after the function returns.

func (*LiveBatch) MarshalJSON

func (b *LiveBatch) MarshalJSON() ([]byte, error)

MarshalJSON marshals a LiveBatch into json.

type LiveStore

type LiveStore struct {
	sync.RWMutex

	// The batch id to batch map.
	Batches map[int32]*LiveBatch

	// Number of rows to create for new batches.
	BatchSize int

	// The upper bound of records (exclusive) that can be read by queries.
	LastReadRecord common.RecordID

	// This is the in memory archiving cutoff time high watermark that gets set by the archiving job
	// before each archiving run. Ingestion will not insert/update records that are older than
	// the archiving cutoff watermark.
	ArchivingCutoffHighWatermark uint32

	// Logs.
	RedoLogManager redolog.RedologManager

	// Manage backfill queue during ingestion.
	BackfillManager *BackfillManager

	// Manage snapshot related stats.
	SnapshotManager *SnapshotManager

	// The writer lock is to guarantee single writer to a Shard at all time. To ensure this, writers
	// (ingestion, archiving etc) need to hold this lock at all times. This lock
	// should be acquired before the VectorStore and Batch locks.
	// TODO: if spinning lock performance is a concern we may need to upgrade this
	// to a designated goroutine with a pc-queue channel.
	WriterLock sync.RWMutex

	// Primary key table of the Shard.
	PrimaryKey common.PrimaryKey

	// The position of the next record to be used for writing. Only used by the ingester.
	NextWriteRecord common.RecordID

	// For convenience.
	HostMemoryManager common.HostMemoryManager `json:"-"`
	// contains filtered or unexported fields
}

LiveStore stores live batches of columnar data.

func NewLiveStore

func NewLiveStore(batchSize int, shard *TableShard) *LiveStore

NewLiveStore creates a new live batch.

func (*LiveStore) AdvanceLastReadRecord

func (s *LiveStore) AdvanceLastReadRecord()

AdvanceLastReadRecord advances the high watermark of the rows to the next write record.

func (*LiveStore) AdvanceNextWriteRecord

func (s *LiveStore) AdvanceNextWriteRecord() common.RecordID

AdvanceNextWriteRecord reserves space for a record that return the next available record position back to the caller.

func (*LiveStore) Destruct

func (s *LiveStore) Destruct()

Destruct deletes all vectors allocated in C. Caller must detach the Shard first and wait until all users are finished.

func (*LiveStore) GetBatchForRead

func (s *LiveStore) GetBatchForRead(id int32) *LiveBatch

GetBatchForRead returns and read locks the batch with its ID for reads. Caller must explicitly RUnlock() the returned batch after all reads.

func (*LiveStore) GetBatchForWrite

func (s *LiveStore) GetBatchForWrite(id int32) *LiveBatch

GetBatchForWrite returns and locks the batch with its ID for reads. Caller must explicitly Unlock() the returned batch after all reads.

func (*LiveStore) GetBatchIDs

func (s *LiveStore) GetBatchIDs() (batchIDs []int32, numRecordsInLastBatch int)

GetBatchIDs snapshots the batches and returns a list of batch ids for read with the number of records in batchIDs[len()-1].

func (*LiveStore) GetMemoryUsageForColumn

func (s *LiveStore) GetMemoryUsageForColumn(valueType common.DataType, columnID int) int

GetMemoryUsageForColumn get the live store memory usage for given data type

func (*LiveStore) LookupKey

func (s *LiveStore) LookupKey(keyStrs []string) (common.RecordID, bool)

LookupKey looks up the given key in primary key.

func (*LiveStore) MarshalJSON

func (s *LiveStore) MarshalJSON() ([]byte, error)

MarshalJSON marshals a LiveStore into json.

func (*LiveStore) PurgeBatch

func (s *LiveStore) PurgeBatch(id int32)

PurgeBatch purges the specified batch.

func (*LiveStore) PurgeBatches

func (s *LiveStore) PurgeBatches(ids []int32)

PurgeBatches purges the specified batches.

type MemStore

type MemStore interface {
	common.TableSchemaReader
	bootstrap.Bootstrapable

	// GetMemoryUsageDetails
	GetMemoryUsageDetails() (map[string]TableShardMemoryUsage, error)
	// GetScheduler returns the scheduler for scheduling archiving and backfill jobs.
	GetScheduler() Scheduler
	// GetHostMemoryManager returns the host memory manager
	GetHostMemoryManager() common.HostMemoryManager
	// AddTableShard add a table shard to the memstore
	AddTableShard(table string, shardID int, needPeerCopy bool)
	// GetTableShard gets the data for a pinned table Shard. Caller needs to unpin after use.
	GetTableShard(table string, shardID int) (*TableShard, error)
	// RemoveTableShard removes table shard from memstore
	RemoveTableShard(table string, shardID int)
	// FetchSchema fetches schema from metaStore and updates in-memory copy of table schema,
	// and set up watch channels for metaStore schema changes, used for bootstrapping mem store.
	FetchSchema() error
	// InitShards loads/recovers data for shards initially owned by the current instance.
	InitShards(schedulerOff bool, shardOwner topology.ShardOwner)
	// HandleIngestion logs an upsert batch and applies it to the in-memory store.
	HandleIngestion(table string, shardID int, upsertBatch *common.UpsertBatch) error
	// Archive is the process moving stable records in fact tables from live batches to archive
	// batches.
	Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error

	// Backfill is the process of merging records with event time older than cutoff with
	// archive batches.
	Backfill(table string, shardID int, reporter BackfillJobDetailReporter) error

	// Snapshot is the process to write the current content of dimension table live store in memory to disk.
	Snapshot(table string, shardID int, reporter SnapshotJobDetailReporter) error

	// Purge is the process to purge out of retention archive batches
	Purge(table string, shardID, batchIDStart, batchIDEnd int, reporter PurgeJobDetailReporter) error
}

MemStore defines the interface for managing multiple table shards in memory. This is for mocking in unit tests

func NewMemStore

func NewMemStore(metaStore metaCom.MetaStore, diskStore diskstore.DiskStore, options Options) MemStore

NewMemStore creates a MemStore from the specified MetaStore.

type Option added in v0.0.2

type Option func(o *Options)

func WithNumShards added in v0.0.2

func WithNumShards(numShards int) Option

WithNumShards set numShards to memstore options

type Options added in v0.0.2

type Options struct {
	// contains filtered or unexported fields
}

class to hold all necessary context objects used in memstore

func NewOptions added in v0.0.2

func NewOptions(bootstrapToken common.BootStrapToken, redoLogMaster *redolog.RedoLogManagerMaster, setters ...Option) Options

NewOptions create new options instance

type PurgeJob

type PurgeJob struct {
	// contains filtered or unexported fields
}

PurgeJob defines the structure that a purge job needs.

func (*PurgeJob) GetIdentifier

func (job *PurgeJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*PurgeJob) JobType added in v0.0.2

func (job *PurgeJob) JobType() common.JobType

JobType return job type

func (*PurgeJob) Run

func (job *PurgeJob) Run() error

Run starts the purge process and wait for it to finish.

func (*PurgeJob) String

func (job *PurgeJob) String() string

String gives meaningful string representation for this job

type PurgeJobDetail

type PurgeJobDetail struct {
	JobDetail
	// Stage of the job is running.
	Stage PurgeStage `json:"stage"`
	// Number of batches purged
	NumBatches   int `json:"numBatches"`
	BatchIDStart int `json:"batchIDStart"`
	BatchIDEnd   int `json:"batchIDEnd"`
}

PurgeJobDetail represents purge job status of a table shard.

type PurgeJobDetailMutator

type PurgeJobDetailMutator func(jobDetail *PurgeJobDetail)

PurgeJobDetailMutator is the mutator functor to change PurgeJobDetail.

type PurgeJobDetailReporter

type PurgeJobDetailReporter func(key string, mutator PurgeJobDetailMutator)

PurgeJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type PurgeManager

type PurgeManager struct {
	sync.RWMutex `json:"-"`

	// Job Trigger Condition related fields
	// Last purge time.
	LastPurgeTime time.Time `json:"lastPurgeTime"`

	PurgeInterval time.Duration `json:"purgeInterval"`
	// contains filtered or unexported fields
}

PurgeManager manages the purge related stats and progress.

func NewPurgeManager

func NewPurgeManager(shard *TableShard) *PurgeManager

NewPurgeManager creates a new PurgeManager instance.

func (*PurgeManager) QualifyForPurge

func (p *PurgeManager) QualifyForPurge() bool

QualifyForPurge tells whether we can trigger a purge job.

type PurgeStage

type PurgeStage string

PurgeStage represents different stages of a running purge job.

const (
	PurgeMetaData PurgeStage = "purge metadata"
	PurgeDataFile PurgeStage = "purge data file"
	PurgeMemory   PurgeStage = "purge memory"
	PurgeComplete PurgeStage = "complete"
)

List of purge stages

type RedoLogBrowser

type RedoLogBrowser interface {
	ListLogFiles() ([]int64, error)
	ListUpsertBatch(creationTime int64) ([]int64, error)
	ReadData(creationTime int64, upsertBatchOffset int64, start int, length int) (
		[][]interface{}, []string, int, error)
}

RedoLogBrowser is the interface to list redo log files, upsert batches and read upsert batch data.

type Scheduler

type Scheduler interface {
	Start()
	Stop()
	SubmitJob(job Job) (error, chan error)
	DeleteTable(table string, isFactTable bool)
	GetJobDetails(jobType common.JobType) interface{}
	NewBackfillJob(tableName string, shardID int) Job
	NewArchivingJob(tableName string, shardID int, cutoff uint32) Job
	NewSnapshotJob(tableName string, shardID int) Job
	NewPurgeJob(tableName string, shardID int, batchIDStart int, batchIDEnd int) Job
	EnableJobType(jobType common.JobType, enable bool)
	IsJobTypeEnabled(jobType common.JobType) bool
	utils.RWLocker
}

Scheduler is for scheduling archiving jobs (and later backfill jobs) for table shards in memStore. It scans through all tables and shards to generate list of eligible jobs to run.

type SnapshotJob

type SnapshotJob struct {
	// contains filtered or unexported fields
}

SnapshotJob defines the structure that a snapshot job needs.

func (*SnapshotJob) GetIdentifier

func (job *SnapshotJob) GetIdentifier() string

GetIdentifier returns a unique identifier of this job.

func (*SnapshotJob) JobType added in v0.0.2

func (job *SnapshotJob) JobType() common.JobType

JobType return job type

func (*SnapshotJob) Run

func (job *SnapshotJob) Run() error

Run starts the snapshot process and wait for it to finish.

func (*SnapshotJob) String

func (job *SnapshotJob) String() string

String gives meaningful string representation for this job

type SnapshotJobDetail

type SnapshotJobDetail struct {
	JobDetail
	// Number of mutations in this snapshot.
	NumMutations int `json:"numMutations"`
	// Number of batches written in this snapshot.
	NumBatches int `json:"numBatches"`
	// Current redolog file that's being backfilled.
	RedologFile int64 `json:"redologFile"`
	// Batch offset within the RedologFile.
	BatchOffset uint32 `json:"batchOffset"`
	// Stage of the job is running.
	Stage SnapshotStage `json:"stage"`
}

SnapshotJobDetail represents snapshot job status of a table shard.

type SnapshotJobDetailMutator

type SnapshotJobDetailMutator func(jobDetail *SnapshotJobDetail)

SnapshotJobDetailMutator is the mutator functor to change SnapshotJobDetail.

type SnapshotJobDetailReporter

type SnapshotJobDetailReporter func(key string, mutator SnapshotJobDetailMutator)

SnapshotJobDetailReporter is the functor to apply mutator changes to corresponding JobDetail.

type SnapshotManager

type SnapshotManager struct {
	sync.RWMutex `json:"-"`

	// Number of mutations since last snapshot. Measured as number of rows mutated.
	NumMutations int `json:"numMutations"`

	// Last snapshot time.
	LastSnapshotTime time.Time `json:"'lastSnapshotTime'"`

	// keep track of the redo log file of the last batch snapshotted.
	LastRedoFile int64 `json:"lastRedoFile"`

	// keep track of the offset of the last batch snapshotted
	LastBatchOffset uint32 `json:"lastBatchOffset"`

	// keep track of the record position of the last batch snapshotted
	LastRecord common.RecordID

	// keep track of the redo log file of the last batch queued
	CurrentRedoFile int64 `json:"currentRedoFile"`

	// keep track of the offset of the last batch queued
	CurrentBatchOffset uint32 `json:"currentBatchOffset"`

	// keep track of the record position when last batch queued
	CurrentRecord common.RecordID

	// Configs
	SnapshotInterval time.Duration `json:"snapshotInterval"`

	SnapshotThreshold int `json:"snapshotThreshold"`
	// contains filtered or unexported fields
}

SnapshotManager manages the snapshot related stats and progress.

func NewSnapshotManager

func NewSnapshotManager(shard *TableShard) *SnapshotManager

NewSnapshotManager creates a new SnapshotManager instance.

func (*SnapshotManager) ApplyUpsertBatch

func (s *SnapshotManager) ApplyUpsertBatch(redoFile int64, offset uint32, numMutations int, currentRecord common.RecordID)

ApplyUpsertBatch advances CurrentRedoLogFile and CurrentBatchOffset and increments NumMutations after applying an upsert batch to live store.

func (*SnapshotManager) Done

func (s *SnapshotManager) Done(currentRedoFile int64, currentBatchOffset uint32, lastNumMutations int, currentRecord common.RecordID) error

Done updates the snapshot progress both in memory and in metastore and updates number of mutations accordingly.

func (*SnapshotManager) GetLastSnapshotInfo

func (s *SnapshotManager) GetLastSnapshotInfo() (int64, uint32, time.Time, common.RecordID)

GetLastSnapshotInfo get last snapshot redolog file, offset and timestamp, lastRecord

func (*SnapshotManager) MarshalJSON

func (s *SnapshotManager) MarshalJSON() ([]byte, error)

MarshalJSON marshals a BackfillManager into json.

func (*SnapshotManager) QualifyForSnapshot

func (s *SnapshotManager) QualifyForSnapshot() bool

QualifyForSnapshot tells whether we can trigger a snapshot job.

func (*SnapshotManager) SetLastSnapshotInfo

func (s *SnapshotManager) SetLastSnapshotInfo(redoLogFile int64, offset uint32, record common.RecordID)

SetLastSnapshotInfo update last snapshot redolog file, offset and timestamp, lastRecord

func (*SnapshotManager) StartSnapshot

func (s *SnapshotManager) StartSnapshot() (int64, uint32, int, common.RecordID)

StartSnapshot returns current redo log file ,offset

type SnapshotStage

type SnapshotStage string

SnapshotStage represents different stages of a running snapshot job.

const (
	SnapshotSnapshot SnapshotStage = "snapshot"
	SnapshotCleanup  SnapshotStage = "cleanup"
	SnapshotComplete SnapshotStage = "complete"
)

List of SnapshotStages

type TableShard

type TableShard struct {
	// Wait group used to prevent the stores from being prematurely deleted.
	Users sync.WaitGroup `json:"-"`

	ShardID int `json:"-"`

	// For convenience, reference to the table schema struct.
	Schema *common.TableSchema `json:"schema"`

	// Live store. Its locks also cover the primary key.
	LiveStore *LiveStore `json:"liveStore"`

	// Archive store.
	ArchiveStore *ArchiveStore `json:"archiveStore"`

	// For convenience.
	HostMemoryManager common.HostMemoryManager `json:"-"`

	BootstrapState bootstrap.BootstrapState `json:"BootstrapState"`

	// BootstrapDetails shows the details of bootstrap
	BootstrapDetails bootstrap.BootstrapDetails `json:"bootstrapDetails,omitempty"`
	// contains filtered or unexported fields
}

TableShard stores the data for one table shard in memory.

func NewTableShard

func NewTableShard(schema *common.TableSchema, metaStore metaCom.MetaStore,
	diskStore diskstore.DiskStore, hostMemoryManager common.HostMemoryManager, shard int, options Options) *TableShard

NewTableShard creates and initiates a table shard based on the schema.

func (*TableShard) ApplyUpsertBatch

func (shard *TableShard) ApplyUpsertBatch(upsertBatch *common.UpsertBatch, redoLogFile int64, offset uint32, skipBackfillRows bool) (bool, error)

ApplyUpsertBatch applies the upsert batch to the memstore shard. Returns true if caller needs to wait for availability of backfill buffer

func (*TableShard) Bootstrap added in v0.0.2

func (shard *TableShard) Bootstrap(
	peerSource client.PeerSource,
	origin string,
	topo topology.Topology,
	topoState *topology.StateSnapshot,
	options bootstrap.Options,
) error

Bootstrap executes bootstrap for table shard

func (*TableShard) DeleteColumn

func (shard *TableShard) DeleteColumn(columnID int) error

DeleteColumn deletes the data for the specified column.

func (*TableShard) Destruct

func (shard *TableShard) Destruct()

Destruct destructs the table shard. Caller must detach the shard from memstore first.

func (*TableShard) IsBootstrapped added in v0.0.2

func (shard *TableShard) IsBootstrapped() bool

IsBootstrapped returns whether this table shard is bootstrapped.

func (*TableShard) IsDiskDataAvailable added in v0.0.2

func (shard *TableShard) IsDiskDataAvailable() bool

IsDiskDataAvailable returns whether the data is available on disk for table shard

func (*TableShard) LoadMetaData

func (shard *TableShard) LoadMetaData() error

LoadMetaData loads metadata for the table Shard from metastore.

func (*TableShard) LoadSnapshot

func (shard *TableShard) LoadSnapshot() error

LoadSnapshot load shard data from snapshot files

func (*TableShard) NewRedoLogBrowser

func (shard *TableShard) NewRedoLogBrowser() RedoLogBrowser

NewRedoLogBrowser creates a RedoLogBrowser using field from Shard.

func (*TableShard) PlayRedoLog added in v0.0.2

func (shard *TableShard) PlayRedoLog()

PlayRedoLog loads data for the table Shard from disk store and recovers the Shard for serving.

func (*TableShard) PreloadColumn

func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int)

PreloadColumn loads the column into memory and wait for completion of loading within (startDay, endDay]. Note endDay is inclusive but startDay is exclusive.

type TableShardMemoryUsage

type TableShardMemoryUsage struct {
	ColumnMemory     map[string]*common.ColumnMemoryUsage `json:"cols"`
	PrimaryKeyMemory uint                                 `json:"pk"`
}

TableShardMemoryUsage contains memory usage for column memory and primary key memory usage

type TestFactoryT

type TestFactoryT struct {
	tests.TestFactoryBase
}

TestFactoryT creates memstore test objects from text file

func GetFactory added in v0.0.2

func GetFactory() TestFactoryT

func (TestFactoryT) NewMockMemStore

func (t TestFactoryT) NewMockMemStore() *memStoreImpl

NewMockMemStore returns a new memstore with mocked diskstore and metastore.

type TransferableVectorParty

type TransferableVectorParty interface {
	// GetHostVectorPartySlice slice vector party between [startIndex, startIndex+length) before transfer to gpu
	GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice
}

TransferableVectorParty is vector party that can be transferred to gpu for processing

Directories

Path Synopsis
mocks
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL