compactor

package
v2.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2022 License: AGPL-3.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildUserID

func BuildUserID(id int) string

func SetupTable

func SetupTable(t *testing.T, path string, commonDBsConfig IndexesConfig, perUserDBsConfig PerUserIndexesConfig)

Types

type CompactedIndex

type CompactedIndex interface {
	// IndexProcessor is used for applying custom retention and processing delete requests.
	retention.IndexProcessor
	// Cleanup should clean up all the state built during compaction.
	// It is typically called at the end or in case of an error.
	Cleanup()
	// ToIndexFile is used to convert the CompactedIndex to an IndexFile for uploading to the object store.
	// Once the IndexFile is uploaded using Index.Reader, the file is closed using Index.Close and removed from disk using Index.Path.
	ToIndexFile() (index.Index, error)
}

CompactedIndex is built by TableCompactor for IndexSet after compaction. It would be used for: 1. applying custom retention, processing delete requests using IndexProcessor 2. uploading the compacted index to storage by converting it to index.Index using ToIndexFile After all the operations are successfully done or in case of failure, Cleanup would be called to cleanup the state.

type Compactor

type Compactor struct {
	services.Service

	DeleteRequestsHandler *deletion.DeleteRequestHandler
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits *validation.Overrides, r prometheus.Registerer) (*Compactor, error)

func (*Compactor) CompactTable

func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error

func (*Compactor) OnRingInstanceHeartbeat

func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)

func (*Compactor) OnRingInstanceRegister

func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)

func (*Compactor) OnRingInstanceStopping

func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler)

func (*Compactor) OnRingInstanceTokens

func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)

func (*Compactor) RegisterIndexCompactor

func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor IndexCompactor)

func (*Compactor) RunCompaction

func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) error

func (*Compactor) ServeHTTP

func (c *Compactor) ServeHTTP(w http.ResponseWriter, req *http.Request)

type Config

type Config struct {
	WorkingDirectory          string          `yaml:"working_directory"`
	SharedStoreType           string          `yaml:"shared_store"`
	SharedStoreKeyPrefix      string          `yaml:"shared_store_key_prefix"`
	CompactionInterval        time.Duration   `yaml:"compaction_interval"`
	ApplyRetentionInterval    time.Duration   `yaml:"apply_retention_interval"`
	RetentionEnabled          bool            `yaml:"retention_enabled"`
	RetentionDeleteDelay      time.Duration   `yaml:"retention_delete_delay"`
	RetentionDeleteWorkCount  int             `yaml:"retention_delete_worker_count"`
	RetentionTableTimeout     time.Duration   `yaml:"retention_table_timeout"`
	DeleteBatchSize           int             `yaml:"delete_batch_size"`
	DeleteRequestCancelPeriod time.Duration   `yaml:"delete_request_cancel_period"`
	MaxCompactionParallelism  int             `yaml:"max_compaction_parallelism"`
	CompactorRing             util.RingConfig `yaml:"compactor_ring,omitempty"`
	RunOnce                   bool            `yaml:"-"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate verifies the config does not contain inappropriate values

type IndexCompactor

type IndexCompactor interface {
	// NewTableCompactor returns a new TableCompactor for compacting a table.
	// commonIndexSet refers to common index files or in other words multi-tenant index.
	// existingUserIndexSet refers to existing user specific index files in the storage.
	// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
	// who does not have an index for it in existingUserIndexSet.
	// periodConfig holds the PeriodConfig for the table.
	NewTableCompactor(
		ctx context.Context,
		commonIndexSet IndexSet,
		existingUserIndexSet map[string]IndexSet,
		makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
		periodConfig config.PeriodConfig,
	) TableCompactor

	// OpenCompactedIndexFile opens a compressed index file at given path.
	OpenCompactedIndexFile(
		ctx context.Context,
		path,
		tableName,
		userID,
		workingDir string,
		periodConfig config.PeriodConfig,
		logger log.Logger,
	) (
		CompactedIndex,
		error,
	)
}

type IndexFileConfig

type IndexFileConfig struct {
	CompressFile bool
}

type IndexRecords

type IndexRecords struct {
	Start, NumRecords int
}

type IndexSet

type IndexSet interface {
	GetTableName() string
	ListSourceFiles() []storage.IndexFile
	GetSourceFile(indexFile storage.IndexFile) (string, error)
	GetLogger() log.Logger
	GetWorkingDir() string
	// SetCompactedIndex sets the CompactedIndex for upload/applying retention and making the compactor remove the source files.
	// CompactedIndex can be nil only in case of all the source files in common index set being compacted away to per tenant index.
	// It would return an error if the CompactedIndex is nil and removeSourceFiles is true in case of user index set since
	// compaction should either create new files or can be a noop if there is nothing to compact.
	// There is no need to call SetCompactedIndex if no changes were made to the index for this IndexSet.
	SetCompactedIndex(compactedIndex CompactedIndex, removeSourceFiles bool) error
}

type IndexesConfig

type IndexesConfig struct {
	NumUnCompactedFiles, NumCompactedFiles int
}

func (IndexesConfig) String

func (c IndexesConfig) String() string

type MakeEmptyUserIndexSetFunc

type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)

type PerUserIndexesConfig

type PerUserIndexesConfig struct {
	IndexesConfig
	NumUsers int
}

func (PerUserIndexesConfig) String

func (c PerUserIndexesConfig) String() string

type TableCompactor

type TableCompactor interface {
	// CompactTable compacts the table.
	// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
	CompactTable() (err error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL