phlaredb

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2024 License: AGPL-3.0 Imports: 66 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMinFreeDisk                        = 10
	DefaultMinDiskAvailablePercentage         = 0.05
	DefaultRetentionPolicyEnforcementInterval = 5 * time.Minute
	DefaultRetentionExpiry                    = 4 * time.Hour // Same as default `querier.query_store_after`.
)
View Source
const (
	PathLocal = "local"
)
View Source
const StaleGracePeriod = 5 * time.Minute

Variables

View Source
var SplitByFingerprint = func(r profileRow, shardsCount uint64) uint64 {
	return uint64(r.fp) % shardsCount
}
View Source
var SplitByStacktracePartition = func(r profileRow, shardsCount uint64) uint64 {
	return r.row.StacktracePartitionID() % shardsCount
}

Functions

func Compact

func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Meta, err error)

func CompactWithSplitting added in v1.1.0

func CompactWithSplitting(ctx context.Context, opts CompactWithSplittingOpts) (
	[]block.Meta, error,
)

func ContextWithBlockMetrics added in v1.3.0

func ContextWithBlockMetrics(ctx context.Context, m *BlocksMetrics) context.Context

func HintsToBlockSkipper added in v1.2.0

func HintsToBlockSkipper(hints *ingestv1.Hints) func(ulid string) bool

func InRange

func InRange(q TimeBounded, start, end model.Time) bool

func LabelNames added in v1.2.0

func LabelValues added in v1.2.0

func NewSingleBlockQuerierFromMeta

func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier

func PostingsForMatchers

func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*labels.Matcher) (index.Postings, error)

PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.

func SelectMatchingProfiles

func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfilesRequest, queriers Queriers) ([]iter.Iterator[Profile], error)

SelectMatchingProfiles returns a list iterator of profiles matching the given request.

func Series added in v1.1.0

func Series(ctx context.Context, req *ingestv1.SeriesRequest, blockGetter BlockGetter) (*ingestv1.SeriesResponse, error)

func SplitFiltersAndMatchers

func SplitFiltersAndMatchers(allMatchers []*labels.Matcher) (filters, matchers []*labels.Matcher)

SplitFiltersAndMatchers splits empty matchers off, which are treated as filters, see #220

func ValidateLocalBlock added in v1.1.0

func ValidateLocalBlock(ctx context.Context, dir string) error

ValidateLocalBlock validates the block in the given directory is readable.

Types

type BidiServerMerge

type BidiServerMerge[Res any, Req any] interface {
	Send(Res) error
	Receive() (Req, error)
}

type BlockGetter

type BlockGetter func(ctx context.Context, start, end model.Time, hints *ingestv1.Hints) (Queriers, error)

type BlockInfo

type BlockInfo struct {
	ID          ulid.ULID
	MinTime     model.Time
	MaxTime     model.Time
	Profiles    TableInfo
	Stacktraces TableInfo
	Locations   TableInfo
	Functions   TableInfo
	Mappings    TableInfo
	Strings     TableInfo
	Series      uint64
}

type BlockProfile

type BlockProfile struct {
	// contains filtered or unexported fields
}

func (BlockProfile) Fingerprint

func (p BlockProfile) Fingerprint() model.Fingerprint

func (BlockProfile) Labels

func (p BlockProfile) Labels() phlaremodel.Labels

func (BlockProfile) RowNumber

func (p BlockProfile) RowNumber() int64

func (BlockProfile) StacktracePartition

func (p BlockProfile) StacktracePartition() uint64

func (BlockProfile) Timestamp

func (p BlockProfile) Timestamp() model.Time

type BlockQuerier

type BlockQuerier struct {
	// contains filtered or unexported fields
}

func NewBlockQuerier

func NewBlockQuerier(phlarectx context.Context, bucketReader phlareobj.Bucket) *BlockQuerier

func (*BlockQuerier) AddBlockQuerierByMeta

func (b *BlockQuerier) AddBlockQuerierByMeta(m *block.Meta)

func (*BlockQuerier) BlockInfo

func (b *BlockQuerier) BlockInfo() []BlockInfo

func (*BlockQuerier) BlockMetas

func (b *BlockQuerier) BlockMetas(ctx context.Context) (metas []*block.Meta, _ error)

func (*BlockQuerier) Close

func (b *BlockQuerier) Close() error

func (*BlockQuerier) Queriers

func (b *BlockQuerier) Queriers() Queriers

func (*BlockQuerier) Sync

func (b *BlockQuerier) Sync(ctx context.Context) error

Sync gradually scans the available blocks. If there are any changes to the last run it will Open/Close new/no longer existing ones.

type BlockReader

type BlockReader interface {
	Open(context.Context) error
	Meta() block.Meta
	Profiles() ProfileReader
	Index() IndexReader
	Symbols() symdb.SymbolsReader
	Close() error
}

type BlocksMetrics added in v1.3.0

type BlocksMetrics struct {
	// contains filtered or unexported fields
}

func NewBlocksMetrics added in v1.3.0

func NewBlocksMetrics(reg prometheus.Registerer) *BlocksMetrics

func (*BlocksMetrics) Unregister added in v1.3.0

func (m *BlocksMetrics) Unregister()

type CompactWithSplittingOpts added in v1.3.0

type CompactWithSplittingOpts struct {
	Src                []BlockReader
	Dst                string
	SplitCount         uint64
	StageSize          uint64
	SplitBy            SplitByFunc
	DownsamplerEnabled bool
	Logger             log.Logger
}

type Config

type Config struct {
	DataPath string `yaml:"data_path,omitempty"`
	// Blocks are generally cut once they reach 1000M of memory size, this will setup an upper limit to the duration of data that a block has that is cut by the ingester.
	MaxBlockDuration time.Duration `yaml:"max_block_duration,omitempty"`

	// TODO: docs
	RowGroupTargetSize uint64 `yaml:"row_group_target_size"`

	Parquet *ParquetConfig `yaml:"-"` // Those configs should not be exposed to the user, rather they should be determined by pyroscope itself. Currently, they are solely used for test cases.

	MinFreeDisk                uint64        `yaml:"min_free_disk_gb"`
	MinDiskAvailablePercentage float64       `yaml:"min_disk_available_percentage"`
	EnforcementInterval        time.Duration `yaml:"enforcement_interval"`
	DisableEnforcement         bool          `yaml:"disable_enforcement"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)
type Head struct {
	// contains filtered or unexported fields
}

func NewHead

func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Head, error)

func (*Head) BlockID added in v1.2.0

func (h *Head) BlockID() string

func (*Head) Bounds

func (h *Head) Bounds() (mint, maxt model.Time)

func (*Head) Flush

func (h *Head) Flush(ctx context.Context) error

Flush closes the head and writes data to disk. No ingestion requests should be made concurrently with the call, or after it returns. The call is thread-safe for reads.

func (*Head) Ingest

func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, externalLabels ...*typesv1.LabelPair) error

func (*Head) LabelNames

LabelNames returns the possible label values for a given label name.

func (*Head) LabelValues

LabelValues returns the possible label values for a given label name.

func (*Head) MemorySize

func (h *Head) MemorySize() uint64

func (*Head) Move

func (h *Head) Move() error

Move moves the head directory to local blocks. The call is not thread-safe: no concurrent reads and writes are allowed.

After the call, head in-memory representation is not valid and should not be accessed for querying.

func (*Head) ProfileTypes

ProfileTypes returns the possible profile types.

func (*Head) Queriers

func (h *Head) Queriers() Queriers

Returns underlying queries, the queriers should be roughly ordered in TS increasing order

func (*Head) Size

func (h *Head) Size() uint64

func (*Head) Sort

func (h *Head) Sort(in []Profile) []Profile

type IndexReader

type IndexReader interface {
	// Bounds returns the earliest and latest samples in the index
	Bounds() (int64, int64)

	Checksum() uint32

	// Symbols return an iterator over sorted string symbols that may occur in
	// series' labels and indices. It is not safe to use the returned strings
	// beyond the lifetime of the index reader.
	Symbols() index.StringIter

	// SortedLabelValues returns sorted possible label values.
	SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// LabelValues returns possible label values which may not be sorted.
	LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// Postings returns the postings list iterator for the label pairs.
	// The Postings here contain the offsets to the series inside the index.
	// Found IDs are not strictly required to point to a valid Series, e.g.
	// during background garbage collections. Input values must be sorted.
	Postings(name string, shard *index.ShardAnnotation, values ...string) (index.Postings, error)

	// Series populates the given labels and chunk metas for the series identified
	// by the reference.
	// Returns storage.ErrNotFound if the ref does not resolve to a known series.
	Series(ref storage.SeriesRef, lset *phlaremodel.Labels, chks *[]index.ChunkMeta) (uint64, error)

	// LabelNames returns all the unique label names present in the index in sorted order.
	LabelNames(matchers ...*labels.Matcher) ([]string, error)

	// LabelValueFor returns label value for the given label name in the series referred to by ID.
	// If the series couldn't be found or the series doesn't have the requested label a
	// storage.ErrNotFound is returned as error.
	LabelValueFor(id storage.SeriesRef, label string) (string, error)

	// LabelNamesFor returns all the label names for the series referred to by IDs.
	// The names returned are sorted.
	LabelNamesFor(ids ...storage.SeriesRef) ([]string, error)

	// Close releases the underlying resources of the reader.
	Close() error
}

IndexReader provides reading access of serialized index data.

type ParquetConfig

type ParquetConfig struct {
	MaxBufferRowCount int
	MaxRowGroupBytes  uint64 // This is the maximum row group size in bytes that the raw data uses in memory.
	MaxBlockBytes     uint64 // This is the size of all parquet tables in memory after which a new block is cut
}

type PhlareDB

type PhlareDB struct {
	services.Service
	// contains filtered or unexported fields
}

func New

func New(phlarectx context.Context, cfg Config, limiter TenantLimiter, fs phlareobj.Bucket) (*PhlareDB, error)

func (*PhlareDB) BlockMetadata added in v1.2.0

func (*PhlareDB) BlockMetas

func (f *PhlareDB) BlockMetas(ctx context.Context) ([]*block.Meta, error)

func (*PhlareDB) Close

func (f *PhlareDB) Close() error

func (*PhlareDB) Evict

func (f *PhlareDB) Evict(blockID ulid.ULID, fn func() error) (bool, error)

Evict removes the given local block from the PhlareDB. Note that the block files are not deleted from the disk. No evictions should be done after and during the Close call.

func (*PhlareDB) Flush

func (f *PhlareDB) Flush(ctx context.Context, force bool, reason string) (err error)

Flush start flushing heads to disk. When force is true, all heads are flushed. When force is false, only stale heads are flushed. see Head.isStale for the definition of stale.

func (*PhlareDB) Ingest

func (f *PhlareDB) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, externalLabels ...*typesv1.LabelPair) (err error)

func (*PhlareDB) LabelNames

LabelNames returns the possible label names.

func (*PhlareDB) LabelValues

LabelValues returns the possible label values for a given label name.

func (*PhlareDB) LocalDataPath

func (f *PhlareDB) LocalDataPath() string

func (*PhlareDB) MergeSpanProfile added in v1.2.0

func (*PhlareDB) ProfileTypes

ProfileTypes returns the possible profile types.

func (*PhlareDB) Series

Series returns labels series for the given set of matchers.

type Profile

type Profile interface {
	RowNumber() int64
	StacktracePartition() uint64
	Timestamp() model.Time
	Fingerprint() model.Fingerprint
	Labels() phlaremodel.Labels
}

type ProfileReader added in v1.2.1

type ProfileReader interface {
	io.ReaderAt
	Schema() *parquet.Schema
	Root() *parquet.Column
	RowGroups() []parquet.RowGroup
}

type ProfileSelectorIterator

type ProfileSelectorIterator struct {
	// contains filtered or unexported fields
}

func NewProfileSelectorIterator

func NewProfileSelectorIterator() *ProfileSelectorIterator

func (*ProfileSelectorIterator) At

func (*ProfileSelectorIterator) Close

func (it *ProfileSelectorIterator) Close() error

func (*ProfileSelectorIterator) Err

func (it *ProfileSelectorIterator) Err() error

func (*ProfileSelectorIterator) Next

func (it *ProfileSelectorIterator) Next() bool

func (*ProfileSelectorIterator) Push

func (it *ProfileSelectorIterator) Push(batch []Profile)

type ProfileWithIndex

type ProfileWithIndex struct {
	Profile
	Index int
}

type ProfileWithLabels

type ProfileWithLabels struct {
	// contains filtered or unexported fields
}

func (ProfileWithLabels) Fingerprint

func (p ProfileWithLabels) Fingerprint() model.Fingerprint

func (ProfileWithLabels) Labels

func (ProfileWithLabels) RowNumber added in v1.4.0

func (p ProfileWithLabels) RowNumber() int64

func (ProfileWithLabels) Samples

func (p ProfileWithLabels) Samples() schemav1.Samples

func (ProfileWithLabels) StacktracePartition

func (p ProfileWithLabels) StacktracePartition() uint64

func (ProfileWithLabels) Timestamp

func (p ProfileWithLabels) Timestamp() model.Time

func (ProfileWithLabels) Total

func (p ProfileWithLabels) Total() int64

type Querier

type Querier interface {
	// BlockID returns the block ID of the querier, when it is representing a single block.
	BlockID() string
	Bounds() (model.Time, model.Time)
	Open(ctx context.Context) error
	Sort([]Profile) []Profile

	MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*phlaremodel.Tree, error)
	MergeBySpans(ctx context.Context, rows iter.Iterator[Profile], spans phlaremodel.SpanSelector) (*phlaremodel.Tree, error)
	MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], s *typesv1.StackTraceSelector, by ...string) ([]*typesv1.Series, error)
	MergePprof(ctx context.Context, rows iter.Iterator[Profile], maxNodes int64, s *typesv1.StackTraceSelector) (*profilev1.Profile, error)
	Series(ctx context.Context, params *ingestv1.SeriesRequest) ([]*typesv1.Labels, error)

	SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error)
	SelectMergeByStacktraces(ctx context.Context, params *ingestv1.SelectProfilesRequest) (*phlaremodel.Tree, error)
	SelectMergeByLabels(ctx context.Context, params *ingestv1.SelectProfilesRequest, s *typesv1.StackTraceSelector, by ...string) ([]*typesv1.Series, error)
	SelectMergeBySpans(ctx context.Context, params *ingestv1.SelectSpanProfileRequest) (*phlaremodel.Tree, error)
	SelectMergePprof(ctx context.Context, params *ingestv1.SelectProfilesRequest, maxNodes int64, s *typesv1.StackTraceSelector) (*profilev1.Profile, error)

	ProfileTypes(context.Context, *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)
	LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)
	LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)
}

type Queriers

type Queriers []Querier

func (Queriers) LabelNames added in v1.2.0

func (Queriers) LabelValues added in v1.2.0

func (Queriers) MergeProfilesLabels added in v1.2.0

func (Queriers) MergeProfilesPprof added in v1.2.0

func (Queriers) MergeProfilesStacktraces added in v1.2.0

func (Queriers) MergeSpanProfile added in v1.2.0

func (Queriers) Open

func (queriers Queriers) Open(ctx context.Context) error

func (Queriers) ProfileTypes added in v1.2.0

func (Queriers) SelectMatchingProfiles

func (queriers Queriers) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error)

func (Queriers) Series added in v1.2.0

type ResultWithRowNum

type ResultWithRowNum[M any] struct {
	Result M
	RowNum int64
}

type RowsIterator added in v1.2.1

type RowsIterator[T any] struct {
	// contains filtered or unexported fields
}

RowsIterator is an iterator over rows of a parquet table. It is a wrapper over query.Iterator to transform its results into a desired type.

func (*RowsIterator[T]) At added in v1.2.1

func (it *RowsIterator[T]) At() T

func (*RowsIterator[T]) Close added in v1.2.1

func (it *RowsIterator[T]) Close() error

func (*RowsIterator[T]) Err added in v1.2.1

func (it *RowsIterator[T]) Err() error

func (*RowsIterator[T]) Next added in v1.2.1

func (it *RowsIterator[T]) Next() bool

type SeriesIterator

type SeriesIterator struct {
	iter.Iterator[*schemav1.InMemoryProfile]
	// contains filtered or unexported fields
}

func NewSeriesIterator

func NewSeriesIterator(labels phlaremodel.Labels, fingerprint model.Fingerprint, it iter.Iterator[*schemav1.InMemoryProfile]) *SeriesIterator

func (*SeriesIterator) At

func (it *SeriesIterator) At() Profile

func (*SeriesIterator) Next

func (it *SeriesIterator) Next() bool

type Source

type Source interface {
	Schema() *parquet.Schema
	RowGroups() []parquet.RowGroup
}

type SplitByFunc added in v1.2.0

type SplitByFunc func(r profileRow, shardsCount uint64) uint64

type SymbolsRewriter added in v1.3.0

type SymbolsRewriter interface {
	ReWriteRow(profile profileRow) error
	Close() (uint64, error)
}

type SymbolsRewriterFn added in v1.3.0

type SymbolsRewriterFn func(blockPath string) SymbolsRewriter

type Table

type Table interface {
	Name() string
	Size() uint64       // Size estimates the uncompressed byte size of the table in memory and on disk.
	MemorySize() uint64 // MemorySize estimates the uncompressed byte size of the table in memory.
	Init(path string, cfg *ParquetConfig, metrics *headMetrics) error
	Flush(context.Context) (numRows uint64, numRowGroups uint64, err error)
	Close() error
}

type TableInfo

type TableInfo struct {
	Rows      uint64
	RowGroups uint64
	Bytes     uint64
}

type TenantLimiter

type TenantLimiter interface {
	AllowProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error
	Stop()
}

type TimeBounded added in v1.2.0

type TimeBounded interface {
	Bounds() (model.Time, model.Time)
}

Directories

Path Synopsis
schemas
v1
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL