informer

package
v0.0.0-...-a134451 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrFIFOClosed = xerrors.New("DeltaFIFO: manipulating with closed queue")

ErrFIFOClosed used when FIFO is closed

View Source
var ErrNotFound = xerrors.New("not found")
View Source
var (
	// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
	// object with zero length is encountered (should be impossible,
	// but included for completeness).
	ErrZeroLengthDeltasObject = xerrors.New("0 length Deltas object; can't get key")
)

Functions

func DefaultWatchErrorHandler

func DefaultWatchErrorHandler(r *Reflector, err error)

DefaultWatchErrorHandler is the default implementation of WatchErrorHandler

func DeletionHandlingMetaNamespaceKeyFunc

func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error)

DeletionHandlingMetaNamespaceKeyFunc checks for DeletedFinalStateUnknown objects before calling MetaNamespaceKeyFunc.

func IsTooManyRequests

func IsTooManyRequests(_ error) bool

func ListAll

func ListAll(store Store, selector interface{}, appendFn AppendFunc) error

ListAll calls appendFn with each value retrieved from store which matches the selector.

func MetaNamespaceKeyFunc

func MetaNamespaceKeyFunc(obj interface{}) (string, error)

func Pop

func Pop(queue Queue) interface{}

Pop is helper function for popping from Queue. WARNING: Do NOT use this function in non-test code to avoid races unless you really really really really know what you are doing.

func WaitForCacheSync

func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool

WaitForCacheSync waits for caches to populate. It returns true if it was successful, false if the controller should shutdown callers should prefer WaitForNamedCacheSync()

func WaitForNamedCacheSync

func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool

WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages indicating that the caller identified by name is waiting for syncs, followed by either a successful or failed sync.

Types

type AppendFunc

type AppendFunc func(interface{})

AppendFunc is used to add a matching item to whatever list the caller is using

type Config

type Config struct {
	// The queue for your objects - has to be a DeltaFIFO due to
	// assumptions in the implementation. Your Process() function
	// should accept the output of this Queue's Pop() method.
	Queue

	// Something that can list and watch your objects.
	ListerWatcher

	// Something that can process a popped Deltas.
	Process ProcessFunc

	// ObjectType is an example object of the type this controller is
	// expected to handle.  Only the type needs to be right, except
	// that when that is `unstructured.Unstructured` the object's
	// `"apiVersion"` and `"kind"` must also be right.
	ObjectType runtime.Object

	// FullResyncPeriod is the period at which ShouldResync is considered.
	FullResyncPeriod time.Duration

	// ShouldResync is periodically used by the reflector to determine
	// whether to Resync the Queue. If ShouldResync is `nil` or
	// returns true, it means the reflector should proceed with the
	// resync.
	ShouldResync ShouldResyncFunc

	// If true, when Process() returns an error, re-enqueue the object.
	// add interface to let you inject a delay/backoff or drop
	//       the object completely if desired. Pass the object in
	//       question to this interface as a parameter.  This is probably moot
	//       now that this functionality appears at a higher level.
	RetryOnError bool

	// Called whenever the ListAndWatch drops the connection with an error.
	WatchErrorHandler WatchErrorHandler
}

Config contains all the settings for one of these low-level controllers.

type Controller

type Controller interface {
	// Run does two things.  One is to construct and run a Reflector
	// to pump objects/notifications from the Config's ListerWatcher
	// to the Config's Queue and possibly invoke the occasional Resync
	// on that Queue.  The other is to repeatedly Pop from the Queue
	// and process with the Config's ProcessFunc.  Both of these
	// continue until `stopCh` is closed.
	Run(stopCh <-chan struct{})

	// HasSynced delegates to the Config's Queue
	HasSynced() bool

	// LastSyncResourceVersion delegates to the Reflector when there
	// is one, otherwise returns the empty string
	LastSyncResourceVersion() string
}

Controller is a low-level controller that is parameterized by a Config and used in sharedIndexInformer.

func New

func New(config *Config) Controller

type DeletedFinalStateUnknown

type DeletedFinalStateUnknown struct {
	Key string
	Obj interface{}
}

DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object was deleted but the watch deletion event was missed while disconnected from apiserver. In this case we don't know the final "resting" state of the object, so there's a chance the included `Obj` is stale.

type Delta

type Delta struct {
	Type   DeltaType
	Object interface{}
}

Delta is a member of Deltas (a list of Delta objects) which in its turn is the type stored by a DeltaFIFO. It tells you what change happened, and the object's state after* that change.

[*] Unless the change is a deletion, and then you'll get the final state of the object before it was deleted.

type DeltaFIFO

type DeltaFIFO struct {
	// contains filtered or unexported fields
}

DeltaFIFO is like FIFO, but differs in two ways. One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.

The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.

DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.

DeltaFIFO solves this use case:

  • You want to process every object change (delta) at most once.
  • When you process an object, you want to see everything that's happened to it since you last processed it.
  • You want to process the deletion of some of the objects.
  • You might want to periodically reprocess objects.

DeltaFIFO's Pop(), Get(), and GetByKey() methods return interface{} to satisfy the Store/Queue interfaces, but they will always return an object of type Deltas. List() returns the newest object from each accumulator in the FIFO.

A DeltaFIFO's knownObjects KeyListerGetter provides the abilities to list Store keys and to get objects by Store key. The objects in question are called "known objects" and this set of objects modifies the behavior of the Delete, Replace, and Resync methods (each in a different way).

A note on threading: If you call Pop() in parallel from multiple threads, you could end up with multiple threads processing slightly different versions of the same object.

func NewDeltaFIFO deprecated

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO

NewDeltaFIFO returns a Queue which can be used to process changes to items.

