consumer

package
v0.0.0-...-c1e5751 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	INITIALIZING      = "initializing"
	PROCESSING        = "processing"
	SHUTTING_DOWN     = "shutdown"
	SHUTDOWN_COMPLETE = "shutdown_complete"
)
View Source
const (
	BasicDateFormat     = "20060102T150405Z"
	HeaderXDate         = "X-Sdk-Date"
	HeaderHost          = "host"
	HeaderAuthorization = "Authorization"
	HeaderContentSha256 = "X-Sdk-Content-Sha256"
	DerivationAlgorithm = "V11-HMAC-SHA256"
	DerivedDateFormat   = "20060102"
)

Variables

This section is empty.

Functions

func CanonicalHeadersBasic

func CanonicalHeadersBasic(r *http.Request, signerHeaders []string) string

func CanonicalQueryStringBasic

func CanonicalQueryStringBasic(queryMap map[string]string) string

func CanonicalRequestBasic

func CanonicalRequestBasic(r *http.Request, signedHeaders []string, queryMap map[string]string) (string, error)

func CanonicalURIBasic

func CanonicalURIBasic(r *http.Request) string

func DerivationAuthHeaderValue

func DerivationAuthHeaderValue(signature, accessKey string, info string, signedHeaders []string) string

func GetDerivationKey

func GetDerivationKey(accessKey string, secretKey string, info string) (string, error)

func HexEncodeSHA256Hash

func HexEncodeSHA256Hash(body []byte) (string, error)

func SignHeaderBasic

func SignHeaderBasic(r *http.Request, ak string, sk string, derivedAuthServiceName string, regionId string, queryMap map[string]string) (map[string]string, error)

func SignStringToSign

func SignStringToSign(stringToSign string, signingKey []byte) (string, error)

func SignedHeadersBasic

func SignedHeadersBasic(headers map[string][]string) []string

func StringToSignDerived

func StringToSignDerived(canonicalRequest string, info string, t time.Time) (string, error)

Types

type BatchGetLog

type BatchGetLog struct {
	Count int64     `json:"count"`
	Next  string    `json:"next"`
	Logs  []LogData `json:"logs"`
}

type Client

type Client struct {
	AccessKeyID     string
	AccessKeySecret string
	SecurityToken   string
	RequestTimeOut  time.Duration
	RetryTimeOut    time.Duration
	HTTPClient      *http.Client
	RegionName      string
	ProjectId       string
	// contains filtered or unexported fields
}

type ClientConsumerWorker

type ClientConsumerWorker struct {
	// contains filtered or unexported fields
}

func (*ClientConsumerWorker) Run

func (w *ClientConsumerWorker) Run()

func (*ClientConsumerWorker) Shutdown

func (w *ClientConsumerWorker) Shutdown()

type Cursor

type Cursor struct {
	Cursor string `json:"cursor"`
}

type DefaultLogConsumerCheckPointTracker

type DefaultLogConsumerCheckPointTracker struct {
	// contains filtered or unexported fields
}

func (*DefaultLogConsumerCheckPointTracker) GetCheckPoint

func (tracker *DefaultLogConsumerCheckPointTracker) GetCheckPoint() string

func (*DefaultLogConsumerCheckPointTracker) GetCurrentCursor

func (tracker *DefaultLogConsumerCheckPointTracker) GetCurrentCursor() string

func (*DefaultLogConsumerCheckPointTracker) SaveCheckPoint

func (tracker *DefaultLogConsumerCheckPointTracker) SaveCheckPoint(persistent bool) error

type FetchedLogData

type FetchedLogData struct {
	// contains filtered or unexported fields
}

type ILogConsumerCheckPointTracker

type ILogConsumerCheckPointTracker interface {
	SaveCheckPoint(persistent bool) error
	GetCheckPoint() string
	GetCurrentCursor() string
}

type ILogConsumerProcessor

type ILogConsumerProcessor interface {
	Initialize(shardId string)
	Process(logGroups []LogData, checkPointTracker ILogConsumerCheckPointTracker) string
	Shutdown(checkPointTracker ILogConsumerCheckPointTracker) error
}

type ILogConsumerProcessorFactory

type ILogConsumerProcessorFactory interface {
	GeneratorProcessor() ILogConsumerProcessor
}

type ILogConsumerSTSToken

type ILogConsumerSTSToken interface {
	// GetSTSTokenConfig SDK会定期从此方法中获取临时AK, 临时SK, 临时securityToken. 如果临时认证信息有变化, 在此方法中实现即可
	GetSTSTokenConfig() STSTokenConfig
}

type LogConsumerClientAdapter

type LogConsumerClientAdapter struct {
	ILogConsumerSTSToken ILogConsumerSTSToken
	// contains filtered or unexported fields
}

func GetLogConsumerClientAdapter

func GetLogConsumerClientAdapter(config *LogConsumerConfig) *LogConsumerClientAdapter

type LogConsumerConfig

type LogConsumerConfig struct {
	RegionName        string
	ProjectId         string
	LogGroupId        string
	LogStreamId       string
	AccessKeyId       string
	AccessKeySecret   string
	ConsumerGroupName string

	SecurityToken        string
	ILogConsumerSTSToken ILogConsumerSTSToken
	StartTimeNs          time.Time
	EndTimeNs            time.Time
	FetchIntervalMillis  int64

	BatchSize int
	// contains filtered or unexported fields
}

func GetConsumerConfig

func GetConsumerConfig() *LogConsumerConfig

type LogConsumerHeartBeat

type LogConsumerHeartBeat struct {
	// contains filtered or unexported fields
}

func GetLogConsumerHeartBeat

func GetLogConsumerHeartBeat(client *LogConsumerClientAdapter, config *LogConsumerConfig) *LogConsumerHeartBeat

type LogData

type LogData struct {
	Labels map[string]string `json:"labels"`
}

type LogShardConsumer

type LogShardConsumer struct {
	ConsumerStatus string
	// contains filtered or unexported fields
}

func (*LogShardConsumer) InitializeTask

func (w *LogShardConsumer) InitializeTask()

func (*LogShardConsumer) LogConsumerFetchTask

func (w *LogShardConsumer) LogConsumerFetchTask()

func (*LogShardConsumer) ProcessTask

func (w *LogShardConsumer) ProcessTask(fetchedData []LogData)

func (*LogShardConsumer) ShutDownTask

func (w *LogShardConsumer) ShutDownTask()

type STSTokenConfig

type STSTokenConfig struct {
	AccessKeyId     string
	AccessKeySecret string
	SecurityToken   string
}

type ShardCheckPoint

type ShardCheckPoint struct {
	Shard      string `json:"shard_id"`
	Checkpoint int64  `json:"checkpoint"`
	UpdateTime int64  `json:"update_time"`
	Consumer   string `json:"consumer_name"`
}

type TaskResult

type TaskResult struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL