Documentation ¶
Index ¶
- Variables
- func DefaultWatchErrorHandler(r *Reflector, err error)
- func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error)
- func IsTooManyRequests(_ error) bool
- func ListAll(store Store, selector interface{}, appendFn AppendFunc) error
- func MetaNamespaceKeyFunc(obj interface{}) (string, error)
- func Pop(queue Queue) interface{}
- func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
- func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
- type AppendFunc
- type Config
- type Controller
- type DeletedFinalStateUnknown
- type Delta
- type DeltaFIFO
- func (f *DeltaFIFO) Add(obj interface{}) error
- func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error
- func (f *DeltaFIFO) Close()
- func (f *DeltaFIFO) Delete(obj interface{}) error
- func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error)
- func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error)
- func (f *DeltaFIFO) HasSynced() bool
- func (f *DeltaFIFO) IsClosed() bool
- func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error)
- func (f *DeltaFIFO) List() []interface{}
- func (f *DeltaFIFO) ListKeys() []string
- func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error)
- func (f *DeltaFIFO) Replace(list []interface{}, _ string) error
- func (f *DeltaFIFO) Resync() error
- func (f *DeltaFIFO) Update(obj interface{}) error
- type DeltaFIFOOptions
- type DeltaType
- type Deltas
- type ErrRequeue
- type ExplicitKey
- type FIFO
- func (f *FIFO) Add(obj interface{}) error
- func (f *FIFO) AddIfNotPresent(obj interface{}) error
- func (f *FIFO) Close()
- func (f *FIFO) Delete(obj interface{}) error
- func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error)
- func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error)
- func (f *FIFO) HasSynced() bool
- func (f *FIFO) IsClosed() bool
- func (f *FIFO) List() []interface{}
- func (f *FIFO) ListKeys() []string
- func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error)
- func (f *FIFO) Replace(list []interface{}, _ string) error
- func (f *FIFO) Resync() error
- func (f *FIFO) Update(obj interface{}) error
- type FilteringResourceEventHandler
- type GenericLister
- type Index
- type IndexFunc
- type Indexer
- type Indexers
- type Indices
- type InformerSynced
- type KeyError
- type KeyFunc
- type KeyGetter
- type KeyLister
- type KeyListerGetter
- type ListFunc
- type ListWatch
- type Lister
- type ListerWatcher
- type MutationDetector
- type PopProcessFunc
- type ProcessFunc
- type Queue
- type Reflector
- type ResourceEventHandler
- type ResourceEventHandlerFuncs
- type ResourceVersionUpdater
- type SharedIndexInformer
- type SharedInformer
- type ShouldResyncFunc
- type Store
- type ThreadSafeStore
- type TransformFunc
- type WatchErrorHandler
- type WatchFunc
- type Watcher
Constants ¶
This section is empty.
Variables ¶
var ErrFIFOClosed = xerrors.New("DeltaFIFO: manipulating with closed queue")
ErrFIFOClosed used when FIFO is closed
var ErrNotFound = xerrors.New("not found")
var ( // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas // object with zero length is encountered (should be impossible, // but included for completeness). ErrZeroLengthDeltasObject = xerrors.New("0 length Deltas object; can't get key") )
Functions ¶
func DefaultWatchErrorHandler ¶
DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
func DeletionHandlingMetaNamespaceKeyFunc ¶
DeletionHandlingMetaNamespaceKeyFunc checks for DeletedFinalStateUnknown objects before calling MetaNamespaceKeyFunc.
func IsTooManyRequests ¶
func ListAll ¶
func ListAll(store Store, selector interface{}, appendFn AppendFunc) error
ListAll calls appendFn with each value retrieved from store which matches the selector.
func MetaNamespaceKeyFunc ¶
func Pop ¶
func Pop(queue Queue) interface{}
Pop is helper function for popping from Queue. WARNING: Do NOT use this function in non-test code to avoid races unless you really really really really know what you are doing.
func WaitForCacheSync ¶
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
WaitForCacheSync waits for caches to populate. It returns true if it was successful, false if the controller should shutdown callers should prefer WaitForNamedCacheSync()
func WaitForNamedCacheSync ¶
func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages indicating that the caller identified by name is waiting for syncs, followed by either a successful or failed sync.
Types ¶
type AppendFunc ¶
type AppendFunc func(interface{})
AppendFunc is used to add a matching item to whatever list the caller is using
type Config ¶
type Config struct { // The queue for your objects - has to be a DeltaFIFO due to // assumptions in the implementation. Your Process() function // should accept the output of this Queue's Pop() method. Queue // Something that can list and watch your objects. ListerWatcher // Something that can process a popped Deltas. Process ProcessFunc // ObjectType is an example object of the type this controller is // expected to handle. Only the type needs to be right, except // that when that is `unstructured.Unstructured` the object's // `"apiVersion"` and `"kind"` must also be right. ObjectType runtime.Object // FullResyncPeriod is the period at which ShouldResync is considered. FullResyncPeriod time.Duration // ShouldResync is periodically used by the reflector to determine // whether to Resync the Queue. If ShouldResync is `nil` or // returns true, it means the reflector should proceed with the // resync. ShouldResync ShouldResyncFunc // If true, when Process() returns an error, re-enqueue the object. // add interface to let you inject a delay/backoff or drop // the object completely if desired. Pass the object in // question to this interface as a parameter. This is probably moot // now that this functionality appears at a higher level. RetryOnError bool // Called whenever the ListAndWatch drops the connection with an error. WatchErrorHandler WatchErrorHandler }
Config contains all the settings for one of these low-level controllers.
type Controller ¶
type Controller interface { // Run does two things. One is to construct and run a Reflector // to pump objects/notifications from the Config's ListerWatcher // to the Config's Queue and possibly invoke the occasional Resync // on that Queue. The other is to repeatedly Pop from the Queue // and process with the Config's ProcessFunc. Both of these // continue until `stopCh` is closed. Run(stopCh <-chan struct{}) // HasSynced delegates to the Config's Queue HasSynced() bool // LastSyncResourceVersion delegates to the Reflector when there // is one, otherwise returns the empty string LastSyncResourceVersion() string }
Controller is a low-level controller that is parameterized by a Config and used in sharedIndexInformer.
func New ¶
func New(config *Config) Controller
type DeletedFinalStateUnknown ¶
type DeletedFinalStateUnknown struct { Key string Obj interface{} }
DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object was deleted but the watch deletion event was missed while disconnected from apiserver. In this case we don't know the final "resting" state of the object, so there's a chance the included `Obj` is stale.
type Delta ¶
type Delta struct { Type DeltaType Object interface{} }
Delta is a member of Deltas (a list of Delta objects) which in its turn is the type stored by a DeltaFIFO. It tells you what change happened, and the object's state after* that change.
[*] Unless the change is a deletion, and then you'll get the final state of the object before it was deleted.
type DeltaFIFO ¶
type DeltaFIFO struct {
// contains filtered or unexported fields
}
DeltaFIFO is like FIFO, but differs in two ways. One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.
The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.
DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.
DeltaFIFO solves this use case:
- You want to process every object change (delta) at most once.
- When you process an object, you want to see everything that's happened to it since you last processed it.
- You want to process the deletion of some of the objects.
- You might want to periodically reprocess objects.
DeltaFIFO's Pop(), Get(), and GetByKey() methods return interface{} to satisfy the Store/Queue interfaces, but they will always return an object of type Deltas. List() returns the newest object from each accumulator in the FIFO.
A DeltaFIFO's knownObjects KeyListerGetter provides the abilities to list Store keys and to get objects by Store key. The objects in question are called "known objects" and this set of objects modifies the behavior of the Delete, Replace, and Resync methods (each in a different way).
A note on threading: If you call Pop() in parallel from multiple threads, you could end up with multiple threads processing slightly different versions of the same object.
func NewDeltaFIFO
deprecated
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO
NewDeltaFIFO returns a Queue which can be used to process changes to items.
keyFunc is used to figure out what key an object should have. (It is exposed in the returned DeltaFIFO's KeyOf() method, with additional handling around deleted objects and queue state).
'knownObjects' may be supplied to modify the behavior of Delete, Replace, and Resync. It may be nil if you do not need those modifications.
consider merging keyLister with this object, tracking a list of "known" keys when Pop() is called. Have to think about how that affects error retrying.
NOTE: It is possible to misuse this and cause a race when using an external known object source. Whether there is a potential race depends on how the consumer modifies knownObjects. In Pop(), process function is called under lock, so it is safe to update data structures in it that need to be in sync with the queue (e.g. knownObjects). Example: In case of sharedIndexInformer being a consumer there is no race as knownObjects (s.indexer) is modified safely under DeltaFIFO's lock. The only exceptions are GetStore() and GetIndexer() methods, which expose ways to modify the underlying storage. Currently these two methods are used for creating Lister and internal tests.
Also see the comment on DeltaFIFO.
Warning: This constructs a DeltaFIFO that does not differentiate between events caused by a call to Replace (e.g., from a relist, which may contain object updates), and synthetic events caused by a periodic resync (which just emit the existing object). See https://issue.k8s.io/86015 for details.
Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` instead to receive a `Replaced` event depending on the type.
Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
func NewDeltaFIFOWithOptions ¶
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO
NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to items. See also the comment on DeltaFIFO.
func (*DeltaFIFO) Add ¶
Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.
func (*DeltaFIFO) AddIfNotPresent ¶
AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.
This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.
Important: obj must be a Deltas (the output of the Pop() function). Yes, this is different from the Add/Update/Delete functions.
func (*DeltaFIFO) Delete ¶
Delete is just like Add, but makes a Deleted Delta. If the given object does not already exist, it will be ignored. (It may have already been deleted by a Replace (re-list), for example.) In this method `f.knownObjects`, if not nil, provides (via GetByKey) _additional_ objects that are considered to already exist.
func (*DeltaFIFO) Get ¶
Get returns the complete list of deltas for the requested item, or sets exists=false. You should treat the items returned inside the deltas as immutable.
func (*DeltaFIFO) GetByKey ¶
GetByKey returns the complete list of deltas for the requested item, setting exists=false if that list is empty. You should treat the items returned inside the deltas as immutable.
func (*DeltaFIFO) HasSynced ¶
HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.
func (*DeltaFIFO) KeyOf ¶
KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or DeletedFinalStateUnknown objects.
func (*DeltaFIFO) List ¶
func (f *DeltaFIFO) List() []interface{}
List returns a list of all the items; it returns the object from the most recent Delta. You should treat the items returned inside the deltas as immutable.
func (*DeltaFIFO) ListKeys ¶
ListKeys returns a list of all the keys of the objects currently in the FIFO.
func (*DeltaFIFO) Pop ¶
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error)
Pop blocks until the queue has some items, and then returns one. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is returned, so if you don't successfully process it, you need to add it back with AddIfNotPresent(). process function is called under lock, so it is safe to update data structures in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc may return an instance of ErrRequeue with a nested error to indicate the current item should be requeued (equivalent to calling AddIfNotPresent under the lock). process should avoid expensive I/O operation so that other queue operations, i.e. Add() and Get(), won't be blocked for too long.
Pop returns a 'Deltas', which has a complete list of all the things that happened to the object (deltas) while it was sitting in the queue.
func (*DeltaFIFO) Replace ¶
Replace atomically does two things: (1) it adds the given objects using the Sync or Replace DeltaType and then (2) it does some deletions. In particular: for every pre-existing key K that is not the key of an object in `list` there is the effect of `Delete(DeletedFinalStateUnknown{K, O})` where O is current object of K. If `f.knownObjects == nil` then the pre-existing keys are those in `f.items` and the current object of K is the `.Newest()` of the Deltas associated with K. Otherwise the pre-existing keys are those listed by `f.knownObjects` and the current object of K is what `f.knownObjects.GetByKey(K)` returns.
type DeltaFIFOOptions ¶
type DeltaFIFOOptions struct { // KeyFunction is used to figure out what key an object should have. (It's // exposed in the returned DeltaFIFO's KeyOf() method, with additional // handling around deleted objects and queue state). // Optional, the default is MetaNamespaceKeyFunc. KeyFunction KeyFunc // KnownObjects is expected to return a list of keys that the consumer of // this queue "knows about". It is used to decide which items are missing // when Replace() is called; 'Deleted' deltas are produced for the missing items. // KnownObjects may be nil if you can tolerate missing deletions on Replace(). KnownObjects KeyListerGetter // EmitDeltaTypeReplaced indicates that the queue consumer // understands the Replaced DeltaType. Before the `Replaced` event type was // added, calls to Replace() were handled the same as Sync(). For // backwards-compatibility purposes, this is false by default. // When true, `Replaced` events will be sent for items passed to a Replace() call. // When false, `Sync` events will be sent instead. EmitDeltaTypeReplaced bool }
DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are optional.
type DeltaType ¶
type DeltaType string
DeltaType is the type of a change (addition, deletion, etc)
const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" // Replaced is emitted when we encountered watch errors and had to do a // relist. We don't know if the replaced object has changed. // // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events // as well. Hence, Replaced is only emitted when the option // EmitDeltaTypeReplaced is true. Replaced DeltaType = "Replaced" // Sync is for synthetic events during a periodic resync. Sync DeltaType = "Sync" )
Change type definition
type Deltas ¶
type Deltas []Delta
Deltas is a list of one or more 'Delta's to an individual object. The oldest delta is at index 0, the newest delta is the last one.
type ErrRequeue ¶
type ErrRequeue struct { // Err is returned by the Pop function Err error }
ErrRequeue may be returned by a PopProcessFunc to safely requeue the current item. The value of Err will be returned from Pop.
func (ErrRequeue) Error ¶
func (e ErrRequeue) Error() string
type ExplicitKey ¶
type ExplicitKey string
ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for the object but not the object itself.
type FIFO ¶
type FIFO struct {
// contains filtered or unexported fields
}
FIFO is a Queue in which (a) each accumulator is simply the most recently provided object and (b) the collection of keys to process is a FIFO. The accumulators all start out empty, and deleting an object from its accumulator empties the accumulator. The Resync operation is a no-op.
Thus: if multiple adds/updates of a single object happen while that object's key is in the queue before it has been processed then it will only be processed once, and when it is processed the most recent version will be processed. This can't be done with a channel
FIFO solves this use case:
- You want to process every object (exactly) once.
- You want to process the most recent version of the object when you process it.
- You do not want to process deleted objects, they should be removed from the queue.
- You do not want to periodically reprocess objects.
Compare with DeltaFIFO for other use cases.
func (*FIFO) Add ¶
Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.
func (*FIFO) AddIfNotPresent ¶
AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.
This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.
func (*FIFO) Delete ¶
Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not the order in which they were created/added.
func (*FIFO) HasSynced ¶
HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.
func (*FIFO) ListKeys ¶
ListKeys returns a list of all the keys of the objects currently in the FIFO.
func (*FIFO) Pop ¶
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error)
Pop waits until an item is ready and processes it. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is processed, so if you don't successfully process it, it should be added back with AddIfNotPresent(). process function is called under lock, so it is safe update data structures in it that need to be in sync with the queue.
func (*FIFO) Replace ¶
Replace will delete the contents of 'f', using instead the given map. 'f' takes ownership of the map, you should not reference the map again after calling this function. f's queue is reset, too; upon return, it will contain the items in the map, in no particular order.
type FilteringResourceEventHandler ¶
type FilteringResourceEventHandler struct { FilterFunc func(obj interface{}) bool Handler ResourceEventHandler }
FilteringResourceEventHandler applies the provided filter to all events coming in, ensuring the appropriate nested handler method is invoked. An object that starts passing the filter after an update is considered an add, and an object that stops passing the filter after an update is considered a delete. Like the handlers, the filter MUST NOT modify the objects it is given.
func (FilteringResourceEventHandler) OnAdd ¶
func (r FilteringResourceEventHandler) OnAdd(obj interface{})
OnAdd calls the nested handler only if the filter succeeds
func (FilteringResourceEventHandler) OnDelete ¶
func (r FilteringResourceEventHandler) OnDelete(obj interface{})
OnDelete calls the nested handler only if the filter succeeds
func (FilteringResourceEventHandler) OnUpdate ¶
func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{})
OnUpdate ensures the proper handler is called depending on whether the filter matches
type GenericLister ¶
type GenericLister interface { // List will return all objects across namespaces List(selector interface{}) (ret []runtime.Object, err error) // Get will attempt to retrieve assuming that name==key Get(name string) (runtime.Object, error) }
GenericLister is a lister skin on a generic Indexer
func NewGenericLister ¶
func NewGenericLister(indexer Indexer, resource schema.GroupResource) GenericLister
NewGenericLister creates a new instance for the genericLister.
type Indexer ¶
type Indexer interface { Store // Index returns the stored objects whose set of indexed values // intersects the set of indexed values of the given object, for // the named index Index(indexName string, obj interface{}) ([]interface{}, error) // IndexKeys returns the storage keys of the stored objects whose // set of indexed values for the named index includes the given // indexed value IndexKeys(indexName, indexedValue string) ([]string, error) // ListIndexFuncValues returns all the indexed values of the given index ListIndexFuncValues(indexName string) []string // ByIndex returns the stored objects whose set of indexed values // for the named index includes the given indexed value ByIndex(indexName, indexedValue string) ([]interface{}, error) // GetIndexers return the indexers GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error }
Indexer extends Store with multiple indices and restricts each accumulator to simply hold the current object (and be empty after Delete).
There are three kinds of strings here:
- a storage key, as defined in the Store interface,
- a name of an index, and
- an "indexed value", which is produced by an IndexFunc and can be a field value or any other string computed from the object.
func NewIndexer ¶
NewIndexer returns an Indexer implemented simply with a map and a lock.
type InformerSynced ¶
type InformerSynced func() bool
InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type KeyError ¶
type KeyError struct { Obj interface{} Err error }
KeyError will be returned any time a KeyFunc gives an error; it includes the object at fault.
type KeyFunc ¶
KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyGetter ¶
type KeyGetter interface { // GetByKey returns the value associated with the key, or sets exists=false. GetByKey(key string) (value interface{}, exists bool, err error) }
A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyLister ¶
type KeyLister interface {
ListKeys() []string
}
A KeyLister is anything that knows how to list its keys.
type KeyListerGetter ¶
A KeyListerGetter is anything that knows how to list its keys and look up by key.
type ListFunc ¶
type ListFunc func(options meta.ListOptions) (runtime.Object, error)
ListFunc knows how to list resources
type ListWatch ¶
type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking requests no chunking for this list watcher. DisableChunking bool }
ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. It is a convenience function for users of NewReflector, etc. ListFunc and WatchFunc must not be nil
type Lister ¶
type Lister interface { // List should return a list type object; the Items field will be extracted, and the // ResourceVersion field will be used to start the watch in the right place. List(options meta.ListOptions) (runtime.Object, error) }
Lister is any object that knows how to perform an initial list.
type ListerWatcher ¶
ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type MutationDetector ¶
type MutationDetector interface { // AddObject adds the given object to the set being monitored for a while from now AddObject(obj interface{}) // Run starts the monitoring and does not return until the monitoring is stopped. Run(stopCh <-chan struct{}) }
MutationDetector is able to monitor objects for mutation within a limited window of time
func NewCacheMutationDetector ¶
func NewCacheMutationDetector(name string) MutationDetector
NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.
type PopProcessFunc ¶
type PopProcessFunc func(interface{}) error
PopProcessFunc is passed to Pop() method of Queue interface. It is supposed to process the accumulator popped from the queue.
type ProcessFunc ¶
type ProcessFunc func(obj interface{}) error
ProcessFunc processes a single object.
type Queue ¶
type Queue interface { Store // Pop blocks until there is at least one key to process or the // Queue is closed. In the latter case Pop returns with an error. // In the former case Pop atomically picks one key to process, // removes that (key, accumulator) association from the Store, and // processes the accumulator. Pop returns the accumulator that // was processed and the result of processing. The PopProcessFunc // may return an ErrRequeue{inner} and in this case Pop will (a) // return that (key, accumulator) association to the Queue as part // of the atomic processing and (b) return the inner error from // Pop. Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent puts the given accumulator into the Queue (in // association with the accumulator's key) if and only if that key // is not already associated with a non-empty accumulator. AddIfNotPresent(interface{}) error // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, AddIfNotPresent, // Update, or Delete; otherwise the first batch is empty. HasSynced() bool // Close the queue Close() }
Queue extends Store with a collection of Store keys to "process". Every Add, Update, or Delete may put the object's key in that collection. A Queue has a way to derive the corresponding key given an accumulator. A Queue can be accessed concurrently from multiple goroutines. A Queue can be "closed", after which Pop operations return an error.
type Reflector ¶
type Reflector struct { // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked ShouldResync func() bool // contains filtered or unexported fields }
func NewNamedReflector ¶
func NewReflector ¶
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector
func (*Reflector) LastSyncResourceVersion ¶
func (*Reflector) ListAndWatch ¶
type ResourceEventHandler ¶
type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) }
ResourceEventHandler can handle notifications for events that happen to a resource. The events are informational only, so you can't return an error. The handlers MUST NOT modify the objects received; this concerns not only the top level of structure but all the data structures reachable from it.
- OnAdd is called when an object is added.
- OnUpdate is called when an object is modified. Note that oldObj is the last known state of the object-- it is possible that several changes were combined together, so you can't use this to see every single change. OnUpdate is also called when a re-list happens, and it will get called even if nothing changed. This is useful for periodically evaluating or syncing something.
- OnDelete will get the final state of the item if it is known, otherwise it will get an object of type DeletedFinalStateUnknown. This can happen if the watch is closed and misses the delete event and we don't notice the deletion until the subsequent re-list.
type ResourceEventHandlerFuncs ¶
type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) }
ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or as few of the notification functions as you want while still implementing ResourceEventHandler. This adapter does not remove the prohibition against modifying the objects.
func (ResourceEventHandlerFuncs) OnAdd ¶
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{})
OnAdd calls AddFunc if it's not nil.
func (ResourceEventHandlerFuncs) OnDelete ¶
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{})
OnDelete calls DeleteFunc if it's not nil.
func (ResourceEventHandlerFuncs) OnUpdate ¶
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{})
OnUpdate calls UpdateFunc if it's not nil.
type ResourceVersionUpdater ¶
type ResourceVersionUpdater interface { // UpdateResourceVersion is called each time current resource version of the reflector // is updated. UpdateResourceVersion(resourceVersion string) }
ResourceVersionUpdater is an interface that allows store implementation to track the current resource version of the reflector. This is especially important if storage bookmarks are enabled.
type SharedIndexInformer ¶
type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. }
SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
func NewSharedIndexInformer ¶
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer
type SharedInformer ¶
type SharedInformer interface { // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. AddEventHandler(handler ResourceEventHandler) // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation // consists of delivering to the handler an update notification // for every object in the informer's local cache; it does not add // any interactions with the authoritative storage. Some // informers do no resyncs at all, not even for handlers added // with a non-zero resyncPeriod. For an informer that does // resyncs, and for each handler that requests resyncs, that // informer develops a nominal resync period that is no shorter // than the requested period but may be longer. The actual time // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) GetStore() Store GetController() Controller // The informer will be stopped when stopCh is closed. Run(stopCh <-chan struct{}) // informed by at least one full LIST of the authoritative state // of the informer's object collection. This is unrelated to "resync". HasSynced() bool // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. LastSyncResourceVersion() string // The WatchErrorHandler is called whenever ListAndWatch drops the // connection with an error. After calling this handler, the informer // will backoff and retry. // // The default implementation looks at the error type and tries to log // the error message at an appropriate level. // // There's only one handler, so if you call this multiple times, last one // wins; calling after the informer has been started returns an error. // // The handler is intended for visibility, not to e.g. pause the consumers. // The handler should return quickly - any expensive processing should be // offloaded. SetWatchErrorHandler(handler WatchErrorHandler) error // The TransformFunc is called for each object which is about to be stored. // // This function is intended for you to take the opportunity to // remove, transform, or normalize fields. One use case is to strip unused // metadata fields out of objects to save on RAM cost. // // Must be set before starting the informer. // // Note: Since the object given to the handler may be already shared with // other goroutines, it is advisable to copy the object being // transform before mutating it at all and returning the copy to prevent // data races. SetTransform(handler TransformFunc) error }
type ShouldResyncFunc ¶
type ShouldResyncFunc func() bool
ShouldResyncFunc is a type of function that indicates if a reflector should perform a resync or not. It can be used by a shared informer to support multiple event handlers with custom resync periods.
type Store ¶
type Store interface { // Add adds the given object to the accumulator associated with the given object's key Add(obj interface{}) error // Update updates the given object in the accumulator associated with the given object's key Update(obj interface{}) error // Delete deletes the given object from the accumulator associated with the given object's key Delete(obj interface{}) error // List returns a list of all the currently non-empty accumulators List() []interface{} // ListKeys returns a list of all the keys currently associated with non-empty accumulators ListKeys() []string // Get returns the accumulator associated with the given object's key Get(obj interface{}) (item interface{}, exists bool, err error) // GetByKey returns the accumulator associated with the given key GetByKey(key string) (item interface{}, exists bool, err error) // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]interface{}, string) error // Resync is meaningless in the terms appearing here but has // meaning in some implementations that have non-trivial // additional behavior (e.g., DeltaFIFO). Resync() error }
Store is a generic object storage and processing interface. A Store holds a map from string keys to accumulators, and has operations to add, update, and delete a given object to/from the accumulator currently associated with a given key. A Store also knows how to extract the key from a given object, so many operations are given only the object.
In the simplest Store implementations each accumulator is simply the last given object, or empty after Delete, and thus the Store's behavior is simple storage.
Reflector knows how to watch a server and update a Store. This package provides a variety of implementations of Store.
type ThreadSafeStore ¶
type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error // Resync is a no-op and is deprecated Resync() error }
ThreadSafeStore is an interface that allows concurrent indexed access to a storage backend. It is like Indexer but does not (necessarily) know how to extract the Store key from a given object.
TL;DR caveats: you must not modify anything returned by Get or List as it will break the indexing feature in addition to not being thread safe.
The guarantees of thread safety provided by List/Get are only valid if the caller treats returned items as read-only. For example, a pointer inserted in the store through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` on the same key and modify the pointer in a non-thread-safe way. Also note that modifying objects stored by the indexers (if any) will *not* automatically lead to a re-index. So it's not a good idea to directly modify the objects returned by Get/List, in general.
func NewThreadSafeStore ¶
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore
NewThreadSafeStore creates a new instance of ThreadSafeStore.
type TransformFunc ¶
type TransformFunc func(interface{}) (interface{}, error)
TransformFunc allows for transforming an object before it will be processed and put into the controller cache and before the corresponding handlers will be called on it. TransformFunc (similarly to ResourceEventHandler functions) should be able to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
The most common usage pattern is to clean-up some parts of the object to reduce component memory usage if a given component doesn't care about them. given controller doesn't care for them
type WatchErrorHandler ¶
The WatchErrorHandler is called whenever ListAndWatch drops the connection with an error. After calling this handler, the informer will backoff and retry.
The default implementation looks at the error type and tries to log the error message at an appropriate level.
Implementations of this handler may display the error message in other ways. Implementations should return quickly - any expensive processing should be offloaded.