keyFunc is used to figure out what key an object should have. (It is exposed in the returned DeltaFIFO's KeyOf() method, with additional handling around deleted objects and queue state).

'knownObjects' may be supplied to modify the behavior of Delete, Replace, and Resync. It may be nil if you do not need those modifications.

consider merging keyLister with this object, tracking a list of "known" keys when Pop() is called. Have to think about how that affects error retrying.

NOTE: It is possible to misuse this and cause a race when using an
external known object source.
Whether there is a potential race depends on how the consumer
modifies knownObjects. In Pop(), process function is called under
lock, so it is safe to update data structures in it that need to be
in sync with the queue (e.g. knownObjects).

Example:
In case of sharedIndexInformer being a consumer
there is no race as knownObjects (s.indexer) is modified safely
under DeltaFIFO's lock. The only exceptions are GetStore() and
GetIndexer() methods, which expose ways to modify the underlying
storage. Currently these two methods are used for creating Lister
and internal tests.

Also see the comment on DeltaFIFO.

Warning: This constructs a DeltaFIFO that does not differentiate between events caused by a call to Replace (e.g., from a relist, which may contain object updates), and synthetic events caused by a periodic resync (which just emit the existing object). See https://issue.k8s.io/86015 for details.

Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` instead to receive a `Replaced` event depending on the type.

Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})

func NewDeltaFIFOWithOptions

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO

NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to items. See also the comment on DeltaFIFO.

func (*DeltaFIFO) Add

func (f *DeltaFIFO) Add(obj interface{}) error

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*DeltaFIFO) AddIfNotPresent

func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error

AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.

This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.

Important: obj must be a Deltas (the output of the Pop() function). Yes, this is different from the Add/Update/Delete functions.

func (*DeltaFIFO) Close

func (f *DeltaFIFO) Close()

Close the queue.

func (*DeltaFIFO) Delete

func (f *DeltaFIFO) Delete(obj interface{}) error

Delete is just like Add, but makes a Deleted Delta. If the given object does not already exist, it will be ignored. (It may have already been deleted by a Replace (re-list), for example.) In this method `f.knownObjects`, if not nil, provides (via GetByKey) _additional_ objects that are considered to already exist.

func (*DeltaFIFO) Get

func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error)

Get returns the complete list of deltas for the requested item, or sets exists=false. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) GetByKey

func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error)

GetByKey returns the complete list of deltas for the requested item, setting exists=false if that list is empty. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) HasSynced

func (f *DeltaFIFO) HasSynced() bool

HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.

func (*DeltaFIFO) IsClosed

func (f *DeltaFIFO) IsClosed() bool

IsClosed checks if the queue is closed

func (*DeltaFIFO) KeyOf

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error)

KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or DeletedFinalStateUnknown objects.

func (*DeltaFIFO) List

func (f *DeltaFIFO) List() []interface{}

List returns a list of all the items; it returns the object from the most recent Delta. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) ListKeys

func (f *DeltaFIFO) ListKeys() []string

ListKeys returns a list of all the keys of the objects currently in the FIFO.

func (*DeltaFIFO) Pop

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error)

Pop blocks until the queue has some items, and then returns one. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is returned, so if you don't successfully process it, you need to add it back with AddIfNotPresent(). process function is called under lock, so it is safe to update data structures in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc may return an instance of ErrRequeue with a nested error to indicate the current item should be requeued (equivalent to calling AddIfNotPresent under the lock). process should avoid expensive I/O operation so that other queue operations, i.e. Add() and Get(), won't be blocked for too long.

Pop returns a 'Deltas', which has a complete list of all the things that happened to the object (deltas) while it was sitting in the queue.

func (*DeltaFIFO) Replace

func (f *DeltaFIFO) Replace(list []interface{}, _ string) error

Replace atomically does two things: (1) it adds the given objects using the Sync or Replace DeltaType and then (2) it does some deletions. In particular: for every pre-existing key K that is not the key of an object in `list` there is the effect of `Delete(DeletedFinalStateUnknown{K, O})` where O is current object of K. If `f.knownObjects == nil` then the pre-existing keys are those in `f.items` and the current object of K is the `.Newest()` of the Deltas associated with K. Otherwise the pre-existing keys are those listed by `f.knownObjects` and the current object of K is what `f.knownObjects.GetByKey(K)` returns.

func (*DeltaFIFO) Resync

func (f *DeltaFIFO) Resync() error

Resync adds, with a Sync type of Delta, every object listed by `f.knownObjects` whose key is not already queued for processing. If `f.knownObjects` is `nil` then Resync does nothing.

func (*DeltaFIFO) Update

func (f *DeltaFIFO) Update(obj interface{}) error

Update is just like Add, but makes an Updated Delta.

type DeltaFIFOOptions

type DeltaFIFOOptions struct {

	// KeyFunction is used to figure out what key an object should have. (It's
	// exposed in the returned DeltaFIFO's KeyOf() method, with additional
	// handling around deleted objects and queue state).
	// Optional, the default is MetaNamespaceKeyFunc.
	KeyFunction KeyFunc

	// KnownObjects is expected to return a list of keys that the consumer of
	// this queue "knows about". It is used to decide which items are missing
	// when Replace() is called; 'Deleted' deltas are produced for the missing items.
	// KnownObjects may be nil if you can tolerate missing deletions on Replace().
	KnownObjects KeyListerGetter

	// EmitDeltaTypeReplaced indicates that the queue consumer
	// understands the Replaced DeltaType. Before the `Replaced` event type was
	// added, calls to Replace() were handled the same as Sync(). For
	// backwards-compatibility purposes, this is false by default.
	// When true, `Replaced` events will be sent for items passed to a Replace() call.
	// When false, `Sync` events will be sent instead.
	EmitDeltaTypeReplaced bool
}

DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are optional.

type DeltaType

type DeltaType string

DeltaType is the type of a change (addition, deletion, etc)

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// Replaced is emitted when we encountered watch errors and had to do a
	// relist. We don't know if the replaced object has changed.
	//
	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
	// as well. Hence, Replaced is only emitted when the option
	// EmitDeltaTypeReplaced is true.
	Replaced DeltaType = "Replaced"
	// Sync is for synthetic events during a periodic resync.
	Sync DeltaType = "Sync"
)

Change type definition

type Deltas

type Deltas []Delta

Deltas is a list of one or more 'Delta's to an individual object. The oldest delta is at index 0, the newest delta is the last one.

func (Deltas) Newest

func (d Deltas) Newest() *Delta

Newest is a convenience function that returns the newest delta, or nil if there are no deltas.

func (Deltas) Oldest

func (d Deltas) Oldest() *Delta

Oldest is a convenience function that returns the oldest delta, or nil if there are no deltas.

type ErrRequeue

type ErrRequeue struct {
	// Err is returned by the Pop function
	Err error
}

ErrRequeue may be returned by a PopProcessFunc to safely requeue the current item. The value of Err will be returned from Pop.

func (ErrRequeue) Error

func (e ErrRequeue) Error() string

type ExplicitKey

type ExplicitKey string

ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for the object but not the object itself.

type FIFO

type FIFO struct {
	// contains filtered or unexported fields
}

FIFO is a Queue in which (a) each accumulator is simply the most recently provided object and (b) the collection of keys to process is a FIFO. The accumulators all start out empty, and deleting an object from its accumulator empties the accumulator. The Resync operation is a no-op.

Thus: if multiple adds/updates of a single object happen while that object's key is in the queue before it has been processed then it will only be processed once, and when it is processed the most recent version will be processed. This can't be done with a channel

FIFO solves this use case:

  • You want to process every object (exactly) once.
  • You want to process the most recent version of the object when you process it.
  • You do not want to process deleted objects, they should be removed from the queue.
  • You do not want to periodically reprocess objects.

Compare with DeltaFIFO for other use cases.

func NewFIFO

func NewFIFO(keyFunc KeyFunc) *FIFO

NewFIFO returns a Store which can be used to queue up items to process.

func (*FIFO) Add

func (f *FIFO) Add(obj interface{}) error

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*FIFO) AddIfNotPresent

func (f *FIFO) AddIfNotPresent(obj interface{}) error

AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.

This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.

func (*FIFO) Close

func (f *FIFO) Close()

Close the queue.

func (*FIFO) Delete

func (f *FIFO) Delete(obj interface{}) error

Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not the order in which they were created/added.

func (*FIFO) Get

func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error)

Get returns the requested item, or sets exists=false.

func (*FIFO) GetByKey

func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error)

GetByKey returns the requested item, or sets exists=false.

func (*FIFO) HasSynced

func (f *FIFO) HasSynced() bool

HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.

func (*FIFO) IsClosed

func (f *FIFO) IsClosed() bool

IsClosed checks if the queue is closed

func (*FIFO) List

func (f *FIFO) List() []interface{}

List returns a list of all the items.

func (*FIFO) ListKeys

func (f *FIFO) ListKeys() []string

ListKeys returns a list of all the keys of the objects currently in the FIFO.

func (*FIFO) Pop

func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error)

Pop waits until an item is ready and processes it. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is processed, so if you don't successfully process it, it should be added back with AddIfNotPresent(). process function is called under lock, so it is safe update data structures in it that need to be in sync with the queue.

func (*FIFO) Replace

func (f *FIFO) Replace(list []interface{}, _ string) error

Replace will delete the contents of 'f', using instead the given map. 'f' takes ownership of the map, you should not reference the map again after calling this function. f's queue is reset, too; upon return, it will contain the items in the map, in no particular order.

func (*FIFO) Resync

func (f *FIFO) Resync() error

Resync will ensure that every object in the Store has its key in the queue. This should be a no-op, because that property is maintained by all operations.

func (*FIFO) Update

func (f *FIFO) Update(obj interface{}) error

Update is the same as Add in this implementation.

type FilteringResourceEventHandler

type FilteringResourceEventHandler struct {
	FilterFunc func(obj interface{}) bool
	Handler    ResourceEventHandler
}

FilteringResourceEventHandler applies the provided filter to all events coming in, ensuring the appropriate nested handler method is invoked. An object that starts passing the filter after an update is considered an add, and an object that stops passing the filter after an update is considered a delete. Like the handlers, the filter MUST NOT modify the objects it is given.

func (FilteringResourceEventHandler) OnAdd

func (r FilteringResourceEventHandler) OnAdd(obj interface{})

OnAdd calls the nested handler only if the filter succeeds

func (FilteringResourceEventHandler) OnDelete

func (r FilteringResourceEventHandler) OnDelete(obj interface{})

OnDelete calls the nested handler only if the filter succeeds

func (FilteringResourceEventHandler) OnUpdate

func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{})

OnUpdate ensures the proper handler is called depending on whether the filter matches

type GenericLister

type GenericLister interface {
	// List will return all objects across namespaces
	List(selector interface{}) (ret []runtime.Object, err error)
	// Get will attempt to retrieve assuming that name==key
	Get(name string) (runtime.Object, error)
}

GenericLister is a lister skin on a generic Indexer

func NewGenericLister

func NewGenericLister(indexer Indexer, resource schema.GroupResource) GenericLister

NewGenericLister creates a new instance for the genericLister.

type Index

type Index map[string]sets.String

Index maps the indexed value to a set of keys in the store that match on that value

type IndexFunc

type IndexFunc func(obj interface{}) ([]string, error)

IndexFunc knows how to compute the set of indexed values for an object.

type Indexer

type Indexer interface {
	Store
	// Index returns the stored objects whose set of indexed values
	// intersects the set of indexed values of the given object, for
	// the named index
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the storage keys of the stored objects whose
	// set of indexed values for the named index includes the given
	// indexed value
	IndexKeys(indexName, indexedValue string) ([]string, error)
	// ListIndexFuncValues returns all the indexed values of the given index
	ListIndexFuncValues(indexName string) []string
	// ByIndex returns the stored objects whose set of indexed values
	// for the named index includes the given indexed value
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	// GetIndexers return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

Indexer extends Store with multiple indices and restricts each accumulator to simply hold the current object (and be empty after Delete).

There are three kinds of strings here:

  1. a storage key, as defined in the Store interface,
  2. a name of an index, and
  3. an "indexed value", which is produced by an IndexFunc and can be a field value or any other string computed from the object.

func NewIndexer

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer

NewIndexer returns an Indexer implemented simply with a map and a lock.

type Indexers

type Indexers map[string]IndexFunc

Indexers maps a name to an IndexFunc

type Indices

type Indices map[string]Index

Indices maps a name to an Index

type InformerSynced

type InformerSynced func() bool

InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.

type KeyError

type KeyError struct {
	Obj interface{}
	Err error
}

KeyError will be returned any time a KeyFunc gives an error; it includes the object at fault.

func (KeyError) Error

func (k KeyError) Error() string

Error gives a human-readable description of the error.

func (KeyError) Unwrap

func (k KeyError) Unwrap() error

Unwrap implements errors.Unwrap

type KeyFunc

type KeyFunc func(obj interface{}) (string, error)

KeyFunc knows how to make a key from an object. Implementations should be deterministic.

type KeyGetter

type KeyGetter interface {
	// GetByKey returns the value associated with the key, or sets exists=false.
	GetByKey(key string) (value interface{}, exists bool, err error)
}

A KeyGetter is anything that knows how to get the value stored under a given key.

type KeyLister

type KeyLister interface {
	ListKeys() []string
}

A KeyLister is anything that knows how to list its keys.

type KeyListerGetter

type KeyListerGetter interface {
	KeyLister
	KeyGetter
}

A KeyListerGetter is anything that knows how to list its keys and look up by key.

type ListFunc

type ListFunc func(options meta.ListOptions) (runtime.Object, error)

ListFunc knows how to list resources

type ListWatch

type ListWatch struct {
	ListFunc  ListFunc
	WatchFunc WatchFunc
	// DisableChunking requests no chunking for this list watcher.
	DisableChunking bool
}

ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. It is a convenience function for users of NewReflector, etc. ListFunc and WatchFunc must not be nil

func (*ListWatch) List

func (lw *ListWatch) List(options meta.ListOptions) (runtime.Object, error)

List a set of apiserver resources

func (*ListWatch) Watch

func (lw *ListWatch) Watch(options meta.ListOptions) (watch.Interface, error)

Watch a set of apiserver resources

type Lister

type Lister interface {
	// List should return a list type object; the Items field will be extracted, and the
	// ResourceVersion field will be used to start the watch in the right place.
	List(options meta.ListOptions) (runtime.Object, error)
}

Lister is any object that knows how to perform an initial list.

type ListerWatcher

type ListerWatcher interface {
	Lister
	Watcher
}

ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.

type MutationDetector

type MutationDetector interface {
	// AddObject adds the given object to the set being monitored for a while from now
	AddObject(obj interface{})

	// Run starts the monitoring and does not return until the monitoring is stopped.
	Run(stopCh <-chan struct{})
}

MutationDetector is able to monitor objects for mutation within a limited window of time

func NewCacheMutationDetector

func NewCacheMutationDetector(name string) MutationDetector

NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.

type PopProcessFunc

type PopProcessFunc func(interface{}) error

PopProcessFunc is passed to Pop() method of Queue interface. It is supposed to process the accumulator popped from the queue.

type ProcessFunc

type ProcessFunc func(obj interface{}) error

ProcessFunc processes a single object.

type Queue

type Queue interface {
	Store

	// Pop blocks until there is at least one key to process or the
	// Queue is closed.  In the latter case Pop returns with an error.
	// In the former case Pop atomically picks one key to process,
	// removes that (key, accumulator) association from the Store, and
	// processes the accumulator.  Pop returns the accumulator that
	// was processed and the result of processing.  The PopProcessFunc
	// may return an ErrRequeue{inner} and in this case Pop will (a)
	// return that (key, accumulator) association to the Queue as part
	// of the atomic processing and (b) return the inner error from
	// Pop.
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent puts the given accumulator into the Queue (in
	// association with the accumulator's key) if and only if that key
	// is not already associated with a non-empty accumulator.
	AddIfNotPresent(interface{}) error

	// HasSynced returns true if the first batch of keys have all been
	// popped.  The first batch of keys are those of the first Replace
	// operation if that happened before any Add, AddIfNotPresent,
	// Update, or Delete; otherwise the first batch is empty.
	HasSynced() bool

	// Close the queue
	Close()
}

Queue extends Store with a collection of Store keys to "process". Every Add, Update, or Delete may put the object's key in that collection. A Queue has a way to derive the corresponding key given an accumulator. A Queue can be accessed concurrently from multiple goroutines. A Queue can be "closed", after which Pop operations return an error.

type Reflector

type Reflector struct {

	// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
	ShouldResync func() bool
	// contains filtered or unexported fields
}

func NewNamedReflector

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector

func NewReflector

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector

func (*Reflector) LastSyncResourceVersion

func (r *Reflector) LastSyncResourceVersion() string

func (*Reflector) ListAndWatch

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error

func (*Reflector) Run

func (r *Reflector) Run(stopCh <-chan struct{})

type ResourceEventHandler

type ResourceEventHandler interface {
	OnAdd(obj interface{})
	OnUpdate(oldObj, newObj interface{})
	OnDelete(obj interface{})
}

ResourceEventHandler can handle notifications for events that happen to a resource. The events are informational only, so you can't return an error. The handlers MUST NOT modify the objects received; this concerns not only the top level of structure but all the data structures reachable from it.

  • OnAdd is called when an object is added.
  • OnUpdate is called when an object is modified. Note that oldObj is the last known state of the object-- it is possible that several changes were combined together, so you can't use this to see every single change. OnUpdate is also called when a re-list happens, and it will get called even if nothing changed. This is useful for periodically evaluating or syncing something.
  • OnDelete will get the final state of the item if it is known, otherwise it will get an object of type DeletedFinalStateUnknown. This can happen if the watch is closed and misses the delete event and we don't notice the deletion until the subsequent re-list.

type ResourceEventHandlerFuncs

type ResourceEventHandlerFuncs struct {
	AddFunc    func(obj interface{})
	UpdateFunc func(oldObj, newObj interface{})
	DeleteFunc func(obj interface{})
}

ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or as few of the notification functions as you want while still implementing ResourceEventHandler. This adapter does not remove the prohibition against modifying the objects.

func (ResourceEventHandlerFuncs) OnAdd

func (r ResourceEventHandlerFuncs) OnAdd(obj interface{})

OnAdd calls AddFunc if it's not nil.

func (ResourceEventHandlerFuncs) OnDelete

func (r ResourceEventHandlerFuncs) OnDelete(obj interface{})

OnDelete calls DeleteFunc if it's not nil.

func (ResourceEventHandlerFuncs) OnUpdate

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{})

OnUpdate calls UpdateFunc if it's not nil.

type ResourceVersionUpdater

type ResourceVersionUpdater interface {
	// UpdateResourceVersion is called each time current resource version of the reflector
	// is updated.
	UpdateResourceVersion(resourceVersion string)
}

ResourceVersionUpdater is an interface that allows store implementation to track the current resource version of the reflector. This is especially important if storage bookmarks are enabled.

type SharedIndexInformer

type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

SharedIndexInformer provides add and get Indexers ability based on SharedInformer.

func NewSharedIndexInformer

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer

type SharedInformer

type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
	// period.  Events to a single handler are delivered sequentially, but there is no coordination
	// between different handlers.
	AddEventHandler(handler ResourceEventHandler)
	// AddEventHandlerWithResyncPeriod adds an event handler to the
	// shared informer with the requested resync period; zero means
	// this handler does not care about resyncs.  The resync operation
	// consists of delivering to the handler an update notification
	// for every object in the informer's local cache; it does not add
	// any interactions with the authoritative storage.  Some
	// informers do no resyncs at all, not even for handlers added
	// with a non-zero resyncPeriod.  For an informer that does
	// resyncs, and for each handler that requests resyncs, that
	// informer develops a nominal resync period that is no shorter
	// than the requested period but may be longer.  The actual time
	// between any two resyncs may be longer than the nominal period
	// because the implementation takes time to do work and there may
	// be competing load and scheduling noise.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// GetStore returns the informer's local cache as a Store.
	GetStore() Store
	// GetController is deprecated, it does nothing useful
	GetController() Controller
	// Run starts and runs the shared informer, returning after it stops.
	// The informer will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has been
	// informed by at least one full LIST of the authoritative state
	// of the informer's object collection.  This is unrelated to "resync".
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string

	// SetWatchErrorHandler
	// The WatchErrorHandler is called whenever ListAndWatch drops the
	// connection with an error. After calling this handler, the informer
	// will backoff and retry.
	//
	// The default implementation looks at the error type and tries to log
	// the error message at an appropriate level.
	//
	// There's only one handler, so if you call this multiple times, last one
	// wins; calling after the informer has been started returns an error.
	//
	// The handler is intended for visibility, not to e.g. pause the consumers.
	// The handler should return quickly - any expensive processing should be
	// offloaded.
	SetWatchErrorHandler(handler WatchErrorHandler) error

	// SetTransform
	// The TransformFunc is called for each object which is about to be stored.
	//
	// This function is intended for you to take the opportunity to
	// remove, transform, or normalize fields. One use case is to strip unused
	// metadata fields out of objects to save on RAM cost.
	//
	// Must be set before starting the informer.
	//
	// Note: Since the object given to the handler may be already shared with
	//	other goroutines, it is advisable to copy the object being
	//  transform before mutating it at all and returning the copy to prevent
	//	data races.
	SetTransform(handler TransformFunc) error
}

type ShouldResyncFunc

type ShouldResyncFunc func() bool

ShouldResyncFunc is a type of function that indicates if a reflector should perform a resync or not. It can be used by a shared informer to support multiple event handlers with custom resync periods.

type Store

type Store interface {

	// Add adds the given object to the accumulator associated with the given object's key
	Add(obj interface{}) error

	// Update updates the given object in the accumulator associated with the given object's key
	Update(obj interface{}) error

	// Delete deletes the given object from the accumulator associated with the given object's key
	Delete(obj interface{}) error

	// List returns a list of all the currently non-empty accumulators
	List() []interface{}

	// ListKeys returns a list of all the keys currently associated with non-empty accumulators
	ListKeys() []string

	// Get returns the accumulator associated with the given object's key
	Get(obj interface{}) (item interface{}, exists bool, err error)

	// GetByKey returns the accumulator associated with the given key
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error

	// Resync is meaningless in the terms appearing here but has
	// meaning in some implementations that have non-trivial
	// additional behavior (e.g., DeltaFIFO).
	Resync() error
}

Store is a generic object storage and processing interface. A Store holds a map from string keys to accumulators, and has operations to add, update, and delete a given object to/from the accumulator currently associated with a given key. A Store also knows how to extract the key from a given object, so many operations are given only the object.

In the simplest Store implementations each accumulator is simply the last given object, or empty after Delete, and thus the Store's behavior is simple storage.

Reflector knows how to watch a server and update a Store. This package provides a variety of implementations of Store.

func NewStore

func NewStore(keyFunc KeyFunc) Store

NewStore returns a Store implemented simply with a map and a lock.

type ThreadSafeStore

type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexedValue string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	// Resync is a no-op and is deprecated
	Resync() error
}

ThreadSafeStore is an interface that allows concurrent indexed access to a storage backend. It is like Indexer but does not (necessarily) know how to extract the Store key from a given object.

TL;DR caveats: you must not modify anything returned by Get or List as it will break the indexing feature in addition to not being thread safe.

The guarantees of thread safety provided by List/Get are only valid if the caller treats returned items as read-only. For example, a pointer inserted in the store through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` on the same key and modify the pointer in a non-thread-safe way. Also note that modifying objects stored by the indexers (if any) will *not* automatically lead to a re-index. So it's not a good idea to directly modify the objects returned by Get/List, in general.

func NewThreadSafeStore

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore

NewThreadSafeStore creates a new instance of ThreadSafeStore.

type TransformFunc

type TransformFunc func(interface{}) (interface{}, error)

TransformFunc allows for transforming an object before it will be processed and put into the controller cache and before the corresponding handlers will be called on it. TransformFunc (similarly to ResourceEventHandler functions) should be able to correctly handle the tombstone of type cache.DeletedFinalStateUnknown

The most common usage pattern is to clean-up some parts of the object to reduce component memory usage if a given component doesn't care about them. given controller doesn't care for them

type WatchErrorHandler

type WatchErrorHandler func(r *Reflector, err error)

The WatchErrorHandler is called whenever ListAndWatch drops the connection with an error. After calling this handler, the informer will backoff and retry.

The default implementation looks at the error type and tries to log the error message at an appropriate level.

Implementations of this handler may display the error message in other ways. Implementations should return quickly - any expensive processing should be offloaded.

type WatchFunc

type WatchFunc func(options meta.ListOptions) (watch.Interface, error)

WatchFunc knows how to watch resources

type Watcher

type Watcher interface {
	// Watch should begin a watch at the specified version.
	Watch(options meta.ListOptions) (watch.Interface, error)
}

Watcher is any object that knows how to start a watch on a resource.

Directories

Path Synopsis
listers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL