Documentation ¶
Overview ¶
Package sampler contains all the logic of the agent-side trace sampling
Currently implementation is based on the scoring of the "signature" of each trace Based on the score, we get a sample rate to apply to the given trace
Current score implementation is super-simple, it is a counter with polynomial decay per signature. We increment it for each incoming trace then we periodically divide the score by two every X seconds. Right after the division, the score is an approximation of the number of received signatures over X seconds. It is different from the scoring in the Agent.
Since the sampling can happen at different levels (client, agent, server) or depending on different rules, we have to track the sample rate applied at previous steps. This way, sampling twice at 50% can result in an effective 25% sampling. The rate is stored as a metric in the trace root.
Package sampler contains all the logic of the agent-side trace sampling ¶
Currently implementation is based on the scoring of the "signature" of each trace Based on the score, we get a sample rate to apply to the given trace
Current score implementation is super-simple, it is a counter with polynomial decay per signature. We increment it for each incoming trace then we periodically divide the score by two every X seconds. Right after the division, the score is an approximation of the number of received signatures over X seconds. It is different from the scoring in the Agent.
Since the sampling can happen at different levels (client, agent, server) or depending on different rules, we have to track the sample rate applied at previous steps. This way, sampling twice at 50% can result in an effective 25% sampling. The rate is stored as a metric in the trace root.
Index ¶
- Constants
- func GetClientRate(s *pb.Span) float64
- func GetEventExtractionRate(s *pb.Span) float64
- func GetGlobalRate(s *pb.Span) float64
- func GetMaxEPSRate(s *pb.Span) float64
- func GetPreSampleRate(s *pb.Span) float64
- func IsAnalyzedSpan(s *pb.Span) bool
- func SampleByRate(traceID uint64, rate float64) bool
- func SetAnalyzedSpan(s *pb.Span)
- func SetClientRate(s *pb.Span, rate float64)
- func SetEventExtractionRate(s *pb.Span, rate float64)
- func SetMaxEPSRate(s *pb.Span, rate float64)
- func SetPreSampleRate(s *pb.Span, rate float64)
- type DynamicConfig
- type EngineType
- type ErrorsSampler
- type MemoryBackend
- func (b *MemoryBackend) AddTotalScore(n float64)
- func (b *MemoryBackend) CountSample()
- func (b *MemoryBackend) CountSignature(signature Signature)
- func (b *MemoryBackend) CountWeightedSig(signature Signature, n float64)
- func (b *MemoryBackend) DecayScore()
- func (b *MemoryBackend) GetCardinality() int64
- func (b *MemoryBackend) GetSampledScore() float64
- func (b *MemoryBackend) GetSignatureScore(signature Signature) float64
- func (b *MemoryBackend) GetSignatureScores() map[Signature]float64
- func (b *MemoryBackend) GetTotalScore() float64
- func (b *MemoryBackend) GetUpperSampledScore() float64
- type NoPrioritySampler
- type PrioritySampler
- func (s *PrioritySampler) CountClientDroppedP0s(dropped int64)
- func (s *PrioritySampler) CountSampled(root *pb.Span, clientDroppedP0s bool, signature Signature, rate float64)
- func (s *PrioritySampler) CountSignature(root *pb.Span, signature Signature)
- func (s *PrioritySampler) Sample(trace *pb.TraceChunk, root *pb.Span, env string, clientDroppedP0s bool) bool
- func (s *PrioritySampler) Start()
- func (s *PrioritySampler) Stop()
- type RareSampler
- type RateByService
- type RemoteRates
- func (r *RemoteRates) AdjustScoring()
- func (r *RemoteRates) CountSample(root *pb.Span, sig Signature)
- func (r *RemoteRates) CountSignature(sig Signature)
- func (r *RemoteRates) CountWeightedSig(sig Signature, weight float64)
- func (r *RemoteRates) DecayScores()
- func (r *RemoteRates) GetAllSignatureSampleRates() map[Signature]float64
- func (r *RemoteRates) GetSignatureSampleRate(sig Signature) (float64, bool)
- func (r *RemoteRates) Start()
- func (r *RemoteRates) Stop()
- type Sampler
- func (s *Sampler) AdjustScoring()
- func (s *Sampler) GetAllCountScores() map[Signature]float64
- func (s *Sampler) GetAllSignatureSampleRates() map[Signature]float64
- func (s *Sampler) GetCountScore(signature Signature) float64
- func (s *Sampler) GetDefaultCountScore() float64
- func (s *Sampler) GetDefaultSampleRate() float64
- func (s *Sampler) GetSampleRate(trace pb.Trace, root *pb.Span, signature Signature) float64
- func (s *Sampler) GetSignatureSampleRate(signature Signature) float64
- func (s *Sampler) GetTargetTPSSampleRate() float64
- func (s *Sampler) SetSignatureCoefficients(offset float64, slope float64)
- func (s *Sampler) Start()
- func (s *Sampler) Stop()
- func (s *Sampler) UpdateExtraRate(extraRate float64)
- func (s *Sampler) UpdateTargetTPS(targetTPS float64)
- type SamplingPriority
- type ScoreSampler
- type ServiceSignature
- type Signature
Constants ¶
const ( // KeySamplingRateGlobal is a metric key holding the global sampling rate. KeySamplingRateGlobal = "_sample_rate" // KeySamplingRateClient is a metric key holding the client-set sampling rate for APM events. KeySamplingRateClient = "_dd1.sr.rcusr" // KeySamplingRatePreSampler is a metric key holding the API rate limiter's rate for APM events. KeySamplingRatePreSampler = "_dd1.sr.rapre" // KeySamplingRateEventExtraction is the key of the metric storing the event extraction rate on an APM event. KeySamplingRateEventExtraction = "_dd1.sr.eausr" // KeySamplingRateMaxEPSSampler is the key of the metric storing the max eps sampler rate on an APM event. KeySamplingRateMaxEPSSampler = "_dd1.sr.eamax" // KeyErrorType is the key of the error type in the meta map KeyErrorType = "error.type" // KeyAnalyzedSpans is the metric key which specifies if a span is analyzed. KeyAnalyzedSpans = "_dd.analyzed" // KeyHTTPStatusCode is the key of the http status code in the meta map KeyHTTPStatusCode = "http.status_code" )
Variables ¶
This section is empty.
Functions ¶
func GetClientRate ¶
GetClientRate gets the rate at which the trace this span belongs to was sampled by the tracer. NOTE: This defaults to 1 if no rate is stored.
func GetEventExtractionRate ¶
GetEventExtractionRate gets the rate at which the trace from which we extracted this event was sampled at the tracer. This defaults to 1 if no rate is stored.
func GetGlobalRate ¶
GetGlobalRate gets the cumulative sample rate of the trace to which this span belongs to.
func GetMaxEPSRate ¶
GetMaxEPSRate gets the rate at which this event was sampled by the max eps event sampler.
func GetPreSampleRate ¶
GetPreSampleRate returns the rate at which the trace this span belongs to was sampled by the agent's presampler. NOTE: This defaults to 1 if no rate is stored.
func IsAnalyzedSpan ¶
IsAnalyzedSpan checks if a span is analyzed
func SampleByRate ¶
SampleByRate tells if a trace (from its ID) with a given rate should be sampled Use Knuth multiplicative hashing to leverage imbalanced traceID generators
func SetClientRate ¶
SetClientRate sets the rate at which the trace this span belongs to was sampled by the tracer.
func SetEventExtractionRate ¶
SetEventExtractionRate sets the rate at which the trace from which we extracted this event was sampled at the tracer.
func SetMaxEPSRate ¶
SetMaxEPSRate sets the rate at which this event was sampled by the max eps event sampler.
func SetPreSampleRate ¶
SetPreSampleRate sets the rate at which the trace this span belongs to was sampled by the agent's presampler.
Types ¶
type DynamicConfig ¶
type DynamicConfig struct { // RateByService contains the rate for each service/env tuple, // used in priority sampling by client libs. RateByService RateByService }
DynamicConfig contains configuration items which may change dynamically over time.
func NewDynamicConfig ¶
func NewDynamicConfig(env string) *DynamicConfig
NewDynamicConfig creates a new dynamic config object which maps service signatures to their corresponding sampling rates. Each service will have a default assigned matching the service rate of the specified env.
type EngineType ¶
type EngineType int
EngineType represents the type of a sampler engine.
const ( // NormalScoreEngineType is the type of the ScoreEngine sampling non-error traces. NormalScoreEngineType EngineType = iota // ErrorsScoreEngineType is the type of the ScoreEngine sampling error traces. ErrorsScoreEngineType // PriorityEngineType is type of the priority sampler engine type. PriorityEngineType )
type ErrorsSampler ¶
type ErrorsSampler struct{ ScoreSampler }
ErrorsSampler is dedicated to catching traces containing spans with errors.
func NewErrorsSampler ¶
func NewErrorsSampler(conf *config.AgentConfig) *ErrorsSampler
NewErrorsSampler returns an initialized Sampler dedicate to errors. It behaves just like the the normal ScoreEngine except for its GetType method (useful for reporting).
type MemoryBackend ¶
type MemoryBackend struct { // DecayPeriod is the time period between each score decay. // A lower value is more reactive, but forgets quicker. DecayPeriod time.Duration // contains filtered or unexported fields }
MemoryBackend storing any state required to run the sampling algorithms.
Current implementation is only based on counters with polynomial decay. Its bias with steady counts is 1 * decayFactor. The stored scores represent approximation of the real count values (with a countScaleFactor factor).
func NewMemoryBackend ¶
func NewMemoryBackend(decayPeriod time.Duration, decayFactor float64) *MemoryBackend
NewMemoryBackend returns an initialized Backend.
func (*MemoryBackend) AddTotalScore ¶
func (b *MemoryBackend) AddTotalScore(n float64)
AddTotalScore adds to the total score.
func (*MemoryBackend) CountSample ¶
func (b *MemoryBackend) CountSample()
CountSample counts a trace sampled by the sampler.
func (*MemoryBackend) CountSignature ¶
func (b *MemoryBackend) CountSignature(signature Signature)
CountSignature counts an incoming signature.
func (*MemoryBackend) CountWeightedSig ¶
func (b *MemoryBackend) CountWeightedSig(signature Signature, n float64)
CountWeightedSig counts a trace sampled by the sampler.
func (*MemoryBackend) DecayScore ¶
func (b *MemoryBackend) DecayScore()
DecayScore applies the decay to the rolling counters.
func (*MemoryBackend) GetCardinality ¶
func (b *MemoryBackend) GetCardinality() int64
GetCardinality returns the number of different signatures seen recently.
func (*MemoryBackend) GetSampledScore ¶
func (b *MemoryBackend) GetSampledScore() float64
GetSampledScore returns the global score of all sampled traces.
func (*MemoryBackend) GetSignatureScore ¶
func (b *MemoryBackend) GetSignatureScore(signature Signature) float64
GetSignatureScore returns the score of a signature. It is normalized to represent a number of signatures per second.
func (*MemoryBackend) GetSignatureScores ¶
func (b *MemoryBackend) GetSignatureScores() map[Signature]float64
GetSignatureScores returns the scores for all signatures. It is normalized to represent a number of signatures per second.
func (*MemoryBackend) GetTotalScore ¶
func (b *MemoryBackend) GetTotalScore() float64
GetTotalScore returns the global score of all sampled traces.
func (*MemoryBackend) GetUpperSampledScore ¶
func (b *MemoryBackend) GetUpperSampledScore() float64
GetUpperSampledScore returns a certain upper bound of the global count of all sampled traces.
type NoPrioritySampler ¶
type NoPrioritySampler struct{ ScoreSampler }
NoPrioritySampler is dedicated to catching traces with no priority set.
func NewNoPrioritySampler ¶
func NewNoPrioritySampler(conf *config.AgentConfig) *NoPrioritySampler
NewNoPrioritySampler returns an initialized Sampler dedicated to traces with no priority set.
type PrioritySampler ¶
type PrioritySampler struct {
// contains filtered or unexported fields
}
PrioritySampler computes priority rates per env, service to apply in a feedback loop with trace-agent clients. Computed rates are sent in http responses to trace-agent. The rates are continuously adjusted in function of the received traffic to match a targetTPS (target traces per second). In order of priority, the sampler will match a targetTPS set remotely (remoteRates) and then the local targetTPS.
func NewPrioritySampler ¶
func NewPrioritySampler(conf *config.AgentConfig, dynConf *DynamicConfig) *PrioritySampler
NewPrioritySampler returns an initialized Sampler
func (*PrioritySampler) CountClientDroppedP0s ¶
func (s *PrioritySampler) CountClientDroppedP0s(dropped int64)
CountClientDroppedP0s counts client dropped traces. They are added to the totalScore, allowing them to weight on sampling rates during adjust calls
func (*PrioritySampler) CountSampled ¶
func (s *PrioritySampler) CountSampled(root *pb.Span, clientDroppedP0s bool, signature Signature, rate float64)
CountSampled counts sampled chunks with local chunk root signature.
func (*PrioritySampler) CountSignature ¶
func (s *PrioritySampler) CountSignature(root *pb.Span, signature Signature)
CountSignature counts all chunks received with local chunk root signature.
func (*PrioritySampler) Sample ¶
func (s *PrioritySampler) Sample(trace *pb.TraceChunk, root *pb.Span, env string, clientDroppedP0s bool) bool
Sample counts an incoming trace and returns the trace sampling decision and the applied sampling rate
func (*PrioritySampler) Start ¶
func (s *PrioritySampler) Start()
Start runs and block on the Sampler main loop
type RareSampler ¶
type RareSampler struct {
// contains filtered or unexported fields
}
RareSampler samples traces that are not caught by the Priority sampler. It ensures that we sample traces for each combination of (env, service, name, resource, error type, http status) seen on a top level or measured span for which we did not see any span with a priority > 0 (sampled by Priority). The resulting sampled traces will likely be incomplete and will be flagged with a exceptioKey metric set at 1.
func NewRareSampler ¶
func NewRareSampler() *RareSampler
NewRareSampler returns a NewRareSampler that ensures that we sample combinations of env, service, name, resource, http-status, error type for each top level or measured spans
func (*RareSampler) Sample ¶
func (e *RareSampler) Sample(t *pb.TraceChunk, env string) bool
Sample a trace and returns true if trace was sampled (should be kept)
type RateByService ¶
type RateByService struct {
// contains filtered or unexported fields
}
RateByService stores the sampling rate per service. It is thread-safe, so one can read/write on it concurrently, using getters and setters.
func (*RateByService) GetAll ¶
func (rbs *RateByService) GetAll() map[string]float64
GetAll returns all sampling rates for all services.
func (*RateByService) SetAll ¶
func (rbs *RateByService) SetAll(rates map[ServiceSignature]float64)
SetAll the sampling rate for all services. If a service/env is not in the map, then the entry is removed.
type RemoteRates ¶
type RemoteRates struct {
// contains filtered or unexported fields
}
RemoteRates computes rates per (env, service) to apply in trace-agent clients. The rates are adjusted to match a targetTPS per (env, service) received from remote configurations. RemoteRates listens for new remote configurations with a grpc subscriber. On reception, new tps targets replace the previous ones.
func (*RemoteRates) AdjustScoring ¶
func (r *RemoteRates) AdjustScoring()
AdjustScoring adjust scores of all samplers
func (*RemoteRates) CountSample ¶
func (r *RemoteRates) CountSample(root *pb.Span, sig Signature)
CountSample counts the number of sampled root span matching a signature.
func (*RemoteRates) CountSignature ¶
func (r *RemoteRates) CountSignature(sig Signature)
CountSignature counts the number of root span seen matching a signature.
func (*RemoteRates) CountWeightedSig ¶
func (r *RemoteRates) CountWeightedSig(sig Signature, weight float64)
CountWeightedSig counts weighted root span seen for a signature. This function is called when trace-agent client drop unsampled spans. as dropped root spans are not accounted anymore in CountSignature calls.
func (*RemoteRates) DecayScores ¶
func (r *RemoteRates) DecayScores()
DecayScores decays scores of all samplers
func (*RemoteRates) GetAllSignatureSampleRates ¶
func (r *RemoteRates) GetAllSignatureSampleRates() map[Signature]float64
GetAllSignatureSampleRates returns sampling rates to apply for all registered signatures.
func (*RemoteRates) GetSignatureSampleRate ¶
func (r *RemoteRates) GetSignatureSampleRate(sig Signature) (float64, bool)
GetSignatureSampleRate returns the sampling rate to apply for a registered signature.
func (*RemoteRates) Start ¶
func (r *RemoteRates) Start()
Start runs and adjust rates per signature following remote TPS targets
type Sampler ¶
type Sampler struct { // Storage of the state of the sampler Backend *MemoryBackend // contains filtered or unexported fields }
Sampler is the main component of the sampling logic
func (*Sampler) AdjustScoring ¶
func (s *Sampler) AdjustScoring()
AdjustScoring modifies sampler coefficients to fit better the `targetTPS` condition
func (*Sampler) GetAllCountScores ¶
GetAllCountScores scores all signatures based on their recent throughput The score value can be seeing as the sample rate if the count were the only factor Since other factors can intervene (such as extra global sampling), its value can be larger than 1
func (*Sampler) GetAllSignatureSampleRates ¶
GetAllSignatureSampleRates gives the sample rate to apply to all signatures. For now, only based on count score.
func (*Sampler) GetCountScore ¶
GetCountScore scores any signature based on its recent throughput The score value can be seeing as the sample rate if the count were the only factor Since other factors can intervene (such as extra global sampling), its value can be larger than 1
func (*Sampler) GetDefaultCountScore ¶
GetDefaultCountScore returns a default score when not knowing the signature for real. Since other factors can intervene (such as extra global sampling), its value can be larger than 1
func (*Sampler) GetDefaultSampleRate ¶
GetDefaultSampleRate gives the sample rate to apply to an unknown signature. For now, only based on count score.
func (*Sampler) GetSampleRate ¶
GetSampleRate returns the sample rate to apply to a trace.
func (*Sampler) GetSignatureSampleRate ¶
GetSignatureSampleRate gives the sample rate to apply to any signature. For now, only based on count score.
func (*Sampler) GetTargetTPSSampleRate ¶
GetTargetTPSSampleRate returns an extra sample rate to apply if we are above targetTPS.
func (*Sampler) SetSignatureCoefficients ¶
SetSignatureCoefficients updates the internal scoring coefficients used by the signature scoring
func (*Sampler) UpdateExtraRate ¶
UpdateExtraRate updates the extra sample rate
func (*Sampler) UpdateTargetTPS ¶
UpdateTargetTPS updates the max TPS limit
type SamplingPriority ¶
type SamplingPriority int8
SamplingPriority is the type encoding a priority sampling decision.
const ( // PriorityNone is the value for SamplingPriority when no priority sampling decision could be found. PriorityNone SamplingPriority = math.MinInt8 // PriorityUserDrop is the value set by a user to explicitly drop a trace. PriorityUserDrop SamplingPriority = -1 // PriorityAutoDrop is the value set by a tracer to suggest dropping a trace. PriorityAutoDrop SamplingPriority = 0 // PriorityAutoKeep is the value set by a tracer to suggest keeping a trace. PriorityAutoKeep SamplingPriority = 1 // PriorityUserKeep is the value set by a user to explicitly keep a trace. PriorityUserKeep SamplingPriority = 2 )
func GetSamplingPriority ¶
func GetSamplingPriority(t *pb.TraceChunk) (SamplingPriority, bool)
GetSamplingPriority returns the value of the sampling priority metric set on this span and a boolean indicating if such a metric was actually found or not.
type ScoreSampler ¶
type ScoreSampler struct { *Sampler // contains filtered or unexported fields }
ScoreSampler samples pieces of traces by computing a signature based on spans (service, name, rsc, http.status, error.type) scoring it and applying a rate. The rates are applied on the TraceID to maximize the number of chunks with errors caught for the same traceID. For a set traceID: P(chunk1 kept and chunk2 kept) = min(P(chunk1 kept), P(chunk2 kept))
type ServiceSignature ¶
type ServiceSignature struct{ Name, Env string }
ServiceSignature represents a unique way to identify a service.
func (ServiceSignature) Hash ¶
func (s ServiceSignature) Hash() Signature
Hash generates the signature of a trace with minimal information such as service and env, this is typically used by distributed sampling based on priority, and used as a key to store the desired rate for a given service,env tuple.
func (ServiceSignature) String ¶
func (s ServiceSignature) String() string