client

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2023 License: Apache-2.0 Imports: 22 Imported by: 15

Documentation

Overview

Copyright Contributors to the Open Cluster Management project

Package client is an event-driven Go library used when Kubernetes objects need to track when other objects change. The API is heavily based on the popular sigs.k8s.io/controller-runtime library.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrNotStarted           = errors.New("DynamicWatcher must be started to perform this action")
	ErrCacheDisabled        = errors.New("cannot perform this action because the cache is not enabled")
	ErrInvalidInput         = errors.New("invalid input provided")
	ErrNoVersionedResource  = errors.New("the resource version was not found")
	ErrQueryBatchInProgress = errors.New(
		"cannot perform this action; the query batch for this object ID is in progress",
	)
	ErrQueryBatchNotStarted = errors.New("the query batch for this object ID is not started")
	ErrWatchStopping        = errors.New("the watched object has a watch being stopped")
)
View Source
var ErrNoCacheEntry = errors.New("there was no populated cache entry")

Functions

This section is empty.

Types

type ControllerRuntimeSourceReconciler

type ControllerRuntimeSourceReconciler struct {
	// contains filtered or unexported fields
}

ControllerRuntimeSourceReconciler is a reconciler to integrate with controller-runtime. See NewControllerRuntimeSource.

func NewControllerRuntimeSource

func NewControllerRuntimeSource() (*ControllerRuntimeSourceReconciler, *source.Channel)

NewControllerRuntimeSource returns a reconciler for DynamicWatcher that sends events to a controller-runtime source.Channel. This source.Channel can be used in the controller-runtime builder.Builder.Watches method. This source.Channel will only send event.GenericEvent typed events, so any handlers specified in the builder.Builder.Watches method will need to handle that.

Example
package main

import (
	"context"
	"fmt"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/klog"

	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/envtest"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"

	"github.com/stolostron/kubernetes-dependency-watches/client"
)

type ctrlRuntimeReconciler struct{}

func (r *ctrlRuntimeReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {

	fmt.Printf("The following reconcile request was received: %v\n", req)

	return reconcile.Result{}, nil
}

func main() {
	// Start a test Kubernetes API.
	testEnv := envtest.Environment{}

	k8sConfig, err := testEnv.Start()
	if err != nil {
		panic(err)
	}

	defer func() {
		err := testEnv.Stop()
		if err != nil {
			klog.Errorf("failed to stop the test Kubernetes API, error: %v", err)
		}
	}()

	// Create a context that can be explicitly canceled.
	ctx, cancel := context.WithCancel(context.TODO())

	dynamicWatcherReconciler, sourceChan := client.NewControllerRuntimeSource()

	// Create the dynamic watcher using the generated reconciler.
	dynamicWatcher, err := client.New(k8sConfig, dynamicWatcherReconciler, nil)
	if err != nil {
		panic(err)
	}

	// Start the dynamic watcher in a separate goroutine to not block the main goroutine.
	go func() {
		err := dynamicWatcher.Start(ctx)
		if err != nil {
			panic(err)
		}
	}()

	// Wait until the dynamic watcher has started.
	<-dynamicWatcher.Started()

	watcher := client.ObjectIdentifier{
		Group:     "",
		Version:   "v1",
		Kind:      "ConfigMap",
		Namespace: "default",
		Name:      "watcher",
	}
	watched1 := client.ObjectIdentifier{
		Group:     "",
		Version:   "v1",
		Kind:      "Secret",
		Namespace: "default",
		Name:      "watched1",
	}

	// Trigger the controller-runtime Reconcile method about watcher when watched1 is updated.
	err = dynamicWatcher.AddOrUpdateWatcher(watcher, watched1)
	if err != nil {
		panic(err)
	}

	// Create a controller-runtime manager and register a simple controller.
	options := ctrl.Options{
		Namespace:              "default",
		Scheme:                 scheme.Scheme,
		MetricsBindAddress:     "0",
		HealthProbeBindAddress: "0",
		LeaderElection:         false,
	}

	mgr, err := ctrl.NewManager(k8sConfig, options)
	if err != nil {
		panic(err)
	}

	// This controller watches ConfigMaps and will additionally reconcile any time the dynamic watcher sees a watched
	// object is updated.
	err = ctrl.NewControllerManagedBy(mgr).
		For(&corev1.ConfigMap{}).
		Watches(sourceChan, &handler.EnqueueRequestForObject{}).
		Complete(&ctrlRuntimeReconciler{})

	if err != nil {
		panic(err)
	}

	// Simulate something canceling the context in 5 seconds so that the example exits.
	go func() {
		time.Sleep(5 * time.Second)

		cancel()
	}()

	err = mgr.Start(ctx)
	if err != nil {
		panic(err)
	}

}
Output:

func (*ControllerRuntimeSourceReconciler) Reconcile

Reconcile will convert the input ObjectIdentifier and send a controller-runtime GenericEvent on ControllerRuntimeSourceReconciler's eventChan channel.

type DynamicWatcher

type DynamicWatcher interface {
	AddWatcher(watcher ObjectIdentifier, watchedObject ObjectIdentifier) error
	// AddOrUpdateWatcher updates the watches for the watcher. When updating, any previously watched objects not
	// specified will stop being watched. If an error occurs, any created watches as part of this method execution will
	// be removed.
	AddOrUpdateWatcher(watcher ObjectIdentifier, watchedObjects ...ObjectIdentifier) error
	// RemoveWatcher removes a watcher and any of its API watches solely referenced by the watcher.
	RemoveWatcher(watcher ObjectIdentifier) error
	// Start will start the DynamicWatcher and block until the input context is canceled.
	Start(ctx context.Context) error
	// GetWatchCount returns the total number of active API watch requests which can be used for metrics.
	GetWatchCount() uint
	// Started returns a channel that is closed when the DynamicWatcher is ready to receive watch requests.
	Started() <-chan struct{}
	// Get will add an additional watch to the started query batch and return the watched object. Note that you must
	// call StartQueryBatch before calling this.
	Get(
		watcher ObjectIdentifier, gvk schema.GroupVersionKind, namespace string, name string,
	) (*unstructured.Unstructured, error)
	// GetFromCache will return the object from the cache. If it's not cached, the ErrNoCacheEntry error will be
	// returned.
	GetFromCache(gvk schema.GroupVersionKind, namespace string, name string) (*unstructured.Unstructured, error)
	// List will add an additional watch to the started query batch and return the watched objects. Note that you must
	// call StartQueryBatch before calling this.
	List(
		watcher ObjectIdentifier, gvk schema.GroupVersionKind, namespace string, selector labels.Selector,
	) ([]unstructured.Unstructured, error)
	// ListFromCache will return the objects from the cache. If it's not cached, the ErrNoCacheEntry error will be
	// returned.
	ListFromCache(
		gvk schema.GroupVersionKind, namespace string, selector labels.Selector,
	) ([]unstructured.Unstructured, error)
	// ListWatchedFromCache will return all watched objects by the watcher in the cache.
	ListWatchedFromCache(watcher ObjectIdentifier) ([]unstructured.Unstructured, error)
	// StartQueryBatch will start a query batch transaction for the watcher. After a series of Get/List calls, calling
	// EndQueryBatch will clean up the non-applicable preexisting watches made from before this query batch.
	StartQueryBatch(watcher ObjectIdentifier) error
	// EndQueryBatch will stop a query batch transaction for the watcher. This will clean up the non-applicable
	// preexisting watches made from before this query batch.
	EndQueryBatch(watcher ObjectIdentifier) error
	// GVKToGVR will convert a GVK to a GVR and cache the result for a default of 10 minutes (configurable) when found,
	// and not cache failed conversions by default (configurable).
	GVKToGVR(gvk schema.GroupVersionKind) (ScopedGVR, error)
}

DynamicWatcher implementations enable a consumer to be notified of updates to Kubernetes objects that other Kubernetes objects depend on. It also provides a cache to retrieve the watched objects.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"k8s.io/klog"

	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/envtest"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"

	"github.com/stolostron/kubernetes-dependency-watches/client"
)

type reconciler struct{}

func (r *reconciler) Reconcile(_ context.Context, watcher client.ObjectIdentifier) (reconcile.Result, error) {

	fmt.Printf("An object that this object (%s) was watching was updated\n", watcher)

	return reconcile.Result{}, nil
}

func main() {
	// Start a test Kubernetes API.
	testEnv := envtest.Environment{}

	k8sConfig, err := testEnv.Start()
	if err != nil {
		panic(err)
	}

	defer func() {
		err := testEnv.Stop()
		if err != nil {
			klog.Errorf("failed to stop the test Kubernetes API, error: %v", err)
		}
	}()

	// Create the dynamic watcher.
	dynamicWatcher, err := client.New(k8sConfig, &reconciler{}, nil)
	if err != nil {
		panic(err)
	}

	// A context that is canceled after a SIGINT signal is received.
	parentCtx := ctrl.SetupSignalHandler()
	// Create a child context that can be explicitly canceled.
	ctx, cancel := context.WithCancel(parentCtx)

	// Start the dynamic watcher in a separate goroutine to not block the main goroutine.
	go func() {
		err := dynamicWatcher.Start(ctx)
		if err != nil {
			panic(err)
		}
	}()

	// Wait until the dynamic watcher has started.
	<-dynamicWatcher.Started()

	// Simulate something canceling the context in 5 seconds so that the example exits.
	go func() {
		time.Sleep(5 * time.Second)

		cancel()
	}()

	watcher := client.ObjectIdentifier{
		Group:     "",
		Version:   "v1",
		Kind:      "ConfigMap",
		Namespace: "default",
		Name:      "watcher",
	}
	watched1 := client.ObjectIdentifier{
		Group:     "",
		Version:   "v1",
		Kind:      "Secret",
		Namespace: "default",
		Name:      "watched1",
	}
	watched2 := client.ObjectIdentifier{
		Group:     "",
		Version:   "v1",
		Kind:      "Secret",
		Namespace: "default",
		Name:      "watched2",
	}

	// Get notified about watcher when watched1 or watched2 is updated.
	err = dynamicWatcher.AddOrUpdateWatcher(watcher, watched1, watched2)
	if err != nil {
		panic(err)
	}

	// Run until the context is canceled.
	<-ctx.Done()

}
Output:

func New

func New(config *rest.Config, reconciler Reconciler, options *Options) (DynamicWatcher, error)

New returns an implemenetation of DynamicWatcher that is ready to be started with the Start method. An error is returned if Kubernetes clients can't be instantiated with the input Kubernetes configuration.

type ObjectCache added in v0.5.0

type ObjectCache interface {
	// Get returns the object from the cache. A nil value can be returned to indicate a not found is cached. The error
	// ErrNoCacheEntry is returned if there is no cache entry at all.
	Get(gvk schema.GroupVersionKind, namespace string, name string) (*unstructured.Unstructured, error)
	// List returns the objects from the cache, which can be an empty list. The error ErrNoCacheEntry is returned if
	// there is no cache entry.
	List(gvk schema.GroupVersionKind, namespace string, selector labels.Selector) ([]unstructured.Unstructured, error)
	// FromObjectIdentifier returns the objects from the cache, which can be an empty list, based on the input object
	// identifier. The error ErrNoCacheEntry is returned if there is no cache entry at all.
	FromObjectIdentifier(objID ObjectIdentifier) ([]unstructured.Unstructured, error)
	// CacheList will cache a list of objects for the list query.
	CacheList(
		gvk schema.GroupVersionKind, namespace string, selector labels.Selector, objects []unstructured.Unstructured,
	)
	// CacheObject allows to cache an object. The input object can be nil to indicate a cached not found result.
	CacheObject(gvk schema.GroupVersionKind, namespace string, name string, object *unstructured.Unstructured)
	// CacheFromObjectIdentifier will cache a list of objects for the input object identifier.
	CacheFromObjectIdentifier(objID ObjectIdentifier, objects []unstructured.Unstructured)
	// UncacheObject will entirely remove the cache entry of the object.
	UncacheObject(gvk schema.GroupVersionKind, namespace string, name string)
	// UncacheObject will entirely remove the cache entries for the list query.
	UncacheList(gvk schema.GroupVersionKind, namespace string, selector labels.Selector)
	// UncacheObject will entirely remove the cache entries for the object identifier.
	UncacheFromObjectIdentifier(objID ObjectIdentifier)
	// GVKToGVR will convert a GVK to a GVR and cache the result for a default of 10 minutes (configurable) when found,
	// and not cache failed conversions by default (configurable).
	GVKToGVR(gvk schema.GroupVersionKind) (ScopedGVR, error)
	// Clear will entirely clear the cache.
	Clear()
}

ObjectCache provides a concurrency safe cache of unstructured objects. Additionally, it's able to convert GVKs to GVRs and cache the results.

func NewObjectCache added in v0.5.0

func NewObjectCache(discoveryClient *discovery.DiscoveryClient, options ObjectCacheOptions) ObjectCache

NewObjectCache will create an object cache with the input discovery client.

type ObjectCacheOptions added in v0.5.0

type ObjectCacheOptions struct {
	// The time for a GVK to GVR conversion cache entry to be considered fresh (not expired). This excludes missing API
	// resources, which is configured by MissingAPIResourceCacheTTL. The default value is 10 minutes.
	GVKToGVRCacheTTL time.Duration
	// The time for a failed GVK to GVR conversion to not be retried. The default behavior is to not cache failures.
	// Setting this can be useful if you don't want to continuously query the Kubernetes API if a CRD is missing.
	MissingAPIResourceCacheTTL time.Duration
}

type ObjectIdentifier

type ObjectIdentifier struct {
	Group     string
	Version   string
	Kind      string
	Namespace string
	Name      string
	Selector  string
}

ObjectIdentifier identifies an object from the Kubernetes API.

func (ObjectIdentifier) GroupVersionKind added in v0.5.0

func (o ObjectIdentifier) GroupVersionKind() schema.GroupVersionKind

GVK returns the GroupVersionKind of the ObjectIdentifier object.

func (ObjectIdentifier) String

func (o ObjectIdentifier) String() string

String will convert the ObjectIdentifer to a string in a similar format to apimachinery's schema.GroupVersionKind.

func (ObjectIdentifier) Validate

func (o ObjectIdentifier) Validate() error

Validate will return a wrapped ErrInvalidInput error when a required field is not set on the ObjectIdentifier.

type Options

type Options struct {
	// RateLimiter is used to limit how frequently requests may be queued.
	// Defaults to client-go's MaxOfRateLimiter which has both overall and per-item rate limiting. The overall is a
	// token bucket and the per-item is exponential.
	RateLimiter ratelimiter.RateLimiter
	// EnableCache causes the watched objects to be cached and retrievable.
	EnableCache bool
	// DisableInitialReconcile causes the initial reconcile from the list request before the watch to not cause a
	// reconcile. This is useful if you are exclusively using the caching query API.
	DisableInitialReconcile bool
	// Options for how long to cache GVK to GVR conversions.
	ObjectCacheOptions ObjectCacheOptions
}

Options specify the arguments for creating a new DynamicWatcher.

type Reconciler

type Reconciler interface {
	// Reconcile is called whenever an object has started being watched (if it exists) as well as when it has changed
	// (added, modified, or deleted). If the watch stops prematurely and is restarted, it may cause a duplicate call to
	// this method. If an error is returned, the request is requeued.
	Reconcile(ctx context.Context, watcher ObjectIdentifier) (reconcile.Result, error)
}

type ScopedGVR added in v0.5.0

type ScopedGVR struct {
	schema.GroupVersionResource
	Namespaced bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL