Documentation ¶
Index ¶
- Constants
- Variables
- func ErrorResponse(msg string, errCode string) []byte
- func GetGzipReader(r io.Reader) (*gzip.Reader, error)
- func GetMSByScrollID(id string) (int64, error)
- func IncQuerySkippingError(err error) bool
- func NewResponseLogger(w http.ResponseWriter) http.ResponseWriter
- func ParseCredentials(r *http.Request) (*credentials, error)
- func Points2Rows(points []models.Point) ([]influx.Row, error)
- func PutGzipReader(zr *gzip.Reader)
- func QuerySkippingError(err string) bool
- func ReadRequestToInfluxQuery(req *prompb.ReadRequest) (string, error)
- func TagsConverterRemoveInfluxSystemTag(tags map[string]string) models.Tags
- func TransYaccSyntaxErr(errorInfo string) string
- func ValidateLogStream(streamName string) error
- func ValidateRepoAndLogStream(repoName, streamName string) error
- func ValidateRepository(repoName string) error
- type AuthenticationMethod
- type Fragment
- type GzipWriterPool
- type Handler
- func (h *Handler) AddRoutes(routes ...Route)
- func (h *Handler) Close()
- func (h *Handler) IsWriteNode() bool
- func (h *Handler) Open()
- func (h *Handler) ResponseAggLogQuery(w http.ResponseWriter, para *QueryParam, now time.Time, hist []Histograms, ...) (b []byte, finished bool)
- func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (h *Handler) ValidateAndCheckLogStreamExists(repoName, streamName string) error
- type HighlightFragment
- type Histograms
- type JsonMapping
- type LogDataType
- type LogResponse
- type LogWriteRequest
- type QueryAggRequest
- type QueryLogAggResponse
- type QueryLogAggResponseEntry
- func (e *QueryLogAggResponseEntry) GetKey() string
- func (e *QueryLogAggResponseEntry) GetTime() time.Time
- func (e *QueryLogAggResponseEntry) GetValue() interface{}
- func (e *QueryLogAggResponseEntry) SetTime(time time.Time)
- func (e *QueryLogAggResponseEntry) SetValue(value interface{})
- func (e *QueryLogAggResponseEntry) Size() int64
- type QueryLogAnalyticsResponse
- type QueryLogRequest
- type QueryLogResponse
- type QueryParam
- type Response
- type ResponseWriter
- type Route
- type Service
- type SubscriberManager
- type Throttler
- type TimeRange
Constants ¶
const ( // DefaultChunkSize specifies the maximum number of points that will // be read before sending results back to the engine. // // This has no relation to the number of bytes that are returned. DefaultChunkSize = 10000 MaxChunkSize = DefaultChunkSize * 50 DefaultInnerChunkSize = 1024 MaxInnerChunkSize = 4096 DefaultDebugRequestsInterval = 10 * time.Second MaxDebugRequestsInterval = 6 * time.Hour )
const ( // LogReqErr default error LogReqErr = "CSSOP.00050001" LogRetryTag = "symbol:repeatLog" )
const ( MaxTtl int64 = 3000 PermanentSaveTtl int64 = 3650 ScannerBufferSize int = 10 * 1024 * 1024 MaxRequestBodyLength int64 = 100 * 1024 * 1024 UnixTimestampMaxMs int64 = 4102416000000 UnixTimestampMinMs int64 = 1e10 NewlineLen int64 = 1 TagsSplitterChar = byte(6) MaxRowLen = 3500 DefaultAggLogQueryTimeout = 500 IncAggLogQueryRetryCount int = 3 DefaultMaxLogStoreAnalyzeResponseNum = 100 MaxSplitCharLen int = 128 )
const ( JSON LogDataType = iota JSONV2 TAGS = "tags" Tag = "tag" CONTENT = "content" TIME = "time" )
const ( EmptyValue = "" Reverse = "reverse" TimeoutMs = "timeout_ms" Explain = "explain" IsTruncate = "is_truncate" From = "from" To = "to" Scroll = "scroll" ScrollId = "scroll_id" Limit = "limit" Highlight = "highlight" Sql = "sql" Select = "select" Query = "query" Https = "https" Http = "Http" )
Query parameter
const ( ErrSyntax = "syntax error" ErrParsingQuery = "error parsing query" ErrShardGroupNotFound = "log shard group not found" ErrShardNotFound = "shard not found" )
Err substring
const ( Timestamp = "timestamp" Cursor = "cursor" IsOverflow = "is_overflow" )
Query result fields
const ( DefaultLogLimit = 10 MaxLogLimit = 1000 MinLogLimit = 0 MaxQueryLen = 2048 DefaultLogQueryTimeout = 10000 MaxTimeoutMs = 60000 MinTimeoutMs = 1000 MaxToValue = 9223372036854775807 MinFromValue = 0 MaxScrollIdLen = 400 MinScrollIdLen = 10 )
const ( Repository = ":repository" LogStream = ":logStream" Complete = "Complete" InComplete = "InComplete" XContentLength = "X-Content-Length" LogProxy = "Log-Proxy" Group = "group" Count = "count" )
URL query parameter
const ( IncIterNumCacheSize int64 = 1 * 1024 * 1024 QueryMetaCacheTTL = 10 * time.Minute QueryLogAggResponseEntrySize = 343 )
Variables ¶
var ( ErrLogRepoEmpty = errors.New("repository name should not be none") ErrLogStreamEmpty = errors.New("logstream name should not be none") ErrLogStreamDeleted = errors.New("logstrem being deleted") ErrLogStreamInvalid = errors.New("logstrem invalid in retentionPolicy") ErrInvalidRepoName = errors.New("invalid repository name") ErrInvalidLogStreamName = errors.New("invalid logstream name") ErrInvalidWriteNode = errors.New("this data node is not used for writing") )
bad req
var ( // ErrBearerAuthDisabled is returned when client specifies bearer auth in // a request but bearer auth is disabled. ErrBearerAuthDisabled = errors.New("bearer auth disabld") )
var (
LogMax = 1000
)
var QueryAggResultCache = cache.NewCache(IncIterNumCacheSize, QueryMetaCacheTTL)
Functions ¶
func ErrorResponse ¶
func GetMSByScrollID ¶
func IncQuerySkippingError ¶
func NewResponseLogger ¶
func NewResponseLogger(w http.ResponseWriter) http.ResponseWriter
func ParseCredentials ¶
parseCredentials parses a request and returns the authentication credentials. The credentials may be present as URL query params, or as a Basic Authentication header. As params: http://127.0.0.1/query?u=username&p=password As basic auth: http://username:password@127.0.0.1 As Bearer token in Authorization header: Bearer <JWT_TOKEN_BLOB> As Token in Authorization header: Token <username:password>
func PutGzipReader ¶
PutGzipReader returns back gzip reader obtained via GetGzipReader.
func QuerySkippingError ¶
func ReadRequestToInfluxQuery ¶
func ReadRequestToInfluxQuery(req *prompb.ReadRequest) (string, error)
func TransYaccSyntaxErr ¶
func ValidateLogStream ¶
func ValidateRepository ¶
Types ¶
type AuthenticationMethod ¶
type AuthenticationMethod int
AuthenticationMethod defines the type of authentication used.
const ( // Authenticate using basic authentication. UserAuthentication AuthenticationMethod = iota // Authenticate with jwt. BearerAuthentication )
Supported authentication methods.
type GzipWriterPool ¶
type GzipWriterPool struct {
// contains filtered or unexported fields
}
func NewGzipWriterPool ¶
func NewGzipWriterPool() *GzipWriterPool
func (*GzipWriterPool) Get ¶
func (p *GzipWriterPool) Get() *gzip.Writer
func (*GzipWriterPool) Put ¶
func (p *GzipWriterPool) Put(gz *gzip.Writer)
type Handler ¶
type Handler struct { Version string BuildType string MetaClient interface { Database(name string) (*meta2.DatabaseInfo, error) Authenticate(username, password string) (ui meta2.User, err error) User(username string) (meta2.User, error) AdminUserExists() bool ShowShards() models.Rows TagArrayEnabled(db string) bool DataNode(id uint64) (*meta2.DataNode, error) DataNodes() ([]meta2.DataNode, error) CreateDatabase(name string, enableTagArray bool, replicaN uint32, options *obs.ObsOptions) (*meta2.DatabaseInfo, error) Databases() map[string]*meta2.DatabaseInfo MarkDatabaseDelete(name string) error Measurements(database string, ms influxql.Measurements) ([]string, error) CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error) RetentionPolicy(database, name string) (rpi *meta2.RetentionPolicyInfo, err error) MarkRetentionPolicyDelete(database, name string) error CreateMeasurement(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo, indexR *influxql.IndexRelation, engineType config2.EngineType, colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error) UpdateMeasurement(db, rp, mst string, options *meta2.Options) error GetShardGroupByTimeRange(repoName, streamName string, min, max time.Time) ([]*meta2.ShardGroupInfo, error) } QueryAuthorizer interface { AuthorizeQuery(u meta2.User, query *influxql.Query, database string) error } WriteAuthorizer interface { AuthorizeWrite(username, database string) error } ExtSysCtrl interface { SendSysCtrlOnNode(nodID uint64, req netstorage.SysCtrlRequest) (map[string]string, error) } QueryExecutor *query2.Executor Monitor interface { } PointsWriter interface { RetryWritePointRows(database, retentionPolicy string, points []influx.Row) error } RecordWriter interface { RetryWriteLogRecord(database, retentionPolicy, measurement string, rec *record.Record) error } SubscriberManager Config *config.Config Logger *logger.Logger CLFLogger *zap.Logger StatisticsPusher *statisticsPusher.StatisticsPusher // contains filtered or unexported fields }
Handler represents an HTTP handler for the InfluxDB server.
func NewHandler ¶
NewHandler returns a new instance of handler with routes.
func (*Handler) IsWriteNode ¶
func (*Handler) ResponseAggLogQuery ¶
func (h *Handler) ResponseAggLogQuery(w http.ResponseWriter, para *QueryParam, now time.Time, hist []Histograms, count int64) (b []byte, finished bool)
func (*Handler) ServeHTTP ¶
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP responds to HTTP request to the handler.
func (*Handler) ValidateAndCheckLogStreamExists ¶
type HighlightFragment ¶
type Histograms ¶
func GenZeroHistogram ¶
func GenZeroHistogram(opt query2.ProcessorOptions, start, end int64, ascending bool) []Histograms
type JsonMapping ¶
type JsonMapping struct {
// contains filtered or unexported fields
}
type LogDataType ¶
type LogDataType uint8
type LogResponse ¶
type LogWriteRequest ¶
type LogWriteRequest struct {
// contains filtered or unexported fields
}
type QueryAggRequest ¶
type QueryAggRequest struct { Query string `json:"query,omitempty"` Timeout int `json:"timeout_ms,omitempty"` From int64 `json:"from,omitempty"` To int64 `json:"to,omitempty"` Scroll string `json:"scroll,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Sql bool `json:"sql,omitempty"` IncQuery bool Explain bool `json:"explain,omitempty"` }
type QueryLogAggResponse ¶
type QueryLogAggResponse struct { Success bool `json:"success,omitempty"` Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` Request_id string `json:"request_id,omitempty"` Count int64 `json:"total_size"` Progress string `json:"progress,omitempty"` Histograms []Histograms `json:"histograms,omitempty"` Took_ms int64 `json:"took_ms,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Explain string `json:"explain,omitempty"` }
type QueryLogAggResponseEntry ¶
type QueryLogAggResponseEntry struct {
// contains filtered or unexported fields
}
func NewQueryLogAggResponse ¶
func NewQueryLogAggResponse(query string) *QueryLogAggResponseEntry
func (*QueryLogAggResponseEntry) GetKey ¶
func (e *QueryLogAggResponseEntry) GetKey() string
func (*QueryLogAggResponseEntry) GetTime ¶
func (e *QueryLogAggResponseEntry) GetTime() time.Time
func (*QueryLogAggResponseEntry) GetValue ¶
func (e *QueryLogAggResponseEntry) GetValue() interface{}
func (*QueryLogAggResponseEntry) SetTime ¶
func (e *QueryLogAggResponseEntry) SetTime(time time.Time)
func (*QueryLogAggResponseEntry) SetValue ¶
func (e *QueryLogAggResponseEntry) SetValue(value interface{})
func (*QueryLogAggResponseEntry) Size ¶
func (e *QueryLogAggResponseEntry) Size() int64
type QueryLogAnalyticsResponse ¶
type QueryLogAnalyticsResponse struct { Success bool `json:"success,omitempty"` Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` Request_id string `json:"request_id,omitempty"` Count int64 `json:"total_size"` Progress string `json:"progress,omitempty"` Took_ms int64 `json:"took_ms,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` GroupInfo []string `json:"groupInfo,omitempty"` Dataset [][]string `json:"dataset,omitempty"` }
type QueryLogRequest ¶
type QueryLogRequest struct { Explain bool `json:"explain,omitempty"` Query string `json:"query,omitempty"` Reverse bool `json:"reverse,omitempty"` Timeout int `json:"timeout_ms,omitempty"` From int64 `json:"from,omitempty"` To int64 `json:"to,omitempty"` Scroll string `json:"scroll,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Limit int `json:"limit,omitempty"` Highlight bool `json:"highlight,omitempty"` Sql bool `json:"sql,omitempty"` IsTruncate bool `json:"is_truncate,omitempty"` }
type QueryLogResponse ¶
type QueryLogResponse struct { Success bool `json:"success,omitempty"` Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` Request_id string `json:"request_id,omitempty"` Count int64 `json:"count,omitempty"` Progress string `json:"progress,omitempty"` Logs []map[string]interface{} `json:"logs,omitempty"` Took_ms int64 `json:"took_ms,omitempty"` Cursor_time int64 `json:"cursor_time,omitempty"` Complete_progress float64 `json:"complete_progress,omitempty"` Scroll_id string `json:"scroll_id,omitempty"` Explain string `json:"explain,omitempty"` }
type QueryParam ¶
type QueryParam struct { Ascending bool Explain bool Highlight bool IncQuery bool Truncate bool IterID int32 Timeout int Limit int SeqID int64 Process float64 Scroll string Query string Scroll_id string // QueryID-IterID QueryID string TimeRange TimeRange GroupBytInterval time.Duration }
func NewQueryPara ¶
func NewQueryParam ¶
func NewQueryParam(queryPara interface{}) *QueryParam
type Response ¶
Response represents a list of statement results.
func (*Response) Error ¶
Error returns the first error from any statement. Returns nil if no errors occurred on any statements.
func (Response) MarshalJSON ¶
MarshalJSON encodes a Response struct into JSON.
func (*Response) UnmarshalJSON ¶
UnmarshalJSON decodes the data into the Response struct.
type ResponseWriter ¶
type ResponseWriter interface { // WriteResponse writes a response. WriteResponse(resp Response) (int, error) http.ResponseWriter }
ResponseWriter is an interface for writing a response.
func NewResponseWriter ¶
func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter
NewResponseWriter creates a new ResponseWriter based on the Accept header in the request that wraps the ResponseWriter.
type Route ¶
type Route struct { Name string Method string Pattern string Gzipped bool LoggingEnabled bool HandlerFunc interface{} }
Route specifies how to handle a HTTP verb for a given endpoint.
type Service ¶
type Service struct { Ln []net.Listener Handler *Handler Logger *zap.Logger // contains filtered or unexported fields }
Service manages the listener and handler for an HTTP endpoint.
func NewService ¶
NewService returns a new instance of Service.
func (*Service) Addr ¶
Addr returns the listener's address. Returns nil if listener is closed. test func, so return 0 index addr
func (*Service) BoundHTTPAddr ¶
BoundHTTPAddr returns the string version of the address that the HTTP server is listening on. This is useful if you start an ephemeral server in test with bind address localhost:0. test func, so return 0 index addr
func (*Service) Openlistener ¶
type SubscriberManager ¶
type Throttler ¶
type Throttler struct { // Maximum amount of time requests can wait in queue. // Must be set before adding middleware. EnqueueTimeout time.Duration Logger *zap.Logger // contains filtered or unexported fields }
Throttler represents an HTTP throttler that limits the number of concurrent requests being processed as well as the number of enqueued requests.
func NewThrottler ¶
NewThrottler returns a new instance of Throttler that limits to concurrentN. requests processed at a time and maxEnqueueN requests waiting to be processed.