reconciler

package
v0.0.0-...-8d8d9d5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OpUpdate = "update"
	OpDelete = "delete"
	OpPrune  = "prune"
)

Variables

This section is empty.

Functions

func NewStatusIndex

func NewStatusIndex[Obj any](getObjectStatus func(Obj) Status) statedb.Index[Obj, StatusKind]

NewStatusIndex creates a status index for a table of reconcilable objects.

func Register

func Register[Obj comparable](cfg Config[Obj], params Params) error

Register creates a new reconciler and registers to the application lifecycle. To be used with cell.Invoke when the API of the reconciler is not needed.

func WaitForReconciliation

func WaitForReconciliation[Obj any](ctx context.Context, db *statedb.DB, table statedb.Table[Obj], statusIndex statedb.Index[Obj, StatusKind]) error

WaitForReconciliation blocks until all objects have been reconciled or the context has cancelled.

Types

type BatchEntry

type BatchEntry[Obj any] struct {
	Object   Obj
	Revision statedb.Revision
	Result   error
}

type BatchOperations

type BatchOperations[Obj any] interface {
	UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []BatchEntry[Obj])
	DeleteBatch(context.Context, statedb.ReadTxn, []BatchEntry[Obj])
}

type Config

type Config[Obj any] struct {
	// Table to reconcile.
	Table statedb.RWTable[Obj]

	// Metrics to use with this reconciler. The metrics capture the duration
	// of operations during incremental and full reconcilation and the errors
	// that occur during either.
	//
	// If nil, then the default metrics are used via Params.
	// A simple implementation of metrics based on the expvar package come
	// with the reconciler and a custom one can be used by implementing the
	// Metrics interface.
	Metrics Metrics

	// FullReconcilationInterval is the amount of time to wait between full
	// reconciliation rounds. A full reconciliation is Prune() of unexpected
	// objects and Update() of all objects. With full reconciliation we're
	// resilient towards outside changes. If FullReconcilationInterval is
	// 0 then full reconciliation is disabled.
	FullReconcilationInterval time.Duration

	// RetryBackoffMinDuration is the minimum amount of time to wait before
	// retrying a failed Update() or Delete() operation on an object.
	// The retry wait time for an object will increase exponentially on
	// subsequent failures until RetryBackoffMaxDuration is reached.
	RetryBackoffMinDuration time.Duration

	// RetryBackoffMaxDuration is the maximum amount of time to wait before
	// retrying.
	RetryBackoffMaxDuration time.Duration

	// IncrementalRoundSize is the maximum number objects to reconcile during
	// incremental reconciliation before updating status and refreshing the
	// statedb snapshot. This should be tuned based on the cost of each operation
	// and the rate of expected changes so that health and per-object status
	// updates are not delayed too much. If in doubt, use a value between 100-1000.
	IncrementalRoundSize int

	// GetObjectStatus returns the reconciliation status for the object.
	GetObjectStatus func(Obj) Status

	// SetObjectStatus sets the reconciliation status for the object.
	// This is called with a copy of the object returned by CloneObject.
	SetObjectStatus func(Obj, Status) Obj

	// CloneObject returns a shallow copy of the object. This is used to
	// make it possible for the reconciliation operations to mutate
	// the object (to for example provide additional information that the
	// reconciliation produces) and to be able to set the reconciliation
	// status after the reconciliation.
	CloneObject func(Obj) Obj

	// RateLimiter is optional and if set will use the limiter to wait between
	// reconciliation rounds. This allows trading latency with throughput by
	// waiting longer to collect a batch of objects to reconcile.
	RateLimiter *rate.Limiter

	// Operations defines how an object is reconciled.
	Operations Operations[Obj]

	// BatchOperations is optional and if provided these are used instead of normal operations.
	BatchOperations BatchOperations[Obj]
}

type ExpVarMetrics

type ExpVarMetrics struct {
	IncrementalReconciliationCountVar         *expvar.Map
	IncrementalReconciliationDurationVar      *expvar.Map
	IncrementalReconciliationTotalErrorsVar   *expvar.Map
	IncrementalReconciliationCurrentErrorsVar *expvar.Map

	FullReconciliationCountVar          *expvar.Map
	FullReconciliationDurationVar       *expvar.Map
	FullReconciliationTotalErrorsVar    *expvar.Map
	FullReconciliationCurrentErrorsVar  *expvar.Map
	FullReconciliationOutOfSyncCountVar *expvar.Map
}

func NewExpVarMetrics

func NewExpVarMetrics() *ExpVarMetrics

func NewUnpublishedExpVarMetrics

func NewUnpublishedExpVarMetrics() *ExpVarMetrics

func (*ExpVarMetrics) FullReconciliationDuration

func (m *ExpVarMetrics) FullReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)

func (*ExpVarMetrics) FullReconciliationErrors

func (m *ExpVarMetrics) FullReconciliationErrors(moduleID cell.FullModuleID, errs []error)

func (*ExpVarMetrics) FullReconciliationOutOfSync

func (m *ExpVarMetrics) FullReconciliationOutOfSync(moduleID cell.FullModuleID)

func (*ExpVarMetrics) IncrementalReconciliationDuration

func (m *ExpVarMetrics) IncrementalReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)

func (*ExpVarMetrics) IncrementalReconciliationErrors

func (m *ExpVarMetrics) IncrementalReconciliationErrors(moduleID cell.FullModuleID, errs []error)

type Metrics

type Metrics interface {
	IncrementalReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
	IncrementalReconciliationErrors(moduleID cell.FullModuleID, errs []error)

	FullReconciliationOutOfSync(moduleID cell.FullModuleID)
	FullReconciliationErrors(moduleID cell.FullModuleID, errs []error)
	FullReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
}

type Operations

type Operations[Obj any] interface {
	// Update the object in the target. If the operation is long-running it should
	// abort if context is cancelled. Should return an error if the operation fails.
	// The reconciler will retry the operation again at a later time, potentially
	// with a new version of the object. The operation should thus be idempotent.
	//
	// Update is used both for incremental and full reconciliation. Incremental
	// reconciliation is performed when the desired state is updated. A full
	// reconciliation is done periodically by calling 'Update' on all objects.
	//
	// If 'changed' is non-nil then the Update must compare the realized state
	// with the desired state and set it to true if they differ, e.g. whether
	// the operation resulted in a change to the realized state. This is used
	// during full reconciliation to catch cases where the realized state has
	// gone out of sync due to outside influence. This is tracked in the
	// "full_out_of_sync_total" metric.
	//
	// The object handed to Update is a clone produced by Config.CloneObject
	// and thus Update can mutate the object.
	Update(ctx context.Context, txn statedb.ReadTxn, obj Obj, changed *bool) error

	// Delete the object in the target. Same semantics as with Update.
	// Deleting a non-existing object is not an error and returns nil.
	Delete(context.Context, statedb.ReadTxn, Obj) error

	// Prune undesired state. It is given an iterator for the full set of
	// desired objects. The implementation should diff the desired state against
	// the realized state to find things to prune.
	// Invoked during full reconciliation before the individual objects are Update()'d.
	//
	// Unlike failed Update()'s a failed Prune() operation is not retried until
	// the next full reconciliation round.
	Prune(context.Context, statedb.ReadTxn, statedb.Iterator[Obj]) error
}

Operations defines how to reconcile an object.

Each operation is given a context that limits the lifetime of the operation and a ReadTxn to allow for the option of looking up realized state from another statedb table as an optimization (main use-case is reconciling routes against Table[Route] to avoid a syscall per route).

type Params

type Params struct {
	cell.In

	Lifecycle      cell.Lifecycle
	Log            *slog.Logger
	DB             *statedb.DB
	Jobs           job.Registry
	ModuleID       cell.FullModuleID
	Health         cell.Health
	DefaultMetrics Metrics `optional:"true"`
}

type Reconciler

type Reconciler[Obj any] interface {
	// TriggerFullReconciliation triggers an immediate full reconciliation,
	// e.g. Prune() of unknown objects and Update() of all objects.
	// Implemented as a select+send to a channel of size 1, so N concurrent
	// calls of this method may result in less than N full reconciliations.
	//
	// Primarily useful in tests, but may be of use when there's knowledge
	// that something has gone wrong in the reconciliation target and full
	// reconciliation is needed to recover.
	TriggerFullReconciliation()
}

func New

func New[Obj comparable](cfg Config[Obj], p Params) (Reconciler[Obj], error)

New creates and registers a new reconciler.

type Status

type Status struct {
	Kind StatusKind

	// Delete is true if the object should be deleted by the reconciler.
	// If an object is deleted outside the reconciler it will not be
	// processed by the incremental reconciliation.
	// We use soft deletes in order to observe and wait for deletions.
	Delete bool

	UpdatedAt time.Time
	Error     string
}

Status is embedded into the reconcilable object. It allows inspecting per-object reconciliation status and waiting for the reconciler.

func StatusDone

func StatusDone() Status

StatusDone constructs the status that marks the object as reconciled.

func StatusError

func StatusError(delete bool, err error) Status

StatusError constructs the status that marks the object as failed to be reconciled.

func StatusPending

func StatusPending() Status

StatusPending constructs the status for marking the object as requiring reconciliation. The reconciler will perform the Update operation and on success transition to Done status, or on failure to Error status.

func StatusPendingDelete

func StatusPendingDelete() Status

StatusPendingDelete constructs the status for marking the object to be deleted.

The reconciler uses soft-deletes in order to be able to retry and to report failed deletions of objects. When the delete operation is successfully performed the reconciler will delete the object from the table.

func (Status) String

func (s Status) String() string

type StatusKind

type StatusKind string
const (
	StatusKindPending StatusKind = "pending"
	StatusKindDone    StatusKind = "done"
	StatusKindError   StatusKind = "error"
)

func (StatusKind) Key

func (s StatusKind) Key() index.Key

Key implements an optimized construction of index.Key for StatusKind to avoid copying and allocation.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL