Documentation ¶
Index ¶
- Constants
- Variables
- func AddEntryToInMemBuf(streamid string, rawJson []byte, ts_millis uint64, indexName string, ...) error
- func AddNewRotatedSegment(segmeta structs.SegMeta)
- func AddTimeSeriesEntryToInMemBuf(rawJson []byte, signalType SIGNAL_TYPE, orgid uint64) error
- func AgFnToIdx(fn utils.AggregateFunctions) int
- func ApplySearchToDictArrayFilter(col []byte, qValDte *DtypeEnclosure, rec []byte, fop FilterOperator, ...) (bool, error)
- func ApplySearchToExpressionFilterSimpleCsg(qValDte *DtypeEnclosure, fop FilterOperator, col []byte, isRegexSearch bool, ...) (bool, error)
- func ApplySearchToMatchFilterRawCsg(match *MatchFilter, col []byte) (bool, error)
- func BackFillPQSSegmetaEntry(segsetkey string, newpqid string)
- func CheckAndGetColsForUnrotatedSegKey(key string) (map[string]bool, bool)
- func DecodeDictionaryColumn(encodedBytes []byte) map[segutils.CValueDictEnclosure][]uint16
- func DeleteOldKibanaDoc(indexNameConverted string, idVal string)
- func DeleteSegmentsForIndex(segmetaFName, indexName string)
- func DeleteVirtualTableSegStore(virtualTableName string)
- func DoesSegKeyHavePqidResults(segKey string, pqid string) bool
- func EncodeBlocksum(bmh *BlockMetadataHolder, bsum *BlockSummary, blockSummBuf []byte, ...) (uint32, []byte, error)
- func EncodeDictionaryColumn(columnValueMap map[segutils.CValueDictEnclosure][]uint16, ...) ([]byte, uint32)
- func EncodeRIBlock(blockRangeIndex map[string]*Numbers, blkNum uint16) (uint32, []byte, error)
- func FilterUnrotatedSegmentsInQuery(timeRange *dtu.TimeRange, indexNames []string, orgid uint64) (map[string]map[string]*dtu.TimeRange, uint64, uint64)
- func FlushWipBufferToFile(sleepDuration *time.Duration)
- func ForceRotateSegmentsForTest()
- func ForcedFlushToSegfile()
- func GetAllPersistentQueryResults(segKey string, pqid string) (*pqmr.SegmentPQMRResults, error)
- func GetBlockSearchInfoForKey(key string) (map[uint16]*structs.BlockMetadataHolder, error)
- func GetBlockSummaryForKey(key string) ([]*structs.BlockSummary, error)
- func GetCvalFromRec(rec []byte, qid uint64) (CValueEnclosure, uint16, error)
- func GetFileNameForRotatedSegment(seg string) (string, error)
- func GetInMemorySize() uint64
- func GetLocalSegmetaFName() string
- func GetNumOfSearchedRecordsUnRotated(segKey string) uint64
- func GetSearchInfoForPQSQuery(key string, spqmr *pqmr.SegmentPQMRResults) (map[uint16]*structs.BlockMetadataHolder, []*structs.BlockSummary, error)
- func GetSizeOfUnrotatedMetadata() uint64
- func GetTSRangeForMissingBlocks(segKey string, tRange *dtu.TimeRange, spqmr *pqmr.SegmentPQMRResults) *dtu.TimeRange
- func GetUnrotatedMetadataInfo() (uint64, uint64)
- func GetUnrotatedVTableCounts(vtable string, orgid uint64) (uint64, int, uint64)
- func GetVTableCounts(vtableName string, orgid uint64) (uint64, int, uint64)
- func GetVTableCountsForAll(orgid uint64) map[string]*structs.VtableCounts
- func HostnameDir()
- func InitKibanaInternalData()
- func InitWriterNode()
- func IsRecentlyRotatedSegKey(key string) bool
- func IsSegKeyUnrotated(key string) bool
- func PackDictEnc(colWip *ColWip)
- func ReadAllSegmetas() []*structs.SegMeta
- func ReadLocalSegmeta() []*structs.SegMeta
- func ReadSegmeta(smFname string) ([]*structs.SegMeta, error)
- func RebalanceUnrotatedMetadata(totalAvailableSize uint64) uint64
- func RemoveSegments(segmetaFName string, segmentsToDelete map[string]*structs.SegMeta)
- func SetCardinalityLimit(val uint16)
- func WriteMockBlockSummary(file string, blockSums []*BlockSummary, allBmh map[uint16]*BlockMetadataHolder)
- func WriteMockColSegFile(segkey string, numBlocks int, entryCount int) ([]map[string]*BloomIndex, []*BlockSummary, []map[string]*RangeIndex, ...)
- func WriteMockMetricsSegment(forceRotate bool, entryCount int) ([]*metrics.MetricsSegment, error)
- func WriteMockTraceFile(segkey string, numBlocks int, entryCount int) ([]map[string]*BloomIndex, []*BlockSummary, []map[string]*RangeIndex, ...)
- func WriteMockTsRollup(segkey string) error
- type BloomIndex
- type ColWip
- type Node
- type PQTracker
- type RangeIndex
- type RolledRecs
- type SegStore
- func (segstore *SegStore) AppendWipToSegfile(streamid string, forceRotate bool, isKibana bool, onTimeRotate bool) error
- func (ss *SegStore) DestroyWipBlock()
- func (ss *SegStore) EncodeColumns(rawData []byte, recordTime uint64, tsKey *string, ...) (uint32, bool, error)
- func (ss *SegStore) FlushSegStats() error
- func (segstore *SegStore) WritePackedRecord(rawJson []byte, ts_millis uint64, signalType utils.SIGNAL_TYPE) error
- type SegfileRotateInfo
- type StarTree
- type StarTreeBuilder
- func (stb *StarTreeBuilder) Aggregate(cur *Node) error
- func (stb *StarTreeBuilder) ComputeStarTree(wip *WipBlock) error
- func (stb *StarTreeBuilder) EncodeStarTree(segKey string) (uint32, error)
- func (stb *StarTreeBuilder) GetNodeCount() int
- func (stb *StarTreeBuilder) ResetSegTree(block *WipBlock, groupByKeys []string, mColNames []string)
- type UnrotatedSegmentInfo
- func (usi *UnrotatedSegmentInfo) DoCMICheckForUnrotated(currQuery *structs.SearchQuery, tRange *dtu.TimeRange, ...) (map[uint16]map[string]bool, uint64, uint64, error)
- func (usi *UnrotatedSegmentInfo) GetTimeRange() *dtu.TimeRange
- func (usi *UnrotatedSegmentInfo) GetUnrotatedBlockInfoForQuery() ([]*structs.BlockSummary, map[uint16]*structs.BlockMetadataHolder, ...)
- type WipBlock
Constants ¶
const ( MeasFnMinIdx int = iota // has to be always zero based MeasFnMaxIdx MeasFnSumIdx MeasFnCountIdx // Note: anytimes you add a Fn, make sure to adjust the IdxToAgFn array // Note: always keep this last since it is used for indexing into aggValues TotalMeasFns )
its ok for this to be int, since this will be used as an index in arrays
const FPARM_FLOAT64 = float64(0)
const FPARM_INT64 = int64(0)
const FPARM_UINT64 = uint64(0)
const MaxAgileTreeNodeCount = 8_000_000
Variables ¶
var AllUnrotatedSegmentInfo = map[string]*UnrotatedSegmentInfo{}
var IdxToAgFn []utils.AggregateFunctions = []utils.AggregateFunctions{ utils.Min, utils.Max, utils.Sum, utils.Count}
var KibanaInternalBaseDir string
var RecentlyRotatedSegmentFiles = map[string]*SegfileRotateInfo{}
var SegmetaSuffix = "segmeta.json"
var TotalUnrotatedMetadataSizeBytes uint64
var UnrotatedInfoLock sync.RWMutex = sync.RWMutex{}
Functions ¶
func AddEntryToInMemBuf ¶
func AddNewRotatedSegment ¶
func AgFnToIdx ¶
func AgFnToIdx(fn utils.AggregateFunctions) int
func BackFillPQSSegmetaEntry ¶
func CheckAndGetColsForUnrotatedSegKey ¶
Returns a copy of AllColumns seen for a given key from the unrotated segment infos If no key exists, returns an error
func DecodeDictionaryColumn ¶
func DecodeDictionaryColumn(encodedBytes []byte) map[segutils.CValueDictEnclosure][]uint16
func DeleteOldKibanaDoc ¶
func DeleteSegmentsForIndex ¶
func DeleteSegmentsForIndex(segmetaFName, indexName string)
func DeleteVirtualTableSegStore ¶
func DeleteVirtualTableSegStore(virtualTableName string)
func EncodeBlocksum ¶
func EncodeDictionaryColumn ¶
func EncodeDictionaryColumn(columnValueMap map[segutils.CValueDictEnclosure][]uint16, colRis map[string]*RangeIndex, recNum uint16) ([]byte, uint32)
func EncodeRIBlock ¶
func FilterUnrotatedSegmentsInQuery ¶
func FilterUnrotatedSegmentsInQuery(timeRange *dtu.TimeRange, indexNames []string, orgid uint64) (map[string]map[string]*dtu.TimeRange, uint64, uint64)
returns map[table]->map[segKey]->timeRange that pass index & time range check, total checked, total passed
func FlushWipBufferToFile ¶
func ForceRotateSegmentsForTest ¶
func ForceRotateSegmentsForTest()
func ForcedFlushToSegfile ¶
func ForcedFlushToSegfile()
This function is used when os.Interrupt is caught meta files need to be updated to not lose range/bloom/file path info on node failure
func GetAllPersistentQueryResults ¶
func GetAllPersistentQueryResults(segKey string, pqid string) (*pqmr.SegmentPQMRResults, error)
func GetBlockSearchInfoForKey ¶
func GetBlockSearchInfoForKey(key string) (map[uint16]*structs.BlockMetadataHolder, error)
returns a copy of the unrotated block search info. This is to prevent concurrent modification
func GetBlockSummaryForKey ¶
func GetBlockSummaryForKey(key string) ([]*structs.BlockSummary, error)
returns the block summary for a segment key
func GetCvalFromRec ¶
Caller of this function can confidently cast the CValEncoslure.CVal to one of the foll types: bool (if CValEncoslure.Dtype = SS_DT_BOOL) uint64 (if CValEncoslure.Dtype = SS_DT_UNSIGNED_NUM) int64 (if CValEncoslure.Dtype = SS_DT_SIGNED_NUM) float64 (if CValEncoslure.Dtype = SS_DT_FLOAT) string (if CValEncoslure.Dtype = SS_DT_STRING) array (if CValEncoslure.Dtype = SS_DT_ARRAY_DICT)
parameters:
rec: byte slice qid
returns:
CValEncoslure: Cval encoding of this col entry uint16: len of this entry inside that was inside the byte slice error:
func GetSearchInfoForPQSQuery ¶
func GetSearchInfoForPQSQuery(key string, spqmr *pqmr.SegmentPQMRResults) (map[uint16]*structs.BlockMetadataHolder, []*structs.BlockSummary, error)
returns block search info, block summaries, and any errors encountered block search info will be loaded for all possible columns
func GetSizeOfUnrotatedMetadata ¶
func GetSizeOfUnrotatedMetadata() uint64
func GetTSRangeForMissingBlocks ¶
func GetTSRangeForMissingBlocks(segKey string, tRange *dtu.TimeRange, spqmr *pqmr.SegmentPQMRResults) *dtu.TimeRange
returns the time range of the blocks in the segment that do not exist in spqmr if the timeRange is nil, no blocks were found in unrotated metadata that donot exist in spqmr
func GetUnrotatedMetadataInfo ¶
Returns number of loaded unrotated metadata, and total number of unrotated metadata
func GetVTableCountsForAll ¶
func GetVTableCountsForAll(orgid uint64) map[string]*structs.VtableCounts
func HostnameDir ¶
func HostnameDir()
func InitKibanaInternalData ¶
func InitKibanaInternalData()
func InitWriterNode ¶
func InitWriterNode()
func IsRecentlyRotatedSegKey ¶
func IsSegKeyUnrotated ¶
func PackDictEnc ¶
func PackDictEnc(colWip *ColWip)
func ReadAllSegmetas ¶
returns all segmetas downloaded, including the current nodes segmeta and all global segmetas
func ReadLocalSegmeta ¶
read only the current nodes segmeta
func RebalanceUnrotatedMetadata ¶
Removed unrotated metadata from in memory based on the available size and return the new in memory size Currently, once we remove an entry we have no way of adding it back TODO: improve on re-loading of unrotated microindices
func RemoveSegments ¶
func SetCardinalityLimit ¶
func SetCardinalityLimit(val uint16)
func WriteMockBlockSummary ¶
func WriteMockColSegFile ¶
func WriteMockMetricsSegment ¶
func WriteMockMetricsSegment(forceRotate bool, entryCount int) ([]*metrics.MetricsSegment, error)
func WriteMockTraceFile ¶
func WriteMockTraceFile(segkey string, numBlocks int, entryCount int) ([]map[string]*BloomIndex, []*BlockSummary, []map[string]*RangeIndex, map[string]bool, map[uint16]*BlockMetadataHolder)
func WriteMockTsRollup ¶
Types ¶
type BloomIndex ¶
type BloomIndex struct { Bf *bloom.BloomFilter HistoricalCount []uint32 // contains filtered or unexported fields }
type ColWip ¶
type ColWip struct {
// contains filtered or unexported fields
}
func InitColWip ¶
func (*ColWip) GetBufAndIdx ¶
func (*ColWip) SetDeCount ¶
func (*ColWip) WriteSingleString ¶
type PQTracker ¶
type PQTracker struct { PQNodes map[string]*structs.SearchNode // maps pqid to search node // contains filtered or unexported fields }
helper struct to keep track of persistent queries and columns that need to be searched
type RangeIndex ¶
type RolledRecs ¶
type RolledRecs struct { MatchedRes *pqmr.PQMatchResults // contains filtered or unexported fields }
type SegStore ¶
type SegStore struct { // segment related data SegmentKey string VirtualTableName string RecordCount int AllSeenColumns map[string]bool BytesReceivedCount uint64 OnDiskBytes uint64 // running sum of cmi/csg/bsu file sizes AllSst map[string]*structs.SegStats // map[colName] => SegStats_of_each_column OrgId uint64 // contains filtered or unexported fields }
SegStore Individual stream buffer
func InitSegStore ¶
func (*SegStore) AppendWipToSegfile ¶
func (*SegStore) DestroyWipBlock ¶
func (ss *SegStore) DestroyWipBlock()
func (*SegStore) EncodeColumns ¶
func (ss *SegStore) EncodeColumns(rawData []byte, recordTime uint64, tsKey *string, signalType segutils.SIGNAL_TYPE) (uint32, bool, error)
Each column stored in its own columnar file Each column file format: [ValType-1 1B] [OptionalStringVal-Len-1 2B] [ActualValue-1] [ValType-2 1B] [OptionalStringVal-Len-2 2B] [ActualValue-2] This function should not be called by itself, must be called via locks This function assumes that the record_json has been flattened foundColsInRecord is a map[string]bool of all columns in the WIPBlock. New columns will be added to this map The values of this map will be set to false before returning for subsequent calls. This lets us re-use the same map across WIPBlock returns : 1) Max index amongst the columns 3) bool if this record matched the column conditions in PQColTracker 3) error
func (*SegStore) FlushSegStats ¶
Encoding Scheme for all columns single file
[Version 1B] [CnameLen 2B] [Cname xB] [ColSegEncodingLen 2B] [ColSegEncoding xB]....
func (*SegStore) WritePackedRecord ¶
type SegfileRotateInfo ¶
type StarTreeBuilder ¶
type StarTreeBuilder struct {
// contains filtered or unexported fields
}
func (*StarTreeBuilder) Aggregate ¶
func (stb *StarTreeBuilder) Aggregate(cur *Node) error
func (*StarTreeBuilder) ComputeStarTree ¶
func (stb *StarTreeBuilder) ComputeStarTree(wip *WipBlock) error
ComputeStarTree
Current assumptions: All groupBy columns that contain strings are dictionaryEncoded. Any column with len(col.deMap) != 0 is assumed to be dictionary encoded It is also assumed that no other values than the dic encoded strings appear in that column When storing all other values, their raw byte values are converted to an unsigned integer, and then converted to uint64 to have a consistent size
parameters:
wipBlock: segstore's wip block
returns:
StarTree: ptr to StarTree
func (*StarTreeBuilder) EncodeStarTree ¶
func (stb *StarTreeBuilder) EncodeStarTree(segKey string) (uint32, error)
*************** StarTree Encoding Format ***************************** [FileType 1B] [LenMetaData 4B] [MetaData] [NodeDataDetails] [MetaData] : [GroupbyKeys] [MeasureColNames] [DictEncCol-1] [DictEncCol-2] ...[DictEncCol-N] [GroupbyKeys] : [LenGrpKeys 2B] [GPK-1] [GPK-2]... [GPK] : [StrLen 2B] [ActualStr xB] [MeasureColNames] : [LenMeasureColNames 2B] [MeasureColName-1] [MeasureColNames-2] ... [MeasureColNames-1] : [StrLen 2B] [McolName xB] [DictEncCol-1] : [ColStrLen 2B] [ColName xB] [NumKeys 4B] [Enc-1] {Enc-2] ... [Enc-1] : [EncStrLen 2B] [EncStr xB] [NodeDataDetails]: [NddLen 4B] [LevOffMeta xB] [LevelDetails-1 xB] [LevelDetails-2 xB].... in BFS [LevOffMetas] : [levOff-0 8B] [levSize-0 4B] [levOff-1 8B] [levSize-1 4B] .... [LevelDetails-1] : [LevelNum 2B] [numNodesAtLevel 4B] [NodeAgInfo...] [NodeAgInfo-1] : [nodeKey 4B] [parentKeys xB] [aggValue-1] [aggValue-2] ... [parentKeys] : [parKey-0 4B] [parKey-1 4B].... // numOfParents depends on level [aggValue]: [dType 1B] [val 8B]
func (*StarTreeBuilder) GetNodeCount ¶
func (stb *StarTreeBuilder) GetNodeCount() int
func (*StarTreeBuilder) ResetSegTree ¶
func (stb *StarTreeBuilder) ResetSegTree(block *WipBlock, groupByKeys []string, mColNames []string)
ResetSegTree
Current assumptions: All groupBy columns that contain strings are dictionaryEncoded. Any column with len(col.deMap) != 0 is assumed to be dictionary encoded It is also assumed that no other values than the dic encoded strings appear in that column When storing all other values, their raw byte values are converted to an unsigned integer, and then converted to uint64 to have a consistent size
parameters:
wipBlock: segstore's wip block groupByKeys: groupBy column Names mColNames: colnames of measure columns
returns:
type UnrotatedSegmentInfo ¶
type UnrotatedSegmentInfo struct { TableName string RecordCount int // contains filtered or unexported fields }
func (*UnrotatedSegmentInfo) DoCMICheckForUnrotated ¶
func (usi *UnrotatedSegmentInfo) DoCMICheckForUnrotated(currQuery *structs.SearchQuery, tRange *dtu.TimeRange, blkTracker *structs.BlockTracker, bloomWords map[string]bool, bloomOp segutils.LogicalOperator, rangeFilter map[string]string, rangeOp segutils.FilterOperator, isRange bool, wildcardValue bool, qid uint64) (map[uint16]map[string]bool, uint64, uint64, error)
does CMI check on unrotated segment info for inputted request. Assumes UnrotatedInfoLock has been acquired returns the final blocks to search, total unrotated blocks, num filtered blocks, and errors if any
func (*UnrotatedSegmentInfo) GetTimeRange ¶
func (usi *UnrotatedSegmentInfo) GetTimeRange() *dtu.TimeRange
func (*UnrotatedSegmentInfo) GetUnrotatedBlockInfoForQuery ¶
func (usi *UnrotatedSegmentInfo) GetUnrotatedBlockInfoForQuery() ([]*structs.BlockSummary, map[uint16]*structs.BlockMetadataHolder, map[string]bool)
For a unrotated segment info, return []blockSummaries, map[uint16]*structs.BlockMetadataHolder, and map[string]bool (all columns) This information will be used for unrotated queries. This will return copies of in memory metadata to avoid race conditions
A copy needs to be returned here as usi.BlockSummaries and usi.BlockInfo may have concurrent writes ¶
This assumes the caller has already acquired the lock on UnrotatedInfoLock