ha

package
v1.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowHaTracker   = fmt.Errorf("proto: integer overflow")
)

Functions

func GetReplicaDescCodec

func GetReplicaDescCodec() codec.Proto

func ProtoReplicaDescFactory

func ProtoReplicaDescFactory() proto.Message

ProtoReplicaDescFactory makes new InstanceDescs

Types

type HATracker

type HATracker struct {
	services.Service
	// contains filtered or unexported fields
}

Track the replica we're accepting samples from for each HA replica group we know about. nolint:revive

func NewHATracker

func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConfig HATrackerStatusConfig, reg prometheus.Registerer, kvNameLabel string, logger log.Logger) (*HATracker, error)

NewHATracker returns a new HA cluster tracker using either Consul or in-memory KV store. Tracker must be started via StartAsync().

func (*HATracker) Cfg

func (c *HATracker) Cfg() HATrackerConfig

func (*HATracker) CheckReplica

func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, replica string, now time.Time) error

CheckReplica checks the cluster and replica against the backing KVStore and local cache in the tracker c to see if we should accept the incomming sample. It will return an error if the sample should not be accepted. Note that internally this function does checks against the stored values and may modify the stored data, for example to failover between replicas after a certain period of time. ReplicasNotMatchError is returned (from checkKVStore) if we shouldn't store this sample but are accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned to customers clients.

func (*HATracker) CleanupHATrackerMetricsForUser

func (c *HATracker) CleanupHATrackerMetricsForUser(userID string)

func (*HATracker) ServeHTTP

func (h *HATracker) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*HATracker) SnapshotElectedReplicas

func (c *HATracker) SnapshotElectedReplicas() map[string]ReplicaDesc

Returns a snapshot of the currently elected replicas. Useful for status display

type HATrackerConfig

type HATrackerConfig struct {
	EnableHATracker bool `yaml:"enable_ha_tracker"`
	// We should only update the timestamp if the difference
	// between the stored timestamp and the time we received a sample at
	// is more than this duration.
	UpdateTimeout          time.Duration `yaml:"ha_tracker_update_timeout"`
	UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
	// We should only failover to accepting samples from a replica
	// other than the replica written in the KVStore if the difference
	// between the stored timestamp and the time we received a sample is
	// more than this duration
	FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`

	KVStore kv.Config `` /* 190-byte string literal not displayed */
}

HATrackerConfig contains the configuration require to create a HA Tracker. nolint:revive

func (*HATrackerConfig) RegisterFlags

func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix

func (*HATrackerConfig) RegisterFlagsWithPrefix

func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*HATrackerConfig) Validate

func (cfg *HATrackerConfig) Validate() error

Validate config and returns error on failure

type HATrackerLimits

type HATrackerLimits interface {
	// MaxHAReplicaGroups returns max number of replica groups that HA tracker should track for a user.
	// Samples from additional replicaGroups are rejected.
	MaxHAReplicaGroups(user string) int
}

nolint:revive

type HATrackerStatusConfig

type HATrackerStatusConfig struct {
	Title             string
	ReplicaGroupLabel string
}

nolint:revive

type ReplicaDesc

type ReplicaDesc struct {
	Replica    string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"`
	ReceivedAt int64  `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"`
	// Unix timestamp in millseconds when this entry was marked for deletion.
	// Reason for doing marking first, and delete later, is to make sure that distributors
	// watching the prefix will receive notification on "marking" -- at which point they can
	// already remove entry from memory. Actual deletion from KV store does *not* trigger
	// "watch" notification with a key for all KV stores.
	DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"`
}

func NewReplicaDesc

func NewReplicaDesc() *ReplicaDesc

NewReplicaDesc returns an empty *ha.ReplicaDesc.

func (*ReplicaDesc) Descriptor

func (*ReplicaDesc) Descriptor() ([]byte, []int)

func (*ReplicaDesc) Equal

func (this *ReplicaDesc) Equal(that interface{}) bool

func (*ReplicaDesc) GetDeletedAt

func (m *ReplicaDesc) GetDeletedAt() int64

func (*ReplicaDesc) GetReceivedAt

func (m *ReplicaDesc) GetReceivedAt() int64

func (*ReplicaDesc) GetReplica

func (m *ReplicaDesc) GetReplica() string

func (*ReplicaDesc) GoString

func (this *ReplicaDesc) GoString() string

func (*ReplicaDesc) Marshal

func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)

func (*ReplicaDesc) MarshalTo

func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error)

func (*ReplicaDesc) MarshalToSizedBuffer

func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ReplicaDesc) ProtoMessage

func (*ReplicaDesc) ProtoMessage()

func (*ReplicaDesc) Reset

func (m *ReplicaDesc) Reset()

func (*ReplicaDesc) Size

func (m *ReplicaDesc) Size() (n int)

func (*ReplicaDesc) String

func (this *ReplicaDesc) String() string

func (*ReplicaDesc) Unmarshal

func (m *ReplicaDesc) Unmarshal(dAtA []byte) error

func (*ReplicaDesc) XXX_DiscardUnknown

func (m *ReplicaDesc) XXX_DiscardUnknown()

func (*ReplicaDesc) XXX_Marshal

func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReplicaDesc) XXX_Merge

func (m *ReplicaDesc) XXX_Merge(src proto.Message)

func (*ReplicaDesc) XXX_Size

func (m *ReplicaDesc) XXX_Size() int

func (*ReplicaDesc) XXX_Unmarshal

func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error

type ReplicasNotMatchError

type ReplicasNotMatchError struct {
	// contains filtered or unexported fields
}

func (ReplicasNotMatchError) Error

func (e ReplicasNotMatchError) Error() string

func (ReplicasNotMatchError) Is

func (e ReplicasNotMatchError) Is(err error) bool

Needed for errors.Is to work properly.

func (ReplicasNotMatchError) IsOperationAborted

func (e ReplicasNotMatchError) IsOperationAborted() bool

IsOperationAborted returns whether the error has been caused by an operation intentionally aborted.

type TooManyReplicaGroupsError

type TooManyReplicaGroupsError struct {
	// contains filtered or unexported fields
}

func (TooManyReplicaGroupsError) Error

func (TooManyReplicaGroupsError) Is

Needed for errors.Is to work properly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL