phlaredb

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: AGPL-3.0 Imports: 59 Imported by: 0

Documentation

Overview

nolint unused

nolint unused

nolint unused

nolint unused

nolint unused

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InRange added in v0.6.1

func InRange(q Querier, start, end model.Time) bool

func MergeProfilesLabels added in v0.6.1

func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error

func MergeProfilesPprof added in v0.6.1

func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error

func MergeProfilesStacktraces added in v0.6.1

func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error

func NewSingleBlockQuerierFromMeta added in v0.6.1

func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier

func PostingsForMatchers

func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*labels.Matcher) (index.Postings, error)

PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.

func SelectMatchingProfiles added in v0.6.1

func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfilesRequest, queriers Queriers) ([]iter.Iterator[Profile], error)

SelectMatchingProfiles returns a list iterator of profiles matching the given request.

func SplitFiltersAndMatchers

func SplitFiltersAndMatchers(allMatchers []*labels.Matcher) (filters, matchers []*labels.Matcher)

SplitFiltersAndMatchers splits empty matchers off, which are treated as filters, see #220

Types

type BidiServerMerge

type BidiServerMerge[Res any, Req any] interface {
	Send(Res) error
	Receive() (Req, error)
}

type BlockGetter added in v0.6.1

type BlockGetter func(ctx context.Context, start, end model.Time) (Queriers, error)

type BlockInfo

type BlockInfo struct {
	ID          ulid.ULID
	MinTime     model.Time
	MaxTime     model.Time
	Profiles    TableInfo
	Stacktraces TableInfo
	Locations   TableInfo
	Functions   TableInfo
	Mappings    TableInfo
	Strings     TableInfo
	Series      uint64
}

type BlockProfile

type BlockProfile struct {
	RowNum int64
	// contains filtered or unexported fields
}

func (BlockProfile) Fingerprint

func (p BlockProfile) Fingerprint() model.Fingerprint

func (BlockProfile) Labels

func (p BlockProfile) Labels() phlaremodel.Labels

func (BlockProfile) RowNumber

func (p BlockProfile) RowNumber() int64

func (BlockProfile) Timestamp

func (p BlockProfile) Timestamp() model.Time

type BlockQuerier

type BlockQuerier struct {
	// contains filtered or unexported fields
}

func NewBlockQuerier

func NewBlockQuerier(phlarectx context.Context, bucketReader phlareobj.Bucket) *BlockQuerier

func (*BlockQuerier) AddBlockQuerierByMeta added in v0.6.0

func (b *BlockQuerier) AddBlockQuerierByMeta(m *block.Meta)

func (*BlockQuerier) BlockInfo

func (b *BlockQuerier) BlockInfo() []BlockInfo

func (*BlockQuerier) BlockMetas

func (b *BlockQuerier) BlockMetas(ctx context.Context) (metas []*block.Meta, _ error)

func (*BlockQuerier) Close

func (b *BlockQuerier) Close() error

func (*BlockQuerier) Queriers added in v0.3.0

func (b *BlockQuerier) Queriers() Queriers

func (*BlockQuerier) Sync

func (b *BlockQuerier) Sync(ctx context.Context) error

Sync gradually scans the available blocks. If there are any changes to the last run it will Open/Close new/no longer existing ones.

type Config

type Config struct {
	DataPath string `yaml:"data_path,omitempty"`
	// Blocks are generally cut once they reach 1000M of memory size, this will setup an upper limit to the duration of data that a block has that is cut by the ingester.
	MaxBlockDuration time.Duration `yaml:"max_block_duration,omitempty"`

	// TODO: docs
	RowGroupTargetSize uint64 `yaml:"row_group_target_size"`

	Parquet *ParquetConfig `yaml:"-"` // Those configs should not be exposed to the user, rather they should be determined by phlare itself. Currently, they are solely used for test cases.
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)
type Head struct {
	// contains filtered or unexported fields
}

func NewHead

func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Head, error)

func (*Head) Bounds added in v0.6.1

func (h *Head) Bounds() (mint, maxt model.Time)

func (*Head) Close

func (h *Head) Close() error

Closes closes the head

func (*Head) Flush

func (h *Head) Flush(ctx context.Context) error

Flush closes the head and writes data to disk. No ingestion requests should be made concurrently with the call, or after it returns. The call is thread-safe for reads.

func (*Head) Ingest

func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, externalLabels ...*typesv1.LabelPair) error

func (*Head) LabelNames

func (h *Head) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)

LabelNames returns the possible label values for a given label name.

func (*Head) LabelValues

func (h *Head) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)

LabelValues returns the possible label values for a given label name.

func (*Head) MemorySize added in v0.3.0

func (h *Head) MemorySize() uint64

func (*Head) Move added in v0.6.0

func (h *Head) Move() error

Move moves the head directory to local blocks. The call is not thread-safe: no concurrent reads and writes are allowed.

After the call, head in-memory representation is not valid and should not be accessed for querying.

func (*Head) ProfileTypes

func (h *Head) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)

ProfileTypes returns the possible profile types.

func (*Head) Queriers added in v0.3.0

func (h *Head) Queriers() Queriers

Returns underlying queries, the queriers should be roughly ordered in TS increasing order

func (*Head) Series

func (h *Head) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error)

func (*Head) Size

func (h *Head) Size() uint64

func (*Head) Sort

func (h *Head) Sort(in []Profile) []Profile

type Helper

type Helper[M Models, K comparable] interface {
	// contains filtered or unexported methods
}

type IndexReader

type IndexReader interface {
	// Bounds returns the earliest and latest samples in the index
	Bounds() (int64, int64)

	Checksum() uint32

	// Symbols return an iterator over sorted string symbols that may occur in
	// series' labels and indices. It is not safe to use the returned strings
	// beyond the lifetime of the index reader.
	Symbols() index.StringIter

	// SortedLabelValues returns sorted possible label values.
	SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// LabelValues returns possible label values which may not be sorted.
	LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// Postings returns the postings list iterator for the label pairs.
	// The Postings here contain the offsets to the series inside the index.
	// Found IDs are not strictly required to point to a valid Series, e.g.
	// during background garbage collections. Input values must be sorted.
	Postings(name string, shard *index.ShardAnnotation, values ...string) (index.Postings, error)

	// Series populates the given labels and chunk metas for the series identified
	// by the reference.
	// Returns storage.ErrNotFound if the ref does not resolve to a known series.
	Series(ref storage.SeriesRef, lset *phlaremodel.Labels, chks *[]index.ChunkMeta) (uint64, error)

	// LabelNames returns all the unique label names present in the index in sorted order.
	LabelNames(matchers ...*labels.Matcher) ([]string, error)

	// LabelValueFor returns label value for the given label name in the series referred to by ID.
	// If the series couldn't be found or the series doesn't have the requested label a
	// storage.ErrNotFound is returned as error.
	LabelValueFor(id storage.SeriesRef, label string) (string, error)

	// LabelNamesFor returns all the label names for the series referred to by IDs.
	// The names returned are sorted.
	LabelNamesFor(ids ...storage.SeriesRef) ([]string, error)

	// Close releases the underlying resources of the reader.
	Close() error
}

IndexReader provides reading access of serialized index data.

type ParquetConfig

type ParquetConfig struct {
	MaxBufferRowCount int
	MaxRowGroupBytes  uint64 // This is the maximum row group size in bytes that the raw data uses in memory.
	MaxBlockBytes     uint64 // This is the size of all parquet tables in memory after which a new block is cut
}

type PhlareDB

type PhlareDB struct {
	services.Service
	// contains filtered or unexported fields
}

func New

func New(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*PhlareDB, error)

func (*PhlareDB) BlockMetas

func (f *PhlareDB) BlockMetas(ctx context.Context) ([]*block.Meta, error)

func (*PhlareDB) Close

func (f *PhlareDB) Close() error

func (*PhlareDB) Evict added in v0.6.0

func (f *PhlareDB) Evict(blockID ulid.ULID, fn func() error) (bool, error)

Evict removes the given local block from the PhlareDB. Note that the block files are not deleted from the disk. No evictions should be done after and during the Close call.

func (*PhlareDB) Flush

func (f *PhlareDB) Flush(ctx context.Context) (err error)

func (*PhlareDB) Head

func (f *PhlareDB) Head() *Head

func (*PhlareDB) Ingest added in v0.6.0

func (f *PhlareDB) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, externalLabels ...*typesv1.LabelPair) error

func (*PhlareDB) LabelNames added in v0.6.0

func (f *PhlareDB) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)

LabelNames returns the possible label names.

func (*PhlareDB) LabelValues added in v0.6.0

func (f *PhlareDB) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)

LabelValues returns the possible label values for a given label name.

func (*PhlareDB) LocalDataPath

func (f *PhlareDB) LocalDataPath() string

func (*PhlareDB) MergeProfilesLabels

func (f *PhlareDB) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error

func (*PhlareDB) MergeProfilesPprof added in v0.1.2

func (f *PhlareDB) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error

func (*PhlareDB) MergeProfilesStacktraces

func (f *PhlareDB) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error

func (*PhlareDB) ProfileTypes added in v0.6.0

func (f *PhlareDB) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)

ProfileTypes returns the possible profile types.

func (*PhlareDB) Series added in v0.6.0

func (f *PhlareDB) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error)

Series returns labels series for the given set of matchers.

type Profile

type Profile interface {
	Timestamp() model.Time
	Fingerprint() model.Fingerprint
	Labels() phlaremodel.Labels
}

type ProfileSelectorIterator

type ProfileSelectorIterator struct {
	// contains filtered or unexported fields
}

func NewProfileSelectorIterator

func NewProfileSelectorIterator() *ProfileSelectorIterator

func (*ProfileSelectorIterator) At

func (*ProfileSelectorIterator) Close

func (it *ProfileSelectorIterator) Close() error

func (*ProfileSelectorIterator) Err

func (it *ProfileSelectorIterator) Err() error

func (*ProfileSelectorIterator) Next

func (it *ProfileSelectorIterator) Next() bool

func (*ProfileSelectorIterator) Push

func (it *ProfileSelectorIterator) Push(batch []Profile)

type ProfileWithIndex added in v0.6.1

type ProfileWithIndex struct {
	Profile
	Index int
}

type ProfileWithLabels

type ProfileWithLabels struct {
	*schemav1.Profile
	// contains filtered or unexported fields
}

func (ProfileWithLabels) Fingerprint

func (p ProfileWithLabels) Fingerprint() model.Fingerprint

func (ProfileWithLabels) Labels

func (ProfileWithLabels) Samples added in v0.3.0

func (p ProfileWithLabels) Samples() []*schemav1.Sample

func (ProfileWithLabels) Timestamp

func (p ProfileWithLabels) Timestamp() model.Time

type Querier

type Querier interface {
	Bounds() (model.Time, model.Time)
	SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error)
	MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error)
	MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error)
	MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error)
	Open(ctx context.Context) error
	// Sorts profiles for retrieval.
	Sort([]Profile) []Profile
}

type Queriers

type Queriers []Querier

func (Queriers) ForTimeRange added in v0.3.0

func (queriers Queriers) ForTimeRange(_ context.Context, start, end model.Time) (Queriers, error)

func (Queriers) Open added in v0.6.1

func (queriers Queriers) Open(ctx context.Context) error

func (Queriers) SelectMatchingProfiles added in v0.3.0

func (queriers Queriers) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error)

type ResultWithRowNum

type ResultWithRowNum[M any] struct {
	Result M
	RowNum int64
}

type SeriesIterator

type SeriesIterator struct {
	iter.Iterator[*schemav1.Profile]
	// contains filtered or unexported fields
}

func NewSeriesIterator

func NewSeriesIterator(labels phlaremodel.Labels, fingerprint model.Fingerprint, it iter.Iterator[*schemav1.Profile]) *SeriesIterator

func (*SeriesIterator) At

func (it *SeriesIterator) At() Profile

func (*SeriesIterator) Next

func (it *SeriesIterator) Next() bool

type Source added in v0.3.0

type Source interface {
	Schema() *parquet.Schema
	RowGroups() []parquet.RowGroup
}

type Table

type Table interface {
	Name() string
	Size() uint64       // Size estimates the uncompressed byte size of the table in memory and on disk.
	MemorySize() uint64 // MemorySize estimates the uncompressed byte size of the table in memory.
	Init(path string, cfg *ParquetConfig, metrics *headMetrics) error
	Flush(context.Context) (numRows uint64, numRowGroups uint64, err error)
	Close() error
}

type TableInfo

type TableInfo struct {
	Rows      uint64
	RowGroups uint64
	Bytes     uint64
}

type TenantLimiter added in v0.5.0

type TenantLimiter interface {
	AllowProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error
	Stop()
}

Directories

Path Synopsis
schemas
v1
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL