queue

package
v0.0.0-...-f43c62e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidTenantID     = errors.New("invalid tenant id")
	ErrTooManyRequests     = errors.New("too many outstanding requests")
	ErrStopped             = errors.New("queue is stopped")
	ErrQuerierShuttingDown = errors.New("querier has informed the scheduler it is shutting down")
)

Functions

This section is empty.

Types

type QuerierID

type QuerierID string

type QueueIndex

type QueueIndex int //nolint:revive // disallows types beginning with package name

type QueuePath

type QueuePath []string //nolint:revive // disallows types beginning with package name

type Request

type Request interface{}

Request stored into the queue.

type RequestQueue

type RequestQueue struct {
	services.Service
	// contains filtered or unexported fields
}

RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers, and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests in a fair fashion.

func NewRequestQueue

func NewRequestQueue(
	log log.Logger,
	maxOutstandingPerTenant int,
	additionalQueueDimensionsEnabled bool,
	forgetDelay time.Duration,
	queueLength *prometheus.GaugeVec,
	discardedRequests *prometheus.CounterVec,
	enqueueDuration prometheus.Histogram,
) *RequestQueue

func (*RequestQueue) EnqueueRequestToDispatcher

func (q *RequestQueue) EnqueueRequestToDispatcher(tenantID string, req Request, maxQueriers int, successFn func()) error

EnqueueRequestToDispatcher handles a request from the query frontend and submits it to the initial dispatcher queue

maxQueries is tenant-specific value to compute which queriers should handle requests for this tenant. It is passed to each EnqueueRequestToDispatcher, because it can change between calls.

If request is successfully enqueued, successFn is called before any querier can receive the request.

func (*RequestQueue) GetConnectedQuerierWorkersMetric

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64

func (*RequestQueue) GetNextRequestForQuerier

func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error)

GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests. By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly. If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.

func (*RequestQueue) NotifyQuerierShutdown

func (q *RequestQueue) NotifyQuerierShutdown(querierID string)

func (*RequestQueue) RegisterQuerierConnection

func (q *RequestQueue) RegisterQuerierConnection(querierID string)

func (*RequestQueue) UnregisterQuerierConnection

func (q *RequestQueue) UnregisterQuerierConnection(querierID string)

type SchedulerRequest

type SchedulerRequest struct {
	FrontendAddress           string
	UserID                    string
	QueryID                   uint64
	Request                   *httpgrpc.HTTPRequest
	StatsEnabled              bool
	AdditionalQueueDimensions []string

	EnqueueTime time.Time

	Ctx        context.Context
	CancelFunc context.CancelCauseFunc
	QueueSpan  opentracing.Span

	// This is only used for testing.
	ParentSpanContext opentracing.SpanContext
}

type TenantID

type TenantID string

type TreeQueue

type TreeQueue struct {
	// contains filtered or unexported fields
}

TreeQueue is a hierarchical queue implementation with an arbitrary amount of child queues.

TreeQueue internally maintains round-robin fair queuing across all of its queue dimensions. Each queuing dimension is modeled as a node in the tree, internally reachable through a QueuePath.

The QueuePath is an ordered array of strings describing the path through the tree to the node, which contains the FIFO local queue of all items enqueued for that queuing dimension.

When dequeuing from a given node, the node will round-robin equally between dequeuing directly from its own local queue and dequeuing recursively from its list of child TreeQueues. No queue at a given level of the tree is dequeued from consecutively unless all others at the same level of the tree are empty down to the leaf node.

func NewTreeQueue

func NewTreeQueue(name string) *TreeQueue

func (*TreeQueue) Dequeue

func (q *TreeQueue) Dequeue() any

Dequeue removes and returns an item from the front of the next nonempty queue node in the tree.

Dequeuing from a node follows the round-robin order of the node's childQueueOrder, dequeuing either from the node's localQueue or selecting the next child node in the order and recursively calling Dequeue on the child nodes until a nonempty queue is found.

Nodes that empty down to the leaf after being dequeued from are deleted as the recursion returns up the stack. This maintains structural guarantees relied on to make IsEmpty() non-recursive.

func (*TreeQueue) DequeueByPath

func (q *TreeQueue) DequeueByPath(childPath QueuePath) any

DequeueByPath selects a child node by a given relative child path and calls Dequeue on the node.

While the child node will recursively clean up its own empty children during dequeue, nodes cannot delete themselves; DequeueByPath cleans up the child node as well if it is empty. This maintains structural guarantees relied on to make IsEmpty() non-recursive.

childPath is relative to the receiver node; pass a zero-length path to refer to the node itself.

func (*TreeQueue) EnqueueBackByPath

func (q *TreeQueue) EnqueueBackByPath(childPath QueuePath, v any) error

EnqueueBackByPath enqueues an item in the back of the local queue of the node located at a given path through the tree; nodes for the path are created as needed.

childPath must be relative to the receiver node; providing a QueuePath beginning with the receiver/parent node name will create a child node of the same name as the parent.

func (*TreeQueue) EnqueueFrontByPath

func (q *TreeQueue) EnqueueFrontByPath(childPath QueuePath, v any) error

EnqueueFrontByPath enqueues an item in the front of the local queue of the node located at a given path through the tree; nodes for the path are created as needed.

Max queue length check is skipped; enqueueing to the front is intended only for items which were first enqueued to the back and then dequeued after reaching the front.

Re-enqueueing to the front is intended for cases where a queue consumer fails to complete operations on the dequeued item, but failure is not yet final, and the operations should be retried by a subsequent queue consumer.

childPath must be relative to the receiver node; providing a QueuePath beginning with the receiver/parent node name will create a child node of the same name as the parent.

func (*TreeQueue) IsEmpty

func (q *TreeQueue) IsEmpty() bool

func (*TreeQueue) ItemCount

func (q *TreeQueue) ItemCount() int

ItemCount counts the queue items in the TreeQueue node and in all its children, recursively.

func (*TreeQueue) LocalQueueLen

func (q *TreeQueue) LocalQueueLen() int

func (*TreeQueue) NodeCount

func (q *TreeQueue) NodeCount() int

NodeCount counts the TreeQueue node and all its children, recursively.

type UserIndex

type UserIndex struct {
	// contains filtered or unexported fields
}

UserIndex is opaque type that allows to resume iteration over users between successive calls of RequestQueue.GetNextRequestForQuerier method.

func FirstUser

func FirstUser() UserIndex

FirstUser returns UserIndex that starts iteration over user queues from the very first user.

func (UserIndex) ReuseLastUser

func (ui UserIndex) ReuseLastUser() UserIndex

ReuseLastUser modifies index to start iteration on the same user, for which last queue was returned.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL