luci: go.chromium.org/luci/dm/appengine/mutate Index | Files

package mutate

import "go.chromium.org/luci/dm/appengine/mutate"

Package mutate includes the main logic of DM's state machine. The package is a series of "go.chromium.org/luci/tumble".Mutation implementations. Each mutation operates on a single entity group in DM's datastore model, advancing the state machine for the dependency graph by one edge.

Index

Package Files

ack_fwd_dep.go activate_execution.go add_backdep.go add_deps.go add_finished_deps.go doc.go ensure_attempt.go ensure_quest_attempts.go filter_existing.go finish_attempt.go finish_execution.go merge_quest.go record_completion.go schedule_execution.go timeout_execution.go

Constants

const MaxEnsureAttempts = 10

MaxEnsureAttempts limits the maximum number of EnsureAttempt entities that the EnsureQuestAttempts mutation will emit. If there are more AttemptIDs than this maximum, then EnsureQuestAttempts will do tail-recursion to process the remainder.

func FinishExecutionFn Uses

func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *dm.Result) ([]tumble.Mutation, error)

FinishExecutionFn is the implementation of distributor.FinishExecutionFn. It's defined here to avoid a circular dependency.

func ResetExecutionTimeout Uses

func ResetExecutionTimeout(c context.Context, e *model.Execution) error

ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the Execution's State to determine which timeout should be set, if any. If no timeout should be active, this will cancel any existing timeouts for this Execution.

type AckFwdDep Uses

type AckFwdDep struct {
    Dep *model.FwdEdge
}

AckFwdDep records the fact that a dependency was completed.

func (*AckFwdDep) RollForward Uses

func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*AckFwdDep) Root Uses

func (f *AckFwdDep) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type ActivateExecution Uses

type ActivateExecution struct {
    Auth   *dm.Execution_Auth
    NewTok []byte
}

ActivateExecution executes an execution, moving it from the SCHEDULING->RUNNING state, and resetting the execution timeout (if any).

func (*ActivateExecution) RollForward Uses

func (a *ActivateExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*ActivateExecution) Root Uses

func (a *ActivateExecution) Root(c context.Context) *datastore.Key

Root implements tumble.Mutation.

type AddBackDep Uses

type AddBackDep struct {
    Dep      *model.FwdEdge
    NeedsAck bool // make AckFwdDep iff true
}

AddBackDep adds a BackDep (and possibly a BackDepGroup). If NeedsAck is true, this mutation will chain to an AckFwdDep. It should only be false if this AddBackDep is spawned from an AddFinishedDeps, where the originating Attempt already knows that this dependency is Finished.

func (*AddBackDep) RollForward Uses

func (a *AddBackDep) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*AddBackDep) Root Uses

func (a *AddBackDep) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type AddDeps Uses

type AddDeps struct {
    Auth   *dm.Execution_Auth
    Quests []*model.Quest

    // Attempts is attempts we think are missing from the global graph.
    Attempts *dm.AttemptList

    // Deps are fwddeps we think are missing from the auth'd attempt.
    Deps *dm.AttemptList
}

AddDeps transactionally stops the current execution and adds one or more dependencies.

func (*AddDeps) RollForward Uses

func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

This mutation is called directly.

func (*AddDeps) Root Uses

func (a *AddDeps) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type AddFinishedDeps Uses

type AddFinishedDeps struct {
    Auth *dm.Execution_Auth

    // MergeQuests lists quests which need their BuiltBy lists merged. The Quests
    // here must be a subset of the quests mentioned in FinishedAttempts.
    MergeQuests []*model.Quest

    // FinishedAttempts are a list of attempts that we already know are in the
    // Finished state.
    FinishedAttempts *dm.AttemptList
}

AddFinishedDeps adds a bunch of dependencies which are known in advance to already be in the Finished state.

func (*AddFinishedDeps) RollForward Uses

func (f *AddFinishedDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*AddFinishedDeps) Root Uses

func (f *AddFinishedDeps) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type EnsureAttempt Uses

type EnsureAttempt struct {
    ID *dm.Attempt_ID
}

EnsureAttempt ensures that the given Attempt exists. If it doesn't, it's created in a NeedsExecution state.

func (*EnsureAttempt) RollForward Uses

func (e *EnsureAttempt) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*EnsureAttempt) Root Uses

func (e *EnsureAttempt) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type EnsureQuestAttempts Uses

type EnsureQuestAttempts struct {
    Quest *model.Quest
    AIDs  []uint32

    // DoNotMergeQuest causes this mutation to not attempt to merge the BuiltBy of
    // Quest.
    DoNotMergeQuest bool
}

EnsureQuestAttempts ensures that the given Attempt exists. If it doesn't, it's created in a NeedsExecution state.

func (*EnsureQuestAttempts) RollForward Uses

func (e *EnsureQuestAttempts) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*EnsureQuestAttempts) Root Uses

func (e *EnsureQuestAttempts) Root(c context.Context) *datastore.Key

Root implements tumble.Mutation.

type FinishAttempt Uses

type FinishAttempt struct {
    dm.FinishAttemptReq
}

FinishAttempt does a couple things:

Invalidates the current Execution
Moves the state to Finished
Creates a new AttemptResult
Starts RecordCompletion state machine.

func (*FinishAttempt) RollForward Uses

func (f *FinishAttempt) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

This mutation is called directly from FinishAttempt.

func (*FinishAttempt) Root Uses

func (f *FinishAttempt) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type FinishExecution Uses

type FinishExecution struct {
    EID    *dm.Execution_ID
    Result *dm.Result
}

FinishExecution records the final state of the Execution, and advances the Attempt state machine.

func NewFinishExecutionAbnormal Uses

func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_Status, reason string) *FinishExecution

NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation with some abnomal result.

func (*FinishExecution) RollForward Uses

func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*FinishExecution) Root Uses

func (f *FinishExecution) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type MergeQuest Uses

type MergeQuest struct {
    Quest   *model.Quest
    AndThen []tumble.Mutation
}

MergeQuest ensures that the given Quest exists and contains the merged set of BuiltBy entries.

func (*MergeQuest) RollForward Uses

func (m *MergeQuest) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*MergeQuest) Root Uses

func (m *MergeQuest) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type RecordCompletion Uses

type RecordCompletion struct {
    For *dm.Attempt_ID
}

RecordCompletion marks that fact that an Attempt is completed (Finished) on its corresponding BackDepGroup, and fires off additional AckFwdDep mutations for each incoming dependency that is blocked.

In the case where an Attempt has hundreds or thousands of incoming dependencies, the naive implementation of this mutation could easily overfill a single datastore transaction. For that reason, the implementation here unblocks things 64 edges at a time, and keeps returning itself as a mutation until it unblocks less than 64 things (e.g. it does a tail-call).

This relies on tumble's tail-call optimization to be performant in terms of the number of transactions, otherwise this would take 1 transaction per 64 dependencies. With the TCO, it could do hundreds or thousands of dependencies, but it will also be fair to other work (e.g. it will allow other Attempts to take dependencies on this Attempt while RecordCompletion is in between tail-calls).

func (*RecordCompletion) RollForward Uses

func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*RecordCompletion) Root Uses

func (r *RecordCompletion) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type ScheduleExecution Uses

type ScheduleExecution struct {
    For *dm.Attempt_ID
}

ScheduleExecution is a placeholder mutation that will be an entry into the Distributor scheduling state-machine.

func (*ScheduleExecution) RollForward Uses

func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*ScheduleExecution) Root Uses

func (s *ScheduleExecution) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

type TimeoutExecution Uses

type TimeoutExecution struct {
    For   *dm.Execution_ID
    State dm.Execution_State
    // TimeoutAttempt is the number of attempts to stop a STOPPING execution,
    // since this potentially requires an RPC to the distributor to enact.
    TimeoutAttempt uint
    Deadline       time.Time
}

TimeoutExecution is a named mutation which triggers on a delay. If the execution is in the noted state when the trigger hits, this sets the Execution to have an AbnormalFinish status of TIMED_OUT.

func (*TimeoutExecution) HighPriority Uses

func (t *TimeoutExecution) HighPriority() bool

HighPriority implements tumble.DelayedMutation

func (*TimeoutExecution) ProcessAfter Uses

func (t *TimeoutExecution) ProcessAfter() time.Time

ProcessAfter implements tumble.DelayedMutation

func (*TimeoutExecution) RollForward Uses

func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation

func (*TimeoutExecution) Root Uses

func (t *TimeoutExecution) Root(c context.Context) *ds.Key

Root implements tumble.Mutation

Package mutate imports 16 packages (graph) and is imported by 4 packages. Updated 2018-08-21. Refresh now. Tools for package owners.