Documentation ¶
Index ¶
- Variables
- type QuerierID
- type QueueIndex
- type QueuePath
- type Request
- type RequestQueue
- func (q *RequestQueue) EnqueueRequestToDispatcher(tenantID string, req Request, maxQueriers int, successFn func()) error
- func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
- func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error)
- func (q *RequestQueue) NotifyQuerierShutdown(querierID string)
- func (q *RequestQueue) RegisterQuerierConnection(querierID string)
- func (q *RequestQueue) UnregisterQuerierConnection(querierID string)
- type SchedulerRequest
- type TenantID
- type TreeQueue
- func (q *TreeQueue) Dequeue() any
- func (q *TreeQueue) DequeueByPath(childPath QueuePath) any
- func (q *TreeQueue) EnqueueBackByPath(childPath QueuePath, v any) error
- func (q *TreeQueue) EnqueueFrontByPath(childPath QueuePath, v any) error
- func (q *TreeQueue) IsEmpty() bool
- func (q *TreeQueue) ItemCount() int
- func (q *TreeQueue) LocalQueueLen() int
- func (q *TreeQueue) NodeCount() int
- type UserIndex
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type QueueIndex ¶
type QueueIndex int //nolint:revive // disallows types beginning with package name
type QueuePath ¶
type QueuePath []string //nolint:revive // disallows types beginning with package name
type RequestQueue ¶
RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers, and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests in a fair fashion.
func NewRequestQueue ¶
func NewRequestQueue( log log.Logger, maxOutstandingPerTenant int, additionalQueueDimensionsEnabled bool, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, enqueueDuration prometheus.Histogram, ) *RequestQueue
func (*RequestQueue) EnqueueRequestToDispatcher ¶
func (q *RequestQueue) EnqueueRequestToDispatcher(tenantID string, req Request, maxQueriers int, successFn func()) error
EnqueueRequestToDispatcher handles a request from the query frontend and submits it to the initial dispatcher queue
maxQueries is tenant-specific value to compute which queriers should handle requests for this tenant. It is passed to each EnqueueRequestToDispatcher, because it can change between calls.
If request is successfully enqueued, successFn is called before any querier can receive the request.
func (*RequestQueue) GetConnectedQuerierWorkersMetric ¶
func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
func (*RequestQueue) GetNextRequestForQuerier ¶
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error)
GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests. By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly. If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.
func (*RequestQueue) NotifyQuerierShutdown ¶
func (q *RequestQueue) NotifyQuerierShutdown(querierID string)
func (*RequestQueue) RegisterQuerierConnection ¶
func (q *RequestQueue) RegisterQuerierConnection(querierID string)
func (*RequestQueue) UnregisterQuerierConnection ¶
func (q *RequestQueue) UnregisterQuerierConnection(querierID string)
type SchedulerRequest ¶
type SchedulerRequest struct { FrontendAddress string UserID string QueryID uint64 Request *httpgrpc.HTTPRequest StatsEnabled bool AdditionalQueueDimensions []string EnqueueTime time.Time Ctx context.Context CancelFunc context.CancelCauseFunc QueueSpan opentracing.Span // This is only used for testing. ParentSpanContext opentracing.SpanContext }
type TreeQueue ¶
type TreeQueue struct {
// contains filtered or unexported fields
}
TreeQueue is a hierarchical queue implementation with an arbitrary amount of child queues.
TreeQueue internally maintains round-robin fair queuing across all of its queue dimensions. Each queuing dimension is modeled as a node in the tree, internally reachable through a QueuePath.
The QueuePath is an ordered array of strings describing the path through the tree to the node, which contains the FIFO local queue of all items enqueued for that queuing dimension.
When dequeuing from a given node, the node will round-robin equally between dequeuing directly from its own local queue and dequeuing recursively from its list of child TreeQueues. No queue at a given level of the tree is dequeued from consecutively unless all others at the same level of the tree are empty down to the leaf node.
func NewTreeQueue ¶
func (*TreeQueue) Dequeue ¶
Dequeue removes and returns an item from the front of the next nonempty queue node in the tree.
Dequeuing from a node follows the round-robin order of the node's childQueueOrder, dequeuing either from the node's localQueue or selecting the next child node in the order and recursively calling Dequeue on the child nodes until a nonempty queue is found.
Nodes that empty down to the leaf after being dequeued from are deleted as the recursion returns up the stack. This maintains structural guarantees relied on to make IsEmpty() non-recursive.
func (*TreeQueue) DequeueByPath ¶
DequeueByPath selects a child node by a given relative child path and calls Dequeue on the node.
While the child node will recursively clean up its own empty children during dequeue, nodes cannot delete themselves; DequeueByPath cleans up the child node as well if it is empty. This maintains structural guarantees relied on to make IsEmpty() non-recursive.
childPath is relative to the receiver node; pass a zero-length path to refer to the node itself.
func (*TreeQueue) EnqueueBackByPath ¶
EnqueueBackByPath enqueues an item in the back of the local queue of the node located at a given path through the tree; nodes for the path are created as needed.
childPath must be relative to the receiver node; providing a QueuePath beginning with the receiver/parent node name will create a child node of the same name as the parent.
func (*TreeQueue) EnqueueFrontByPath ¶
EnqueueFrontByPath enqueues an item in the front of the local queue of the node located at a given path through the tree; nodes for the path are created as needed.
Max queue length check is skipped; enqueueing to the front is intended only for items which were first enqueued to the back and then dequeued after reaching the front.
Re-enqueueing to the front is intended for cases where a queue consumer fails to complete operations on the dequeued item, but failure is not yet final, and the operations should be retried by a subsequent queue consumer.
childPath must be relative to the receiver node; providing a QueuePath beginning with the receiver/parent node name will create a child node of the same name as the parent.
func (*TreeQueue) ItemCount ¶
ItemCount counts the queue items in the TreeQueue node and in all its children, recursively.
func (*TreeQueue) LocalQueueLen ¶
type UserIndex ¶
type UserIndex struct {
// contains filtered or unexported fields
}
UserIndex is opaque type that allows to resume iteration over users between successive calls of RequestQueue.GetNextRequestForQuerier method.
func FirstUser ¶
func FirstUser() UserIndex
FirstUser returns UserIndex that starts iteration over user queues from the very first user.
func (UserIndex) ReuseLastUser ¶
ReuseLastUser modifies index to start iteration on the same user, for which last queue was returned.