framework

package
v0.14.2-0...-43ec88f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2015 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package framework implements all the grunt work involved in running a simple controller.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

func main() {
	// source simulates an apiserver object endpoint.
	source := framework.NewFakeControllerSource()

	// This will hold the downstream state, as we know it.
	downstream := cache.NewStore(cache.MetaNamespaceKeyFunc)

	// This will hold incoming changes. Note how we pass downstream in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream)

	// Let's do threadsafe output to get predictable test results.
	outputSetLock := sync.Mutex{}
	outputSet := util.StringSet{}

	cfg := &framework.Config{
		Queue:            fifo,
		ListerWatcher:    source,
		ObjectType:       &api.Pod{},
		FullResyncPeriod: time.Millisecond * 100,
		RetryOnError:     false,

		// Let's implement a simple controller that just deletes
		// everything that comes in.
		Process: func(obj interface{}) error {
			// Obj is from the Pop method of the Queue we make above.
			newest := obj.(cache.Deltas).Newest()

			if newest.Type != cache.Deleted {
				// Update our downstream store.
				err := downstream.Add(newest.Object)
				if err != nil {
					return err
				}

				source.Delete(newest.Object.(runtime.Object))
			} else {
				// Update our downstream store.
				err := downstream.Delete(newest.Object)
				if err != nil {
					return err
				}

				// fifo's KeyOf is easiest, because it handles
				// DeletedFinalStateUnknown markers.
				key, err := fifo.KeyOf(newest.Object)
				if err != nil {
					return err
				}

				// Record some output.
				outputSetLock.Lock()
				defer outputSetLock.Unlock()
				outputSet.Insert(key)
			}
			return nil
		},
	}

	// Create the controller and run it until we close stop.
	stop := make(chan struct{})
	framework.New(cfg).Run(stop)

	// Let's add a few objects to the source.
	for _, name := range []string{"a-hello", "b-controller", "c-framework"} {
		// Note that these pods are not valid-- the fake source doesn't
		// call validation or anything.
		source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
	}

	// Let's wait for the controller to process the things we just added.
	time.Sleep(500 * time.Millisecond)
	close(stop)

	outputSetLock.Lock()
	for _, key := range outputSet.List() {
		fmt.Println(key)
	}
}
Output:

a-hello
b-controller
c-framework

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// The queue for your objects; either a cache.FIFO or
	// a cache.DeltaFIFO. Your Process() function should accept
	// the output of this Oueue's Pop() method.
	cache.Queue

	// Something that can list and watch your objects.
	cache.ListerWatcher

	// Something that can process your objects.
	Process ProcessFunc

	// The type of your objects.
	ObjectType runtime.Object

	// Reprocess everything at least this often.
	// Note that if it takes longer for you to clear the queue than this
	// period, you will end up processing items in the order determined
	// by cache.FIFO.Replace(). Currently, this is random. If this is a
	// problem, we can change that replacement policy to append new
	// things to the end of the queue instead of replacing the entire
	// queue.
	FullResyncPeriod time.Duration

	// If true, when Process() returns an error, re-enqueue the object.
	// TODO: add interface to let you inject a delay/backoff or drop
	//       the object completely if desired. Pass the object in
	//       question to this interface as a parameter.
	RetryOnError bool
}

Config contains all the settings for a Controller.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller is a generic controller framework.

func New

func New(c *Config) *Controller

New makes a new Controller from the given Config.

func (*Controller) Run

func (c *Controller) Run(stopCh <-chan struct{})

Run begins processing items, and will continue until a value is sent down stopCh. It's an error to call Run more than once. Run does not block.

type FakeControllerSource

type FakeControllerSource struct {
	// contains filtered or unexported fields
}

FakeControllerSource implements listing/watching for testing.

func NewFakeControllerSource

func NewFakeControllerSource() *FakeControllerSource

func (*FakeControllerSource) Add

func (f *FakeControllerSource) Add(obj runtime.Object)

Add adds an object to the set and sends an add event to watchers. obj's ResourceVersion is set.

func (*FakeControllerSource) Delete

func (f *FakeControllerSource) Delete(lastValue runtime.Object)

Delete deletes an object from the set and sends a delete event to watchers. obj's ResourceVersion is set.

func (*FakeControllerSource) List

List returns a list object, with its resource version set.

func (*FakeControllerSource) Modify

func (f *FakeControllerSource) Modify(obj runtime.Object)

Modify updates an object in the set and sends a modified event to watchers. obj's ResourceVersion is set.

func (*FakeControllerSource) Watch

func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, error)

Watch returns a watch, which will be pre-populated with all changes after resourceVersion.

type ProcessFunc

type ProcessFunc func(obj interface{}) error

ProcessFunc processes a single object.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL