store

package
v1.15.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2024 License: Apache-2.0 Imports: 20 Imported by: 23

Documentation

Overview

Package store implements a shared store backed by a kvstore or similar with the following properties:

  • A single type is used to represent all keys
  • Any number of collaborators can join the store. Typically a collaborator is an individual Cilium agent running on each node.
  • All collaborators can own and contribute keys to the store. Each key is owned by exactly one collaborator. It is the responsibility of each collaborator to pick a key name which is guaranteed to be unique.
  • All collaborate desire to see all keys within the scope of a store. The scope of the store is defined by a common key prefix. For this purpose, each collaborator maintains a local cache of all keys in the store by subscribing to change events.

Index

Constants

This section is empty.

Variables

View Source
var Cell = cell.Module(
	"kvstore-utils",
	"Provides factory for kvstore related synchronizers",

	cell.Provide(NewFactory),

	cell.Metric(MetricsProvider),
)

Functions

This section is empty.

Types

type Configuration

type Configuration struct {
	// Prefix is the key prefix of the store shared by all keys. The prefix
	// is the unique identification of the store. Multiple collaborators
	// connected to the same kvstore cluster configuring stores with
	// matching prefixes will automatically form a shared store. This
	// parameter is required.
	Prefix string

	// SynchronizationInterval is the interval in which locally owned keys
	// are synchronized with the kvstore. This parameter is optional.
	SynchronizationInterval time.Duration

	// SharedKeyDeleteDelay is the delay before a shared key delete is
	// handled. This parameter is optional, and defaults to 0 if unset.
	SharedKeyDeleteDelay time.Duration

	// KeyCreator is called to allocate a Key instance when a new shared
	// key is discovered. This parameter is required.
	KeyCreator KeyCreator

	// Backend is the kvstore to use as a backend. If no backend is
	// specified, kvstore.Client() is being used.
	Backend kvstore.BackendOperations

	// Observer is the observe that will receive events on key mutations
	Observer Observer

	Context context.Context
}

Configuration is the set of configuration parameters of a shared store.

type Factory

type Factory interface {
	NewSyncStore(clusterName string, backend SyncStoreBackend, prefix string, opts ...WSSOpt) SyncStore
	NewWatchStore(clusterName string, keyCreator KeyCreator, observer Observer, opts ...RWSOpt) WatchStore
	NewWatchStoreManager(backend WatchStoreBackend, clusterName string) WatchStoreManager
}

func NewFactory

func NewFactory(storeMetrics *Metrics) Factory

type KVPair

type KVPair struct{ Key, Value string }

KVPair represents a basic implementation of the LocalKey interface

func NewKVPair

func NewKVPair(key, value string) *KVPair

func (*KVPair) DeepKeyCopy

func (kv *KVPair) DeepKeyCopy() LocalKey

func (*KVPair) GetKeyName

func (kv *KVPair) GetKeyName() string

func (*KVPair) Marshal

func (kv *KVPair) Marshal() ([]byte, error)

func (*KVPair) Unmarshal

func (kv *KVPair) Unmarshal(key string, data []byte) error

type Key

type Key interface {
	NamedKey

	// Marshal is called to retrieve the byte slice representation of the
	// data represented by the key to store it in the kvstore. The function
	// must ensure that the underlying datatype is properly locked. It is
	// typically a good idea to use json.Marshal to implement this
	// function.
	Marshal() ([]byte, error)

	// Unmarshal is called when an update from the kvstore is received. The
	// prefix configured for the store is removed from the key, and the
	// byte slice passed to the function is coming from the Marshal
	// function from another collaborator. The function must unmarshal and
	// update the underlying data type. It is typically a good idea to use
	// json.Unmarshal to implement this function.
	Unmarshal(key string, data []byte) error
}

Key is the interface that a data structure must implement in order to be stored and shared as a key in a SharedStore.

func KVPairCreator

func KVPairCreator() Key

type KeyCreator

type KeyCreator func() Key

KeyCreator is the function to create a new empty Key instances. Store collaborators must implement this interface and provide the implementation in the Configuration structure.

type LocalKey

type LocalKey interface {
	Key

	// DeepKeyCopy must return a deep copy of the key
	DeepKeyCopy() LocalKey
}

LocalKey is a Key owned by the local store instance

type Metrics

type Metrics struct {
	KVStoreSyncQueueSize        metric.Vec[metric.Gauge]
	KVStoreSyncErrors           metric.Vec[metric.Counter]
	KVStoreInitialSyncCompleted metric.Vec[metric.Gauge]
}

func MetricsProvider

func MetricsProvider() *Metrics

type NamedKey

type NamedKey interface {
	// GetKeyName must return the name of the key. The name of the key must
	// be unique within the store and stable for a particular key. The name
	// of the key must be identical across agent restarts as the keys
	// remain in the kvstore.
	GetKeyName() string
}

NamedKey is an interface that a data structure must implement in order to be deleted from a SharedStore.

type Observer

type Observer interface {
	// OnDelete is called when the key has been deleted from the shared store
	OnDelete(k NamedKey)

	// OnUpdate is called whenever a change has occurred in the data
	// structure represented by the key
	OnUpdate(k Key)
}

Observer receives events when objects in the store mutate

type RWSOpt

type RWSOpt func(*restartableWatchStore)

func RWSWithEntriesMetric

func RWSWithEntriesMetric(gauge prometheus.Gauge) RWSOpt

WSWithEntriesGauge registers a Prometheus gauge metric that is kept in sync with the number of entries synchronized from the kvstore.

func RWSWithOnSyncCallback

func RWSWithOnSyncCallback(callback func(ctx context.Context)) RWSOpt

WSWithOnSyncCallback registers a function to be executed after listing all keys from the kvstore for the first time. Multiple callback functions can be registered.

type SharedStore

type SharedStore struct {
	// contains filtered or unexported fields
}

SharedStore is an instance of a shared store. It is created with JoinSharedStore() and released with the SharedStore.Close() function.

func JoinSharedStore

func JoinSharedStore(c Configuration) (*SharedStore, error)

JoinSharedStore creates a new shared store based on the provided configuration. An error is returned if the configuration is invalid. The store is initialized with the contents of the kvstore. An error is returned if the contents cannot be retrieved synchronously from the kvstore. Starts a controller to continuously synchronize the store with the kvstore.

func (*SharedStore) Close

func (s *SharedStore) Close(ctx context.Context)

Close stops participation with a shared store and removes all keys owned by this node in the kvstore. This stops the controller started by JoinSharedStore().

func (*SharedStore) DeleteLocalKey

func (s *SharedStore) DeleteLocalKey(ctx context.Context, key NamedKey)

DeleteLocalKey removes a key from being synchronized with the kvstore

func (*SharedStore) NumEntries

func (s *SharedStore) NumEntries() int

NumEntries returns the number of entries in the store

func (*SharedStore) Release

func (s *SharedStore) Release()

Release frees all resources own by the store but leaves all keys in the kvstore intact

func (*SharedStore) SharedKeysMap

func (s *SharedStore) SharedKeysMap() map[string]Key

SharedKeysMap returns a copy of the SharedKeysMap, the returned map can be safely modified but the values of the map represent the actual data stored in the internal SharedStore SharedKeys map.

func (*SharedStore) UpdateKeySync

func (s *SharedStore) UpdateKeySync(ctx context.Context, key LocalKey, lease bool) error

UpdateKeySync synchronously synchronizes a key with the kvstore.

func (*SharedStore) UpdateLocalKeySync

func (s *SharedStore) UpdateLocalKeySync(ctx context.Context, key LocalKey) error

UpdateLocalKeySync synchronously synchronizes a local key with the kvstore and adds it to the list of local keys to be synchronized if the initial synchronous synchronization was successful

type SyncStore

