mutate

package
v0.0.0-...-ddee7fb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package mutate includes the main logic of DM's state machine. The package is a series of "go.chromium.org/luci/tumble".Mutation implementations. Each mutation operates on a single entity group in DM's datastore model, advancing the state machine for the dependency graph by one edge.

Index

Constants

View Source
const MaxEnsureAttempts = 10

MaxEnsureAttempts limits the maximum number of EnsureAttempt entities that the EnsureQuestAttempts mutation will emit. If there are more AttemptIDs than this maximum, then EnsureQuestAttempts will do tail-recursion to process the remainder.

Variables

This section is empty.

Functions

func FinishExecutionFn

func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *dm.Result) ([]tumble.Mutation, error)

FinishExecutionFn is the implementation of distributor.FinishExecutionFn. It's defined here to avoid a circular dependency.

func ResetExecutionTimeout

func ResetExecutionTimeout(c context.Context, e *model.Execution) error

ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the Execution's State to determine which timeout should be set, if any. If no timeout should be active, this will cancel any existing timeouts for this Execution.

Types

type AckFwdDep

type AckFwdDep struct {
	Dep *model.FwdEdge
}

AckFwdDep records the fact that a dependency was completed.

func (*AckFwdDep) RollForward

func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*AckFwdDep) Root

func (f *AckFwdDep) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type ActivateExecution

type ActivateExecution struct {
	Auth   *dm.Execution_Auth
	NewTok []byte
}

ActivateExecution executes an execution, moving it from the SCHEDULING->RUNNING state, and resetting the execution timeout (if any).

func (*ActivateExecution) RollForward

func (a *ActivateExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*ActivateExecution) Root

Root implements tumble.Mutation.

type AddBackDep

type AddBackDep struct {
	Dep      *model.FwdEdge
	NeedsAck bool // make AckFwdDep iff true
}

AddBackDep adds a BackDep (and possibly a BackDepGroup). If NeedsAck is true, this mutation will chain to an AckFwdDep. It should only be false if this AddBackDep is spawned from an AddFinishedDeps, where the originating Attempt already knows that this dependency is Finished.

func (*AddBackDep) RollForward

func (a *AddBackDep) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*AddBackDep) Root

func (a *AddBackDep) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type AddDeps

type AddDeps struct {
	Auth   *dm.Execution_Auth
	Quests []*model.Quest

	// Attempts is attempts we think are missing from the global graph.
	Attempts *dm.AttemptList

	// Deps are fwddeps we think are missing from the auth'd attempt.
	Deps *dm.AttemptList
}

AddDeps transactionally stops the current execution and adds one or more dependencies.

func (*AddDeps) RollForward

func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

This mutation is called directly.

func (*AddDeps) Root

func (a *AddDeps) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type AddFinishedDeps

type AddFinishedDeps struct {
	Auth *dm.Execution_Auth

	// MergeQuests lists quests which need their BuiltBy lists merged. The Quests
	// here must be a subset of the quests mentioned in FinishedAttempts.
	MergeQuests []*model.Quest

	// FinishedAttempts are a list of attempts that we already know are in the
	// Finished state.
	FinishedAttempts *dm.AttemptList
}

AddFinishedDeps adds a bunch of dependencies which are known in advance to already be in the Finished state.

func (*AddFinishedDeps) RollForward

func (f *AddFinishedDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*AddFinishedDeps) Root

func (f *AddFinishedDeps) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type EnsureAttempt

type EnsureAttempt struct {
	ID *dm.Attempt_ID
}

EnsureAttempt ensures that the given Attempt exists. If it doesn't, it's created in a NeedsExecution state.

func (*EnsureAttempt) RollForward

func (e *EnsureAttempt) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*EnsureAttempt) Root

func (e *EnsureAttempt) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type EnsureQuestAttempts

type EnsureQuestAttempts struct {
	Quest *model.Quest
	AIDs  []uint32

	// DoNotMergeQuest causes this mutation to not attempt to merge the BuiltBy of
	// Quest.
	DoNotMergeQuest bool
}

EnsureQuestAttempts ensures that the given Attempt exists. If it doesn't, it's created in a NeedsExecution state.

func (*EnsureQuestAttempts) RollForward

func (e *EnsureQuestAttempts) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*EnsureQuestAttempts) Root

Root implements tumble.Mutation.

type FinishAttempt

type FinishAttempt struct {
	dm.FinishAttemptReq
}

FinishAttempt does a couple things:

Invalidates the current Execution
Moves the state to Finished
Creates a new AttemptResult
Starts RecordCompletion state machine.

func (*FinishAttempt) RollForward

func (f *FinishAttempt) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

This mutation is called directly from FinishAttempt.

func (*FinishAttempt) Root

func (f *FinishAttempt) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type FinishExecution

type FinishExecution struct {
	EID    *dm.Execution_ID
	Result *dm.Result
}

FinishExecution records the final state of the Execution, and advances the Attempt state machine.

func NewFinishExecutionAbnormal

func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_Status, reason string) *FinishExecution

NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation with some abnomal result.

func (*FinishExecution) RollForward

func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*FinishExecution) Root

func (f *FinishExecution) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type MergeQuest

type MergeQuest struct {
	Quest   *model.Quest
	AndThen []tumble.Mutation
}

MergeQuest ensures that the given Quest exists and contains the merged set of BuiltBy entries.

func (*MergeQuest) RollForward

func (m *MergeQuest) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*MergeQuest) Root

func (m *MergeQuest) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type RecordCompletion

type RecordCompletion struct {
	For *dm.Attempt_ID
}

RecordCompletion marks that fact that an Attempt is completed (Finished) on its corresponding BackDepGroup, and fires off additional AckFwdDep mutations for each incoming dependency that is blocked.

In the case where an Attempt has hundreds or thousands of incoming dependencies, the naive implementation of this mutation could easily overfill a single datastore transaction. For that reason, the implementation here unblocks things 64 edges at a time, and keeps returning itself as a mutation until it unblocks less than 64 things (e.g. it does a tail-call).

This relies on tumble's tail-call optimization to be performant in terms of the number of transactions, otherwise this would take 1 transaction per 64 dependencies. With the TCO, it could do hundreds or thousands of dependencies, but it will also be fair to other work (e.g. it will allow other Attempts to take dependencies on this Attempt while RecordCompletion is in between tail-calls).

func (*RecordCompletion) RollForward

func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*RecordCompletion) Root

func (r *RecordCompletion) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type ScheduleExecution

type ScheduleExecution struct {
	For *dm.Attempt_ID
}

ScheduleExecution is a placeholder mutation that will be an entry into the Distributor scheduling state-machine.

func (*ScheduleExecution) RollForward

func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*ScheduleExecution) Root

func (s *ScheduleExecution) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type TimeoutExecution

type TimeoutExecution struct {
	For   *dm.Execution_ID
	State dm.Execution_State
	// TimeoutAttempt is the number of attempts to stop a STOPPING execution,
	// since this potentially requires an RPC to the distributor to enact.
	TimeoutAttempt uint
	Deadline       time.Time
}

TimeoutExecution is a named mutation which triggers on a delay. If the execution is in the noted state when the trigger hits, this sets the Execution to have an AbnormalFinish status of TIMED_OUT.

func (*TimeoutExecution) HighPriority

func (t *TimeoutExecution) HighPriority() bool

HighPriority implements tumble.DelayedMutation

func (*TimeoutExecution) ProcessAfter

func (t *TimeoutExecution) ProcessAfter() time.Time

ProcessAfter implements tumble.DelayedMutation

func (*TimeoutExecution) RollForward

func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*TimeoutExecution) Root

func (t *TimeoutExecution) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL