kcache

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2022 License: MIT Imports: 15 Imported by: 2

README

kcache: kubernetes object cache Build Status codecov

Kcache is a kubernetes object data source similar to k8s.io/client-go/tools/cache which uses channels to create a flexible event-based toolkit. Features include typed producers, joining between multiple producers, and (re)filtering.

Kcache was originally created to drive a Kubernetes monitoring application and it currently powers kail.

Usage

Using kcache involves creating controllers to manage dynamic object sets with the kubernetes API. The monitored objects are cached and events about changing state are broadcast to subscribers.

Controllers

Each controller represents a single kubernetes watch stream. There can be any number of subscribers to each controller, and subscribers can be publishers themselves.

  controller, err := kcache.NewController(ctx,log,client)

  // wait for the initial sync to be complete
  <-controller.Ready()

  fmt.Println("controller has been synced")

Controllers maintain a cache of the objects being watched.

  // fetch the pod named 'pod-1' in the namespace 'default' from the cache.
  pod, err := controller.Cache().Get("default","pod-1")
Channels

There are many ways to subscribe to a controller's events, the most basic is a simple channel-based subscription:

  sub, err := controller.Subscribe()
  <-sub.Ready()

  // fetch cached list of objects
  sub.Cache().List()

  for event := range sub.Events() {
    // handle add/update/delete event for objects
  }
Callbacks

In addition to channels, callbacks can be used to handle events

  handler := kcache.BuildHandler().
    OnInitialize(func(objs []metav1.Object) { /* ... */ }).
    OnCreate(func(obj metav1.Object){ /* ... */ }).
    OnUpdate(func(obj metav1.Object){ /* ... */ }).
    OnDelete(func(obj metav1.Object){ /* ... */ }).
    Create()
  controller

  kcache.NewMonitor(controller,handler)
Types

Typed controllers and subscribers are available to reduce the need for casting objects. Each type has all of the features of the untyped system (channels,callbacks, filtering, caches, etc...)

  controller, err := pod.NewController(ctx,log,client,"default")
  sub, err := controller.Subescribe()
  ...

Currently implemented types are:

  • Pod
  • Node
  • Event
  • Secret
  • Service
  • Ingress
  • Daemonset
  • ReplicaSet
  • Deployment
  • ReplicationController
Filtering

The cache and events that are be exposed to a subscription can be limited by a filter object

The following will return a subscription that only sees the pod named "default/pod-1" pod in its cache and events:

  sub, err := controller.SubscribeWithFilter(filter.NSName("default","pod-1"))

Additionally, new publishers can be created with filters. In the following example, sub_a will only receive events about "default/pod-1" and sub_b will only receive events about "default/pod-2"

  pub_a, err := controller.CloneWithFilter(filter.NSName("default","pod-1"))
  pub_b, err := controller.CloneWithFilter(filter.NSName("default","pod-2"))

  sub_a, err := pub_a.Subscribe()
  sub_b, err := pub_b.Subscribe()
Refiltering

The filter used for filtered publishers and subscribers can be changed at any time. The cache for each will readjust and CREATE, DELETE events will be emitted as necessary.

In the example below, if the pods "default/pod-1" and "default/pod-2" exist, sub_a will receive a delete event for "default/pod-1" and a create event for "default/pod-2"

  pub_a, err := controller.CloneWithFilter(filter.NSName("default","pod-1"))

  sub_a, err := pub_a.Subscribe()

  <-sub_a.Ready()

  go func() {
    for evt := sub_a.Events() {
      fmt.Println(evt)
    }
  }()

  pub_a.Refilter(filter.NSName("default","pod-2"))
Joins

Refiltering allows for joining between different publishers. The join is dynamic -- as the objects of the joined publisher changes, so does the set of objects in the resulting publisher.

In the example below, sub will only know about pods that are targeted by the "default/frontend" service.

  pods, err := pod.NewController(/*...*/)
  services, err := service.NewController(/*...*/)

  frontend, err := services.CloneWithFilter(filter.NSName("default","frontend"))

  sub, err := join.ServicePods(ctx,frontend,pods)

  <- sub.Ready()

  for evt := range sub.Events() {
    /* ... */
  }

Joining can be done by hand but there are a number of utility joins available:

  • ServicePods() - restrict pods to those that match the services available in the given publisher.
  • RCPods() - restrict pods to those that match the replication controllers in the given publisher.
  • RSPods() - restrict pods to those that match the replica sets in the given publisher.
  • DeploymentPods() - restrict pods to those that match the deployments in the given publisher.
  • DaemonSetPods() - restrict pods to those that match the daemonsets in the given publisher.
  • IngressServices() - restrict services to those that match the ingresses in the given publisher.
  • IngressPods() - restrict pods to those that match the services which match the ingresses in the given publisher (double join)

Documentation

Index

Constants

View Source
const (
	EventBufsiz = 100
)

Variables

View Source
var (
	ErrNotRunning = builtin_errors.New("Not running")
)

Functions

This section is empty.

Types

type Builder

type Builder interface {
	Context(context.Context) Builder
	Log(logutil.Log) Builder

	Filter(filter.Filter) Builder

	Client(client.Client) Builder
	Lister() ListerBuilder
	Watcher() WatcherBuilder

	Create() (Controller, error)
}

func NewBuilder

func NewBuilder() Builder

type CacheController

type CacheController interface {
	Cache() CacheReader
	Ready() <-chan struct{}
}

type CacheReader

type CacheReader interface {
	GetObject(obj metav1.Object) (metav1.Object, error)
	Get(ns string, name string) (metav1.Object, error)
	List() ([]metav1.Object, error)
}

type Controller

type Controller interface {
	CacheController
	Publisher
	Done() <-chan struct{}
	Close()
	Error() error
}

func NewController

func NewController(ctx context.Context, log logutil.Log, client client.Client) (Controller, error)

type Event

type Event interface {
	Type() EventType
	Resource() v1.Object
}

func NewEvent

func NewEvent(et EventType, resource v1.Object) Event

type EventType

type EventType string
const (
	EventTypeCreate EventType = "create"
	EventTypeUpdate EventType = "update"
	EventTypeDelete EventType = "delete"
)

type FilterController

type FilterController interface {
	Controller
	Refilter(filter.Filter) error
}

type FilterSubscription

type FilterSubscription interface {
	Subscription
	Refilter(filter.Filter) error
}

type Handler

type Handler interface {
	OnInitialize([]metav1.Object)
	OnCreate(metav1.Object)
	OnUpdate(metav1.Object)
	OnDelete(metav1.Object)
}

type HandlerBuilder

type HandlerBuilder interface {
	OnInitialize(func([]metav1.Object)) HandlerBuilder
	OnCreate(func(metav1.Object)) HandlerBuilder
	OnUpdate(func(metav1.Object)) HandlerBuilder
	OnDelete(func(metav1.Object)) HandlerBuilder
	Create() Handler
}

func BuildHandler

func BuildHandler() HandlerBuilder

type ListerBuilder

type ListerBuilder interface {
	RefreshPeriod(time.Duration) ListerBuilder
	Client(client.ListClient) ListerBuilder
}

type Monitor

type Monitor interface {
	Close()
	Done() <-chan struct{}
	Error() error
}

func NewMonitor

func NewMonitor(publisher Publisher, handler Handler) (Monitor, error)

type Publisher

type Publisher interface {
	Subscribe() (Subscription, error)
	SubscribeWithFilter(filter.Filter) (FilterSubscription, error)
	SubscribeForFilter() (FilterSubscription, error)
	Clone() (Controller, error)
	CloneWithFilter(filter.Filter) (FilterController, error)
	CloneForFilter() (FilterController, error)
}

type Subscription

type Subscription interface {
	CacheController
	Events() <-chan Event
	Close()
	Done() <-chan struct{}
	Error() error
}

type WatcherBuilder

type WatcherBuilder interface {
	Client(client.WatchClient) WatcherBuilder
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL