service

package
v0.0.0-...-0f49ba3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PROMETHEUS_METRICS_NAME    = "__name__"
	PROMETHEUS_NATIVE_TAG_NAME = "tag"
	PROMETHEUS_TIME_COLUMNS    = "timestamp"
	PROMETHEUS_METRIC_VALUE    = "value"
	ENUM_TAG_SUFFIX            = "_enum"

	FUNCTION_TOPK    = "topk"
	FUNCTION_BOTTOMK = "bottomk"

	PROMETHEUS_WINDOW_FIRST_TIME  = "_first_timestamp"
	PROMETHEUS_WINDOW_FIRST_VALUE = "_first_value"
	PROMETHEUS_WINDOW_LAST_TIME   = "_last_timestamp"
)
View Source
const (
	EXT_METRICS_TABLE         = "metrics"
	PROMETHEUS_TABLE          = "samples"
	L4_FLOW_LOG_TABLE         = "l4_flow_log"
	L7_FLOW_LOG_TABLE         = "l7_flow_log"
	VTAP_APP_PORT_TABLE       = "vtap_app_port"
	VTAP_FLOW_PORT_TABLE      = "vtap_flow_port"
	VTAP_APP_EDGE_PORT_TABLE  = "vtap_app_edge_port"
	VTAP_FLOW_EDGE_PORT_TABLE = "vtap_flow_edge_port"
)
View Source
const (
	TIME_INDEX int = iota
	TAG_INDEX
	METRICS_INDEX
	LABELS_INDEX
	WINDOW_FIRST_TIME_INDEX
	WINDOW_FIRST_VALUE_INDEX
	WINDOW_LAST_TIME_INDEX
)

indexes to column indexes

View Source
const (
	LABEL_NAME_METRICS      = "__name__"
	DB_NAME_EXT_METRICS     = "ext_metrics"
	DB_NAME_DEEPFLOW_SYSTEM = "deepflow_system"
	DB_NAME_FLOW_METRICS    = "flow_metrics"
	TABLE_NAME_METRICS      = "metrics"
	TABLE_NAME_L7_FLOW_LOG  = "l7_flow_log"
	TABLE_NAME_SAMPLES      = "samples"
	METRICS_CATEGORY_TAG    = "Tag"
)
View Source
const (
	// map tag will be extract in other tags
	// e.g.: tag `k8s.label` will extract in tag `k8s.label.app` (or other)
	IGNORABLE_TAG_TYPE = "map"
)

Variables

View Source
var QueryFuncCall = map[string]QueryFunc{

	"avg_over_time":   simpleCallMatrixFunc("avg_over_time", "AAvg"),
	"count_over_time": simpleSelectMatrix("count_over_time", "Count(row)"),
	"last_over_time":  simpleCallMatrixFunc("last_over_time", "Last"),
	"max_over_time":   simpleCallMatrixFunc("max_over_time", "Max"),
	"min_over_time": func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string) {

		*query = append(*query, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))
		*group = append(*group, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))

		if queryType == model.Instant {
			resetQueryInterval(query, 1, 0)
			*query = append(*query, fmt.Sprintf("%s(%s)", "Min", metric))
		} else if queryType == model.Range {
			interval := getRangeInterval(req, "min_over_time")

			resetQueryInterval(query, interval/1e3, getRangeOffset(req, interval)/1e3)
			*query = append(*query, fmt.Sprintf("Percentile(%s, 0)", metric))
		}
	},
	"stddev_over_time": func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string) {
		*query = append(*query, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))
		*group = append(*group, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))

		if queryType == model.Instant {
			if len(*query) > 0 {
				(*query)[0] = fmt.Sprintf("%d AS %s", req.GetEnd()/1e3, PROMETHEUS_TIME_COLUMNS)
			} else {
				*query = append(*query, fmt.Sprintf("%d AS %s", req.GetEnd()/1e3, PROMETHEUS_TIME_COLUMNS))
			}
			*query = append(*query, fmt.Sprintf("%s(%s)", "Stddev", metric))
		} else if queryType == model.Range {

			if len(*query) > 0 {
				(*query)[0] = fmt.Sprintf("toUnixTimestamp(time) AS %s", PROMETHEUS_TIME_COLUMNS)
			} else {
				*query = append(*query, fmt.Sprintf("toUnixTimestamp(time) AS %s", PROMETHEUS_TIME_COLUMNS))
			}
			*query = append(*query, fmt.Sprintf("%s(%s)", "Last", metric))
		}
	},
	"sum_over_time":     simpleCallMatrixFunc("sum_over_time", "Sum"),
	"present_over_time": simpleSelectMatrix("present_over_time", "1"),
	"quantile_over_time": func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string) {
		*group = append(*group, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))
		*query = append(*query, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))

		if queryType == model.Range {
			interval := getRangeInterval(req, "quantile_over_time")
			resetQueryInterval(query, interval/1e3, getRangeOffset(req, interval)/1e3)
		}

		quantile_param := req.GetFuncParam("quantile_over_time")
		*query = append(*query, fmt.Sprintf("%s(%s, %g)", "Percentile", metric, quantile_param))
	},

	"sum": simpleCallFunc("sum", "Sum"),
	"min": func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string) {
		resetQueryInterval(query, 1, 0)
		*query = append(*query, fmt.Sprintf("%s as %s", _prometheus_tag_key, model.PROMETHEUS_LABELS_INDEX))
		*query = append(*query, fmt.Sprintf("%s(%s)", "Min", metric))
		*group = append(*group, model.PROMETHEUS_LABELS_INDEX)

		for _, tag := range req.GetGrouping("min") {
			*group = append(*group, handleLabelsMatch(tag))
		}
	},
	"max":          simpleCallFunc("max", "Max"),
	"avg":          simpleCallFunc("avg", "AAvg"),
	"stddev":       nil,
	"group":        simpleSelection("group", "1"),
	"count":        simpleSelection("count", "Count(row)"),
	"count_values": simpleCallFunc("count_values", "Last"),

	"topk": func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string) {
		*group = append(*group, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))
		*query = append(*query, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))
		*query = append(*query, fmt.Sprintf("Max(%s)", metric))

	},

	"bottomk": nil,

	"quantile": func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string) {
		*group = append(*group, model.PROMETHEUS_LABELS_INDEX)
		*query = append(*query, fmt.Sprintf("%s as %s", _prometheus_tag_key, model.PROMETHEUS_LABELS_INDEX))

		quantile_param := req.GetFuncParam("quantile")
		*query = append(*query, fmt.Sprintf("Percentile(%s, %g)", metric, quantile_param))
		for _, tag := range req.GetGrouping("quantile") {
			*group = append(*group, handleLabelsMatch(tag))
		}
	},

	"idelta":   nil,
	"delta":    nil,
	"increase": offloadRate("increase"),
	"irate": func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string) {
		if queryType == model.Range {

			resetQueryInterval(query, int64(min_interval.Seconds()), getRangeOffset(req, min_interval.Milliseconds())/1e3)
		}

		*group = append(*group, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))
		*query = append(*query, fmt.Sprintf("`%s`", PROMETHEUS_NATIVE_TAG_NAME))

		*query = append(*query, fmt.Sprintf("Derivative(%s,%s)", metric, PROMETHEUS_NATIVE_TAG_NAME))
	},
	"rate": offloadRate("rate"),
}

NOTICE: query `value` should be LAST index of `query`, it will append `query` as value outside of QueryFuncCall

Functions

func NewPrometheusExecutor

func NewPrometheusExecutor(delta time.Duration) *prometheusExecutor

Types

type CtxKeyShowTag

type CtxKeyShowTag struct{}

define `showtag` flag, it passed when and only api/v1/series been called

type OffloadQuerier

type OffloadQuerier struct {
	// contains filtered or unexported fields
}

func (*OffloadQuerier) Close

func (q *OffloadQuerier) Close() error

func (*OffloadQuerier) LabelNames

func (o *OffloadQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error)

func (*OffloadQuerier) LabelValues

func (o *OffloadQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error)

func (*OffloadQuerier) Select

func (o *OffloadQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet

type OffloadQuerierable

type OffloadQuerierable struct {
	// contains filtered or unexported fields
}

func NewOffloadQueriable

func NewOffloadQueriable(args *model.PromQueryParams, opts ...OffloadQuerierableOpts) *OffloadQuerierable

func (*OffloadQuerierable) AfterQueryExec

func (o *OffloadQuerierable) AfterQueryExec(promql.Query)

func (*OffloadQuerierable) BindSelectedCallBack

func (o *OffloadQuerierable) BindSelectedCallBack(q promql.Query)

func (*OffloadQuerierable) GetSQLQuery

func (o *OffloadQuerierable) GetSQLQuery() []model.PromQueryStats

func (*OffloadQuerierable) Querier

func (o *OffloadQuerierable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error)

type OffloadQuerierableOpts

type OffloadQuerierableOpts func(*OffloadQuerierable)

func WithKeyGenerator

func WithKeyGenerator(generator func(model.QueryRequest) string) OffloadQuerierableOpts

func WithPrometheuReader

func WithPrometheuReader(reader *prometheusReader) OffloadQuerierableOpts

func WithQueryRequests

func WithQueryRequests(queryReq []model.QueryRequest) OffloadQuerierableOpts

func WithQueryType

func WithQueryType(queryType model.QueryType) OffloadQuerierableOpts

type PrometheusService

type PrometheusService struct {

	// prometheus query rate limit
	QPSLeakyBucket *datastructure.LeakyBucket
	// contains filtered or unexported fields
}

func NewPrometheusService

func NewPrometheusService() *PrometheusService

func (*PrometheusService) FormatData

func (s *PrometheusService) FormatData(data *model.PromQueryWrapper) *bytes.Buffer

func (*PrometheusService) PromInstantQueryService

func (s *PrometheusService) PromInstantQueryService(args *model.PromQueryParams, ctx context.Context) (*model.PromQueryResponse, error)

func (*PrometheusService) PromLabelValuesService

func (s *PrometheusService) PromLabelValuesService(args *model.PromMetaParams, ctx context.Context) (*model.PromQueryResponse, error)

func (*PrometheusService) PromQLAdapter

func (*PrometheusService) PromQLAnalysis

func (s *PrometheusService) PromQLAnalysis(ctx context.Context, metric string, targetLabels []string, appLabels []string, startTime string, endTime string) (*common.Result, error)

func (*PrometheusService) PromQLParse

func (s *PrometheusService) PromQLParse(query string) (*model.PromQueryWrapper, error)

func (*PrometheusService) PromQLParseFilter

func (s *PrometheusService) PromQLParseFilter(query string, filters map[string]string) (*model.PromQueryWrapper, error)

func (*PrometheusService) PromRangeQueryService

func (s *PrometheusService) PromRangeQueryService(args *model.PromQueryParams, ctx context.Context) (*model.PromQueryResponse, error)

func (*PrometheusService) PromRemoteReadService

func (s *PrometheusService) PromRemoteReadService(req *prompb.ReadRequest, ctx context.Context, offloading bool) (resp *prompb.ReadResponse, err error)

func (*PrometheusService) PromSeriesQueryService

func (s *PrometheusService) PromSeriesQueryService(args *model.PromQueryParams, ctx context.Context) (*model.PromQueryResponse, error)

type QueryFunc

type QueryFunc func(metric string, query, order, group *[]string, req model.QueryRequest, queryType model.QueryType, handleLabelsMatch func(string) string)

type QueryHint

type QueryHint struct {
	// contains filtered or unexported fields
}

use QueryHint to get multiple `func` in promql query

func (*QueryHint) GetBy

func (q *QueryHint) GetBy() bool

func (*QueryHint) GetEnd

func (q *QueryHint) GetEnd() int64

func (*QueryHint) GetFunc

func (q *QueryHint) GetFunc() []string

func (*QueryHint) GetFuncParam

func (q *QueryHint) GetFuncParam(f string) float64

func (*QueryHint) GetGrouping

func (q *QueryHint) GetGrouping(f string) []string

func (*QueryHint) GetLabels

func (q *QueryHint) GetLabels() []*labels.Matcher

func (*QueryHint) GetMetric

func (q *QueryHint) GetMetric() string

func (*QueryHint) GetQuery

func (q *QueryHint) GetQuery() string

func (*QueryHint) GetRange

func (q *QueryHint) GetRange(f string) int64

func (*QueryHint) GetStart

func (q *QueryHint) GetStart() int64

func (*QueryHint) GetStep

func (q *QueryHint) GetStep() int64

func (*QueryHint) GetSubStep

func (q *QueryHint) GetSubStep(f string) int64

type RemoteReadQuerier

type RemoteReadQuerier struct {
	Args        *model.PromQueryParams
	Ctx         context.Context
	Querierable *RemoteReadQuerierable
	// contains filtered or unexported fields
}

func (*RemoteReadQuerier) Close

func (q *RemoteReadQuerier) Close() error

func (*RemoteReadQuerier) LabelNames

func (q *RemoteReadQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error)

func (*RemoteReadQuerier) LabelValues

func (q *RemoteReadQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error)

func (*RemoteReadQuerier) Select

func (q *RemoteReadQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet

type RemoteReadQuerierable

type RemoteReadQuerierable struct {
	Args *model.PromQueryParams
	Ctx  context.Context
	// contains filtered or unexported fields
}

func (*RemoteReadQuerierable) AfterQueryExec

func (q *RemoteReadQuerierable) AfterQueryExec(qry promql.Query)

func (*RemoteReadQuerierable) BindSelectedCallBack

func (q *RemoteReadQuerierable) BindSelectedCallBack(qry promql.Query)

func (*RemoteReadQuerierable) GetSQLQuery

func (q *RemoteReadQuerierable) GetSQLQuery() []model.PromQueryStats

func (*RemoteReadQuerierable) Querier

func (q *RemoteReadQuerierable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL