ingester

package
v0.0.0-...-bfa2730 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: AGPL-3.0 Imports: 48 Imported by: 0

README

ingester

The ingester is responsible for collecting the data from the distributors and storing this data to disk. This module has a few components:

  • WAL - This is the Write Ahead Log that tries to ensure zero data loss
  • tenantBlockManager - This is responsible for managing the data for each tenant
  • search - This is an optional route to search live snapshots (snapshots that have been ingested but not yet flushed to storage)

Overview

The ingester accepts data from the distributors via the 'IngesterService' service. This data is then passed to an instance of the 'tenantBlockManager'. Which will store the data in memory, once in memory it is periodically flushed to the WAL (also on shutdown). These WAL blocks are then periodically flushed to a local 'backend block' which is a local version of how it is stored in the backend (s3). These local blocks are then periodically flushed to the backend storage (s3).

On start up we have to replay any data that has been written to the local disk. This ensures that any data that has been accepted, but not flushed to storage is not lost during a crash.

IngesterService

This is the main entry for data, it is defined in ingester.go. This will simply find or create a new tenantBlockManager and pass the data to it for processing.

tenantBlockManager

This deals with reading and writing data to the various forms of blocks that are used in the ingester:

  • liveSnapshot - An in memory 'block' that has not been written to disk yet
  • WAL Block - A collection of smaller blocks, each being the contents of liveSnapshots
  • local backend Block - This is the compacted WAL block that created prior to flushing to storage
  • backend block - The block that is a copy of the local backend block but is sent to storage

Each stage is written to and flushed to the next using periodic checks.

ingest to liveSnapshots

This is the first step of ingest, as soon as the data is accepted from the IngesterService, it is written to the tenantBlockManager into the liveSnapshots.

liveSnapshots to WAL Block

This is the main way to prevent data loss on crash or restart, periodically (using the FlushCheckPeriod config value, default 10 seconds) the data form the liveSnapshots is written to a new file in the current WAL block.

WAL Block to local backend Block

Every time WAL Block is updated it is checked if it is ready, determined by its age and size. Once it is ready it is queued to as ready. Then a new WAL block will be started for the next set of liveSnapshots. An operation of ' opKindComplete' is then enqueued so the completed block can be handled.

The operations are continuously checked, and as soon as the processors have capacity they will process the completed block. This entails compacting and writing the data into a backend block and storing it locally, becoming a 'local backend block'. Once complete another operation is enqueued 'opKindFlush'.

local backend Block to backend Block

When the operation 'opKindFlush' is processed, then tenantBlockManager will write the local block to the backend storage. The local block is then marked as flushed, and will be deleted once the 'CompleteBlockTimeout' has expired.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrReadOnly = errors.New("Ingester is shutting down")

ErrReadOnly is returned when the ingester is shutting down and a push was attempted.

Functions

This section is empty.

Types

type ComparableTagValue

type ComparableTagValue struct {
	Type  string
	Value string
}

type Config

type Config struct {
	LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

	ConcurrentFlushes    int           `yaml:"concurrent_flushes"`
	FlushCheckPeriod     time.Duration `yaml:"flush_check_period"`
	FlushOpTimeout       time.Duration `yaml:"flush_op_timeout"`
	MaxSnapshotIdle      time.Duration `yaml:"snapshot_idle_period"`
	MaxBlockDuration     time.Duration `yaml:"max_block_duration"`
	MaxBlockBytes        uint64        `yaml:"max_block_bytes"`
	CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
	OverrideRingKey      string        `yaml:"override_ring_key"`
}

Config for an ingester.

func (*Config) RegisterFlagsAndApplyDefaults

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

RegisterFlagsAndApplyDefaults registers the flags.

type Ingester

type Ingester struct {
	services.Service
	deeppb.UnimplementedIngesterServiceServer
	deeppb.UnimplementedQuerierServiceServer
	// contains filtered or unexported fields
}

Ingester builds blocks out of incoming snapshots

func New

func New(cfg Config, store storage.Store, limits *overrides.Overrides, reg prometheus.Registerer) (*Ingester, error)

New makes a new Ingester.

func (*Ingester) CheckReady

func (i *Ingester) CheckReady(ctx context.Context) error

func (*Ingester) FindSnapshotByID

func (*Ingester) Flush

func (i *Ingester) Flush()

Flush triggers a flush of all in memory snapshots to disk. This is called by the lifecycler on shutdown and will put our snapshots in the WAL to be replayed.

func (*Ingester) FlushHandler

func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)

FlushHandler calls sweepAllInstances(true) which will force push all snapshots into the WAL and force mark all head blocks as ready to flush.

func (*Ingester) PushBytes

PushBytes accepts ingest requests of data (snapshots) as byte data

func (*Ingester) SearchBlock

SearchBlock only exists here to fulfill the protobuf interface. The ingester will never support backend search

func (*Ingester) SearchRecent

func (i *Ingester) SearchRecent(ctx context.Context, req *deeppb.SearchRequest) (*deeppb.SearchResponse, error)

func (*Ingester) SearchTagValues

func (*Ingester) SearchTagValuesV2

func (*Ingester) SearchTags

func (*Ingester) ShutdownHandler

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request)

ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order * Stop incoming writes by exiting from the ring * Flush all blocks to backend

func (*Ingester) TransferOut

func (i *Ingester) TransferOut(_ context.Context) error

TransferOut implements ring.Lifecycler.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of snapshots an ingester can handle for a specific tenant

func NewLimiter

func NewLimiter(limits *overrides.Overrides, ring RingCount, replicationFactor int) *Limiter

NewLimiter makes a new limiter

func (*Limiter) AssertMaxSnapshotsPerTenant

func (l *Limiter) AssertMaxSnapshotsPerTenant(tenantID string, snapshots int) error

AssertMaxSnapshotsPerTenant ensures limit has not been reached compared to the current number of streams in input and returns an error if so.

type RingCount

type RingCount interface {
	HealthyInstancesCount() int
}

RingCount is the interface exposed by a ring implementation which allows to count members

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL