inventory

package
v0.0.0-...-5c79d48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: AGPL-3.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const SHARDS = 128

Variables

This section is empty.

Functions

func SendHeartbeat

func SendHeartbeat(ctx context.Context, handle DownstreamHandle, hb proto.InventoryHeartbeat, retry retryutils.Retry)

Types

type Auth

type Auth interface {
	UpsertNode(context.Context, types.Server) (*types.KeepAlive, error)

	KeepAliveServer(context.Context, types.KeepAlive) error

	UpsertInstance(ctx context.Context, instance types.Instance) error
}

Auth is an interface representing the subset of the auth API that must be made available to the controller in order for it to be able to handle control streams.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller manages the inventory control streams registered with a given auth instance. Incoming messages are processed by invoking the appropriate methods on the Auth interface.

func NewController

func NewController(auth Auth, usageReporter usagereporter.UsageReporter, opts ...ControllerOption) *Controller

NewController sets up a new controller instance.

func (*Controller) Close

func (c *Controller) Close() error

Close terminates all control streams registered with this controller. Control streams registered after Close() is called are closed immediately.

func (*Controller) ConnectedInstances

func (c *Controller) ConnectedInstances() int

ConnectedInstances gets the total number of connected instances. Note that this is the total number of *handles*, not the number of unique instances by id.

func (*Controller) ConnectedServiceCount

func (c *Controller) ConnectedServiceCount(systemRole types.SystemRole) uint64

ConnectedServiceCount returns the number of a particular connected service in the inventory.

func (*Controller) ConnectedServiceCounts

func (c *Controller) ConnectedServiceCounts() map[types.SystemRole]uint64

ConnectedServiceCounts returns the number of each connected service seen in the inventory.

func (*Controller) GetControlStream

func (c *Controller) GetControlStream(serverID string) (handle UpstreamHandle, ok bool)

GetControlStream gets a control stream for the given server ID if one exists (if multiple control streams exist one is selected pseudorandomly).

func (*Controller) Iter

func (c *Controller) Iter(fn func(UpstreamHandle))

Iter iterates across all handles registered with this controller. note: if multiple handles are registered for a given server, only one handle is selected pseudorandomly to be observed.

func (*Controller) RegisterControlStream

func (c *Controller) RegisterControlStream(stream client.UpstreamInventoryControlStream, hello proto.UpstreamInventoryHello)

RegisterControlStream registers a new control stream with the controller.

type ControllerOption

type ControllerOption func(c *controllerOptions)

func WithAuthServerID

func WithAuthServerID(serverID string) ControllerOption

type DownstreamCreateFunc

type DownstreamCreateFunc func(ctx context.Context) (client.DownstreamInventoryControlStream, error)

DownstreamCreateFunc is a function that creates a downstream inventory control stream.

type DownstreamHandle

type DownstreamHandle interface {
	// Sender is used to asynchronously access a send-only reference to the current control
	// stream instance. If not currently healthy, this blocks indefinitely until a healthy control
	// stream is established.
	Sender() <-chan DownstreamSender
	// RegisterPingHandler registers a handler for downstream ping messages, returning
	// a de-registration function.
	RegisterPingHandler(DownstreamPingHandler) (unregister func())
	// CloseContext gets the close context of the downstream handle.
	CloseContext() context.Context
	// Close closes the downstream handle.
	Close() error
	// GetUpstreamLabels gets the labels received from upstream.
	GetUpstreamLabels(kind proto.LabelUpdateKind) map[string]string
}

DownstreamHandle is a persistent handle used to interact with the current downstream half of the inventory control stream. This handle automatically re-creates the control stream if it fails. The latest (or next, if currently unhealthy) control stream send-half can be accessed/awaited via the Sender() channel. The intended usage pattern is that handlers for incoming messages are registered once, while components that need to send messages should re-acquire a new sender each time the old one fails. If send logic cares about auth server version, make sure to re-check the version *for each* sender, since different streams may be connected to different auth servers.

func NewDownstreamHandle

NewDownstreamHandle creates a new downstream inventory control handle which will create control streams via the supplied create func and manage hello exchange with the supplied upstream hello.

type DownstreamPingHandler

type DownstreamPingHandler func(sender DownstreamSender, msg proto.DownstreamInventoryPing)

DownstreamPingHandler is a function that handles ping messages that come down the inventory control stream.

type DownstreamSender

type DownstreamSender interface {
	// Send sends a message up the control stream.
	Send(ctx context.Context, msg proto.UpstreamInventoryMessage) error
	// Hello gets the cached downstream hello that was sent by the auth server
	// when the stream was initialized.
	Hello() proto.DownstreamInventoryHello
	// Done signals closure of the underlying stream.
	Done() <-chan struct{}
}

DownstreamSender is a send-only reference to the downstream half of an inventory control stream. Components that require use of the inventory control stream should accept a DownstreamHandle instead, and take a reference to the sender via the Sender() method.

type InstanceStateRef

type InstanceStateRef struct {
	QualifiedPendingControlLog   []types.InstanceControlLogEntry
	UnqualifiedPendingControlLog []types.InstanceControlLogEntry
	LastHeartbeat                types.Instance
}

InstanceStateRef is a helper used to present a copy of the public subset of instanceStateTracker. Used by the VisitInstanceState helper to show callers the current state without risking concurrency issues due to misuse.

type InstanceStateUpdate

type InstanceStateUpdate struct {
	QualifiedPendingControlLog   []types.InstanceControlLogEntry
	UnqualifiedPendingControlLog []types.InstanceControlLogEntry
}

InstanceStateUpdate encodes additional pending control log entries that should be included in future heartbeats. Used by the VisitInstanceState helper to provide a mechanism of appending to the primary pending queues without risking concurrency issues due to misuse.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a sharded key-value store that manages inventory control handles.

note: the sharding here may not really be necessary. sharding does improve perf under high combined read/write load, but perf isn't terrible without sharding (~2.5s vs ~0.5s in the basic benchmark). we've previously seen outages due to contention on a similar structure in the event fanout system, and I opted to shard here as well since the expected load on startup is similar to that system (though the fanout system performs more memory allocation under lock, which I suspect is why it has worse single-lock perf despite being otherwise quite similar).

func NewStore

func NewStore() *Store

NewStore creates a new inventory control handle store.

func (*Store) Get

func (s *Store) Get(serverID string) (handle UpstreamHandle, ok bool)

Get attempts to load a handle for the given server ID. note: if multiple handles exist for a given server, the returned handle is selected pseudorandomly from the available set.

func (*Store) Insert

func (s *Store) Insert(handle UpstreamHandle)

Insert adds a new handle to the store.

func (*Store) Iter

func (s *Store) Iter(fn func(UpstreamHandle))

Iter iterates across all handles registered with this store. note: if multiple handles are registered for a given server, only one handle is selected pseudorandomly to be observed.

func (*Store) Len

func (s *Store) Len() int

Len returns the count of currently registered servers (servers with multiple handles registered still only count as one).

func (*Store) Remove

func (s *Store) Remove(handle UpstreamHandle)

Remove removes the handle from the store.

type UpstreamHandle

type UpstreamHandle interface {
	client.UpstreamInventoryControlStream
	// Hello gets the cached upstream hello that was used to initialize the stream.
	Hello() proto.UpstreamInventoryHello

	// AgentMetadata is the service's metadata: OS, glibc version, install methods, ...
	AgentMetadata() proto.UpstreamInventoryAgentMetadata

	Ping(ctx context.Context, id uint64) (d time.Duration, err error)
	// HasService is a helper for checking if a given service is associated with this
	// stream.
	HasService(types.SystemRole) bool

	// VisitInstanceState runs the provided closure against a representation of the most
	// recently observed instance state, plus any pending control log entries. The returned
	// value may optionally include additional control log entries to add to the pending
	// queues. Inputs and outputs are deep copied to avoid concurrency issues. See the InstanceStateTracker
	// for an explanation of how this system works.
	VisitInstanceState(func(ref InstanceStateRef) InstanceStateUpdate)

	// HeartbeatInstance triggers an early instance heartbeat. This function does not
	// wait for the instance heartbeat to actually be completed, so calling this and then
	// immediately locking the instanceStateTracker will likely result in observing the
	// pre-heartbeat state.
	HeartbeatInstance()
	// UpdateLabels updates the labels on the instance.
	UpdateLabels(ctx context.Context, kind proto.LabelUpdateKind, labels map[string]string) error
}

UpstreamHandle is the primary mechanism for interacting with a fully initialized upstream control stream. The hello message cached in this handle has already passed through the auth layer, meaning that it represents the verified identity and capabilities of the remote entity.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL