bloomgateway

package
v3.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Overview

Bloom Gateway package

The bloom gateway is a component that can be run as a standalone microserivce target and provides capabilities for filtering ChunkRefs based on a given list of line filter expressions.

		     Querier   Query Frontend
		        |           |
		................................... service boundary
		        |           |
		        +----+------+
		             |
		     indexgateway.Gateway
		             |
		   bloomgateway.BloomQuerier
		             |
		   bloomgateway.GatewayClient
		             |
		  logproto.BloomGatewayClient
		             |
		................................... service boundary
		             |
		      bloomgateway.Gateway
		             |
	       queue.RequestQueue
		             |
	       bloomgateway.Worker
		             |
	     bloomgateway.Processor
		             |
         bloomshipper.Store
		             |
         bloomshipper.Client
		             |
		        ObjectClient
		             |
		................................... service boundary
		             |
	         object storage

Index

Constants

View Source
const (
	Day = 24 * time.Hour
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AddressProvider

type AddressProvider interface {
	Addresses() []string
}

type BloomQuerier

type BloomQuerier struct {
	// contains filtered or unexported fields
}

BloomQuerier is a store-level abstraction on top of Client It is used by the index gateway to filter ChunkRefs based on given line fiter expression.

func NewQuerier

func NewQuerier(c Client, r prometheus.Registerer, logger log.Logger) *BloomQuerier

func (*BloomQuerier) FilterChunkRefs

func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error)

type CacheConfig

type CacheConfig struct {
	resultscache.Config `yaml:",inline"`
}

func (*CacheConfig) RegisterFlags

func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags.

func (*CacheConfig) RegisterFlagsWithPrefix

func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type CacheLimits

type CacheLimits interface {
	resultscache.Limits
	BloomGatewayCacheKeyInterval(tenantID string) time.Duration
}

type Client

type Client interface {
	FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
}

type ClientCache

type ClientCache struct {
	// contains filtered or unexported fields
}

func NewBloomGatewayClientCacheMiddleware

func NewBloomGatewayClientCacheMiddleware(
	logger log.Logger,
	next logproto.BloomGatewayClient,
	c cache.Cache,
	limits CacheLimits,
	cacheGen resultscache.CacheGenNumberLoader,
	retentionEnabled bool,
) *ClientCache

func (*ClientCache) FilterChunkRefs

type ClientConfig

type ClientConfig struct {
	// PoolConfig defines the behavior of the gRPC connection pool used to communicate
	// with the Bloom Gateway.
	PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."`

	// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
	GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`

	// Cache configures the cache used to store the results of the Bloom Gateway server.
	Cache        CacheConfig `yaml:"results_cache,omitempty"`
	CacheResults bool        `yaml:"cache_results"`

	// Client sharding using DNS disvovery and jumphash
	Addresses string `yaml:"addresses,omitempty"`
}

IndexGatewayClientConfig configures the Index Gateway client used to communicate with the Index Gateway server.

func (*ClientConfig) RegisterFlags

func (i *ClientConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags for the Bloom Gateway client configuration.

func (*ClientConfig) RegisterFlagsWithPrefix

func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix registers flags for the Bloom Gateway client configuration with a common prefix.

func (*ClientConfig) Validate

func (i *ClientConfig) Validate() error

type Config

type Config struct {
	// Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks.
	Enabled bool `yaml:"enabled"`
	// Client configures the Bloom Gateway client
	Client ClientConfig `yaml:"client,omitempty" doc:""`

	WorkerConcurrency       int `yaml:"worker_concurrency"`
	BlockQueryConcurrency   int `yaml:"block_query_concurrency"`
	MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
	NumMultiplexItems       int `yaml:"num_multiplex_tasks"`
}

Config configures the Bloom Gateway component.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags for the Bloom Gateway configuration.

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.

func (*Config) Validate

func (cfg *Config) Validate() error

type GRPCPool

GRPCPool represents a pool of gRPC connections to different bloom gateway instances. Interfaces are inlined for simplicity to automatically satisfy interface functions.

func NewBloomGatewayGRPCPool

func NewBloomGatewayGRPCPool(address string, opts []grpc.DialOption) (*GRPCPool, error)

NewBloomGatewayGRPCPool instantiates a new pool of GRPC connections for the Bloom Gateway Internally, it also instantiates a protobuf bloom gateway client and a health client.

type Gateway

type Gateway struct {
	services.Service
	// contains filtered or unexported fields
}

func New

func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error)

New returns a new instance of the Bloom Gateway.

func (*Gateway) FilterChunkRefs

FilterChunkRefs implements BloomGatewayServer

type GatewayClient

type GatewayClient struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(
	cfg ClientConfig,
	limits Limits,
	registerer prometheus.Registerer,
	logger log.Logger,
	cacheGen resultscache.CacheGenNumberLoader,
	retentionEnabled bool,
) (*GatewayClient, error)

func (*GatewayClient) Close

func (c *GatewayClient) Close()

func (*GatewayClient) FilterChunks

func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)

FilterChunkRefs implements Client

type JumpHashClientPool

type JumpHashClientPool struct {
	*client.Pool
	*jumphash.Selector
	// contains filtered or unexported fields
}

func NewJumpHashClientPool

func NewJumpHashClientPool(pool *client.Pool, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) *JumpHashClientPool

func (*JumpHashClientPool) AddrForFingerprint

func (p *JumpHashClientPool) AddrForFingerprint(fp uint64) (string, error)

func (*JumpHashClientPool) Start

func (p *JumpHashClientPool) Start()

func (*JumpHashClientPool) Stop

func (p *JumpHashClientPool) Stop()

type Limits

type Limits interface {
	CacheLimits
	BloomGatewayShardSize(tenantID string) int
	BloomGatewayEnabled(tenantID string) bool
}

type PoolConfig

type PoolConfig struct {
	CheckInterval             time.Duration `yaml:"check_interval"`
	HealthCheckEnabled        bool          `yaml:"enable_health_check"`
	HealthCheckTimeout        time.Duration `yaml:"health_check_timeout"`
	MaxConcurrentHealthChecks int           `yaml:"-"`
}

PoolConfig is config for creating a Pool. It has the same fields as "github.com/grafana/dskit/ring/client.PoolConfig" so it can be cast.

func (*PoolConfig) RegisterFlagsWithPrefix

func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*PoolConfig) Validate

func (cfg *PoolConfig) Validate() error

type Stats

type Stats struct {
	Status                              string
	NumTasks, NumFilters                int
	ChunksRequested, ChunksFiltered     int
	SeriesRequested, SeriesFiltered     int
	QueueTime                           *atomic.Duration
	MetasFetchTime, BlocksFetchTime     *atomic.Duration
	ProcessingTime, TotalProcessingTime *atomic.Duration
	PostProcessingTime                  *atomic.Duration
	ProcessedBlocks                     *atomic.Int32
}

func ContextWithEmptyStats

func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context)

ContextWithEmptyStats returns a context with empty stats.

func FromContext

func FromContext(ctx context.Context) *Stats

FromContext gets the Stats out of the Context. Returns nil if stats have not been initialised in the context.

func (*Stats) AddBlocksFetchTime

func (s *Stats) AddBlocksFetchTime(t time.Duration)

func (*Stats) AddMetasFetchTime

func (s *Stats) AddMetasFetchTime(t time.Duration)

func (*Stats) AddPostProcessingTime

func (s *Stats) AddPostProcessingTime(t time.Duration)

func (*Stats) AddProcessingTime

func (s *Stats) AddProcessingTime(t time.Duration)

func (*Stats) AddQueueTime

func (s *Stats) AddQueueTime(t time.Duration)

func (*Stats) AddTotalProcessingTime

func (s *Stats) AddTotalProcessingTime(t time.Duration)

func (*Stats) Duration

func (s *Stats) Duration() (dur time.Duration)

aggregates the total duration

func (*Stats) IncProcessedBlocks

func (s *Stats) IncProcessedBlocks()

func (*Stats) KVArgs

func (s *Stats) KVArgs() []any

type Task

type Task struct {
	// ID is a lexcographically sortable unique identifier of the task
	ID ulid.ULID
	// Tenant is the tenant ID
	Tenant string
	// contains filtered or unexported fields
}

Task is the data structure that is enqueued to the internal queue and dequeued by query workers

func NewTask

func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr) (Task, error)

NewTask returns a new Task that can be enqueued to the task queue. In addition, it returns a result and an error channel, as well as an error if the instantiation fails.

func (Task) Bounds

func (t Task) Bounds() (model.Time, model.Time)

Bounds implements Bounded see pkg/storage/stores/shipper/indexshipper/tsdb.Bounded

func (Task) Close

func (t Task) Close()

func (Task) CloseWithError

func (t Task) CloseWithError(err error)

func (Task) Copy

func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task

Copy returns a copy of the existing task but with a new slice of grouped chunk refs

func (Task) Done

func (t Task) Done() <-chan struct{}

func (Task) Err

func (t Task) Err() error

func (Task) RequestIter

func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL