core

package
v0.0.0-...-267b159 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrorWorkQueue        errors.ErrorCode = "CATALOG_READER_QUEUE_FAILED"
	ErrorInternalMismatch errors.ErrorCode = "ARRAY_MISMATCH"
	ErrorK8sArrayGeneric  errors.ErrorCode = "ARRAY_JOB_GENERIC_FAILURE"
)

Variables

This section is empty.

Functions

func CalculateOriginalIndex

func CalculateOriginalIndex(childIdx int, toCache *bitarray.BitSet) int

CalculateOriginalIndex computes the original index of a sub-task.

func InitializeExternalResources

func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionContext, state *State,
	generateSubTaskID func(core.TaskExecutionContext, int) string) ([]*core.ExternalResource, error)

InitializeExternalResources constructs an ExternalResource array where each element describes the initial state of the subtask. This involves labeling all cached subtasks as successful with a cache hit and initializing others to undefined state.

func InvertBitSet

func InvertBitSet(input *bitarray.BitSet, limit uint) *bitarray.BitSet

func MapArrayStateToPluginPhase

func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, externalResources []*core.ExternalResource) (core.PhaseInfo, error)

Any state of the plugin needs to map to a core.PhaseInfo (which in turn will map to Admin events) so that the rest of the Nebula platform can understand what's happening. That is, each possible state that our plugin state machine returns should map to a unique (core.Phase, core.PhaseInfo.version). Info fields will always be nil, because we're going to send log links individually. This simplifies our state handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping all the log links takes up a lot of space).

func NewPhasesCompactArray

func NewPhasesCompactArray(count uint) bitarray.CompactArray

func ToArrayJob

func ToArrayJob(structObj *structpb.Struct, taskTypeVersion int32) (*idlPlugins.ArrayJob, error)

Types

type Phase

type Phase uint8
const (
	PhaseStart Phase = iota
	PhasePreLaunch
	PhaseLaunch
	PhaseWaitingForResources
	PhaseCheckingSubTaskExecutions
	PhaseAssembleFinalOutput
	PhaseWriteToDiscovery
	PhaseWriteToDiscoveryThenFail
	PhaseSuccess
	PhaseAssembleFinalError
	PhaseRetryableFailure
	PhasePermanentFailure
)

func PhaseString

func PhaseString(s string) (Phase, error)

PhaseString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func PhaseValues

func PhaseValues() []Phase

PhaseValues returns all values of the enum

func SummaryToPhase

func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus.ArraySummary) Phase

func (Phase) IsAPhase

func (i Phase) IsAPhase() bool

IsAPhase returns "true" if the value is listed in the enum definition. "false" otherwise

func (Phase) String

func (i Phase) String() string

type State

type State struct {
	CurrentPhase         Phase                   `json:"phase"`
	PhaseVersion         uint32                  `json:"phaseVersion"`
	Reason               string                  `json:"reason"`
	ExecutionErr         *idlCore.ExecutionError `json:"err"`
	ExecutionArraySize   int                     `json:"arraySize"`
	OriginalArraySize    int64                   `json:"originalArraySize"`
	ArrayStatus          arraystatus.ArrayStatus `json:"arrayStatus"`
	OriginalMinSuccesses int64                   `json:"minSuccess"`

	// Which sub-tasks to cache, (using the original index, that is, the length is ArrayJob.size)
	IndexesToCache *bitarray.BitSet `json:"indexesToCache"`

	// Tracks the number of subtask retries using the execution index
	RetryAttempts bitarray.CompactArray `json:"retryAttempts"`

	// Tracks the number of system failures for each subtask using the execution index
	SystemFailures bitarray.CompactArray `json:"systemFailures"`
}

func (State) GetArrayStatus

func (s State) GetArrayStatus() arraystatus.ArrayStatus

func (State) GetExecutionArraySize

func (s State) GetExecutionArraySize() int

func (*State) GetExecutionErr

func (s *State) GetExecutionErr() *idlCore.ExecutionError

func (*State) GetIndexesToCache

func (s *State) GetIndexesToCache() *bitarray.BitSet

func (*State) GetOriginalArraySize

func (s *State) GetOriginalArraySize() int64

func (*State) GetOriginalMinSuccesses

func (s *State) GetOriginalMinSuccesses() int64

func (State) GetPhase

func (s State) GetPhase() (phase Phase, version uint32)

func (State) GetReason

func (s State) GetReason() string

func (*State) SetArrayStatus

func (s *State) SetArrayStatus(state arraystatus.ArrayStatus) *State

func (*State) SetExecutionArraySize

func (s *State) SetExecutionArraySize(size int) *State

func (*State) SetExecutionErr

func (s *State) SetExecutionErr(err *idlCore.ExecutionError) *State

func (*State) SetIndexesToCache

func (s *State) SetIndexesToCache(set *bitarray.BitSet) *State

func (*State) SetOriginalArraySize

func (s *State) SetOriginalArraySize(size int64) *State

func (*State) SetOriginalMinSuccesses

func (s *State) SetOriginalMinSuccesses(size int64) *State

func (*State) SetPhase

func (s *State) SetPhase(phase Phase, phaseVersion uint32) *State

func (*State) SetReason

func (s *State) SetReason(reason string) *State

func (*State) SetRetryAttempts

func (s *State) SetRetryAttempts(retryAttempts bitarray.CompactArray) *State

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL