ds

package
v0.0.0-...-8223eb1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2020 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// This label is applied to pods owned by a DS.
	DSIDLabel                = "daemon_set_id"
	DaemonSetStatusNamespace = statusstore.Namespace("daemon_set_farm")
)
View Source
const (
	Created dsState = "created"
	Same    dsState = "same"
	Updated dsState = "updated"
	Deleted dsState = "deleted"
)
View Source
const (
	DefaultStatusWritingInterval = 15 * time.Second
)

Variables

View Source
var (
	DefaultRetryInterval = 5 * time.Minute
)

Functions

func DSContends

func DSContends(dsFields ds_fields.DaemonSet, scheduler Scheduler, dsStore DaemonSetStore) (ds_fields.DaemonSet, bool, error)

Naive implementation of a guard, this checks if any of the scheduled nodes are used by two daemon sets, does not pre-emptively catch overlaps by selector because of how kubernetes selectors work

Also naively checks the selectors to see if there are any selector overlap if two label selectors are labels.Everything()

Returns [ daemon set contended, contention exists, error ]

Types

type DSFarmConfig

type DSFarmConfig struct {
	// PodBlacklist represents pod IDs that the farm should skip over. In other
	// words, if the daemon set's pod ID matches one of these the farm will
	// refuse to service it
	PodBlacklist []types.PodID `yaml:"pod_blacklist" json:"pod_blacklist"`

	// PodWhitelist contains the set of pod IDs that the farm should exclusively
	// service. If there is at least one entry in here, all daemon sets with pod
	// IDs other than the ones included in the whitelist will be ignored by this
	// farm
	PodWhitelist []types.PodID `yaml:"pod_whitelist" json:"pod_whitelist"`

	StatusWritingInterval time.Duration
}

TODO: move other config options in here to reduce the number of arguments to NewFarm()

type DaemonSet

type DaemonSet interface {
	ID() fields.ID

	IsDisabled() bool

	// Returns the daemon set's pod id
	PodID() types.PodID

	ClusterName() fields.ClusterName

	GetNodeSelector() klabels.Selector

	// Returns a list of all nodes that are selected by this daemon set's selector
	EligibleNodes() ([]types.NodeName, error)

	MetricNames(suffix string) []string

	// WatchDesires watches for changes to its daemon set, then schedule/unschedule
	// pods to to the nodes that it is responsible for
	//
	// Whatever calls WatchDesires is responsible for sending signals for whether
	// the daemon set updated or deleted
	//
	// When this is first called, it assumes that the daemon set is created
	//
	// The caller is responsible for sending signals when something has been changed
	WatchDesires(
		ctx context.Context,
		updatedCh <-chan fields.DaemonSet,
		deletedCh <-chan fields.DaemonSet,
	) <-chan error

	// CurrentPods() returns all nodes that are scheduled by this daemon set
	CurrentPods() (types.PodLocations, error)

	Replicate(context.Context, <-chan []types.NodeName, <-chan struct{}, <-chan struct{}, <-chan manifest.Manifest, <-chan time.Duration)
}

func New

func New(
	fields fields.DaemonSet,
	dsStore DaemonSetStore,
	store store,
	txner transaction.Txner,
	applicator Labeler,
	watcher LabelWatcher,
	labelsAggregationRate time.Duration,
	logger logging.Logger,
	healthChecker *checker.HealthChecker,
	rateLimitInterval time.Duration,
	cachedPodMatch bool,
	healthWatchDelay time.Duration,
	retryInterval time.Duration,
	unlocker consul.TxnUnlocker,
	statusStore StatusStore,
	statusWritingInterval time.Duration,
) DaemonSet

type DaemonSetLocker

type DaemonSetLocker interface {
	LockForOwnershipTxn(
		lockCtx context.Context,
		dsID fields.ID,
		session consul.Session,
	) (consul.TxnUnlocker, error)
}

DaemonSetLocker is necessary to allow coordination between multiple daemon set farm instances. Before processing a daemon set, each farm instance will attempt to acquire a distributed lock and will only proceed if the acquisition was successful. All subsequent consul operations performed as part of servicing the daemon set will ensure that the lock is held within a transaction.

NOTE: this interface is separate from DaemonSetStore because it is specifically tied to consul semantics by requiring a consul session.

type DaemonSetStore

type DaemonSetStore interface {
	List() ([]fields.DaemonSet, error)
	Watch(quitCh <-chan struct{}) <-chan dsstore.WatchedDaemonSets
	Disable(id fields.ID) (fields.DaemonSet, error)
}

type Farm

type Farm struct {
	// contains filtered or unexported fields
}

Farm instatiates and deletes daemon sets as needed

func NewFarm

func NewFarm(
	store store,
	txner transaction.Txner,
	dsStore DaemonSetStore,
	dsLocker DaemonSetLocker,
	statusStore StatusStore,
	labeler Labeler,
	watcher LabelWatcher,
	sessions <-chan string,
	logger logging.Logger,
	alerter alerting.Alerter,
	healthChecker *checker.HealthChecker,
	rateLimitInterval time.Duration,
	monitorHealth bool,
	cachedPodMatch bool,
	healthWatchDelay time.Duration,
	dsRetryInterval time.Duration,
	farmConfig DSFarmConfig,
) *Farm

func (*Farm) Start

func (dsf *Farm) Start(quitCh <-chan struct{})

type LabelWatcher

type LabelWatcher interface {
	WatchMatchDiff(
		selector klabels.Selector,
		labelType labels.Type,
		aggregationRate time.Duration,
		quitCh <-chan struct{},
	) <-chan *labels.LabeledChanges
}

type Labeler

type Labeler interface {
	SetLabelsTxn(ctx context.Context, labelType labels.Type, id string, labels map[string]string) error
	RemoveLabelTxn(
		ctx context.Context,
		labelType labels.Type,
		id string,
		name string,
	) error
	GetMatches(selector klabels.Selector, labelType labels.Type) ([]labels.Labeled, error)
	GetCachedMatches(selector klabels.Selector, labelType labels.Type, aggregationRate time.Duration) ([]labels.Labeled, error)
	GetLabels(labelType labels.Type, id string) (labels.Labeled, error)
}

type Scheduler

type Scheduler interface {
	EligibleNodes(manifest.Manifest, klabels.Selector) ([]types.NodeName, error)
}

type StatusStore

type StatusStore interface {
	Get(dsID fields.ID) (daemonsetstatus.Status, *api.QueryMeta, error)
	SetTxn(ctx context.Context, dsID fields.ID, status daemonsetstatus.Status) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL