agent

package
v1.0.0-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2022 License: MPL-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetConfig

func GetConfig(network string, rpcClient *rpc.Client, addressesExt []string, isEnvStaging bool, componentTag int32, archTag int32, cluster string, logF func(format string, args ...interface{})) tlstatshouse.GetConfigResult

func SpareShardReplica

func SpareShardReplica(shardReplica int, timestamp uint32) int

func ValidateConfigSource

func ValidateConfigSource(c Config) error

Types

type Agent

type Agent struct {
	Shards          []*Shard                     // Actually those are shard-replicas
	GetConfigResult tlstatshouse.GetConfigResult // for ingress proxy

	// Used for builtin metrics when running inside aggregator
	AggregatorShardKey   int32
	AggregatorReplicaKey int32
	AggregatorHost       int32
	// contains filtered or unexported fields
}

func MakeAgent

func MakeAgent(network string, storageDir string, aesPwd string, config Config, hostName string, componentTag int32, metricStorage format.MetaStorageInterface, logF func(format string, args ...interface{}),
	beforeFlushBucketFunc func(now time.Time), getConfigResult *tlstatshouse.GetConfigResult) (*Agent, error)

All shard aggregators must be on the same network

func (*Agent) AddCounter

func (s *Agent) AddCounter(key data_model.Key, count float64)

count should be > 0 and not NaN

func (*Agent) AddCounterHost

func (s *Agent) AddCounterHost(key data_model.Key, count float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Agent) AddCounterHostStringBytes

func (s *Agent) AddCounterHostStringBytes(key data_model.Key, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue)

str should be reasonably short. Empty string will be undistinguishable from "the rest" count should be > 0 and not NaN

func (*Agent) AddUniqueHostStringBytes

func (s *Agent) AddUniqueHostStringBytes(key data_model.Key, hostTag int32, str []byte, hashes []int64, count float64, metricInfo *format.MetricMetaValue)

func (*Agent) AddValueArrayCounterHostStringBytes

func (s *Agent) AddValueArrayCounterHostStringBytes(key data_model.Key, values []float64, mult float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue)

func (*Agent) AddValueCounter

func (s *Agent) AddValueCounter(key data_model.Key, value float64, counter float64, metricInfo *format.MetricMetaValue)

value should be not NaN.

func (*Agent) AddValueCounterHost

func (s *Agent) AddValueCounterHost(key data_model.Key, value float64, counter float64, hostTag int32)

func (*Agent) AddValueCounterHostArray

func (s *Agent) AddValueCounterHostArray(key data_model.Key, values []float64, mult float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Agent) AddValueCounterHostStringBytes

func (s *Agent) AddValueCounterHostStringBytes(key data_model.Key, value float64, counter float64, hostTag int32, str []byte)

func (*Agent) AggKey

func (s *Agent) AggKey(time uint32, metricID int32, keys [format.MaxTags]int32) data_model.Key

func (*Agent) ApplyMetric

func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetricHeader, ingestionStatusOKTag int32)

func (*Agent) AutoCreateMetric

func (s *Agent) AutoCreateMetric(ctx context.Context, args tlstatshouse.AutoCreate) error

func (*Agent) Close

func (s *Agent) Close()

func (*Agent) CreateBuiltInItemValue

func (s *Agent) CreateBuiltInItemValue(key data_model.Key) *BuiltInItemValue

Do not create too many. Shards will iterate through values before flushing bucket Useful for watermark metrics.

func (*Agent) GetTagMappingBootstrap

func (s *Agent) GetTagMappingBootstrap(ctxParent context.Context) ([]tlstatshouse.Mapping, time.Duration, error)

func (*Agent) LoadMetaMetricJournal

func (s *Agent) LoadMetaMetricJournal(ctxParent context.Context, version int64, returnIfEmpty bool) ([]tlmetadata.Event, int64, error)

func (*Agent) LoadOrCreateMapping

func (s *Agent) LoadOrCreateMapping(ctxParent context.Context, key string, floodLimitKey interface{}) (pcache.Value, time.Duration, error)

func (*Agent) LoadPromTargets

func (s *Agent) LoadPromTargets(ctxParent context.Context, version string) (res *tlstatshouse.GetTargetsResult, versionHash string, err error)

func (*Agent) MergeItemValue

func (s *Agent) MergeItemValue(key data_model.Key, item *data_model.ItemValue, metricInfo *format.MetricMetaValue)

func (*Agent) NumShardReplicas

func (s *Agent) NumShardReplicas() int

func (*Agent) Run

func (s *Agent) Run(aggHost int32, aggShardKey int32, aggReplicaKey int32)

separated so we can set AggregatorHost, which is dependent on tagMapper which uses agent to wirte statistics

type BuiltInItemValue

type BuiltInItemValue struct {
	// contains filtered or unexported fields
}

func (*BuiltInItemValue) AddValueCounter

func (s *BuiltInItemValue) AddValueCounter(value float64, count float64)

For counters, use AddValueCounter(0, 1)

func (*BuiltInItemValue) Merge

func (s *BuiltInItemValue) Merge(s2 *data_model.ItemValue)

func (*BuiltInItemValue) SetValueCounter

func (s *BuiltInItemValue) SetValueCounter(value float64, count float64)

type Config

type Config struct {
	AggregatorAddresses []string
	// Shard Sampling Algorithm
	SampleBudget        int   // for all shards, in bytes
	SampleGroups        bool  // use group weights. Experimental, will be turned on unconditionally later
	MaxHistoricDiskSize int64 // for all shards, in bytes

	// How much strings (per key) is stored and sent to aggregator
	StringTopCapacity  int
	StringTopCountSend int

	// Liveness detector to switch between original and spare
	LivenessResponsesWindowLength    int
	LivenessResponsesWindowSuccesses int
	KeepAliveSuccessTimeout          time.Duration // LivenessResponsesWindowLength subsequent keepalives must takes < this

	SaveSecondsImmediately bool // If false, will only go to disk if first send fails
	StatsHouseEnv          string
	Cluster                string
	SkipFirstNShards       int // if cluster is extended, first shard might be almost full, so we can skip them for some time.

	RemoteWriteEnabled bool
	RemoteWriteAddr    string
	RemoteWritePath    string

	AutoCreate bool
}

func DefaultConfig

func DefaultConfig() Config

type DiskBucketStorage

type DiskBucketStorage struct {
	// contains filtered or unexported fields
}

func MakeDiskBucketStorage

func MakeDiskBucketStorage(dirPath string, numShards int, logf func(format string, args ...interface{})) (*DiskBucketStorage, error)

func (*DiskBucketStorage) Close

func (d *DiskBucketStorage) Close() error

func (*DiskBucketStorage) EraseBucket

func (d *DiskBucketStorage) EraseBucket(shardID int, time uint32) error

func (*DiskBucketStorage) GetBucket

func (d *DiskBucketStorage) GetBucket(shardID int, time uint32, scratchPad *[]byte) ([]byte, error)

func (*DiskBucketStorage) PutBucket

func (d *DiskBucketStorage) PutBucket(shardID int, time uint32, data []byte) error

func (*DiskBucketStorage) ReadNextTailSecond

func (d *DiskBucketStorage) ReadNextTailSecond(shardID int) (uint32, bool)

func (*DiskBucketStorage) TotalFileSize

func (d *DiskBucketStorage) TotalFileSize(shardID int) int64

type Shard

type Shard struct {
	ShardReplicaNum int

	CurrentTime    uint32
	CurrentBuckets [][]*data_model.MetricsBucket // [resolution][shard]. All disallowed resolutions are always skipped
	MissedSeconds  uint32                        // If disk is slow or computer sleeps/slows, several seconds can get into single bucket
	FutureQueue    [][]*data_model.MetricsBucket // 60 seconds long circular buffer.

	CurentLowResBucket [][]*data_model.MetricsBucket // [resolution][shard]
	LowResFutureQueue  []*data_model.MetricsBucket   // Max 60 seconds long. Shorter if max resolution is lower.

	BucketsToSend     chan compressedBucketData
	BuiltInItemValues []*BuiltInItemValue // Moved into CurrentBuckets before flush

	PreprocessingBucketTime    uint32
	PreprocessingBuckets       []*data_model.MetricsBucket // CurrentBuckets is moved here, if PreviousBucket empty
	PreprocessingMissedSeconds uint32                      // copy of MissedSeconds for bucket being processed

	HistoricBucketsToSend   []compressedBucketData // Slightly out of order here
	HistoricBucketsDataSize int                    // if too many are with data, will put without data, which will be read from disk
	// contains filtered or unexported fields
}

Shard gets data after initial hashing and shard number

func (*Shard) AddCounterHost

func (s *Shard) AddCounterHost(key data_model.Key, keyHash uint64, count float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Shard) AddCounterHostStringBytes

func (s *Shard) AddCounterHostStringBytes(key data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Shard) AddUniqueHostStringBytes

func (s *Shard) AddUniqueHostStringBytes(key data_model.Key, hostTag int32, str []byte, keyHash uint64, hashes []int64, count float64, metricInfo *format.MetricMetaValue)

func (*Shard) AddValueArrayCounterHost

func (s *Shard) AddValueArrayCounterHost(key data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Shard) AddValueArrayCounterHostStringBytes

func (s *Shard) AddValueArrayCounterHostStringBytes(key data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue)

func (*Shard) AddValueCounterHost

func (s *Shard) AddValueCounterHost(key data_model.Key, keyHash uint64, value float64, counter float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Shard) AddValueCounterHostStringBytes

func (s *Shard) AddValueCounterHostStringBytes(key data_model.Key, keyHash uint64, value float64, count float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue)

func (*Shard) ApplyCounter

func (s *Shard) ApplyCounter(key data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Shard) ApplyUnique

func (s *Shard) ApplyUnique(key data_model.Key, keyHash uint64, str []byte, hashes []int64, count float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Shard) ApplyValues

func (s *Shard) ApplyValues(key data_model.Key, keyHash uint64, str []byte, values []float64, count float64, hostTag int32, metricInfo *format.MetricMetaValue)

func (*Shard) CreateBuiltInItemValue

func (s *Shard) CreateBuiltInItemValue(key data_model.Key) *BuiltInItemValue

func (*Shard) HistoricBucketsDataSizeDisk

func (s *Shard) HistoricBucketsDataSizeDisk() int64

func (*Shard) HistoricBucketsDataSizeMemory

func (s *Shard) HistoricBucketsDataSizeMemory() int

func (*Shard) IsAlive

func (s *Shard) IsAlive() bool

func (*Shard) MergeItemValue

func (s *Shard) MergeItemValue(key data_model.Key, keyHash uint64, item *data_model.ItemValue, metricInfo *format.MetricMetaValue)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL