storegateway

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2024 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RingKey is the key under which we store the store gateways ring in the KVStore.
	RingKey = "store-gateway"

	// RingNameForServer is the name of the ring used by the store gateway server.
	RingNameForServer = "store-gateway"

	// RingNameForClient is the name of the ring used by the store gateway client (we need
	// a different name to avoid clashing Prometheus metrics when running in single-binary).
	RingNameForClient = "store-gateway-client"

	// We use a safe default instead of exposing to config option to the user
	// in order to simplify the config.
	RingNumTokens = 512
)

Variables

View Source
var (
	// BlocksOwnerSync is the operation used to check the authoritative owners of a block
	// (replicas included).
	BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)

	// BlocksOwnerRead is the operation used to check the authoritative owners of a block
	// (replicas included) that are available for queries (a store-gateway is available for
	// queries only when ACTIVE).
	BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

	// BlocksRead is the operation run by the querier to query blocks via the store-gateway.
	BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {

		return s != ring.ACTIVE
	})
)

Functions

func GetShuffleShardingSubring

func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing

GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.

func NewShardingMetadataFilterAdapter

func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter

Types

type Block

type Block struct {
	BlockCloser
	// contains filtered or unexported fields
}

type BlockCloser

type BlockCloser interface {
	phlaredb.Querier
	Close() error
}

type BucketIndexConfig added in v1.2.0

type BucketIndexConfig struct {
	UpdateOnErrorInterval time.Duration `yaml:"update_on_error_interval" category:"advanced"`
	IdleTimeout           time.Duration `yaml:"idle_timeout" category:"advanced"`
	MaxStalePeriod        time.Duration `yaml:"max_stale_period" category:"advanced"`
}

func (*BucketIndexConfig) RegisterFlagsWithPrefix added in v1.2.0

func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

type BucketIndexMetadataFetcher added in v1.2.0

type BucketIndexMetadataFetcher struct {
	// contains filtered or unexported fields
}

BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Mimir bucket index.

func NewBucketIndexMetadataFetcher added in v1.2.0

func NewBucketIndexMetadataFetcher(
	userID string,
	bkt objstore.Bucket,
	cfgProvider objstore.TenantConfigProvider,
	logger log.Logger,
	reg prometheus.Registerer,
	filters []block.MetadataFilter,
) *BucketIndexMetadataFetcher

func (*BucketIndexMetadataFetcher) Fetch added in v1.2.0

func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*block.Meta, partial map[ulid.ULID]error, err error)

Fetch implements block.MetadataFetcher. Not goroutine-safe.

type BucketStore

type BucketStore struct {
	// contains filtered or unexported fields
}

func NewBucketStore

func NewBucketStore(bucket phlareobj.Bucket, fetcher block.MetadataFetcher, tenantID string, syncDir string, logger log.Logger, reg prometheus.Registerer) (*BucketStore, error)

func (*BucketStore) BlockMetadata added in v1.2.0

func (*BucketStore) InitialSync

func (b *BucketStore) InitialSync(ctx context.Context) error

func (*BucketStore) MergeSpanProfile added in v1.2.0

func (*BucketStore) RemoveBlocksAndClose

func (s *BucketStore) RemoveBlocksAndClose() error

RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.

func (*BucketStore) Stats

func (b *BucketStore) Stats() BucketStoreStats

func (*BucketStore) SyncBlocks

func (s *BucketStore) SyncBlocks(ctx context.Context) error

type BucketStoreConfig

type BucketStoreConfig struct {
	SyncDir                  string        `yaml:"sync_dir"`
	SyncInterval             time.Duration `yaml:"sync_interval" category:"advanced"`
	TenantSyncConcurrency    int           `yaml:"tenant_sync_concurrency" category:"advanced"`
	IgnoreBlocksWithin       time.Duration `yaml:"ignore_blocks_within" category:"advanced"`
	MetaSyncConcurrency      int           `yaml:"meta_sync_concurrency" category:"advanced"`
	IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"`
}

func (*BucketStoreConfig) RegisterFlags

func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the BucketStore flags

func (*BucketStoreConfig) Validate

func (cfg *BucketStoreConfig) Validate(logger log.Logger) error

Validate the config.

type BucketStoreStats

type BucketStoreStats struct {
	// BlocksLoaded is the number of blocks currently loaded in the bucket store.
	BlocksLoaded int
}

type BucketStores

type BucketStores struct {
	// contains filtered or unexported fields
}

func NewBucketStores

func NewBucketStores(cfg BucketStoreConfig, shardingStrategy ShardingStrategy, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)

func (*BucketStores) InitialSync

func (bs *BucketStores) InitialSync(ctx context.Context) error

func (*BucketStores) SyncBlocks

func (bs *BucketStores) SyncBlocks(ctx context.Context) error

SyncBlocks synchronizes the stores state with the Bucket store for every user.

type Config

type Config struct {
	ShardingRing      RingConfig        `yaml:"sharding_ring" doc:"description=The hash ring configuration."`
	BucketStoreConfig BucketStoreConfig `yaml:"bucket_store,omitempty"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the Config flags.

func (*Config) Validate

func (c *Config) Validate(limits validation.Limits) error

type IgnoreDeletionMarkFilter added in v1.2.0

type IgnoreDeletionMarkFilter struct {
	// contains filtered or unexported fields
}

IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements the MetadataFilterWithBucketIndex interface.

func NewIgnoreDeletionMarkFilter added in v1.2.0

func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.BucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter

NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.

func (*IgnoreDeletionMarkFilter) DeletionMarkBlocks added in v1.2.0

func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*block.DeletionMark

DeletionMarkBlocks returns blocks that were marked for deletion.

func (*IgnoreDeletionMarkFilter) Filter added in v1.2.0

func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error

Filter implements block.MetadataFilter.

func (*IgnoreDeletionMarkFilter) FilterWithBucketIndex added in v1.2.0

func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error

FilterWithBucketIndex implements MetadataFilterWithBucketIndex.

type Limits

type Limits interface {
	ShardingLimits
	phlareobj.TenantConfigProvider
}

type MetadataFilterWithBucketIndex added in v1.2.0

type MetadataFilterWithBucketIndex interface {
	// FilterWithBucketIndex is like Thanos MetadataFilter.Filter() but it provides in input the bucket index too.
	FilterWithBucketIndex(ctx context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
}

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewBucketStoreMetrics added in v1.3.0

func NewBucketStoreMetrics(reg prometheus.Registerer) *Metrics

func (*Metrics) Unregister added in v1.3.0

func (m *Metrics) Unregister()

type RingConfig

type RingConfig struct {
	Ring                 util.CommonRingConfig `yaml:",inline"`
	ReplicationFactor    int                   `yaml:"replication_factor" category:"advanced"`
	TokensFilePath       string                `yaml:"tokens_file_path"`
	ZoneAwarenessEnabled bool                  `yaml:"zone_awareness_enabled"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"`

	// Instance details
	InstanceZone string `yaml:"instance_availability_zone"`

	UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"`

	// Injected internally
	RingCheckPeriod time.Duration `yaml:"-"`
}

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig

func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

type ShardingLimits

type ShardingLimits interface {
	StoreGatewayTenantShardSize(tenantID string) int
}

ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.

type ShardingStrategy

type ShardingStrategy interface {
	// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
	// that should be synced by the store-gateway.
	FilterUsers(ctx context.Context, userIDs []string) ([]string, error)

	// FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway.
	// The provided loaded map contains blocks which have been previously returned by this function and
	// are now loaded or loading in the store-gateway.
	FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error
}

type ShuffleShardingStrategy

type ShuffleShardingStrategy struct {
	// contains filtered or unexported fields
}

SardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.

func NewShuffleShardingStrategy

func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy

NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.

func (*ShuffleShardingStrategy) FilterBlocks

func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error

FilterBlocks implements ShardingStrategy.

func (*ShuffleShardingStrategy) FilterUsers

func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)

FilterUsers implements ShardingStrategy.

type StoreGateway

type StoreGateway struct {
	services.Service
	// contains filtered or unexported fields
}

func NewStoreGateway

func NewStoreGateway(gatewayCfg Config, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)

func (*StoreGateway) BlockMetadata added in v1.2.0

func (*StoreGateway) BlocksHandler

func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) LabelNames added in v1.2.0

func (*StoreGateway) LabelValues added in v1.2.0

func (*StoreGateway) MergeSpanProfile added in v1.2.0

func (*StoreGateway) ProfileTypes added in v1.2.0

func (*StoreGateway) RingHandler

func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) Series added in v1.1.0

func (*StoreGateway) TenantsHandler

func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL