producer

package
v0.0.0-...-c1e5751 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2024 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CompressLz4  = iota // 0
	CompressNone        // 1
	CompressGzip
)
View Source
const (
	BasicDateFormat     = "20060102T150405Z"
	HeaderXDate         = "X-Sdk-Date"
	HeaderHost          = "host"
	HeaderAuthorization = "Authorization"
	HeaderContentSha256 = "X-Sdk-Content-Sha256"
	DerivationAlgorithm = "V11-HMAC-SHA256"
	DerivedDateFormat   = "20060102"
)
View Source
const Delimiter = "|"
View Source
const (
	RequestIDHeader = "X-Request-Id"
)
View Source
const (
	TimeoutExecption = "TimeoutExecption"
)

Variables

View Source
var GlobalDebugLevel = 0
View Source
var RetryOnServerErrorEnabled = true

Functions

func CanonicalHeadersBasic

func CanonicalHeadersBasic(r *http.Request, signerHeaders []string) string

func CanonicalQueryStringBasic

func CanonicalQueryStringBasic(r *http.Request) string

func CanonicalRequestBasic

func CanonicalRequestBasic(r *http.Request, signedHeaders []string) (string, error)

func CanonicalURIBasic

func CanonicalURIBasic(r *http.Request) string

func DerivationAuthHeaderValue

func DerivationAuthHeaderValue(signature, accessKey string, info string, signedHeaders []string) string

func GetDerivationKey

func GetDerivationKey(accessKey string, secretKey string, info string) (string, error)

func GetLogListSize

func GetLogListSize(logList []*Log) int

func GetLogSizeCalculate

func GetLogSizeCalculate(log *Log) int

func GetTimeMs

func GetTimeMs(t int64) int64

func HexEncodeSHA256Hash

func HexEncodeSHA256Hash(body []byte) (string, error)

func IsDebugLevelMatched

func IsDebugLevelMatched(level int) bool

func RetryWithCondition

func RetryWithCondition(ctx context.Context, b backoff.BackOff, o ConditionOperation) error

func SignHeaderBasic

func SignHeaderBasic(r *http.Request, ak string, sk string, derivedAuthServiceName string, regionId string) (map[string]string, error)

func SignStringToSign

func SignStringToSign(stringToSign string, signingKey []byte) (string, error)

func SignedHeadersBasic

func SignedHeadersBasic(headers map[string][]string) []string

func StringToSignDerived

func StringToSignDerived(canonicalRequest string, info string, t time.Time) (string, error)

Types

type Attempt

type Attempt struct {
	Success           bool
	HttpCode          int32
	RequestId         string
	ErrorCode         string
	ErrorMessage      string
	TimeStampMs       int64
	LastAttemptCostMs int64
}

type BadResponseError

type BadResponseError struct {
	RespBody   string
	RespHeader map[string][]string
	HTTPCode   int
}

func NewBadResponseError

func NewBadResponseError(body string, header map[string][]string, httpCode int) *BadResponseError

func (BadResponseError) Error

func (e BadResponseError) Error() string

func (BadResponseError) String

func (e BadResponseError) String() string

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

type CallBack

type CallBack interface {
	Success(result *Result)
	Fail(result *Result)
}

type Client

type Client struct {
	Endpoint        string // IP or hostname of SLS endpoint
	AccessKeyID     string
	AccessKeySecret string
	RequestTimeOut  time.Duration
	RetryTimeOut    time.Duration
	HTTPClient      *http.Client
	Region          string
	ProjectId       string
	// contains filtered or unexported fields
}

func (*Client) PutLogs

func (c *Client) PutLogs(groupId, streamId string, lg *LogGroup) (err error)

type ClientInterface

type ClientInterface interface {
	PutLogs(project, logStore string, lg *LogGroup) (err error)
}

func CreateNormalInterface

func CreateNormalInterface(config *Config) ClientInterface

type ConditionOperation

type ConditionOperation func() (bool, error)

type Config

type Config struct {
	TotalSizeLnBytes    int64
	MaxIoWorkers        int64
	MaxBlockSec         int
	MaxBatchSize        int64
	MaxBatchCount       int
	LingerMs            int64
	Retries             int
	MaxReservedAttempts int
	BaseRetryBackoffMs  int64
	MaxRetryBackoffMs   int64
	Buckets             int
	Endpoint            string
	AccessKeyID         string
	AccessKeySecret     string
	ProjectId           string
	RegionId            string
	NoRetryCodeList     []int
	HTTPClient          *http.Client
}

func GetConfig

func GetConfig() *Config

type CustomLog

type CustomLog struct {
	LogTimeNs int64  `json:"log_time_ns"`
	Log       string `json:"log"`
}

type DefaultHttpRequest

type DefaultHttpRequest struct {
	// contains filtered or unexported fields
}

func (*DefaultHttpRequest) GetBodyToBytes

func (httpRequest *DefaultHttpRequest) GetBodyToBytes() (*bytes.Buffer, error)

func (*DefaultHttpRequest) GetEndpoint

func (httpRequest *DefaultHttpRequest) GetEndpoint() string

func (*DefaultHttpRequest) GetHeaderParams

func (httpRequest *DefaultHttpRequest) GetHeaderParams() map[string]string

func (*DefaultHttpRequest) GetMethod

func (httpRequest *DefaultHttpRequest) GetMethod() string

func (*DefaultHttpRequest) GetPath

func (httpRequest *DefaultHttpRequest) GetPath() string

func (*DefaultHttpRequest) GetQueryParams

func (httpRequest *DefaultHttpRequest) GetQueryParams() map[string]interface{}

type Error

type Error struct {
	HTTPCode  int32  `json:"httpCode"`
	Code      string `json:"errorCode"`
	Message   string `json:"errorMessage"`
	RequestID string `json:"requestID"`
}

func NewClientError

func NewClientError(err error) *Error

func (Error) Error

func (e Error) Error() string

func (Error) String

func (e Error) String() string

type HttpRequestBuilder

type HttpRequestBuilder struct {
	// contains filtered or unexported fields
}

type IoThreadPool

type IoThreadPool struct {
	// contains filtered or unexported fields
}

type IoWorker

type IoWorker struct {
	// contains filtered or unexported fields
}

type Log

type Log struct {
	Time      *uint32       `json:"time,omitempty"`
	Contents  []*LogContent `json:"contents,omitempty"`
	Labels    string        `json:"labels"`
	ProjectId string        `json:"tenant_project_id,omitempty"`
}

func GenerateLog

func GenerateLog(addLogMap []string, labels map[string]string) *Log

func GenerateLogWithCustomTime

func GenerateLogWithCustomTime(customLogs []CustomLog, labels map[string]string) *Log

func (*Log) GetContents

func (m *Log) GetContents() []*LogContent

func (*Log) Size

func (m *Log) Size() (n int)

type LogAccumulator

type LogAccumulator struct {
	// contains filtered or unexported fields
}

type LogContent

type LogContent struct {
	LogTimeNs int64  `json:"log_time_ns"`
	Log       string `json:"log"`
}

func (*LogContent) Size

func (m *LogContent) Size() (n int)

type LogGroup

type LogGroup struct {
	Logs     []*Log    `json:"log_items,omitempty"`
	Category *string   `json:"Category,omitempty"`
	LogTags  []*LogTag `json:"LogTags,omitempty"`
}

func (*LogGroup) GetLogs

func (m *LogGroup) GetLogs() []*Log

func (*LogGroup) Size

func (m *LogGroup) Size() (n int)

type LogItem

type LogItem struct {
	Contents        []LogContent `json:"contents"`
	Labels          string       `json:"labels"`
	TenantProjectId string       `json:"tenant_project_id"`
}

type LogItems

type LogItems struct {
	LogItems []LogItem `json:"log_items"`
}

type LogProject

type LogProject struct {
	Region          string // region id
	Endpoint        string // IP or hostname
	AccessKeyID     string
	AccessKeySecret string

	ProjectId string
	// contains filtered or unexported fields
}

func NewLogProject

func NewLogProject(client *Client) (p *LogProject, err error)

func (*LogProject) WithRequestTimeout

func (p *LogProject) WithRequestTimeout(timeout time.Duration) *LogProject

func (*LogProject) WithRetryTimeout

func (p *LogProject) WithRetryTimeout(timeout time.Duration) *LogProject

type LogStore

type LogStore struct {
	GroupId  string
	StreamId string
	// contains filtered or unexported fields
}

func (*LogStore) PutLogs

func (s *LogStore) PutLogs(lg *LogGroup) (err error)

type LogTag

type LogTag struct {
	Key   *string `json:"Key,omitempty"`
	Value *string `json:"Value,omitempty"`
}

func (*LogTag) Size

func (m *LogTag) Size() (n int)

type Mover

type Mover struct {
	// contains filtered or unexported fields
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func InitProducer

func InitProducer(producerConfig *Config) *Producer

func (*Producer) Close

func (producer *Producer) Close(timeoutMs int64) error

func (*Producer) SendLog

func (producer *Producer) SendLog(groupId, streamId string, log *Log) error

func (*Producer) SendLogWithCallBack

func (producer *Producer) SendLogWithCallBack(groupId, streamId string, log *Log, callback CallBack) error

func (*Producer) Start

func (producer *Producer) Start()

type Result

type Result struct {
	// contains filtered or unexported fields
}

func (*Result) GetErrorCode

func (result *Result) GetErrorCode() string

func (*Result) GetErrorMessage

func (result *Result) GetErrorMessage() string

func (*Result) GetHttpCode

func (result *Result) GetHttpCode() int32

func (*Result) GetRequestId

func (result *Result) GetRequestId() string

func (*Result) GetReservedAttempts

func (result *Result) GetReservedAttempts() []*Attempt

func (*Result) GetTimeStampMs

func (result *Result) GetTimeStampMs() int64

func (*Result) IsSuccessful

func (result *Result) IsSuccessful() bool

type RetryQueue

type RetryQueue struct {
	// contains filtered or unexported fields
}

RetryQueue cache ProducerBatch and retry latter

func (*RetryQueue) Len

func (retryQueue *RetryQueue) Len() int

func (*RetryQueue) Less

func (retryQueue *RetryQueue) Less(i, j int) bool

func (*RetryQueue) Pop

func (retryQueue *RetryQueue) Pop() interface{}

func (*RetryQueue) Push

func (retryQueue *RetryQueue) Push(x interface{})

func (*RetryQueue) Swap

func (retryQueue *RetryQueue) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL