distributor

package
v0.0.0-...-748a726 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: AGPL-3.0 Imports: 81 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
	// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
	ExemplarMaxLabelSetLength = 128
)
View Source
const (
	SkipLabelNameValidationHeader = "X-Mimir-SkipLabelNameValidation"
)
View Source
const (
	// 529 is non-standard status code used by some services to signal that "The service is overloaded".
	StatusServiceOverloaded = 529
)

Variables

View Source
var (
	ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowHaTracker   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrResponseTooLarge = errors.New("response too large")

Functions

func GetReplicaDescCodec

func GetReplicaDescCodec() codec.Proto

func Handler

func Handler(
	maxRecvMsgSize int,
	sourceIPs *middleware.SourceIPExtractor,
	allowSkipLabelNameValidation bool,
	limits *validation.Overrides,
	retryCfg RetryConfig,
	push PushFunc,
	logger log.Logger,
) http.Handler

Handler is a http.Handler which accepts WriteRequests.

func NewPool

func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger) *ring_client.Pool

func OTLPHandler

func OTLPHandler(
	maxRecvMsgSize int,
	sourceIPs *middleware.SourceIPExtractor,
	allowSkipLabelNameValidation bool,
	enableOtelMetadataStorage bool,
	limits *validation.Overrides,
	retryCfg RetryConfig,
	reg prometheus.Registerer,
	push PushFunc,
	logger log.Logger,
) http.Handler

OTLPHandler is an http.Handler accepting OTLP write requests.

func ProtoReplicaDescFactory

func ProtoReplicaDescFactory() proto.Message

ProtoReplicaDescFactory makes new InstanceDescs

func SetDefaultInstanceLimitsForYAMLUnmarshalling

func SetDefaultInstanceLimitsForYAMLUnmarshalling(l InstanceLimits)

func TimeseriesToOTLPRequest

func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) pmetricotlp.ExportRequest

TimeseriesToOTLPRequest is used in tests.

Types

type Config

type Config struct {
	PoolConfig PoolConfig `yaml:"pool"`

	RetryConfig     RetryConfig     `yaml:"retry_after_header"`
	HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

	MaxRecvMsgSize int           `yaml:"max_recv_msg_size" category:"advanced"`
	RemoteTimeout  time.Duration `yaml:"remote_timeout" category:"advanced"`

	// Distributors ring
	DistributorRing RingConfig `yaml:"ring"`

	// for testing and for extending the ingester by adding calls to the client
	IngesterClientFactory ring_client.PoolFactory `yaml:"-"`

	// when true the distributor does not validate the label name, Mimir doesn't directly use
	// this (and should never use it) but this feature is used by other projects built on top of it
	SkipLabelNameValidation bool `yaml:"-"`

	// This config is dynamically injected because it is defined in the querier config.
	ShuffleShardingLookbackPeriod              time.Duration `yaml:"-"`
	StreamingChunksPerIngesterSeriesBufferSize uint64        `yaml:"-"`
	MinimizeIngesterRequests                   bool          `yaml:"-"`
	MinimiseIngesterRequestsHedgingDelay       time.Duration `yaml:"-"`
	PreferAvailabilityZone                     string        `yaml:"-"`

	// IngestStorageConfig is dynamically injected because defined outside of distributor config.
	IngestStorageConfig ingest.Config `yaml:"-"`

	// Limits for distributor
	DefaultLimits    InstanceLimits         `yaml:"instance_limits"`
	InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

	// This allows downstream projects to wrap the distributor push function
	// and access the deserialized write requests before/after they are pushed.
	// These functions will only receive samples that don't get dropped by HA deduplication.
	PushWrappers []PushWrapper `yaml:"-"`

	WriteRequestsBufferPoolingEnabled           bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
	LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled
	ReusableIngesterPushWorkers                 int  `yaml:"reusable_ingester_push_workers" category:"advanced"`
}

Config contains the configuration required to create a Distributor

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*Config) Validate

func (cfg *Config) Validate(limits validation.Limits) error

Validate config and returns error on failure

type Distributor

type Distributor struct {
	services.Service

	// For handling HA replicas.
	HATracker *haTracker

	PushWithMiddlewares PushFunc
	// contains filtered or unexported fields
}

Distributor forwards appends and queries to individual ingesters.

func New

func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error)

New constructs a new Distributor

func (*Distributor) ActiveSeries

func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Matcher) ([]labels.Labels, error)

ActiveSeries queries the ingester replication set for active series matching the given selector. It combines and deduplicates the results.

func (*Distributor) AllUserStats

func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)

AllUserStats returns statistics about all users. Note it does not divide by the ReplicationFactor like UserStats()

func (*Distributor) AllUserStatsHandler

func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)

AllUserStatsHandler shows stats for all users.

func (*Distributor) FinishPushRequest

func (d *Distributor) FinishPushRequest(ctx context.Context)

FinishPushRequest is a counter-part to StartPushRequest, and must be called exactly once while handling the push request, on the same goroutine as push method itself.

func (*Distributor) HealthyInstancesCount

func (d *Distributor) HealthyInstancesCount() int

HealthyInstancesCount implements the ReadLifecycler interface

We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES

func (*Distributor) LabelNames

func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error)

LabelNames returns the names of all labels from series with samples timestamp between from and to, and matching the input optional series label matchers. The returned label names are sorted.

func (*Distributor) LabelNamesAndValues

func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesResponse, error)

LabelNamesAndValues returns the label name and value pairs for series matching the input label matchers.

The actual series considered eligible depend on countMethod:

  • inmemory: in-memory series in ingesters.
  • active: in-memory series in ingesters which are also tracked as active ones.

func (*Distributor) LabelValuesCardinality

func (d *Distributor) LabelValuesCardinality(ctx context.Context, labelNames []model.LabelName, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (uint64, *ingester_client.LabelValuesCardinalityResponse, error)

LabelValuesCardinality performs the following two operations in parallel:

  • queries ingesters for label values cardinality of a set of labelNames
  • queries ingesters for user stats to get the ingester's series head count

func (*Distributor) LabelValuesForLabelName

func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error)

LabelValuesForLabelName returns the label values associated with the given labelName, among all series with samples timestamp between from and to, and series labels matching the optional matchers.

func (*Distributor) MetricsForLabelMatchers

func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)

MetricsForLabelMatchers returns a list of series with samples timestamps between from and through, and series labels matching the optional label matchers. The returned series are not sorted.

func (*Distributor) MetricsMetadata

MetricsMetadata returns the metrics metadata based on the provided req.

func (*Distributor) Push

Push is gRPC method registered as client.IngesterServer and distributor.DistributorServer.

func (*Distributor) QueryExemplars

func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error)

QueryExemplars returns exemplars with timestamp between from and to, for the series matching the input series label matchers. The exemplars in the response are sorted by series labels.

func (*Distributor) QueryStream

func (d *Distributor) QueryStream(ctx context.Context, queryMetrics *stats.QueryMetrics, from, to model.Time, matchers ...*labels.Matcher) (ingester_client.CombinedQueryStreamResponse, error)

QueryStream queries multiple ingesters via the streaming interface and returns a big ol' set of chunks.

func (*Distributor) RemoveGroupMetricsForUser

func (d *Distributor) RemoveGroupMetricsForUser(userID, group string)

func (*Distributor) ServeHTTP

func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Distributor) StartPushRequest

func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error)

func (*Distributor) UserStats

func (d *Distributor) UserStats(ctx context.Context, countMethod cardinality.CountMethod) (*UserStats, error)

UserStats returns statistics about the current user.

func (*Distributor) UserStatsHandler

func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)

UserStatsHandler handles user stats to the Distributor.

type HATrackerConfig

type HATrackerConfig struct {
	EnableHATracker bool `yaml:"enable_ha_tracker"`
	// We should only update the timestamp if the difference
	// between the stored timestamp and the time we received a sample at
	// is more than this duration.
	UpdateTimeout          time.Duration `yaml:"ha_tracker_update_timeout" category:"advanced"`
	UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max" category:"advanced"`
	// We should only failover to accepting samples from a replica
	// other than the replica written in the KVStore if the difference
	// between the stored timestamp and the time we received a sample is
	// more than this duration
	FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout" category:"advanced"`

	KVStore kv.Config `` /* 190-byte string literal not displayed */
}

HATrackerConfig contains the configuration require to create a HA Tracker.

func (*HATrackerConfig) RegisterFlags

func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*HATrackerConfig) Validate

func (cfg *HATrackerConfig) Validate() error

Validate config and returns error on failure

type InstanceLimits

type InstanceLimits struct {
	MaxIngestionRate             float64 `yaml:"max_ingestion_rate" category:"advanced"`
	MaxInflightPushRequests      int     `yaml:"max_inflight_push_requests" category:"advanced"`
	MaxInflightPushRequestsBytes int     `yaml:"max_inflight_push_requests_bytes" category:"advanced"`
}

func (*InstanceLimits) RegisterFlags

func (l *InstanceLimits) RegisterFlags(f *flag.FlagSet)

func (*InstanceLimits) UnmarshalYAML

func (l *InstanceLimits) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML implements the yaml.Unmarshaler interface.

type PoolConfig

type PoolConfig struct {
	ClientCleanupPeriod  time.Duration `yaml:"client_cleanup_period" category:"advanced"`
	HealthCheckIngesters bool          `yaml:"health_check_ingesters" category:"advanced"`
	RemoteTimeout        time.Duration `yaml:"-"`
}

PoolConfig is config for creating a Pool.

func (*PoolConfig) RegisterFlags

func (cfg *PoolConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

type PushFunc

type PushFunc func(ctx context.Context, req *Request) error

PushFunc defines the type of the push. It is similar to http.HandlerFunc.

type PushWrapper

type PushWrapper func(next PushFunc) PushFunc

PushWrapper wraps around a push. It is similar to middleware.Interface.

type ReadLifecycler

type ReadLifecycler interface {
	HealthyInstancesCount() int
}

ReadLifecycler represents the read interface to the lifecycler.

type ReplicaDesc

type ReplicaDesc struct {
	Replica    string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"`
	ReceivedAt int64  `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"`
	// Unix timestamp in milliseconds when this entry was marked for deletion.
	// Reason for doing marking first, and delete later, is to make sure that distributors
	// watching the prefix will receive notification on "marking" -- at which point they can
	// already remove entry from memory. Actual deletion from KV store does *not* trigger
	// "watch" notification with a key for all KV stores.
	DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"`
}

func NewReplicaDesc

func NewReplicaDesc() *ReplicaDesc

NewReplicaDesc returns an empty *distributor.ReplicaDesc.

func (*ReplicaDesc) Descriptor

func (*ReplicaDesc) Descriptor() ([]byte, []int)

func (*ReplicaDesc) Equal

func (this *ReplicaDesc) Equal(that interface{}) bool

func (*ReplicaDesc) GetDeletedAt

func (m *ReplicaDesc) GetDeletedAt() int64

func (*ReplicaDesc) GetReceivedAt

func (m *ReplicaDesc) GetReceivedAt() int64

func (*ReplicaDesc) GetReplica

func (m *ReplicaDesc) GetReplica() string

func (*ReplicaDesc) GoString

func (this *ReplicaDesc) GoString() string

func (*ReplicaDesc) Marshal

func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)

func (*ReplicaDesc) MarshalTo

func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error)

func (*ReplicaDesc) MarshalToSizedBuffer

func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ReplicaDesc) ProtoMessage

func (*ReplicaDesc) ProtoMessage()

func (*ReplicaDesc) Reset

func (m *ReplicaDesc) Reset()

func (*ReplicaDesc) Size

func (m *ReplicaDesc) Size() (n int)

func (*ReplicaDesc) String

func (this *ReplicaDesc) String() string

func (*ReplicaDesc) Unmarshal

func (m *ReplicaDesc) Unmarshal(dAtA []byte) error

func (*ReplicaDesc) XXX_DiscardUnknown

func (m *ReplicaDesc) XXX_DiscardUnknown()

func (*ReplicaDesc) XXX_Marshal

func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReplicaDesc) XXX_Merge

func (m *ReplicaDesc) XXX_Merge(src proto.Message)

func (*ReplicaDesc) XXX_Size

func (m *ReplicaDesc) XXX_Size() int

func (*ReplicaDesc) XXX_Unmarshal

func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error

type Request

type Request struct {
	// contains filtered or unexported fields
}

Request represents a push request. It allows lazy body reading from the underlying http request and adding cleanup functions that should be called after the request has been handled.

func NewParsedRequest

func NewParsedRequest(r *mimirpb.WriteRequest) *Request

func (*Request) AddCleanup

func (r *Request) AddCleanup(f func())

AddCleanup adds a function that will be called once CleanUp is called. If f is nil, it will not be invoked.

func (*Request) CleanUp

func (r *Request) CleanUp()

CleanUp calls all added cleanups in reverse order - the last added is the first invoked. CleanUp removes each called cleanup function from the list of cleanups. So subsequent calls to CleanUp will not invoke the same cleanup functions.

func (*Request) WriteRequest

func (r *Request) WriteRequest() (*mimirpb.WriteRequest, error)

WriteRequest returns request from supplier function. Function is only called once, and subsequent calls to WriteRequest return the same value.

type RetryConfig

type RetryConfig struct {
	Enabled            bool `yaml:"enabled" category:"experimental"`
	BaseSeconds        int  `yaml:"base_seconds" category:"experimental"`
	MaxBackoffExponent int  `yaml:"max_backoff_exponent" category:"experimental"`
}

func (*RetryConfig) RegisterFlags

func (cfg *RetryConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*RetryConfig) Validate

func (cfg *RetryConfig) Validate() error

type RingConfig

type RingConfig struct {
	Common util.CommonRingConfig `yaml:",inline"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

func (*RingConfig) ToBasicLifecyclerConfig

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

type UserIDStats

type UserIDStats struct {
	UserID string `json:"userID"`
	UserStats
}

UserIDStats models ingestion statistics for one user, including the user ID

type UserStats

type UserStats struct {
	IngestionRate     float64 `json:"ingestionRate"`
	NumSeries         uint64  `json:"numSeries"`
	APIIngestionRate  float64 `json:"APIIngestionRate"`
	RuleIngestionRate float64 `json:"RuleIngestionRate"`
}

UserStats models ingestion statistics for one user.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL