writer

package
v0.0.0-...-1c6f4db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: AGPL-3.0 Imports: 39 Imported by: 17

Documentation

Index

Constants

View Source
const (
	MeasFnMinIdx int = iota // has to be always zero based
	MeasFnMaxIdx
	MeasFnSumIdx
	MeasFnCountIdx
	// Note: anytimes you add a Fn, make sure to adjust the IdxToAgFn array
	// Note: always keep this last since it is used for indexing into aggValues
	TotalMeasFns
)

its ok for this to be int, since this will be used as an index in arrays

View Source
const FPARM_FLOAT64 = float64(0)
View Source
const FPARM_INT64 = int64(0)
View Source
const FPARM_UINT64 = uint64(0)
View Source
const MaxAgileTreeNodeCount = 8_000_000

Variables

View Source
var AllUnrotatedSegmentInfo = map[string]*UnrotatedSegmentInfo{}
View Source
var KibanaInternalBaseDir string
View Source
var RecentlyRotatedSegmentFiles = map[string]*SegfileRotateInfo{}
View Source
var SegmetaSuffix = "segmeta.json"
View Source
var TotalUnrotatedMetadataSizeBytes uint64
View Source
var UnrotatedInfoLock sync.RWMutex = sync.RWMutex{}

Functions

func AddEntryToInMemBuf

func AddEntryToInMemBuf(streamid string, rawJson []byte, ts_millis uint64,
	indexName string, bytesReceived uint64, flush bool, signalType SIGNAL_TYPE, orgid uint64) error

func AddNewRotatedSegment

func AddNewRotatedSegment(segmeta structs.SegMeta)

func AddTimeSeriesEntryToInMemBuf

func AddTimeSeriesEntryToInMemBuf(rawJson []byte, signalType SIGNAL_TYPE, orgid uint64) error

func AgFnToIdx

func AgFnToIdx(fn utils.AggregateFunctions) int

func ApplySearchToDictArrayFilter

func ApplySearchToDictArrayFilter(col []byte, qValDte *DtypeEnclosure, rec []byte, fop FilterOperator, isRegexSearch bool,
	holderDte *DtypeEnclosure) (bool, error)

func ApplySearchToExpressionFilterSimpleCsg

func ApplySearchToExpressionFilterSimpleCsg(qValDte *DtypeEnclosure, fop FilterOperator,
	col []byte, isRegexSearch bool, holderDte *DtypeEnclosure) (bool, error)

func ApplySearchToMatchFilterRawCsg

func ApplySearchToMatchFilterRawCsg(match *MatchFilter, col []byte) (bool, error)

func BackFillPQSSegmetaEntry

func BackFillPQSSegmetaEntry(segsetkey string, newpqid string)

func CheckAndGetColsForUnrotatedSegKey

func CheckAndGetColsForUnrotatedSegKey(key string) (map[string]bool, bool)

Returns a copy of AllColumns seen for a given key from the unrotated segment infos If no key exists, returns an error

func DecodeDictionaryColumn

func DecodeDictionaryColumn(encodedBytes []byte) map[segutils.CValueDictEnclosure][]uint16

func DeleteOldKibanaDoc

func DeleteOldKibanaDoc(indexNameConverted string, idVal string)

func DeleteSegmentsForIndex

func DeleteSegmentsForIndex(segmetaFName, indexName string)

func DeleteVirtualTableSegStore

func DeleteVirtualTableSegStore(virtualTableName string)

func DoesSegKeyHavePqidResults

func DoesSegKeyHavePqidResults(segKey string, pqid string) bool

func EncodeBlocksum

func EncodeBlocksum(bmh *BlockMetadataHolder, bsum *BlockSummary,
	blockSummBuf []byte, blkNum uint16) (uint32, []byte, error)

func EncodeDictionaryColumn

func EncodeDictionaryColumn(columnValueMap map[segutils.CValueDictEnclosure][]uint16, colRis map[string]*RangeIndex, recNum uint16) ([]byte, uint32)

func EncodeRIBlock

func EncodeRIBlock(blockRangeIndex map[string]*Numbers, blkNum uint16) (uint32, []byte, error)

func FilterUnrotatedSegmentsInQuery

func FilterUnrotatedSegmentsInQuery(timeRange *dtu.TimeRange, indexNames []string, orgid uint64) (map[string]map[string]*dtu.TimeRange, uint64, uint64)

returns map[table]->map[segKey]->timeRange that pass index & time range check, total checked, total passed

func FlushWipBufferToFile

func FlushWipBufferToFile(sleepDuration *time.Duration)

func ForceRotateSegmentsForTest

func ForceRotateSegmentsForTest()

func ForcedFlushToSegfile

func ForcedFlushToSegfile()

This function is used when os.Interrupt is caught meta files need to be updated to not lose range/bloom/file path info on node failure

func GetAllPersistentQueryResults

func GetAllPersistentQueryResults(segKey string, pqid string) (*pqmr.SegmentPQMRResults, error)

func GetBlockSearchInfoForKey

func GetBlockSearchInfoForKey(key string) (map[uint16]*structs.BlockMetadataHolder, error)

returns a copy of the unrotated block search info. This is to prevent concurrent modification

func GetBlockSummaryForKey

func GetBlockSummaryForKey(key string) ([]*structs.BlockSummary, error)

returns the block summary for a segment key

func GetCvalFromRec

func GetCvalFromRec(rec []byte, qid uint64) (CValueEnclosure, uint16, error)
   Caller of this function can confidently cast the CValEncoslure.CVal to one of the foll types:
	 bool       (if CValEncoslure.Dtype = SS_DT_BOOL)
	 uint64     (if CValEncoslure.Dtype = SS_DT_UNSIGNED_NUM)
	 int64      (if CValEncoslure.Dtype = SS_DT_SIGNED_NUM)
	 float64    (if CValEncoslure.Dtype = SS_DT_FLOAT)
	 string     (if CValEncoslure.Dtype = SS_DT_STRING)
	 array      (if CValEncoslure.Dtype = SS_DT_ARRAY_DICT)

parameters:

rec: byte slice
qid

returns:

CValEncoslure: Cval encoding of this col entry
uint16: len of this entry inside that was inside the byte slice
error:

func GetFileNameForRotatedSegment

func GetFileNameForRotatedSegment(seg string) (string, error)

func GetInMemorySize

func GetInMemorySize() uint64

returns the total size used by AllSegStores

func GetLocalSegmetaFName

func GetLocalSegmetaFName() string

returns the current nodes segmeta

func GetNumOfSearchedRecordsUnRotated

func GetNumOfSearchedRecordsUnRotated(segKey string) uint64

func GetSearchInfoForPQSQuery

func GetSearchInfoForPQSQuery(key string, spqmr *pqmr.SegmentPQMRResults) (map[uint16]*structs.BlockMetadataHolder,
	[]*structs.BlockSummary, error)

returns block search info, block summaries, and any errors encountered block search info will be loaded for all possible columns

func GetSizeOfUnrotatedMetadata

func GetSizeOfUnrotatedMetadata() uint64

func GetTSRangeForMissingBlocks

func GetTSRangeForMissingBlocks(segKey string, tRange *dtu.TimeRange, spqmr *pqmr.SegmentPQMRResults) *dtu.TimeRange

returns the time range of the blocks in the segment that do not exist in spqmr if the timeRange is nil, no blocks were found in unrotated metadata that donot exist in spqmr

func GetUnrotatedMetadataInfo

func GetUnrotatedMetadataInfo() (uint64, uint64)

Returns number of loaded unrotated metadata, and total number of unrotated metadata

func GetUnrotatedVTableCounts

func GetUnrotatedVTableCounts(vtable string, orgid uint64) (uint64, int, uint64)

func GetVTableCounts

func GetVTableCounts(vtableName string, orgid uint64) (uint64, int, uint64)

func GetVTableCountsForAll

func GetVTableCountsForAll(orgid uint64) map[string]*structs.VtableCounts

func HostnameDir

func HostnameDir()

func InitKibanaInternalData

func InitKibanaInternalData()

func InitWriterNode

func InitWriterNode()

func IsRecentlyRotatedSegKey

func IsRecentlyRotatedSegKey(key string) bool

func IsSegKeyUnrotated

func IsSegKeyUnrotated(key string) bool

func PackDictEnc

func PackDictEnc(colWip *ColWip)

func ReadAllSegmetas

func ReadAllSegmetas() []*structs.SegMeta

returns all segmetas downloaded, including the current nodes segmeta and all global segmetas

func ReadLocalSegmeta

func ReadLocalSegmeta() []*structs.SegMeta

read only the current nodes segmeta

func ReadSegmeta

func ReadSegmeta(smFname string) ([]*structs.SegMeta, error)

func RebalanceUnrotatedMetadata

func RebalanceUnrotatedMetadata(totalAvailableSize uint64) uint64

Removed unrotated metadata from in memory based on the available size and return the new in memory size Currently, once we remove an entry we have no way of adding it back TODO: improve on re-loading of unrotated microindices

func RemoveSegments

func RemoveSegments(segmetaFName string, segmentsToDelete map[string]*structs.SegMeta)

func SetCardinalityLimit

func SetCardinalityLimit(val uint16)

func WriteMockBlockSummary

func WriteMockBlockSummary(file string, blockSums []*BlockSummary,
	allBmh map[uint16]*BlockMetadataHolder)

func WriteMockColSegFile

func WriteMockColSegFile(segkey string, numBlocks int, entryCount int) ([]map[string]*BloomIndex,
	[]*BlockSummary, []map[string]*RangeIndex, map[string]bool, map[uint16]*BlockMetadataHolder,
	map[string]*ColSizeInfo)

func WriteMockMetricsSegment

func WriteMockMetricsSegment(forceRotate bool, entryCount int) ([]*metrics.MetricsSegment, error)

func WriteMockTraceFile

func WriteMockTraceFile(segkey string, numBlocks int, entryCount int) ([]map[string]*BloomIndex,
	[]*BlockSummary, []map[string]*RangeIndex, map[string]bool, map[uint16]*BlockMetadataHolder)

func WriteMockTsRollup

func WriteMockTsRollup(segkey string) error

Types

type BloomIndex

type BloomIndex struct {
	Bf *bloom.BloomFilter

	HistoricalCount []uint32
	// contains filtered or unexported fields
}

type ColWip

type ColWip struct {
	// contains filtered or unexported fields
}

func InitColWip

func InitColWip(segKey string, colName string) *ColWip

func (*ColWip) GetBufAndIdx

func (cw *ColWip) GetBufAndIdx() ([]byte, uint32)

func (*ColWip) SetDeCount

func (cw *ColWip) SetDeCount(val uint16)

func (*ColWip) SetDeMap

func (cw *ColWip) SetDeMap(val map[string][]uint16)

func (*ColWip) WriteSingleString

func (cw *ColWip) WriteSingleString(value string)

type Node

type Node struct {
	// contains filtered or unexported fields
}

type PQTracker

type PQTracker struct {
	PQNodes map[string]*structs.SearchNode // maps pqid to search node
	// contains filtered or unexported fields
}

helper struct to keep track of persistent queries and columns that need to be searched

type RangeIndex

type RangeIndex struct {
	Ranges map[string]*structs.Numbers
}

type RolledRecs

type RolledRecs struct {
	MatchedRes *pqmr.PQMatchResults
	// contains filtered or unexported fields
}

type SegStore

type SegStore struct {

	// segment related data
	SegmentKey string

	VirtualTableName string
	RecordCount      int
	AllSeenColumns   map[string]bool

	BytesReceivedCount uint64
	OnDiskBytes        uint64 // running sum of cmi/csg/bsu file sizes

	AllSst map[string]*structs.SegStats // map[colName] => SegStats_of_each_column

	OrgId uint64
	// contains filtered or unexported fields
}

SegStore Individual stream buffer

func InitSegStore

func InitSegStore(
	segmentKey string,
	segbaseDir string,
	suffix uint64,
	virtualTableName string,
	skipDe bool,
	orgId uint64,
	usingSegTree bool,
	highTs uint64,
	lowTs uint64,
) *SegStore

func (*SegStore) AppendWipToSegfile

func (segstore *SegStore) AppendWipToSegfile(streamid string, forceRotate bool, isKibana bool, onTimeRotate bool) error

func (*SegStore) DestroyWipBlock

func (ss *SegStore) DestroyWipBlock()

func (*SegStore) EncodeColumns

func (ss *SegStore) EncodeColumns(rawData []byte, recordTime uint64, tsKey *string,
	signalType segutils.SIGNAL_TYPE) (uint32, bool, error)
   Each column stored in its own columnar file
   Each column file format:
	  [ValType-1 1B] [OptionalStringVal-Len-1 2B] [ActualValue-1]
	  [ValType-2 1B] [OptionalStringVal-Len-2 2B] [ActualValue-2]

   This function should not be called by itself, must be called via locks

   This function assumes that the record_json has been flattened

   foundColsInRecord is a map[string]bool of all columns in the WIPBlock. New columns will be added to this map
   The values of this map will be set to false before returning for subsequent calls. This lets us re-use the same map across WIPBlock

   returns :
	  1) Max index amongst the columns
	  3) bool if this record matched the column conditions in PQColTracker
	  3) error

func (*SegStore) FlushSegStats

func (ss *SegStore) FlushSegStats() error

Encoding Scheme for all columns single file

[Version 1B] [CnameLen 2B] [Cname xB] [ColSegEncodingLen 2B] [ColSegEncoding xB]....

func (*SegStore) WritePackedRecord

func (segstore *SegStore) WritePackedRecord(rawJson []byte, ts_millis uint64, signalType utils.SIGNAL_TYPE) error

type SegfileRotateInfo

type SegfileRotateInfo struct {
	FinalName   string
	TimeRotated uint64
}

type StarTree

type StarTree struct {
	Root *Node
}

type StarTreeBuilder

type StarTreeBuilder struct {
	// contains filtered or unexported fields
}

func (*StarTreeBuilder) Aggregate

func (stb *StarTreeBuilder) Aggregate(cur *Node) error

func (*StarTreeBuilder) ComputeStarTree

func (stb *StarTreeBuilder) ComputeStarTree(wip *WipBlock) error

ComputeStarTree

Current assumptions:

All groupBy columns that contain strings are dictionaryEncoded.
Any column with len(col.deMap) != 0 is assumed to be dictionary encoded
It is also assumed that no other values than the dic encoded strings appear in that column

When storing all other values, their raw byte values are converted to an unsigned integer,
and then converted to uint64 to have a consistent size

parameters:

wipBlock: segstore's wip block

returns:

StarTree: ptr to StarTree

func (*StarTreeBuilder) EncodeStarTree

func (stb *StarTreeBuilder) EncodeStarTree(segKey string) (uint32, error)
   *************** StarTree Encoding Format *****************************

   [FileType 1B] [LenMetaData 4B] [MetaData] [NodeDataDetails]

   [MetaData] :
	  [GroupbyKeys] [MeasureColNames] [DictEncCol-1] [DictEncCol-2] ...[DictEncCol-N]
		[GroupbyKeys] : [LenGrpKeys 2B] [GPK-1] [GPK-2]...
		   [GPK] : [StrLen 2B] [ActualStr xB]

	  [MeasureColNames] : [LenMeasureColNames 2B] [MeasureColName-1] [MeasureColNames-2] ...
		   [MeasureColNames-1] : [StrLen 2B] [McolName xB]

	  [DictEncCol-1] : [ColStrLen 2B] [ColName xB] [NumKeys 4B] [Enc-1] {Enc-2] ...
		   [Enc-1] : [EncStrLen 2B] [EncStr xB]

   [NodeDataDetails]: [NddLen 4B] [LevOffMeta xB] [LevelDetails-1 xB] [LevelDetails-2 xB].... in BFS
	  [LevOffMetas] : [levOff-0 8B] [levSize-0 4B] [levOff-1 8B] [levSize-1 4B] ....
	  [LevelDetails-1] : [LevelNum 2B] [numNodesAtLevel 4B] [NodeAgInfo...]
		  [NodeAgInfo-1] : [nodeKey 4B] [parentKeys xB] [aggValue-1] [aggValue-2] ...
			[parentKeys] : [parKey-0 4B] [parKey-1 4B].... // numOfParents depends on level
			[aggValue]: [dType 1B] [val 8B]

func (*StarTreeBuilder) GetNodeCount

func (stb *StarTreeBuilder) GetNodeCount() int

func (*StarTreeBuilder) ResetSegTree

func (stb *StarTreeBuilder) ResetSegTree(block *WipBlock, groupByKeys []string, mColNames []string)

ResetSegTree

Current assumptions:

All groupBy columns that contain strings are dictionaryEncoded.
Any column with len(col.deMap) != 0 is assumed to be dictionary encoded
It is also assumed that no other values than the dic encoded strings appear in that column

When storing all other values, their raw byte values are converted to an unsigned integer,
and then converted to uint64 to have a consistent size

parameters:

wipBlock: segstore's wip block
groupByKeys: groupBy column Names
mColNames: colnames of measure columns

returns:

type UnrotatedSegmentInfo

type UnrotatedSegmentInfo struct {
	TableName string

	RecordCount int
	// contains filtered or unexported fields
}

func (*UnrotatedSegmentInfo) DoCMICheckForUnrotated

func (usi *UnrotatedSegmentInfo) DoCMICheckForUnrotated(currQuery *structs.SearchQuery, tRange *dtu.TimeRange,
	blkTracker *structs.BlockTracker, bloomWords map[string]bool, bloomOp segutils.LogicalOperator, rangeFilter map[string]string,
	rangeOp segutils.FilterOperator, isRange bool, wildcardValue bool,
	qid uint64) (map[uint16]map[string]bool, uint64, uint64, error)

does CMI check on unrotated segment info for inputted request. Assumes UnrotatedInfoLock has been acquired returns the final blocks to search, total unrotated blocks, num filtered blocks, and errors if any

func (*UnrotatedSegmentInfo) GetTimeRange

func (usi *UnrotatedSegmentInfo) GetTimeRange() *dtu.TimeRange

func (*UnrotatedSegmentInfo) GetUnrotatedBlockInfoForQuery

func (usi *UnrotatedSegmentInfo) GetUnrotatedBlockInfoForQuery() ([]*structs.BlockSummary, map[uint16]*structs.BlockMetadataHolder, map[string]bool)

For a unrotated segment info, return []blockSummaries, map[uint16]*structs.BlockMetadataHolder, and map[string]bool (all columns) This information will be used for unrotated queries. This will return copies of in memory metadata to avoid race conditions

A copy needs to be returned here as usi.BlockSummaries and usi.BlockInfo may have concurrent writes

This assumes the caller has already acquired the lock on UnrotatedInfoLock

type WipBlock

type WipBlock struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL