Documentation ¶
Index ¶
- Constants
- Variables
- func BuildCallSource(incpaths []string, name string, args map[string]interface{}, ...) (string, error)
- func CheckMinimalSpace(path string) error
- func CompileAll(mroPaths []string, checkSrcPath bool) (int, []*syntax.Ast, error)
- func DefaultRetries() int
- func GetAvailableSpace(path string) (bytes, inodes uint64, err error)
- func GetCPUInfo() (int, int, int, int)
- func GetCallable(mroPaths []string, name string) (syntax.Callable, error)
- func GetMaxFiles() (*unix.Rlimit, error)
- func GetMaxProcs() (*unix.Rlimit, error)
- func GetUserProcessCount() (int, error)
- func MRTBuildPipeline(newinfo *PipestanceSetup, oldinfo *PipestanceSetup, invalidate []string)
- func MakeFQName(pipeline string, psid string) string
- func MapTwoPipestances(newp *Pipestance, oldp *Pipestance) map[*Node]*Node
- func MaximizeMaxFiles() error
- func MaximizeMaxProcs() error
- func ParseFQName(fqname string) (string, string)
- func ParseJobMode(data string) (string, string, string)
- func ParseTimestamp(data string) string
- func ParseVersions(data string) (string, string, error)
- func Pdeathsig(attr *syscall.SysProcAttr, sig syscall.Signal) *syscall.SysProcAttr
- func ScanTree(root *Node)
- func SetMaxFiles(rlim *unix.Rlimit) error
- func SetMaxProcs(rlim *unix.Rlimit) error
- func TaintNode(root *Node, nodemap map[*Node]*Node)
- func VDRTaint(root *Node, nodemap map[*Node]*Node)
- func VerifyOnFinish(onfinish string)
- func VerifyProfileMode(profileMode ProfileMode)
- func VerifyVDRMode(vdrMode string)
- type ArgumentMap
- type Binding
- type BindingInfo
- type Chunk
- type ChunkDef
- type ChunkInfo
- type ChunkPerfInfo
- type DiskSpaceError
- type EdgeInfo
- type Fork
- type ForkBindingsInfo
- type ForkInfo
- type ForkPerfCache
- type ForkPerfInfo
- type ForkStorageEvent
- type InvocationData
- type IoAmount
- type IoRate
- type IoRateValues
- type IoStats
- type IoStatsBuilder
- type IoValues
- type JobInfo
- type JobManager
- type JobManagerJson
- type JobManagerSettings
- type JobModeEnv
- type JobModeJson
- type JobResources
- type LocalJobManager
- func (self *LocalJobManager) Enqueue(shellCmd string, argv []string, envs map[string]string, metadata *Metadata, ...)
- func (self *LocalJobManager) GetMaxCores() int
- func (self *LocalJobManager) GetMaxMemGB() int
- func (self *LocalJobManager) GetSettings() *JobManagerSettings
- func (self *LocalJobManager) GetSystemReqs(threads int, memGB int) (int, int)
- func (self *LocalJobManager) HandleSignal(sig os.Signal)
- type MaxJobsSemaphore
- type Metadata
- func (self *Metadata) AppendAlarm(text string) error
- func (self *Metadata) FilePath(name string) string
- func (self *Metadata) FilesPath() string
- func (self *Metadata) MetadataFilePath(name MetadataFileName) string
- func (self *Metadata) ReadInto(name MetadataFileName, target interface{}) error
- func (self *Metadata) UpdateJournal(name MetadataFileName) error
- func (self *Metadata) Write(name MetadataFileName, object interface{}) error
- func (self *Metadata) WriteAtomic(name MetadataFileName, object interface{}) error
- func (self *Metadata) WriteRaw(name MetadataFileName, text string) error
- func (self *Metadata) WriteTime(name MetadataFileName) error
- type MetadataFileName
- type MetadataInfo
- type MetadataState
- type MroCache
- type Nodable
- type Node
- type NodeByteStamp
- type NodeErrorInfo
- type NodeInfo
- type NodePerfInfo
- type ObservedMemory
- type PerfInfo
- type PerfInfoByStart
- type Pipestance
- func (self *Pipestance) BlacklistMRTNodes(namesToBlacklist []string, nodemap map[*Node]*Node) error
- func (self *Pipestance) Callable() syntax.Callable
- func (self *Pipestance) CheckHeartbeats()
- func (self *Pipestance) ClearUiPort() error
- func (self *Pipestance) ComputeDiskUsage(nodePerf *NodePerfInfo) *NodePerfInfo
- func (self *Pipestance) GetFQName() string
- func (self *Pipestance) GetFailedNodes() []*Node
- func (self *Pipestance) GetFatalError() (string, bool, string, string, MetadataFileName, []string)
- func (self *Pipestance) GetInvocation() interface{}
- func (self *Pipestance) GetPath() string
- func (self *Pipestance) GetPname() string
- func (self *Pipestance) GetPostNodes() map[string]Nodable
- func (self *Pipestance) GetPrenodes() map[string]Nodable
- func (self *Pipestance) GetPsid() string
- func (self *Pipestance) GetState() MetadataState
- func (self *Pipestance) GetTimestamp() string
- func (self *Pipestance) GetUuid() (string, error)
- func (self *Pipestance) GetVersions() (string, string, error)
- func (self *Pipestance) HandleSignal(sig os.Signal)
- func (self *Pipestance) Immortalize(force bool) error
- func (self *Pipestance) IsErrorTransient() (bool, string)
- func (self *Pipestance) Kill()
- func (self *Pipestance) KillWithMessage(message string)
- func (self *Pipestance) LoadMetadata()
- func (self *Pipestance) Lock() error
- func (self *Pipestance) OnFinishHook()
- func (self *Pipestance) PostProcess()
- func (self *Pipestance) RecordUiPort(url string) error
- func (self *Pipestance) RefreshState()
- func (self *Pipestance) Reset() error
- func (self *Pipestance) RestartLocalJobs(jobMode string) error
- func (self *Pipestance) RestartRunningNodes(jobMode string) error
- func (self *Pipestance) Serialize(name MetadataFileName) interface{}
- func (self *Pipestance) SerializePerf() []*NodePerfInfo
- func (self *Pipestance) SerializeState() []*NodeInfo
- func (self *Pipestance) SetUuid(uuid string) error
- func (self *Pipestance) StepNodes() bool
- func (self *Pipestance) Unlock()
- func (self *Pipestance) VDRKill() *VDRKillReport
- func (self *Pipestance) VerifyJobMode() error
- func (self *Pipestance) ZipMetadata(zipPath string) error
- type PipestanceCopyingError
- type PipestanceExistsError
- type PipestanceFactory
- type PipestanceInvocationError
- type PipestanceJobModeError
- type PipestanceLockedError
- type PipestanceNotExistsError
- type PipestanceNotFailedError
- type PipestanceNotRunningError
- type PipestanceOverrides
- type PipestancePathError
- type PipestanceSetup
- type PipestanceSizeError
- type PipestanceWipeError
- type ProfileMode
- type PythonInfo
- type RemoteJobManager
- type ResourceSemaphore
- func (self *ResourceSemaphore) Acquire(n int64) error
- func (self *ResourceSemaphore) Available() int64
- func (self *ResourceSemaphore) CurrentSize() int64
- func (self *ResourceSemaphore) InUse() int64
- func (self *ResourceSemaphore) QueueLength() int
- func (self *ResourceSemaphore) Release(n int64)
- func (self *ResourceSemaphore) Reserved() int64
- func (self *ResourceSemaphore) UpdateActual(n int64) int64
- func (self *ResourceSemaphore) UpdateFreeUsed(free, usedReservation int64) int64
- func (self *ResourceSemaphore) UpdateSize(n int64)
- type Runtime
- func (self *Runtime) BuildCallSource(incpaths []string, name string, args map[string]interface{}, ...) (string, error)
- func (self *Runtime) GetMetadata(pipestancePath string, metadataPath string) (string, error)
- func (self *Runtime) GetSerialization(pipestancePath string, name MetadataFileName) (interface{}, bool)
- func (self *Runtime) GetSerializationInto(pipestancePath string, name MetadataFileName, target interface{}) error
- func (self *Runtime) InvokePipeline(src string, srcPath string, psid string, pipestancePath string, ...) (*Pipestance, error)
- func (self *Runtime) InvokeStage(src string, srcPath string, ssid string, stagestancePath string, ...) (*Stagestance, error)
- func (self *Runtime) ReattachToPipestance(psid string, pipestancePath string, src string, mroPaths []string, ...) (*Pipestance, error)
- func (self *Runtime) ReattachToPipestanceWithMroSrc(psid string, pipestancePath string, src string, mroPaths []string, ...) (*Pipestance, error)
- type RuntimeError
- type RuntimeOptions
- type Rusage
- type RusageInfo
- type StageDefs
- type StageOverride
- type StagePerfInfo
- type Stagestance
- func (self *Stagestance) Callable() syntax.Callable
- func (self *Stagestance) CheckHeartbeats()
- func (self *Stagestance) GetFQName() string
- func (self *Stagestance) GetFatalError() (string, bool, string, string, MetadataFileName, []string)
- func (self *Stagestance) GetPostNodes() map[string]Nodable
- func (self *Stagestance) GetPrenodes() map[string]Nodable
- func (self *Stagestance) GetState() MetadataState
- func (self *Stagestance) LoadMetadata()
- func (self *Stagestance) PostProcess()
- func (self *Stagestance) RefreshState()
- func (self *Stagestance) Step() bool
- type StorageEvent
- type StorageEventByTimestamp
- type TopNode
- type VDRByTimestamp
- type VDRKillReport
- type VersionInfo
- type WallClockInfo
Examples ¶
Constants ¶
const ( ChunksPrefix = "chunks_" SplitPrefix = "split_" JoinPrefix = "join_" CleanupPrefix = "cleanup_" RetryPrefix = "retry_" )
const MetadataFilePrefix string = "_"
const PIPESTANCE_MIN_DISK uint64 = 50 * 1024 * 1024
The minimum amount of available disk space for a pipestance directory. If the available space falls below this at any time during the run, the the pipestance is killed.
const PIPESTANCE_MIN_INODES uint64 = 500
The minimum number of inodes available in the pipestance directory below which the pipestance will not run.
const STAGE_TYPE_CHUNK = "chunk"
const STAGE_TYPE_JOIN = "join"
const STAGE_TYPE_SPLIT = "split"
Variables ¶
var LegalOverrideTypes map[string]reflect.Kind = map[string]reflect.Kind{ "force_volatile": reflect.Bool, "join.threads": reflect.Float64, "join.mem_gb": reflect.Float64, "chunk.threads": reflect.Float64, "chunk.mem_gb": reflect.Float64, "split.threads": reflect.Float64, "split.mem_gb": reflect.Float64, }
Specifies the expected types for elements in a stageoverride map. Note that all JSON numeric types look like Float64s when we stick them in an interface.
Functions ¶
func BuildCallSource ¶
func CheckMinimalSpace ¶
Returns an error if the current available space on the disk drive is very low.
func CompileAll ¶
Compile all the MRO files in mroPaths.
func DefaultRetries ¶
func DefaultRetries() int
func GetAvailableSpace ¶
func GetMaxFiles ¶
Gets the current (soft) and maximum (hard) rlimit for number of open files.
See `man getrlimit`
func GetMaxProcs ¶
Gets the current (soft) and maximum (hard) rlimit for number of processes.
See `man getrlimit`
func GetUserProcessCount ¶
Get the number of processes (threads) currently running for the current user.
func MRTBuildPipeline ¶
func MRTBuildPipeline(newinfo *PipestanceSetup, oldinfo *PipestanceSetup, invalidate []string)
This is the main entry point for "mrt".
newinfo corresponds to a new (non-existing) pipestance and oldinfo to an existing pipestance. Invalidate lists stages in the new pipestance that have code differences.
We create a new pipestance directory and link every stage/pipeline from oldinfo that we can. We explicitly don't link anything in |invalidate| or that derives from anything in invalidate.
After this runs, the new directory can be mrp'ed to run the new pipestance.
func MakeFQName ¶
func MapTwoPipestances ¶
func MapTwoPipestances(newp *Pipestance, oldp *Pipestance) map[*Node]*Node
This takes two pipestances and creates a map that associates nodes in one pipestance with the nodes in the other. Nodes are associated if they have the same name.
func MaximizeMaxFiles ¶
func MaximizeMaxFiles() error
Sets the soft rlimit for maximum open files equal to the hard limit.
See `man setrlimit`.
func MaximizeMaxProcs ¶
func MaximizeMaxProcs() error
Sets the soft rlimit for maximum processes equal to the hard limit.
See `man setrlimit`.
func ParseFQName ¶
func ParseTimestamp ¶
func Pdeathsig ¶
func Pdeathsig(attr *syscall.SysProcAttr, sig syscall.Signal) *syscall.SysProcAttr
Add pdeathsig to a SysProcAttr structure, if the operating system supports it, and return the object. On other platforms, do nothing.
func ScanTree ¶
func ScanTree(root *Node)
Iterate over the entire tree and print the names of the nodes that have been blacklisted
func SetMaxFiles ¶
Set the current (soft) and maximum (hard) rlimit for number of open files.
See `man setrlimit`
func SetMaxProcs ¶
Sets the current (soft) and maximum (hard) rlimit for number of processes.
See `man setrlimit`
func VerifyOnFinish ¶
func VerifyOnFinish(onfinish string)
func VerifyProfileMode ¶
func VerifyProfileMode(profileMode ProfileMode)
func VerifyVDRMode ¶
func VerifyVDRMode(vdrMode string)
Types ¶
type ArgumentMap ¶
type ArgumentMap map[string]interface{}
Mapping from argument or output names to values.
Includes convenience methods to validate the arguments against parameter lists from MRO, and to convert to or from other structured data types.
ArgumentMap always deserializes numbers as json.Number values, in order to prevent loss of precision for integer types.
func MakeArgumentMap ¶
func MakeArgumentMap(binding interface{}) ArgumentMap
Convenience method to convert an arbitrary object type into an ArgumentMap.
This is intended primarily for use by authors of native Go stages.
func (ArgumentMap) Decode ¶
func (self ArgumentMap) Decode(target interface{}) error
Convenience method to convert an ArgumentMap into another kind of object.
This is intended primarily for authors of native Golang stages.
func (*ArgumentMap) UnmarshalJSON ¶
func (self *ArgumentMap) UnmarshalJSON(b []byte) error
func (ArgumentMap) Validate ¶
func (self ArgumentMap) Validate(expected *syntax.Params, isInput bool, optional ...*syntax.Params) (error, string)
Validate that all of the arguments in the map are declared parameters, and that all declared parameters are set in the arguments to a value of the correct type, or null.
Hard errors are returned as the first parameter. "soft" error messages are returned in the second.
Optional params are values which are permitted to be in the argument map (if they are of the correct type) but which are not required to be present. For example, for a stage defined as
stage STAGE( in int a, out int b, ) split ( in int c, out int d, )
then in the outputs from the chunks, d is required but b is optional.
type Binding ¶
type Binding struct {
// contains filtered or unexported fields
}
Holds information about the value of an input arguemnt, either hard-coded into the MRO or bound to the output of another node.
type BindingInfo ¶
type BindingInfo struct { Id string `json:"id"` Type string `json:"type"` ValExp string `json:"valexp"` Mode string `json:"mode"` Output string `json:"output"` Sweep bool `json:"sweep"` SweepRootId string `json:"sweepRootId"` Node interface{} `json:"node"` MatchedFork interface{} `json:"matchedFork"` Value interface{} `json:"value"` Waiting bool `json:"waiting"` }
An exportable version of Binding.
type Chunk ¶
type Chunk struct {
// contains filtered or unexported fields
}
Represents the state of a stage chunk (the "main" method).
type ChunkDef ¶
type ChunkDef struct { Resources *JobResources Args ArgumentMap }
Defines the resources and arguments of a chunk.
func (*ChunkDef) MarshalJSON ¶
func (*ChunkDef) MergeArguments ¶
func (self *ChunkDef) MergeArguments(bindings ArgumentMap) *ChunkDef
func (*ChunkDef) UnmarshalJSON ¶
type ChunkInfo ¶
type ChunkInfo struct { Index int `json:"index"` ChunkDef *ChunkDef `json:"chunkDef"` State MetadataState `json:"state"` Metadata *MetadataInfo `json:"metadata"` }
Exportable information about a Chunk object.
type ChunkPerfInfo ¶
type DiskSpaceError ¶
func (*DiskSpaceError) Error ¶
func (self *DiskSpaceError) Error() string
type Fork ¶
type Fork struct {
// contains filtered or unexported fields
}
Represents a fork of a stage or pipeline. When sweaping over multiple possible values for an input parameter, there will be more than one fork for a given pipeline or stage.
type ForkBindingsInfo ¶
type ForkBindingsInfo struct { Argument []*BindingInfo `json:"Argument"` Return []*BindingInfo `json:"Return"` }
type ForkInfo ¶
type ForkInfo struct { Index int `json:"index"` ArgPermute map[string]interface{} `json:"argPermute"` JoinDef *JobResources `json:"joinDef"` State MetadataState `json:"state"` Metadata *MetadataInfo `json:"metadata"` SplitMetadata *MetadataInfo `json:"split_metadata"` JoinMetadata *MetadataInfo `json:"join_metadata"` Chunks []*ChunkInfo `json:"chunks"` Bindings *ForkBindingsInfo `json:"bindings"` }
Exportable information from a Fork object.
type ForkPerfCache ¶
type ForkPerfCache struct {
// contains filtered or unexported fields
}
type ForkPerfInfo ¶
type ForkPerfInfo struct { Stages []*StagePerfInfo `json:"stages"` Index int `json:"index"` Chunks []*ChunkPerfInfo `json:"chunks"` SplitStats *PerfInfo `json:"split_stats"` JoinStats *PerfInfo `json:"join_stats"` ForkStats *PerfInfo `json:"fork_stats"` }
type ForkStorageEvent ¶
type ForkStorageEvent struct { Name string ChildNames []string TotalBytes uint64 ChunkBytes uint64 ForkBytes uint64 TotalVDRBytes uint64 ForkVDRBytes uint64 Timestamp time.Time VDRTimestamp time.Time }
this is due to the fact that the VDR bytes/total bytes reported at the fork level is the sum of chunk + split + join plus any additional files. The additional files that are unique to the fork cannot be resolved unless you sub out chunk/split/join and then child stages.
func NewForkStorageEvent ¶
type InvocationData ¶
type InvocationData struct { Call string `json:"call"` Args map[string]interface{} `json:"args"` SweepArgs []string `json:"sweepargs"` IncludePaths []string `json:"incpaths"` }
func BuildCallData ¶
func BuildCallData(src string, srcPath string, mroPaths []string) (*InvocationData, error)
func BuildDataForAst ¶
func BuildDataForAst(incpaths []string, ast *syntax.Ast) (*InvocationData, error)
type IoAmount ¶
Collects a total number of read/write IO operations.
func GetRunningIo ¶
Gets IO statistics for a running process by pid.
type IoRate ¶
type IoRate struct { Read IoRateValues `json:"read"` Write IoRateValues `json:"write"` }
Represents a rate of change for IoAmount
type IoRateValues ¶
type IoRateValues struct { // The rate at which IO syscalls were issued. These may have been against // non-block devices, such as sockets, terminals or a psudo-filesystem // such as /proc. Syscalls float64 `json:"sysc"` // The rate at which bytes were transferred to or from a block device. // Even when a read or write is made against a block device, it may not be // counted in this number if it was served from cache, or if the file was // truncated or unlinked before it was synced to disk. BlockBytes float64 `json:"bytes"` }
IoValues per second
func (*IoRateValues) Increment ¶
func (self *IoRateValues) Increment(other IoRateValues)
Increment this rate by another.
func (*IoRateValues) TakeMax ¶
func (self *IoRateValues) TakeMax(other IoRateValues)
Update this rate to be the maximum of itself and another.
type IoStats ¶
type IoStats struct { Total IoAmount `json:"total,omitempty"` RateMax IoRate `json:"max,omitempty"` RateDev IoRate `json:"dev,omitempty"` }
Collects statistics based on observed process tree IO usage.
type IoStatsBuilder ¶
type IoStatsBuilder struct { IoStats // contains filtered or unexported fields }
Example ¶
// Set the start time to a known value. t := time.Now() sb := NewIoStatsBuilder() sb.lastMeasurement = t sb.start = t // No writes. // Reads at a constant rate of 2048 bytes every 10 seconds, with // 11, 9, and 10 syscalls in the first, second, and third // 10-second period, respectively. sb.Update(map[int]*IoAmount{ 1: &IoAmount{ Read: IoValues{Syscalls: 1, BlockBytes: 1024}, Write: IoValues{}, }, 2: &IoAmount{ Read: IoValues{Syscalls: 10, BlockBytes: 1024}, Write: IoValues{}, }, }, t.Add(time.Second*10)) sb.Update(map[int]*IoAmount{ 1: &IoAmount{ Read: IoValues{Syscalls: 10, BlockBytes: 3072}, Write: IoValues{}, }, 2: &IoAmount{ Read: IoValues{Syscalls: 10, BlockBytes: 1024}, Write: IoValues{}, }, }, t.Add(time.Second*20)) sb.Update(map[int]*IoAmount{ 1: &IoAmount{ Read: IoValues{Syscalls: 10, BlockBytes: 4096}, Write: IoValues{}, }, 3: &IoAmount{ Read: IoValues{Syscalls: 10, BlockBytes: 1024}, Write: IoValues{}, }, }, t.Add(time.Second*30)) fmt.Println("Read syscalls:") fmt.Println("Total:", sb.Total.Read.Syscalls) fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n\n", float64(sb.Total.Read.Syscalls)/30, sb.RateDev.Read.Syscalls, sb.RateMax.Read.Syscalls) fmt.Println("Write syscalls:") fmt.Println("Total:", sb.Total.Write.Syscalls) fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n\n", float64(sb.Total.Write.Syscalls)/30, sb.RateDev.Write.Syscalls, sb.RateMax.Write.Syscalls) fmt.Println("Read bytes:") fmt.Println("Total:", sb.Total.Read.BlockBytes) fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n\n", float64(sb.Total.Read.BlockBytes)/30, sb.RateDev.Read.BlockBytes, sb.RateMax.Read.BlockBytes) fmt.Println("Write bytes:") fmt.Println("Total:", sb.Total.Write.BlockBytes) fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n", float64(sb.Total.Write.BlockBytes)/30, sb.RateDev.Write.BlockBytes, sb.RateMax.Write.BlockBytes)
Output: Read syscalls: Total: 30 Rate: 1.0 ± 0.08 (max: 1.1) Write syscalls: Total: 0 Rate: 0.0 ± 0.00 (max: 0.0) Read bytes: Total: 6144 Rate: 204.8 ± 0.00 (max: 204.8) Write bytes: Total: 0 Rate: 0.0 ± 0.00 (max: 0.0)
func NewIoStatsBuilder ¶
func NewIoStatsBuilder() *IoStatsBuilder
type IoValues ¶
type IoValues struct { // The number of io syscalls issued. These may have been against non-block // devices, such as sockets, terminals or a psudo-filesystem such as /proc. // For block IO operations, see rusage. Syscalls int64 `json:"sysc"` // The number of bytes transferred to or from a block device. Even when a // read or write is made against a block device, it may not be counted in // this number if it was served from cache, or if the file was truncated // or unlinked before it was synced to disk. BlockBytes int64 `json:"bytes"` }
Stores cumulative values for IO metrics.
type JobInfo ¶
type JobInfo struct { Name string `json:"name"` Pid int `json:"pid,omitempty"` Host string `json:"host,omitempty"` Type string `json:"type,omitempty"` Cwd string `json:"cwd,omitempty"` PythonInfo *PythonInfo `json:"python,omitempty"` RusageInfo *RusageInfo `json:"rusage,omitempty"` MemoryUsage *ObservedMemory `json:"used_bytes,omitempty"` IoStats *IoStats `json:"io,omitempty"` WallClockInfo *WallClockInfo `json:"wallclock,omitempty"` Threads int `json:"threads,omitempty"` MemGB int `json:"memGB,omitempty"` ProfileMode ProfileMode `json:"profile_mode,omitempty"` Stackvars string `json:"stackvars_flag,omitempty"` Monitor string `json:"monitor_flag,omitempty"` Invocation *InvocationData `json:"invocation,omitempty"` Version *VersionInfo `json:"version,omitempty"` ClusterEnv map[string]string `json:"sge,omitempty"` }
type JobManager ¶
type JobManager interface { GetSystemReqs(int, int) (int, int) GetMaxCores() int GetMaxMemGB() int GetSettings() *JobManagerSettings // contains filtered or unexported methods }
Job managers
type JobManagerJson ¶
type JobManagerJson struct { JobSettings *JobManagerSettings `json:"settings"` JobModes map[string]*JobModeJson `json:"jobmodes"` }
type JobManagerSettings ¶
type JobModeEnv ¶
type JobModeJson ¶
type JobResources ¶
type JobResources struct { Threads int `json:"__threads,omitempty"` MemGB int `json:"__mem_gb,omitempty"` Special string `json:"__special,omitempty"` }
Defines resources used by a stage.
func (*JobResources) ToMap ¶
func (self *JobResources) ToMap() ArgumentMap
type LocalJobManager ¶
type LocalJobManager struct {
// contains filtered or unexported fields
}
func NewLocalJobManager ¶
func (*LocalJobManager) GetMaxCores ¶
func (self *LocalJobManager) GetMaxCores() int
func (*LocalJobManager) GetMaxMemGB ¶
func (self *LocalJobManager) GetMaxMemGB() int
func (*LocalJobManager) GetSettings ¶
func (self *LocalJobManager) GetSettings() *JobManagerSettings
func (*LocalJobManager) GetSystemReqs ¶
func (self *LocalJobManager) GetSystemReqs(threads int, memGB int) (int, int)
func (*LocalJobManager) HandleSignal ¶
func (self *LocalJobManager) HandleSignal(sig os.Signal)
type MaxJobsSemaphore ¶
type MaxJobsSemaphore struct { Limit int // contains filtered or unexported fields }
A semaphore limiting the number of unique jobs which are active at a time.
func NewMaxJobsSemaphore ¶
func NewMaxJobsSemaphore(limit int) *MaxJobsSemaphore
func (*MaxJobsSemaphore) Acquire ¶
func (self *MaxJobsSemaphore) Acquire(metadata *Metadata) bool
Wait for this semaphore to have capacity to run this metadata object.
If the object is not in the queued or waiting states, it was canceled between when the job was enqueued and now.
If the object was already in the semaphore, as may be the case in the event of automatic restart if the failure was missed for whatever reason, then we only treat the metadata object as having one job running ever.
func (*MaxJobsSemaphore) Current ¶
func (self *MaxJobsSemaphore) Current() int
func (*MaxJobsSemaphore) FindDone ¶
func (self *MaxJobsSemaphore) FindDone()
Check that each metadata object which holds the semaphore is still actually running.
func (*MaxJobsSemaphore) Release ¶
func (self *MaxJobsSemaphore) Release(metadata *Metadata)
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
Manages interatction with the filesystem-based "database" of Martian metadata for a pipeline node (pipeline, subpipeline, fork, stage, split, chunk, join).
func NewMetadata ¶
func (*Metadata) AppendAlarm ¶
Add text to the Alarm file for this node.
func (*Metadata) MetadataFilePath ¶
func (self *Metadata) MetadataFilePath(name MetadataFileName) string
Get the absolute path to the given metadata file.
func (*Metadata) ReadInto ¶
func (self *Metadata) ReadInto(name MetadataFileName, target interface{}) error
Reads the content of the given metadata file and deserializes it into the given object.
func (*Metadata) UpdateJournal ¶
func (self *Metadata) UpdateJournal(name MetadataFileName) error
Writes a journal file corresponding to the given metadata file. This is used to notify the runtime of the existence of a new or updated file.
The journal is a performance optimization to prevent the runtime from needing to constantly scan the entire database for changes. Instead, it only scans the journal. This means that when a metadata file is created or modified (except by the runtime itself), the change won't be "noticed" until the journal is updated.
func (*Metadata) Write ¶
func (self *Metadata) Write(name MetadataFileName, object interface{}) error
Serializes the given object and writes it to the given metadata file.
func (*Metadata) WriteAtomic ¶
func (self *Metadata) WriteAtomic(name MetadataFileName, object interface{}) error
Serializes the given object and writes it to the given metadata file in a way that ensures the file is updated atomically and will never be observed in a partially-written form.
func (*Metadata) WriteRaw ¶
func (self *Metadata) WriteRaw(name MetadataFileName, text string) error
Writes the given raw data into the given metadata file.
func (*Metadata) WriteTime ¶
func (self *Metadata) WriteTime(name MetadataFileName) error
Writes the current timestamp into the given metadata file. Generally used for sentinel files.
type MetadataFileName ¶
type MetadataFileName string
const ( AlarmFile MetadataFileName = "alarm" ArgsFile MetadataFileName = "args" Assert MetadataFileName = "assert" ChunkDefsFile MetadataFileName = "chunk_defs" ChunkOutsFile MetadataFileName = "chunk_outs" CompleteFile MetadataFileName = "complete" Errors MetadataFileName = "errors" FinalState MetadataFileName = "finalstate" Heartbeat MetadataFileName = "heartbeat" InvocationFile MetadataFileName = "invocation" JobId MetadataFileName = "jobid" JobInfoFile MetadataFileName = "jobinfo" JobModeFile MetadataFileName = "jobmode" Lock MetadataFileName = "lock" LogFile MetadataFileName = "log" MetadataZip MetadataFileName = "metadata.zip" MroSourceFile MetadataFileName = "mrosource" OutsFile MetadataFileName = "outs" Perf MetadataFileName = "perf" ProgressFile MetadataFileName = "progress" QueuedLocally MetadataFileName = "queued_locally" Stackvars MetadataFileName = "stackvars" StageDefsFile MetadataFileName = "stage_defs" StdErr MetadataFileName = "stderr" StdOut MetadataFileName = "stdout" TagsFile MetadataFileName = "tags" TimestampFile MetadataFileName = "timestamp" UiPort MetadataFileName = "uiport" UuidFile MetadataFileName = "uuid" VdrKill MetadataFileName = "vdrkill" VersionsFile MetadataFileName = "versions" DisabledFile MetadataFileName = "disabled" )
const AnyFile MetadataFileName = "*"
func (MetadataFileName) FileName ¶
func (self MetadataFileName) FileName() string
type MetadataInfo ¶
type MetadataInfo struct { // The filesystem path containing the metadata files. Path string `json:"path"` // The metadata file names which exist for this object. Names []string `json:"names"` }
Basic exportable information from a metadata object.
type MetadataState ¶
type MetadataState string
const ( Complete MetadataState = "complete" Failed MetadataState = "failed" DisabledState MetadataState = "disabled" Running MetadataState = "running" Queued MetadataState = "queued" Ready MetadataState = "ready" Waiting MetadataState = "" ForkWaiting MetadataState = "waiting" )
func (MetadataState) HasPrefix ¶
func (self MetadataState) HasPrefix(prefix string) bool
func (MetadataState) IsQueued ¶
func (self MetadataState) IsQueued() bool
func (MetadataState) IsRunning ¶
func (self MetadataState) IsRunning() bool
func (MetadataState) Prefixed ¶
func (self MetadataState) Prefixed(prefix string) MetadataState
type MroCache ¶
type MroCache struct {
// contains filtered or unexported fields
}
func NewMroCache ¶
func NewMroCache() *MroCache
func (*MroCache) GetCallable ¶
func (*MroCache) GetPipelines ¶
type Nodable ¶
type Nodable interface { // Gets the node's fully-qualified name. GetFQName() string // Returns the set of nodes which serve as prerequisites to this node, // as a mapping from fully-qualified name to node. GetPrenodes() map[string]Nodable // Returns the set of nodes which are able to run once this node // has completed. GetPostNodes() map[string]Nodable // Gets the mro AST object, if any, which will be executed for this node. Callable() syntax.Callable // contains filtered or unexported methods }
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Represents a node in the pipeline graph.
func (*Node) FindNodeByName ¶
Find a node by a name. |name| may be a "partially" qualified pipestance name (see partiallyQualifiedName()) or just a stage name. If it is a stage name, and that name occurs multiple times in the pipeline, we will panic().
func (*Node) GetPostNodes ¶
func (*Node) GetPrenodes ¶
func (*Node) VDRMurdered ¶
Return true if the data inside a node was VDR'ed.
type NodeByteStamp ¶
type NodeErrorInfo ¶
type NodeErrorInfo struct { FQname string `json:"fqname"` Path string `json:"path"` Summary string `json:"summary,omitempty"` Log string `json:"log,omitempty"` }
Encapsulates information about a node failure.
type NodeInfo ¶
type NodeInfo struct { Name string `json:"name"` Fqname string `json:"fqname"` Type string `json:"type"` Path string `json:"path"` State MetadataState `json:"state"` Metadata *MetadataInfo `json:"metadata"` SweepBindings []*BindingInfo `json:"sweepbindings"` Forks []*ForkInfo `json:"forks"` Edges []EdgeInfo `json:"edges"` StagecodeLang syntax.StageCodeType `json:"stagecodeLang"` StagecodeCmd string `json:"stagecodeCmd"` Error *NodeErrorInfo `json:"error,omitempty"` }
type NodePerfInfo ¶
type NodePerfInfo struct { Name string `json:"name"` Fqname string `json:"fqname"` Type string `json:"type"` Forks []*ForkPerfInfo `json:"forks"` MaxBytes int64 `json:"maxbytes"` BytesHist []*NodeByteStamp `json:"bytehist"` HighMem *ObservedMemory `json:"highmem,omitempty"` }
type ObservedMemory ¶
type ObservedMemory struct { Rss int64 `json:"rss"` Vmem int64 `json:"vmem"` Text int64 `json:"text"` Stack int64 `json:"stack"` Procs int `json:"proc_count"` }
Current observed memory usage.
func GetProcessTreeMemory ¶
func GetProcessTreeMemory(pid int, includeParent bool, io map[int]*IoAmount) (mem ObservedMemory, err error)
Gets the total memory usage for the given process and all of its children. Only errors getting the first process's memory, or the set of children for that process, are reported. includeParent specifies whether the top-level pid is included in the total.
func GetRunningMemory ¶
func GetRunningMemory(pid int) (ObservedMemory, error)
Gets the total vmem and rss memory of a running process by pid.
func (*ObservedMemory) Add ¶
func (self *ObservedMemory) Add(other ObservedMemory)
Add other to this.
func (*ObservedMemory) IncreaseRusage ¶
func (self *ObservedMemory) IncreaseRusage(other *RusageInfo)
Increase this value to the max RSS reported by getrusage, if it is higher.
func (*ObservedMemory) IncreaseTo ¶
func (self *ObservedMemory) IncreaseTo(other ObservedMemory)
Increase this value to max(this,other).
func (*ObservedMemory) IsZero ¶
func (self *ObservedMemory) IsZero() bool
func (*ObservedMemory) RssKb ¶
func (self *ObservedMemory) RssKb() int
func (*ObservedMemory) VmemKb ¶
func (self *ObservedMemory) VmemKb() int
type PerfInfo ¶
type PerfInfo struct { NumJobs int `json:"num_jobs"` NumThreads int `json:"num_threads"` Duration float64 `json:"duration"` CoreHours float64 `json:"core_hours"` MaxRss int `json:"maxrss"` MaxVmem int `json:"maxvmem"` InBlocks int `json:"in_blocks"` OutBlocks int `json:"out_blocks"` TotalBlocks int `json:"total_blocks"` InBlocksRate float64 `json:"in_blocks_rate"` OutBlocksRate float64 `json:"out_blocks_rate"` TotalBlocksRate float64 `json:"total_blocks_rate"` InBytes int64 `json:"in_bytes"` OutBytes int64 `json:"out_bytes"` InBytesRate float64 `json:"in_bytes_rate"` OutBytesRate float64 `json:"out_bytes_rate"` InBytesPeak float64 `json:"in_bytes_peak"` OutBytesPeak float64 `json:"out_bytes_peak"` Start time.Time `json:"start"` End time.Time `json:"end"` WallTime float64 `json:"walltime"` UserTime float64 `json:"usertime"` SystemTime float64 `json:"systemtime"` TotalFiles uint `json:"total_files"` TotalBytes uint64 `json:"total_bytes"` OutputFiles uint `json:"output_files"` OutputBytes uint64 `json:"output_bytes"` VdrFiles uint `json:"vdr_files"` VdrBytes uint64 `json:"vdr_bytes"` // Deviation for a single job is deviation over time as measured by mrjob. // For node aggregates, it's the deviation between child nodes. InBytesDev float64 `json:"in_bytes_dev"` OutBytesDev float64 `json:"out_bytes_dev"` }
func ComputeStats ¶
func ComputeStats(perfInfos []*PerfInfo, outputPaths []string, vdrKillReport *VDRKillReport) *PerfInfo
type PerfInfoByStart ¶
type PerfInfoByStart []*PerfInfo
func (PerfInfoByStart) Len ¶
func (self PerfInfoByStart) Len() int
func (PerfInfoByStart) Less ¶
func (self PerfInfoByStart) Less(i, j int) bool
func (PerfInfoByStart) Swap ¶
func (self PerfInfoByStart) Swap(i, j int)
type Pipestance ¶
type Pipestance struct {
// contains filtered or unexported fields
}
Encapsulates information about an instance of a running (or failed, or completed) pipeline.
func NewPipestance ¶
func (*Pipestance) BlacklistMRTNodes ¶
func (self *Pipestance) BlacklistMRTNodes(namesToBlacklist []string, nodemap map[*Node]*Node) error
This marks a set of nodes as well as any nodes dependent on them as blacklisted. A node is dependent another node if it uses data that it provides (is in postnodes) or if it is a parent of that node.
func (*Pipestance) Callable ¶
func (self *Pipestance) Callable() syntax.Callable
func (*Pipestance) CheckHeartbeats ¶
func (self *Pipestance) CheckHeartbeats()
func (*Pipestance) ClearUiPort ¶
func (self *Pipestance) ClearUiPort() error
func (*Pipestance) ComputeDiskUsage ¶
func (self *Pipestance) ComputeDiskUsage(nodePerf *NodePerfInfo) *NodePerfInfo
func (*Pipestance) GetFQName ¶
func (self *Pipestance) GetFQName() string
func (*Pipestance) GetFailedNodes ¶
func (self *Pipestance) GetFailedNodes() []*Node
func (*Pipestance) GetFatalError ¶
func (self *Pipestance) GetFatalError() (string, bool, string, string, MetadataFileName, []string)
func (*Pipestance) GetInvocation ¶
func (self *Pipestance) GetInvocation() interface{}
func (*Pipestance) GetPath ¶
func (self *Pipestance) GetPath() string
func (*Pipestance) GetPname ¶
func (self *Pipestance) GetPname() string
func (*Pipestance) GetPostNodes ¶
func (self *Pipestance) GetPostNodes() map[string]Nodable
func (*Pipestance) GetPrenodes ¶
func (self *Pipestance) GetPrenodes() map[string]Nodable
func (*Pipestance) GetPsid ¶
func (self *Pipestance) GetPsid() string
func (*Pipestance) GetState ¶
func (self *Pipestance) GetState() MetadataState
func (*Pipestance) GetTimestamp ¶
func (self *Pipestance) GetTimestamp() string
func (*Pipestance) GetUuid ¶
func (self *Pipestance) GetUuid() (string, error)
func (*Pipestance) GetVersions ¶
func (self *Pipestance) GetVersions() (string, string, error)
func (*Pipestance) HandleSignal ¶
func (self *Pipestance) HandleSignal(sig os.Signal)
func (*Pipestance) Immortalize ¶
func (self *Pipestance) Immortalize(force bool) error
Generate the final state file for the pipestance and zip the content up for posterity.
Unless force is true, this is only permitted for locked pipestances.
func (*Pipestance) IsErrorTransient ¶
func (self *Pipestance) IsErrorTransient() (bool, string)
Returns true if there is no error or if the error is one we expect to not recur if the pipeline is rerun, and the log message from the first error found, if any.
func (*Pipestance) Kill ¶
func (self *Pipestance) Kill()
func (*Pipestance) KillWithMessage ¶
func (self *Pipestance) KillWithMessage(message string)
func (*Pipestance) LoadMetadata ¶
func (self *Pipestance) LoadMetadata()
func (*Pipestance) Lock ¶
func (self *Pipestance) Lock() error
func (*Pipestance) OnFinishHook ¶
func (self *Pipestance) OnFinishHook()
Run a script whenever a pipestance finishes
func (*Pipestance) PostProcess ¶
func (self *Pipestance) PostProcess()
func (*Pipestance) RecordUiPort ¶
func (self *Pipestance) RecordUiPort(url string) error
func (*Pipestance) RefreshState ¶
func (self *Pipestance) RefreshState()
func (*Pipestance) Reset ¶
func (self *Pipestance) Reset() error
func (*Pipestance) RestartLocalJobs ¶
func (self *Pipestance) RestartLocalJobs(jobMode string) error
Resets local nodes which are queued or are running with a PID that is not a running job. If |jobMode| is "local" then all nodes are treated as local. This is nessessary for when e.g. mrp is restarted in local mode after ctrl-C kills it and all of its child processes.
func (*Pipestance) RestartRunningNodes ¶
func (self *Pipestance) RestartRunningNodes(jobMode string) error
func (*Pipestance) Serialize ¶
func (self *Pipestance) Serialize(name MetadataFileName) interface{}
func (*Pipestance) SerializePerf ¶
func (self *Pipestance) SerializePerf() []*NodePerfInfo
func (*Pipestance) SerializeState ¶
func (self *Pipestance) SerializeState() []*NodeInfo
func (*Pipestance) SetUuid ¶
func (self *Pipestance) SetUuid(uuid string) error
func (*Pipestance) StepNodes ¶
func (self *Pipestance) StepNodes() bool
Process state updates for nodes. Returns true if there was a change in state which would make it productive to call StepNodes again immediately.
func (*Pipestance) Unlock ¶
func (self *Pipestance) Unlock()
func (*Pipestance) VDRKill ¶
func (self *Pipestance) VDRKill() *VDRKillReport
func (*Pipestance) VerifyJobMode ¶
func (self *Pipestance) VerifyJobMode() error
func (*Pipestance) ZipMetadata ¶
func (self *Pipestance) ZipMetadata(zipPath string) error
type PipestanceCopyingError ¶
type PipestanceCopyingError struct {
Psid string
}
PipestanceCopyingError
func (*PipestanceCopyingError) Error ¶
func (self *PipestanceCopyingError) Error() string
type PipestanceExistsError ¶
type PipestanceExistsError struct {
Psid string
}
PipestanceExistsError
func (*PipestanceExistsError) Error ¶
func (self *PipestanceExistsError) Error() string
type PipestanceFactory ¶
type PipestanceFactory interface { ReattachToPipestance() (*Pipestance, error) InvokePipeline() (*Pipestance, error) }
Encapsulates the information needed to instantiate a pipestance, either by creating one or reattaching to an existing one.
type PipestanceInvocationError ¶
PipestanceInvocationError
func (*PipestanceInvocationError) Error ¶
func (self *PipestanceInvocationError) Error() string
type PipestanceJobModeError ¶
PipestanceJobModeError
func (*PipestanceJobModeError) Error ¶
func (self *PipestanceJobModeError) Error() string
type PipestanceLockedError ¶
PipestanceLockedError
func (*PipestanceLockedError) Error ¶
func (self *PipestanceLockedError) Error() string
type PipestanceNotExistsError ¶
type PipestanceNotExistsError struct {
Psid string
}
PipestanceNotExistsError
func (*PipestanceNotExistsError) Error ¶
func (self *PipestanceNotExistsError) Error() string
type PipestanceNotFailedError ¶
type PipestanceNotFailedError struct {
Psid string
}
PipestanceNotFailedError
func (*PipestanceNotFailedError) Error ¶
func (self *PipestanceNotFailedError) Error() string
type PipestanceNotRunningError ¶
type PipestanceNotRunningError struct {
Psid string
}
PipestanceNotRunningError
func (*PipestanceNotRunningError) Error ¶
func (self *PipestanceNotRunningError) Error() string
type PipestanceOverrides ¶
type PipestanceOverrides struct {
// contains filtered or unexported fields
}
func ReadOverrides ¶
func ReadOverrides(path string) (*PipestanceOverrides, error)
Read the overrides file and produce a pipestance overrides object.
func (*PipestanceOverrides) GetOverride ¶
func (self *PipestanceOverrides) GetOverride(node *Node, what string, def interface{}) interface{}
Compute the value to use for a stage option when that value might be overrided.
|node| is the Node object for the stage
|what| is the name of the override we're considering
|def| is the default value to use if the value is not overridded
type PipestancePathError ¶
type PipestancePathError struct {
Path string
}
PipestancePathError
func (*PipestancePathError) Error ¶
func (self *PipestancePathError) Error() string
type PipestanceSetup ¶
type PipestanceSetup struct { Srcpath string // Path to the mro invocation file Psid string // pipestance ID PipestancePath string // Path to put this pipestance MroPaths []string // Where to look for MROs MroVersion string // mro version Envs map[string]string // mro environment vars to pass through JobMode string // jobmode to use }
PipestanceSetup defines the parameters we need to start a pipestance. It encapsulates the argument to InvokePipelineand friends.
type PipestanceSizeError ¶
type PipestanceSizeError struct {
Psid string
}
PipestanceSizeError
func (*PipestanceSizeError) Error ¶
func (self *PipestanceSizeError) Error() string
type PipestanceWipeError ¶
type PipestanceWipeError struct {
Psid string
}
PipestanceWipeError
func (*PipestanceWipeError) Error ¶
func (self *PipestanceWipeError) Error() string
type ProfileMode ¶
type ProfileMode string
Defines available profiling modes for stage code.
const ( DisableProfile ProfileMode = "disable" CpuProfile ProfileMode = "cpu" MemProfile ProfileMode = "mem" LineProfile ProfileMode = "line" PyflameProfile ProfileMode = "pyflame" PerfRecordProfile ProfileMode = "perf" )
type PythonInfo ¶
type RemoteJobManager ¶
type RemoteJobManager struct {
// contains filtered or unexported fields
}
func NewRemoteJobManager ¶
func (*RemoteJobManager) GetMaxCores ¶
func (self *RemoteJobManager) GetMaxCores() int
func (*RemoteJobManager) GetMaxMemGB ¶
func (self *RemoteJobManager) GetMaxMemGB() int
func (*RemoteJobManager) GetSettings ¶
func (self *RemoteJobManager) GetSettings() *JobManagerSettings
func (*RemoteJobManager) GetSystemReqs ¶
func (self *RemoteJobManager) GetSystemReqs(threads int, memGB int) (int, int)
type ResourceSemaphore ¶
type ResourceSemaphore struct { Name string // contains filtered or unexported fields }
A semaphore type which allows for the maxium size of things entering the semaphore to be dynamically reduced based on observed resource availability.
func NewResourceSemaphore ¶
func NewResourceSemaphore(size int64, name string) *ResourceSemaphore
Create a new semaphore with the given capactiy.
func (*ResourceSemaphore) Acquire ¶
func (self *ResourceSemaphore) Acquire(n int64) error
Reserve n of the resource. Block until it is available. Returns an error if more was requested than is possible to serve.
func (*ResourceSemaphore) Available ¶
func (self *ResourceSemaphore) Available() int64
Get the current amount of available resources.
func (*ResourceSemaphore) CurrentSize ¶
func (self *ResourceSemaphore) CurrentSize() int64
Get the current amount of resources which are reservable (including those already reserved).
func (*ResourceSemaphore) InUse ¶
func (self *ResourceSemaphore) InUse() int64
Get the current amount of resources in use. This includes both reserved resources and resources for which their usage is unaccounted for.
func (*ResourceSemaphore) QueueLength ¶
func (self *ResourceSemaphore) QueueLength() int
Get the number of items waiting on the semaphore.
func (*ResourceSemaphore) Release ¶
func (self *ResourceSemaphore) Release(n int64)
Release n of the resource.
func (*ResourceSemaphore) Reserved ¶
func (self *ResourceSemaphore) Reserved() int64
Get the current amount of explicitly reserved resources.
func (*ResourceSemaphore) UpdateActual ¶
func (self *ResourceSemaphore) UpdateActual(n int64) int64
Set the current actual availability, e.g. by checking free memory. Returns the difference between the unreserved and actual. A negative return value indicates either a job which is using more memory than it reserved, or some other process on the system is using memory.
func (*ResourceSemaphore) UpdateFreeUsed ¶
func (self *ResourceSemaphore) UpdateFreeUsed(free, usedReservation int64) int64
Set the current actual availability based on the current free amount and the amount of the reserved usage which is actually in use. This handles the case where, for example, 30 of 32 GB of memory are reserved, but only 16GB has actually been committed so far, so 16GB appears to be free.
func (*ResourceSemaphore) UpdateSize ¶
func (self *ResourceSemaphore) UpdateSize(n int64)
Change the current semaphore size. This is is for cases where the resource limit may change but the current consumption is invisible. It is logically equivalent to self.UpdateActual(n, self.Reserved()), though without the potential race conditions.
type Runtime ¶
type Runtime struct { Config *RuntimeOptions MroCache *MroCache JobManager JobManager LocalJobManager JobManager // contains filtered or unexported fields }
Collects configuration and state required to initialize and run pipestances and stagestances.
func NewRuntime
deprecated
func NewRuntime(jobMode string, vdrMode string, profileMode ProfileMode, martianVersion string) *Runtime
Deprecated: use RuntimeConfig.NewRuntime() instead
func NewRuntimeWithCores
deprecated
func NewRuntimeWithCores(jobMode string, vdrMode string, profileMode ProfileMode, martianVersion string, reqCores int, reqMem int, reqMemPerCore int, maxJobs int, jobFreqMillis int, jobQueues string, fullStageReset bool, enableStackVars bool, enableZip bool, skipPreflight bool, enableMonitor bool, debug bool, stest bool, onFinishExec string, overrides *PipestanceOverrides, limitLoadavg bool) *Runtime
Deprecated: use RuntimeConfig.NewRuntime() instead
func (*Runtime) BuildCallSource ¶
func (*Runtime) GetMetadata ¶
func (*Runtime) GetSerialization ¶
func (self *Runtime) GetSerialization(pipestancePath string, name MetadataFileName) (interface{}, bool)
func (*Runtime) GetSerializationInto ¶
func (self *Runtime) GetSerializationInto(pipestancePath string, name MetadataFileName, target interface{}) error
func (*Runtime) InvokePipeline ¶
func (self *Runtime) InvokePipeline(src string, srcPath string, psid string, pipestancePath string, mroPaths []string, mroVersion string, envs map[string]string, tags []string) (*Pipestance, error)
Invokes a new pipestance.
func (*Runtime) InvokeStage ¶
func (self *Runtime) InvokeStage(src string, srcPath string, ssid string, stagestancePath string, mroPaths []string, mroVersion string, envs map[string]string) (*Stagestance, error)
Instantiate a stagestance.
func (*Runtime) ReattachToPipestance ¶
type RuntimeError ¶
type RuntimeError struct {
Msg string
}
RuntimeError
func (*RuntimeError) Error ¶
func (self *RuntimeError) Error() string
type RuntimeOptions ¶
type RuntimeOptions struct { // The runtime mode (required): either "local" or a named mode from // jobmanagers/config.json JobMode string // The volatile disk recovery mode (required): either "post", // "rolling", or "disable". VdrMode string // The profiling mode (required): "disable" or one of the available // constants. ProfileMode ProfileMode MartianVersion string LocalMem int LocalCores int MemPerCore int MaxJobs int JobFreqMillis int ResourceSpecial string FullStageReset bool StackVars bool Zip bool SkipPreflight bool Monitor bool Debug bool StressTest bool OnFinishHandler string Overrides *PipestanceOverrides LimitLoadavg bool NeverLocal bool }
Configuration required to initialize a Runtime object.
func DefaultRuntimeOptions ¶
func DefaultRuntimeOptions() RuntimeOptions
func (*RuntimeOptions) NewRuntime ¶
func (c *RuntimeOptions) NewRuntime() *Runtime
func (*RuntimeOptions) ToFlags ¶
func (config *RuntimeOptions) ToFlags() []string
returns the set of command line flags which would set these options.
type Rusage ¶
type Rusage struct { MaxRss int `json:"ru_maxrss"` MinorFaults int `json:"ru_minflt"` MajorFaults int `json:"ru_majflt"` SwapOuts int `json:"ru_nswap"` UserTime float64 `json:"ru_utime"` SystemTime float64 `json:"ru_stime"` InBlocks int `json:"ru_inblock"` OutBlocks int `json:"ru_oublock"` MessagesSent int `json:"ru_msgsnd"` MessagesRcvd int `json:"ru_msgrcv"` SignalsRcvd int `json:"ru_nsignals"` }
type RusageInfo ¶
type RusageInfo struct { Self *Rusage `json:"self,omitempty"` Children *Rusage `json:"children,omitempty"` }
func GetRusage ¶
func GetRusage() *RusageInfo
type StageDefs ¶
type StageDefs struct { ChunkDefs []*ChunkDef `json:"chunks"` JoinDef *JobResources `json:"join,omitempty"` }
func (*StageDefs) UnmarshalJSON ¶
type StageOverride ¶
type StageOverride map[string]interface{}
type StagePerfInfo ¶
type Stagestance ¶
type Stagestance struct {
// contains filtered or unexported fields
}
Similar to a pipestance, except for a single stage. Intended for use during testing and development of pipelines, e.g. with `mrs`.
func NewStagestance ¶
func (*Stagestance) Callable ¶
func (self *Stagestance) Callable() syntax.Callable
func (*Stagestance) CheckHeartbeats ¶
func (self *Stagestance) CheckHeartbeats()
func (*Stagestance) GetFQName ¶
func (self *Stagestance) GetFQName() string
func (*Stagestance) GetFatalError ¶
func (self *Stagestance) GetFatalError() (string, bool, string, string, MetadataFileName, []string)
func (*Stagestance) GetPostNodes ¶
func (self *Stagestance) GetPostNodes() map[string]Nodable
func (*Stagestance) GetPrenodes ¶
func (self *Stagestance) GetPrenodes() map[string]Nodable
func (*Stagestance) GetState ¶
func (self *Stagestance) GetState() MetadataState
func (*Stagestance) LoadMetadata ¶
func (self *Stagestance) LoadMetadata()
func (*Stagestance) PostProcess ¶
func (self *Stagestance) PostProcess()
func (*Stagestance) RefreshState ¶
func (self *Stagestance) RefreshState()
func (*Stagestance) Step ¶
func (self *Stagestance) Step() bool
type StorageEvent ¶
func NewStorageEvent ¶
func NewStorageEvent(timestamp time.Time, delta int64, fqname string) *StorageEvent
type StorageEventByTimestamp ¶
type StorageEventByTimestamp []*StorageEvent
func (StorageEventByTimestamp) Len ¶
func (self StorageEventByTimestamp) Len() int
func (StorageEventByTimestamp) Less ¶
func (self StorageEventByTimestamp) Less(i, j int) bool
func (StorageEventByTimestamp) Swap ¶
func (self StorageEventByTimestamp) Swap(i, j int)
type TopNode ¶
type TopNode struct {
// contains filtered or unexported fields
}
The top-level node for a pipestance.
func NewTopNode ¶
func (*TopNode) GetPostNodes ¶
func (*TopNode) GetPrenodes ¶
type VDRByTimestamp ¶
type VDRByTimestamp []*VDRKillReport
func (VDRByTimestamp) Len ¶
func (self VDRByTimestamp) Len() int
func (VDRByTimestamp) Less ¶
func (self VDRByTimestamp) Less(i, j int) bool
func (VDRByTimestamp) Swap ¶
func (self VDRByTimestamp) Swap(i, j int)
type VDRKillReport ¶
type VDRKillReport struct { Count uint `json:"count"` Size uint64 `json:"size"` Timestamp string `json:"timestamp"` Paths []string `json:"paths"` Errors []string `json:"errors"` }
Volatile Disk Recovery
type VersionInfo ¶
type WallClockInfo ¶
Source Files ¶
- argument_map.go
- binding.go
- cpuinfo_linux.go
- errors.go
- exec_linux.go
- iostats.go
- jobdef.go
- jobinfo.go
- jobmanager.go
- maxjobs_semaphore.go
- metadata.go
- mrt.go
- node.go
- override.go
- perf.go
- perf_unix.go
- pipestance.go
- profile_mode.go
- resource_semaphore.go
- rlimit.go
- runtime.go
- stage.go
- statfs_unix.go
- storage.go