type SyncStore interface {
	// Run starts the SyncStore logic, blocking until the context is closed.
	Run(ctx context.Context)

	// UpsertKey upserts a key/value pair into the kvstore.
	UpsertKey(ctx context.Context, key Key) error

	// DeleteKey removes a key from the kvstore.
	DeleteKey(ctx context.Context, key NamedKey) error

	// Synced triggers the insertion of the "synced" key associated with this
	// store into the kvstore once all upsertions already issued have completed
	// successfully, eventually executing all specified callbacks (if any).
	// Only the first invocation takes effect.
	Synced(ctx context.Context, callbacks ...func(ctx context.Context)) error
}

SyncStore abstracts the operations allowing to synchronize key/value pairs into a kvstore.

type SyncStoreBackend

type SyncStoreBackend interface {
	// Update creates or updates a key.
	Update(ctx context.Context, key string, value []byte, lease bool) error
	// Delete deletes a key.
	Delete(ctx context.Context, key string) error

	// RegisterLeaseExpiredObserver registers a function which is executed when
	// the lease associated with a key having the given prefix is detected as expired.
	RegisterLeaseExpiredObserver(prefix string, fn func(key string))
}

SyncStoreBackend represents the subset kvstore.BackendOperations leveraged by SyncStore implementations.

type WSMFunc

type WSMFunc func(context.Context)

WSMFunc if a function which can be registered in the WatchStoreManager.

type WSSOpt

type WSSOpt func(*wqSyncStore)

func WSSWithRateLimiter

func WSSWithRateLimiter(limiter workqueue.RateLimiter) WSSOpt

WSSWithRateLimiter sets the rate limiting algorithm to be used when requeueing failed events.

func WSSWithSyncedKeyOverride

func WSSWithSyncedKeyOverride(key string) WSSOpt

WSSWithSyncedKeyOverride overrides the "synced" key inserted into the kvstore when initial synchronization completed (by default it corresponds to the prefix).

func WSSWithWorkers

func WSSWithWorkers(workers uint) WSSOpt

WSSWithWorkers configures the number of workers spawned by Run() to handle update/delete operations.

func WSSWithoutLease

func WSSWithoutLease() WSSOpt

WSSWithoutLease disables attaching the lease to upserted keys.

type WatchStore

type WatchStore interface {
	// Watch starts watching the specified kvstore prefix, blocking until the context is closed.
	// Depending on the implementation, it might be executed multiple times.
	Watch(ctx context.Context, backend WatchStoreBackend, prefix string)

	// NumEntries returns the number of entries synchronized from the store.
	NumEntries() uint64

	// Synced returns whether the initial list of entries has been retrieved from
	// the kvstore, and new events are currently being watched.
	Synced() bool

	// Drain emits a deletion event for each known key. It shall be called only
	// when no watch operation is in progress.
	Drain()
}

WatchStore abstracts the operations allowing to synchronize key/value pairs from a kvstore, emitting the corresponding events.

type WatchStoreBackend

type WatchStoreBackend interface {
	// ListAndWatch creates a new watcher for the given prefix after listing the existing keys.
	ListAndWatch(ctx context.Context, prefix string, chanSize int) *kvstore.Watcher
}

WatchStoreBackend represents the subset of kvstore.BackendOperations leveraged by WatchStore implementations.

type WatchStoreManager

type WatchStoreManager interface {
	// Register registers a function associated with a given kvstore prefix.
	// It cannot be called once Run() has started.
	Register(prefix string, function WSMFunc)
	// Run starts the manager, blocking until the context is closed and all
	// started functions terminated.
	Run(ctx context.Context)
}

WatchStoreManager enables to register a set of functions to be asynchronously executed when the corresponding kvstore prefixes are synchronized (based on the implementation).

func NewWatchStoreManagerImmediate

func NewWatchStoreManagerImmediate(clusterName string) WatchStoreManager

NewWatchStoreManagerImmediate implements the WatchStoreManager interface, immediately starting the registered functions once Run() is executed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL