storegateway

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: AGPL-3.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetShuffleShardingSubring

func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits storegateway.ShardingLimits) ring.ReadRing

GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.

Types

type Block

type Block struct {
	BlockCloser
	// contains filtered or unexported fields
}

type BlockCloser

type BlockCloser interface {
	phlaredb.Querier
	Close() error
}

type BlockMetaFilter

type BlockMetaFilter interface {
	Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced tsdb_block.GaugeVec) error
}

func NewShardingMetadataFilterAdapter

func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) BlockMetaFilter

type BucketStore

type BucketStore struct {
	// contains filtered or unexported fields
}

func NewBucketStore

func NewBucketStore(bucket phlareobj.Bucket, tenantID string, syncDir string, filters []BlockMetaFilter, logger log.Logger, Metrics *Metrics) (*BucketStore, error)

func (*BucketStore) InitialSync

func (b *BucketStore) InitialSync(ctx context.Context) error

func (*BucketStore) MergeProfilesLabels

func (store *BucketStore) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error

func (*BucketStore) MergeProfilesPprof

func (store *BucketStore) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error

func (*BucketStore) MergeProfilesStacktraces

func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error

func (*BucketStore) RemoveBlocksAndClose

func (s *BucketStore) RemoveBlocksAndClose() error

RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.

func (*BucketStore) Stats

func (*BucketStore) SyncBlocks

func (s *BucketStore) SyncBlocks(ctx context.Context) error

type BucketStoreConfig

type BucketStoreConfig struct {
	SyncDir               string        `yaml:"sync_dir"`
	SyncInterval          time.Duration `yaml:"sync_interval" category:"advanced"`
	TenantSyncConcurrency int           `yaml:"tenant_sync_concurrency" category:"advanced"`
	IgnoreBlocksWithin    time.Duration `yaml:"ignore_blocks_within" category:"advanced"`
}

func (*BucketStoreConfig) RegisterFlags

func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the BucketStore flags

func (*BucketStoreConfig) Validate

func (cfg *BucketStoreConfig) Validate(logger log.Logger) error

Validate the config.

type BucketStores

type BucketStores struct {
	// contains filtered or unexported fields
}

func NewBucketStores

func NewBucketStores(cfg BucketStoreConfig, shardingStrategy ShardingStrategy, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)

func (*BucketStores) InitialSync

func (bs *BucketStores) InitialSync(ctx context.Context) error

func (*BucketStores) SyncBlocks

func (bs *BucketStores) SyncBlocks(ctx context.Context) error

SyncBlocks synchronizes the stores state with the Bucket store for every user.

type Config

type Config struct {
	storegateway.Config `yaml:",inline"`
	BucketStoreConfig   BucketStoreConfig `yaml:"bucket_store,omitempty"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the Config flags.

func (*Config) Validate

func (c *Config) Validate(limits validation.Limits) error

type Limits

type Limits interface {
	storegateway.ShardingLimits
}

type Metrics

type Metrics struct {
	Synced *prometheus.GaugeVec
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(reg prometheus.Registerer) *Metrics

type ShardingStrategy

type ShardingStrategy interface {
	// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
	// that should be synced by the store-gateway.
	FilterUsers(ctx context.Context, userIDs []string) ([]string, error)

	// FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway.
	// The provided loaded map contains blocks which have been previously returned by this function and
	// are now loaded or loading in the store-gateway.
	FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced tsdb_block.GaugeVec) error
}

type ShuffleShardingStrategy

type ShuffleShardingStrategy struct {
	// contains filtered or unexported fields
}

SardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.

func NewShuffleShardingStrategy

func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits storegateway.ShardingLimits, logger log.Logger) *ShuffleShardingStrategy

NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.

func (*ShuffleShardingStrategy) FilterBlocks

func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced tsdb_block.GaugeVec) error

FilterBlocks implements ShardingStrategy.

func (*ShuffleShardingStrategy) FilterUsers

func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)

FilterUsers implements ShardingStrategy.

type StoreGateway

type StoreGateway struct {
	services.Service
	// contains filtered or unexported fields
}

func NewStoreGateway

func NewStoreGateway(gatewayCfg Config, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)

func (*StoreGateway) BlocksHandler

func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) MergeProfilesLabels

func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error

func (*StoreGateway) MergeProfilesPprof

func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error

func (*StoreGateway) MergeProfilesStacktraces

func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error

func (*StoreGateway) RingHandler

func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) TenantsHandler

func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL