httpd

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0, MIT Imports: 82 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultChunkSize specifies the maximum number of points that will
	// be read before sending results back to the engine.
	//
	// This has no relation to the number of bytes that are returned.
	DefaultChunkSize = 10000

	MaxChunkSize = DefaultChunkSize * 50

	DefaultInnerChunkSize = 1024

	MaxInnerChunkSize = 4096

	DefaultDebugRequestsInterval = 10 * time.Second

	MaxDebugRequestsInterval = 6 * time.Hour
)
View Source
const (
	// LogReqErr default error
	LogReqErr = "CSSOP.00050001"

	LogRetryTag = "symbol:repeatLog"
)
View Source
const (
	MaxTtl               int64 = 3000
	PermanentSaveTtl     int64 = 3650
	ScannerBufferSize    int   = 10 * 1024 * 1024
	MaxRequestBodyLength int64 = 100 * 1024 * 1024
	UnixTimestampMaxMs   int64 = 4102416000000
	UnixTimestampMinMs   int64 = 1e10
	NewlineLen           int64 = 1
	TagsSplitterChar           = byte(6)

	MaxRowLen                                = 3500
	DefaultAggLogQueryTimeout                = 500
	IncAggLogQueryRetryCount             int = 3
	DefaultMaxLogStoreAnalyzeResponseNum     = 100

	MaxSplitCharLen int = 128
)
View Source
const (
	JSON LogDataType = iota
	JSONV2

	TAGS    = "tags"
	Tag     = "tag"
	CONTENT = "content"
	TIME    = "time"
)
View Source
const (
	EmptyValue = ""
	Reverse    = "reverse"
	TimeoutMs  = "timeout_ms"
	Explain    = "explain"
	IsTruncate = "is_truncate"
	From       = "from"
	To         = "to"
	Scroll     = "scroll"
	ScrollId   = "scroll_id"
	Limit      = "limit"
	Highlight  = "highlight"
	Sql        = "sql"
	Select     = "select"
	Query      = "query"
	Https      = "https"
	Http       = "Http"
)

Query parameter

View Source
const (
	ErrSyntax             = "syntax error"
	ErrParsingQuery       = "error parsing query"
	ErrShardGroupNotFound = "log shard group not found"
	ErrShardNotFound      = "shard not found"
)

Err substring

View Source
const (
	Timestamp  = "timestamp"
	Cursor     = "cursor"
	IsOverflow = "is_overflow"
)

Query result fields

View Source
const (
	DefaultLogLimit = 10
	MaxLogLimit     = 1000
	MinLogLimit     = 0

	MaxQueryLen            = 2048
	DefaultLogQueryTimeout = 10000

	MaxTimeoutMs = 60000
	MinTimeoutMs = 1000

	MaxToValue   = 9223372036854775807
	MinFromValue = 0

	MaxScrollIdLen = 400
	MinScrollIdLen = 10
)
View Source
const (
	Repository     = ":repository"
	LogStream      = ":logStream"
	Complete       = "Complete"
	InComplete     = "InComplete"
	XContentLength = "X-Content-Length"
	LogProxy       = "Log-Proxy"
	Group          = "group"
	Count          = "count"
)

URL query parameter

View Source
const (
	IncIterNumCacheSize          int64 = 1 * 1024 * 1024
	QueryMetaCacheTTL                  = 10 * time.Minute
	QueryLogAggResponseEntrySize       = 343
)

Variables

View Source
var (
	ErrLogRepoEmpty         = errors.New("repository name should not be none")
	ErrLogStreamEmpty       = errors.New("logstream name should not be none")
	ErrLogStreamDeleted     = errors.New("logstrem being deleted")
	ErrLogStreamInvalid     = errors.New("logstrem invalid in retentionPolicy")
	ErrInvalidRepoName      = errors.New("invalid repository name")
	ErrInvalidLogStreamName = errors.New("invalid logstream name")
	ErrInvalidWriteNode     = errors.New("this data node is not used for writing")
)

bad req

View Source
var (
	// ErrBearerAuthDisabled is returned when client specifies bearer auth in
	// a request but bearer auth is disabled.
	ErrBearerAuthDisabled = errors.New("bearer auth disabld")
)
View Source
var (
	LogMax = 1000
)

Functions

func ErrorResponse

func ErrorResponse(msg string, errCode string) []byte

func GetGzipReader

func GetGzipReader(r io.Reader) (*gzip.Reader, error)

func GetMSByScrollID

func GetMSByScrollID(id string) (int64, error)

func IncQuerySkippingError

func IncQuerySkippingError(err error) bool

func NewResponseLogger

func NewResponseLogger(w http.ResponseWriter) http.ResponseWriter

func ParseCredentials

func ParseCredentials(r *http.Request) (*credentials, error)

parseCredentials parses a request and returns the authentication credentials. The credentials may be present as URL query params, or as a Basic Authentication header. As params: http://127.0.0.1/query?u=username&p=password As basic auth: http://username:password@127.0.0.1 As Bearer token in Authorization header: Bearer <JWT_TOKEN_BLOB> As Token in Authorization header: Token <username:password>

func Points2Rows

func Points2Rows(points []models.Point) ([]influx.Row, error)

func PutGzipReader

func PutGzipReader(zr *gzip.Reader)

PutGzipReader returns back gzip reader obtained via GetGzipReader.

func QuerySkippingError

func QuerySkippingError(err string) bool

func ReadRequestToInfluxQuery

func ReadRequestToInfluxQuery(req *prompb.ReadRequest) (string, error)

func TagsConverterRemoveInfluxSystemTag

func TagsConverterRemoveInfluxSystemTag(tags map[string]string) models.Tags

func TransYaccSyntaxErr

func TransYaccSyntaxErr(errorInfo string) string

func ValidateLogStream

func ValidateLogStream(streamName string) error

func ValidateRepoAndLogStream

func ValidateRepoAndLogStream(repoName, streamName string) error

func ValidateRepository

func ValidateRepository(repoName string) error

Types

type AuthenticationMethod

type AuthenticationMethod int

AuthenticationMethod defines the type of authentication used.

const (
	// Authenticate using basic authentication.
	UserAuthentication AuthenticationMethod = iota

	// Authenticate with jwt.
	BearerAuthentication
)

Supported authentication methods.

type Fragment

type Fragment struct {
	Offset int
	Length int
}

type GzipWriterPool

type GzipWriterPool struct {
	// contains filtered or unexported fields
}

func NewGzipWriterPool

func NewGzipWriterPool() *GzipWriterPool

func (*GzipWriterPool) Get

func (p *GzipWriterPool) Get() *gzip.Writer

func (*GzipWriterPool) Put

func (p *GzipWriterPool) Put(gz *gzip.Writer)

type Handler

type Handler struct {
	Version   string
	BuildType string

	MetaClient interface {
		Database(name string) (*meta2.DatabaseInfo, error)
		Authenticate(username, password string) (ui meta2.User, err error)
		User(username string) (meta2.User, error)
		AdminUserExists() bool
		ShowShards() models.Rows
		TagArrayEnabled(db string) bool
		DataNode(id uint64) (*meta2.DataNode, error)
		DataNodes() ([]meta2.DataNode, error)

		CreateDatabase(name string, enableTagArray bool, replicaN uint32, options *obs.ObsOptions) (*meta2.DatabaseInfo, error)
		Databases() map[string]*meta2.DatabaseInfo
		MarkDatabaseDelete(name string) error
		Measurements(database string, ms influxql.Measurements) ([]string, error)

		CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error)
		RetentionPolicy(database, name string) (rpi *meta2.RetentionPolicyInfo, err error)
		MarkRetentionPolicyDelete(database, name string) error
		CreateMeasurement(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo, indexR *influxql.IndexRelation, engineType config2.EngineType,
			colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error)
		UpdateMeasurement(db, rp, mst string, options *meta2.Options) error
		GetShardGroupByTimeRange(repoName, streamName string, min, max time.Time) ([]*meta2.ShardGroupInfo, error)
	}

	QueryAuthorizer interface {
		AuthorizeQuery(u meta2.User, query *influxql.Query, database string) error
	}

	WriteAuthorizer interface {
		AuthorizeWrite(username, database string) error
	}

	ExtSysCtrl interface {
		SendSysCtrlOnNode(nodID uint64, req netstorage.SysCtrlRequest) (map[string]string, error)
	}

	QueryExecutor *query2.Executor

	Monitor interface {
	}

	PointsWriter interface {
		RetryWritePointRows(database, retentionPolicy string, points []influx.Row) error
	}

	RecordWriter interface {
		RetryWriteLogRecord(database, retentionPolicy, measurement string, rec *record.Record) error
	}

	SubscriberManager

	Config    *config.Config
	Logger    *logger.Logger
	CLFLogger *zap.Logger

	StatisticsPusher *statisticsPusher.StatisticsPusher
	// contains filtered or unexported fields
}

Handler represents an HTTP handler for the InfluxDB server.

func NewHandler

func NewHandler(c config.Config) *Handler

NewHandler returns a new instance of handler with routes.

func (*Handler) AddRoutes

func (h *Handler) AddRoutes(routes ...Route)

AddRoutes sets the provided routes on the handler.

func (*Handler) Close

func (h *Handler) Close()

func (*Handler) IsWriteNode

func (h *Handler) IsWriteNode() bool

func (*Handler) Open

func (h *Handler) Open()

func (*Handler) ResponseAggLogQuery

func (h *Handler) ResponseAggLogQuery(w http.ResponseWriter, para *QueryParam, now time.Time, hist []Histograms, count int64) (b []byte, finished bool)

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP responds to HTTP request to the handler.

func (*Handler) ValidateAndCheckLogStreamExists

func (h *Handler) ValidateAndCheckLogStreamExists(repoName, streamName string) error

type HighlightFragment

type HighlightFragment struct {
	Fragment  string `json:"fragment"`
	Highlight bool   `json:"highlight"`
}

type Histograms

type Histograms struct {
	From  int64 `json:"from"`
	To    int64 `json:"to"`
	Count int64 `json:"count"`
}

func GenZeroHistogram

func GenZeroHistogram(opt query2.ProcessorOptions, start, end int64, ascending bool) []Histograms

type JsonMapping

type JsonMapping struct {
	// contains filtered or unexported fields
}

type LogDataType

type LogDataType uint8

type LogResponse

type LogResponse struct {
	ErrorCode string `json:"error_code"`
	ErrorMsg  string `json:"error_msg"`
}

type LogWriteRequest

type LogWriteRequest struct {
	// contains filtered or unexported fields
}

type QueryAggRequest

type QueryAggRequest struct {
	Query     string `json:"query,omitempty"`
	Timeout   int    `json:"timeout_ms,omitempty"`
	From      int64  `json:"from,omitempty"`
	To        int64  `json:"to,omitempty"`
	Scroll    string `json:"scroll,omitempty"`
	Scroll_id string `json:"scroll_id,omitempty"`
	Sql       bool   `json:"sql,omitempty"`
	IncQuery  bool
	Explain   bool `json:"explain,omitempty"`
}

type QueryLogAggResponse

type QueryLogAggResponse struct {
	Success    bool         `json:"success,omitempty"`
	Code       string       `json:"code,omitempty"`
	Message    string       `json:"message,omitempty"`
	Request_id string       `json:"request_id,omitempty"`
	Count      int64        `json:"total_size"`
	Progress   string       `json:"progress,omitempty"`
	Histograms []Histograms `json:"histograms,omitempty"`
	Took_ms    int64        `json:"took_ms,omitempty"`
	Scroll_id  string       `json:"scroll_id,omitempty"`
	Explain    string       `json:"explain,omitempty"`
}

type QueryLogAggResponseEntry

type QueryLogAggResponseEntry struct {
	// contains filtered or unexported fields
}

func NewQueryLogAggResponse

func NewQueryLogAggResponse(query string) *QueryLogAggResponseEntry

func (*QueryLogAggResponseEntry) GetKey

func (e *QueryLogAggResponseEntry) GetKey() string

func (*QueryLogAggResponseEntry) GetTime

func (e *QueryLogAggResponseEntry) GetTime() time.Time

func (*QueryLogAggResponseEntry) GetValue

func (e *QueryLogAggResponseEntry) GetValue() interface{}

func (*QueryLogAggResponseEntry) SetTime

func (e *QueryLogAggResponseEntry) SetTime(time time.Time)

func (*QueryLogAggResponseEntry) SetValue

func (e *QueryLogAggResponseEntry) SetValue(value interface{})

func (*QueryLogAggResponseEntry) Size

func (e *QueryLogAggResponseEntry) Size() int64

type QueryLogAnalyticsResponse

type QueryLogAnalyticsResponse struct {
	Success    bool       `json:"success,omitempty"`
	Code       string     `json:"code,omitempty"`
	Message    string     `json:"message,omitempty"`
	Request_id string     `json:"request_id,omitempty"`
	Count      int64      `json:"total_size"`
	Progress   string     `json:"progress,omitempty"`
	Took_ms    int64      `json:"took_ms,omitempty"`
	Scroll_id  string     `json:"scroll_id,omitempty"`
	GroupInfo  []string   `json:"groupInfo,omitempty"`
	Dataset    [][]string `json:"dataset,omitempty"`
}

type QueryLogRequest

type QueryLogRequest struct {
	Explain    bool   `json:"explain,omitempty"`
	Query      string `json:"query,omitempty"`
	Reverse    bool   `json:"reverse,omitempty"`
	Timeout    int    `json:"timeout_ms,omitempty"`
	From       int64  `json:"from,omitempty"`
	To         int64  `json:"to,omitempty"`
	Scroll     string `json:"scroll,omitempty"`
	Scroll_id  string `json:"scroll_id,omitempty"`
	Limit      int    `json:"limit,omitempty"`
	Highlight  bool   `json:"highlight,omitempty"`
	Sql        bool   `json:"sql,omitempty"`
	IsTruncate bool   `json:"is_truncate,omitempty"`
}

type QueryLogResponse

type QueryLogResponse struct {
	Success           bool                     `json:"success,omitempty"`
	Code              string                   `json:"code,omitempty"`
	Message           string                   `json:"message,omitempty"`
	Request_id        string                   `json:"request_id,omitempty"`
	Count             int64                    `json:"count,omitempty"`
	Progress          string                   `json:"progress,omitempty"`
	Logs              []map[string]interface{} `json:"logs,omitempty"`
	Took_ms           int64                    `json:"took_ms,omitempty"`
	Cursor_time       int64                    `json:"cursor_time,omitempty"`
	Complete_progress float64                  `json:"complete_progress,omitempty"`
	Scroll_id         string                   `json:"scroll_id,omitempty"`
	Explain           string                   `json:"explain,omitempty"`
}

type QueryParam

type QueryParam struct {
	Ascending        bool
	Explain          bool
	Highlight        bool
	IncQuery         bool
	Truncate         bool
	IterID           int32
	Timeout          int
	Limit            int
	SeqID            int64
	Process          float64
	Scroll           string
	Query            string
	Scroll_id        string // QueryID-IterID
	QueryID          string
	TimeRange        TimeRange
	GroupBytInterval time.Duration
}

func NewQueryPara

func NewQueryPara(query string, ascending bool, highlight bool, timeout, limit int, from, to int64, scroll, scroll_id string,
	isIncQuery bool, explain bool, isTruncate bool) *QueryParam

func NewQueryParam

func NewQueryParam(queryPara interface{}) *QueryParam

type Response

type Response struct {
	Results []*query.Result
	Err     error
}

Response represents a list of statement results.

func (*Response) Error

func (r *Response) Error() error

Error returns the first error from any statement. Returns nil if no errors occurred on any statements.

func (Response) MarshalJSON

func (r Response) MarshalJSON() ([]byte, error)

MarshalJSON encodes a Response struct into JSON.

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Response struct.

type ResponseWriter

type ResponseWriter interface {
	// WriteResponse writes a response.
	WriteResponse(resp Response) (int, error)

	http.ResponseWriter
}

ResponseWriter is an interface for writing a response.

func NewResponseWriter

func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter

NewResponseWriter creates a new ResponseWriter based on the Accept header in the request that wraps the ResponseWriter.

type Route

type Route struct {
	Name           string
	Method         string
	Pattern        string
	Gzipped        bool
	LoggingEnabled bool
	HandlerFunc    interface{}
}

Route specifies how to handle a HTTP verb for a given endpoint.

type Service

type Service struct {
	Ln []net.Listener

	Handler *Handler

	Logger *zap.Logger
	// contains filtered or unexported fields
}

Service manages the listener and handler for an HTTP endpoint.

func NewService

func NewService(c config.Config) *Service

NewService returns a new instance of Service.

func (*Service) Addr

func (s *Service) Addr() net.Addr

Addr returns the listener's address. Returns nil if listener is closed. test func, so return 0 index addr

func (*Service) BoundHTTPAddr

func (s *Service) BoundHTTPAddr() string

BoundHTTPAddr returns the string version of the address that the HTTP server is listening on. This is useful if you start an ephemeral server in test with bind address localhost:0. test func, so return 0 index addr

func (*Service) Close

func (s *Service) Close() error

Close closes the underlying listener.

func (*Service) Err

func (s *Service) Err() <-chan error

Err returns a channel for fatal errors that occur on the listener.

func (*Service) Open

func (s *Service) Open() error

Open starts the service.

func (*Service) Openlistener

func (s *Service) Openlistener(addr string) error

type SubscriberManager

type SubscriberManager interface {
	Send(db, rp string, lineProtocol []byte)
}

type Throttler

type Throttler struct {

	// Maximum amount of time requests can wait in queue.
	// Must be set before adding middleware.
	EnqueueTimeout time.Duration

	Logger *zap.Logger
	// contains filtered or unexported fields
}

Throttler represents an HTTP throttler that limits the number of concurrent requests being processed as well as the number of enqueued requests.

func NewThrottler

func NewThrottler(concurrentN, maxEnqueueN int, rateValue int) *Throttler

NewThrottler returns a new instance of Throttler that limits to concurrentN. requests processed at a time and maxEnqueueN requests waiting to be processed.

func (*Throttler) Handler

func (t *Throttler) Handler(h http.Handler) http.Handler

Handler wraps h in a middleware handler that throttles requests.

type TimeRange

type TimeRange struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL