core

package
v3.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2018 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	ChunksPrefix  = "chunks_"
	SplitPrefix   = "split_"
	JoinPrefix    = "join_"
	CleanupPrefix = "cleanup_"
	RetryPrefix   = "retry_"
)
View Source
const MetadataFilePrefix string = "_"
View Source
const PIPESTANCE_MIN_DISK uint64 = 50 * 1024 * 1024

The minimum amount of available disk space for a pipestance directory. If the available space falls below this at any time during the run, the the pipestance is killed.

View Source
const PIPESTANCE_MIN_INODES uint64 = 500

The minimum number of inodes available in the pipestance directory below which the pipestance will not run.

View Source
const STAGE_TYPE_CHUNK = "chunk"
View Source
const STAGE_TYPE_JOIN = "join"
View Source
const STAGE_TYPE_SPLIT = "split"

Variables

View Source
var LegalOverrideTypes map[string]reflect.Kind = map[string]reflect.Kind{
	"force_volatile": reflect.Bool,
	"join.threads":   reflect.Float64,
	"join.mem_gb":    reflect.Float64,
	"chunk.threads":  reflect.Float64,
	"chunk.mem_gb":   reflect.Float64,
	"split.threads":  reflect.Float64,
	"split.mem_gb":   reflect.Float64,
}

Specifies the expected types for elements in a stageoverride map. Note that all JSON numeric types look like Float64s when we stick them in an interface.

Functions

func BuildCallSource

func BuildCallSource(incpaths []string,
	name string,
	args map[string]interface{},
	sweepargs []string,
	callable syntax.Callable) (string, error)

func CheckMinimalSpace

func CheckMinimalSpace(path string) error

Returns an error if the current available space on the disk drive is very low.

func CompileAll

func CompileAll(mroPaths []string, checkSrcPath bool) (int, []*syntax.Ast, error)

Compile all the MRO files in mroPaths.

func DefaultRetries

func DefaultRetries() int

func GetAvailableSpace

func GetAvailableSpace(path string) (bytes, inodes uint64, err error)

func GetCPUInfo

func GetCPUInfo() (int, int, int, int)

func GetCallable

func GetCallable(mroPaths []string, name string) (syntax.Callable, error)

func GetMaxFiles

func GetMaxFiles() (*unix.Rlimit, error)

Gets the current (soft) and maximum (hard) rlimit for number of open files.

See `man getrlimit`

func GetMaxProcs

func GetMaxProcs() (*unix.Rlimit, error)

Gets the current (soft) and maximum (hard) rlimit for number of processes.

See `man getrlimit`

func GetUserProcessCount

func GetUserProcessCount() (int, error)

Get the number of processes (threads) currently running for the current user.

func MRTBuildPipeline

func MRTBuildPipeline(newinfo *PipestanceSetup, oldinfo *PipestanceSetup, invalidate []string)

This is the main entry point for "mrt".

newinfo corresponds to a new (non-existing) pipestance and oldinfo to an existing pipestance. Invalidate lists stages in the new pipestance that have code differences.

We create a new pipestance directory and link every stage/pipeline from oldinfo that we can. We explicitly don't link anything in |invalidate| or that derives from anything in invalidate.

After this runs, the new directory can be mrp'ed to run the new pipestance.

func MakeFQName

func MakeFQName(pipeline string, psid string) string

func MapTwoPipestances

func MapTwoPipestances(newp *Pipestance, oldp *Pipestance) map[*Node]*Node

This takes two pipestances and creates a map that associates nodes in one pipestance with the nodes in the other. Nodes are associated if they have the same name.

func MaximizeMaxFiles

func MaximizeMaxFiles() error

Sets the soft rlimit for maximum open files equal to the hard limit.

See `man setrlimit`.

func MaximizeMaxProcs

func MaximizeMaxProcs() error

Sets the soft rlimit for maximum processes equal to the hard limit.

See `man setrlimit`.

func ParseFQName

func ParseFQName(fqname string) (string, string)

func ParseJobMode

func ParseJobMode(data string) (string, string, string)

func ParseTimestamp

func ParseTimestamp(data string) string

func ParseVersions

func ParseVersions(data string) (string, string, error)

func Pdeathsig

func Pdeathsig(attr *syscall.SysProcAttr, sig syscall.Signal) *syscall.SysProcAttr

Add pdeathsig to a SysProcAttr structure, if the operating system supports it, and return the object. On other platforms, do nothing.

func ScanTree

func ScanTree(root *Node)

Iterate over the entire tree and print the names of the nodes that have been blacklisted

func SetMaxFiles

func SetMaxFiles(rlim *unix.Rlimit) error

Set the current (soft) and maximum (hard) rlimit for number of open files.

See `man setrlimit`

func SetMaxProcs

func SetMaxProcs(rlim *unix.Rlimit) error

Sets the current (soft) and maximum (hard) rlimit for number of processes.

See `man setrlimit`

func TaintNode

func TaintNode(root *Node, nodemap map[*Node]*Node)

Recursively blacklist nodes.

func VDRTaint

func VDRTaint(root *Node, nodemap map[*Node]*Node)

blacklist dependencies that have been VDR'ed.

func VerifyOnFinish

func VerifyOnFinish(onfinish string)

func VerifyProfileMode

func VerifyProfileMode(profileMode ProfileMode)

func VerifyVDRMode

func VerifyVDRMode(vdrMode string)

Types

type ArgumentMap

type ArgumentMap map[string]interface{}

Mapping from argument or output names to values.

Includes convenience methods to validate the arguments against parameter lists from MRO, and to convert to or from other structured data types.

ArgumentMap always deserializes numbers as json.Number values, in order to prevent loss of precision for integer types.

func MakeArgumentMap

func MakeArgumentMap(binding interface{}) ArgumentMap

Convenience method to convert an arbitrary object type into an ArgumentMap.

This is intended primarily for use by authors of native Go stages.

func (ArgumentMap) Decode

func (self ArgumentMap) Decode(target interface{}) error

Convenience method to convert an ArgumentMap into another kind of object.

This is intended primarily for authors of native Golang stages.

func (*ArgumentMap) UnmarshalJSON

func (self *ArgumentMap) UnmarshalJSON(b []byte) error

func (ArgumentMap) Validate

func (self ArgumentMap) Validate(expected *syntax.Params, isInput bool, optional ...*syntax.Params) (error, string)

Validate that all of the arguments in the map are declared parameters, and that all declared parameters are set in the arguments to a value of the correct type, or null.

Hard errors are returned as the first parameter. "soft" error messages are returned in the second.

Optional params are values which are permitted to be in the argument map (if they are of the correct type) but which are not required to be present. For example, for a stage defined as

stage STAGE(
    in  int a,
    out int b,
) split (
    in  int c,
    out int d,
)

then in the outputs from the chunks, d is required but b is optional.

type Binding

type Binding struct {
	// contains filtered or unexported fields
}

Holds information about the value of an input arguemnt, either hard-coded into the MRO or bound to the output of another node.

func NewBinding

func NewBinding(node *Node, bindStm *syntax.BindStm) *Binding

func NewReturnBinding

func NewReturnBinding(node *Node, bindStm *syntax.BindStm) *Binding

type BindingInfo

type BindingInfo struct {
	Id          string      `json:"id"`
	Type        string      `json:"type"`
	ValExp      string      `json:"valexp"`
	Mode        string      `json:"mode"`
	Output      string      `json:"output"`
	Sweep       bool        `json:"sweep"`
	SweepRootId string      `json:"sweepRootId"`
	Node        interface{} `json:"node"`
	MatchedFork interface{} `json:"matchedFork"`
	Value       interface{} `json:"value"`
	Waiting     bool        `json:"waiting"`
}

An exportable version of Binding.

type Chunk

type Chunk struct {
	// contains filtered or unexported fields
}

Represents the state of a stage chunk (the "main" method).

func NewChunk

func NewChunk(fork *Fork, index int,
	chunkDef *ChunkDef, chunkIndexWidth int) *Chunk

func (*Chunk) Stage

func (self *Chunk) Stage() *syntax.Stage

Get the stage definition for this chunk. Panics if this is not a stage fork.

type ChunkDef

type ChunkDef struct {
	Resources *JobResources
	Args      ArgumentMap
}

Defines the resources and arguments of a chunk.

func (*ChunkDef) MarshalJSON

func (self *ChunkDef) MarshalJSON() ([]byte, error)

func (*ChunkDef) Merge

func (self *ChunkDef) Merge(bindings interface{}) *ChunkDef

func (*ChunkDef) MergeArguments

func (self *ChunkDef) MergeArguments(bindings ArgumentMap) *ChunkDef

func (*ChunkDef) UnmarshalJSON

func (self *ChunkDef) UnmarshalJSON(b []byte) error

type ChunkInfo

type ChunkInfo struct {
	Index    int           `json:"index"`
	ChunkDef *ChunkDef     `json:"chunkDef"`
	State    MetadataState `json:"state"`
	Metadata *MetadataInfo `json:"metadata"`
}

Exportable information about a Chunk object.

type ChunkPerfInfo

type ChunkPerfInfo struct {
	Index      int       `json:"index"`
	ChunkStats *PerfInfo `json:"chunk_stats"`
}

type DiskSpaceError

type DiskSpaceError struct {
	Bytes   uint64
	Inodes  uint64
	Message string
}

func (*DiskSpaceError) Error

func (self *DiskSpaceError) Error() string

type EdgeInfo

type EdgeInfo struct {
	From string `json:"from"`
	To   string `json:"to"`
}

Represents an edge in the pipeline graph.

type Fork

type Fork struct {
	// contains filtered or unexported fields
}

Represents a fork of a stage or pipeline. When sweaping over multiple possible values for an input parameter, there will be more than one fork for a given pipeline or stage.

func NewFork

func NewFork(nodable Nodable, index int, argPermute map[string]interface{}) *Fork

func (*Fork) OutParams

func (self *Fork) OutParams() *syntax.Params

Get the fork's output parameter list.

func (*Fork) Split

func (self *Fork) Split() bool

type ForkBindingsInfo

type ForkBindingsInfo struct {
	Argument []*BindingInfo `json:"Argument"`
	Return   []*BindingInfo `json:"Return"`
}

type ForkInfo

type ForkInfo struct {
	Index         int                    `json:"index"`
	ArgPermute    map[string]interface{} `json:"argPermute"`
	JoinDef       *JobResources          `json:"joinDef"`
	State         MetadataState          `json:"state"`
	Metadata      *MetadataInfo          `json:"metadata"`
	SplitMetadata *MetadataInfo          `json:"split_metadata"`
	JoinMetadata  *MetadataInfo          `json:"join_metadata"`
	Chunks        []*ChunkInfo           `json:"chunks"`
	Bindings      *ForkBindingsInfo      `json:"bindings"`
}

Exportable information from a Fork object.

type ForkPerfCache

type ForkPerfCache struct {
	// contains filtered or unexported fields
}

type ForkPerfInfo

type ForkPerfInfo struct {
	Stages     []*StagePerfInfo `json:"stages"`
	Index      int              `json:"index"`
	Chunks     []*ChunkPerfInfo `json:"chunks"`
	SplitStats *PerfInfo        `json:"split_stats"`
	JoinStats  *PerfInfo        `json:"join_stats"`
	ForkStats  *PerfInfo        `json:"fork_stats"`
}

type ForkStorageEvent

type ForkStorageEvent struct {
	Name          string
	ChildNames    []string
	TotalBytes    uint64
	ChunkBytes    uint64
	ForkBytes     uint64
	TotalVDRBytes uint64
	ForkVDRBytes  uint64
	Timestamp     time.Time
	VDRTimestamp  time.Time
}

this is due to the fact that the VDR bytes/total bytes reported at the fork level is the sum of chunk + split + join plus any additional files. The additional files that are unique to the fork cannot be resolved unless you sub out chunk/split/join and then child stages.

func NewForkStorageEvent

func NewForkStorageEvent(timestamp time.Time, totalBytes uint64, vdrBytes uint64, fqname string) *ForkStorageEvent

type InvocationData

type InvocationData struct {
	Call         string                 `json:"call"`
	Args         map[string]interface{} `json:"args"`
	SweepArgs    []string               `json:"sweepargs"`
	IncludePaths []string               `json:"incpaths"`
}

func BuildCallData

func BuildCallData(src string, srcPath string, mroPaths []string) (*InvocationData, error)

func BuildDataForAst

func BuildDataForAst(incpaths []string, ast *syntax.Ast) (*InvocationData, error)

type IoAmount

type IoAmount struct {
	Read  IoValues `json:"read"`
	Write IoValues `json:"write"`
}

Collects a total number of read/write IO operations.

func GetRunningIo

func GetRunningIo(pid int) (*IoAmount, error)

Gets IO statistics for a running process by pid.

func (*IoAmount) Increment

func (self *IoAmount) Increment(other *IoAmount)

Increment this value by another.

type IoRate

type IoRate struct {
	Read  IoRateValues `json:"read"`
	Write IoRateValues `json:"write"`
}

Represents a rate of change for IoAmount

func (*IoRate) Increment

func (self *IoRate) Increment(other *IoRate)

Increment this rate by another.

func (*IoRate) TakeMax

func (self *IoRate) TakeMax(other *IoRate)

Update this rate to be the maximum of itself and another.

type IoRateValues

type IoRateValues struct {
	// The rate at which IO syscalls were issued.  These may have been against
	// non-block devices, such as sockets, terminals or a psudo-filesystem
	// such as /proc.
	Syscalls float64 `json:"sysc"`

	// The rate at which bytes were transferred to or from a block device.
	// Even when a read or write is made against a block device, it may not be
	// counted in this number if it was served from cache, or if the file was
	// truncated or unlinked before it was synced to disk.
	BlockBytes float64 `json:"bytes"`
}

IoValues per second

func (*IoRateValues) Increment

func (self *IoRateValues) Increment(other IoRateValues)

Increment this rate by another.

func (*IoRateValues) TakeMax

func (self *IoRateValues) TakeMax(other IoRateValues)

Update this rate to be the maximum of itself and another.

type IoStats

type IoStats struct {
	Total   IoAmount `json:"total,omitempty"`
	RateMax IoRate   `json:"max,omitempty"`
	RateDev IoRate   `json:"dev,omitempty"`
}

Collects statistics based on observed process tree IO usage.

type IoStatsBuilder

type IoStatsBuilder struct {
	IoStats
	// contains filtered or unexported fields
}
Example
// Set the start time to a known value.
t := time.Now()
sb := NewIoStatsBuilder()
sb.lastMeasurement = t
sb.start = t

// No writes.
// Reads at a constant rate of 2048 bytes every 10 seconds, with
// 11, 9, and 10 syscalls in the first, second, and third
// 10-second period, respectively.
sb.Update(map[int]*IoAmount{
	1: &IoAmount{
		Read:  IoValues{Syscalls: 1, BlockBytes: 1024},
		Write: IoValues{},
	},
	2: &IoAmount{
		Read:  IoValues{Syscalls: 10, BlockBytes: 1024},
		Write: IoValues{},
	},
}, t.Add(time.Second*10))
sb.Update(map[int]*IoAmount{
	1: &IoAmount{
		Read:  IoValues{Syscalls: 10, BlockBytes: 3072},
		Write: IoValues{},
	},
	2: &IoAmount{
		Read:  IoValues{Syscalls: 10, BlockBytes: 1024},
		Write: IoValues{},
	},
}, t.Add(time.Second*20))
sb.Update(map[int]*IoAmount{
	1: &IoAmount{
		Read:  IoValues{Syscalls: 10, BlockBytes: 4096},
		Write: IoValues{},
	},
	3: &IoAmount{
		Read:  IoValues{Syscalls: 10, BlockBytes: 1024},
		Write: IoValues{},
	},
}, t.Add(time.Second*30))
fmt.Println("Read syscalls:")
fmt.Println("Total:", sb.Total.Read.Syscalls)
fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n\n",
	float64(sb.Total.Read.Syscalls)/30,
	sb.RateDev.Read.Syscalls,
	sb.RateMax.Read.Syscalls)
fmt.Println("Write syscalls:")
fmt.Println("Total:", sb.Total.Write.Syscalls)
fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n\n",
	float64(sb.Total.Write.Syscalls)/30,
	sb.RateDev.Write.Syscalls,
	sb.RateMax.Write.Syscalls)
fmt.Println("Read bytes:")
fmt.Println("Total:", sb.Total.Read.BlockBytes)
fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n\n",
	float64(sb.Total.Read.BlockBytes)/30,
	sb.RateDev.Read.BlockBytes,
	sb.RateMax.Read.BlockBytes)
fmt.Println("Write bytes:")
fmt.Println("Total:", sb.Total.Write.BlockBytes)
fmt.Printf("Rate: %0.1f ± %0.2f (max: %0.1f)\n",
	float64(sb.Total.Write.BlockBytes)/30,
	sb.RateDev.Write.BlockBytes,
	sb.RateMax.Write.BlockBytes)
Output:

Read syscalls:
Total: 30
Rate: 1.0 ± 0.08 (max: 1.1)

Write syscalls:
Total: 0
Rate: 0.0 ± 0.00 (max: 0.0)

Read bytes:
Total: 6144
Rate: 204.8 ± 0.00 (max: 204.8)

Write bytes:
Total: 0
Rate: 0.0 ± 0.00 (max: 0.0)

func NewIoStatsBuilder

func NewIoStatsBuilder() *IoStatsBuilder

func (*IoStatsBuilder) Update

func (self *IoStatsBuilder) Update(current map[int]*IoAmount, now time.Time)

Update the stats object with the current per-pid IO amounts.

type IoValues

type IoValues struct {
	// The number of io syscalls issued.  These may have been against non-block
	// devices, such as sockets, terminals or a psudo-filesystem such as /proc.
	// For block IO operations, see rusage.
	Syscalls int64 `json:"sysc"`

	// The number of bytes transferred to or from a block device.  Even when a
	// read or write is made against a block device, it may not be counted in
	// this number if it was served from cache, or if the file was truncated
	// or unlinked before it was synced to disk.
	BlockBytes int64 `json:"bytes"`
}

Stores cumulative values for IO metrics.

func (*IoValues) Increment

func (self *IoValues) Increment(other IoValues)

Increment this value by another.

type JobInfo

type JobInfo struct {
	Name          string            `json:"name"`
	Pid           int               `json:"pid,omitempty"`
	Host          string            `json:"host,omitempty"`
	Type          string            `json:"type,omitempty"`
	Cwd           string            `json:"cwd,omitempty"`
	PythonInfo    *PythonInfo       `json:"python,omitempty"`
	RusageInfo    *RusageInfo       `json:"rusage,omitempty"`
	MemoryUsage   *ObservedMemory   `json:"used_bytes,omitempty"`
	IoStats       *IoStats          `json:"io,omitempty"`
	WallClockInfo *WallClockInfo    `json:"wallclock,omitempty"`
	Threads       int               `json:"threads,omitempty"`
	MemGB         int               `json:"memGB,omitempty"`
	ProfileMode   ProfileMode       `json:"profile_mode,omitempty"`
	Stackvars     string            `json:"stackvars_flag,omitempty"`
	Monitor       string            `json:"monitor_flag,omitempty"`
	Invocation    *InvocationData   `json:"invocation,omitempty"`
	Version       *VersionInfo      `json:"version,omitempty"`
	ClusterEnv    map[string]string `json:"sge,omitempty"`
}

type JobManager

type JobManager interface {
	GetSystemReqs(int, int) (int, int)
	GetMaxCores() int
	GetMaxMemGB() int
	GetSettings() *JobManagerSettings
	// contains filtered or unexported methods
}

Job managers

type JobManagerJson

type JobManagerJson struct {
	JobSettings *JobManagerSettings     `json:"settings"`
	JobModes    map[string]*JobModeJson `json:"jobmodes"`
}

type JobManagerSettings

type JobManagerSettings struct {
	ThreadsPerJob int      `json:"threads_per_job"`
	MemGBPerJob   int      `json:"memGB_per_job"`
	ThreadEnvs    []string `json:"thread_envs"`
}

type JobModeEnv

type JobModeEnv struct {
	Name        string `json:"name"`
	Description string `json:"description"`
}

type JobModeJson

type JobModeJson struct {
	Cmd             string        `json:"cmd"`
	Args            []string      `json:"args,omitempty"`
	QueueQuery      string        `json:"queue_query,omitempty"`
	QueueQueryGrace int           `json:"queue_query_grace_secs,omitempty"`
	ResourcesOpt    string        `json:"resopt"`
	JobEnvs         []*JobModeEnv `json:"envs"`
}

type JobResources

type JobResources struct {
	Threads int    `json:"__threads,omitempty"`
	MemGB   int    `json:"__mem_gb,omitempty"`
	Special string `json:"__special,omitempty"`
}

Defines resources used by a stage.

func (*JobResources) ToMap

func (self *JobResources) ToMap() ArgumentMap

type LocalJobManager

type LocalJobManager struct {
	// contains filtered or unexported fields
}

func NewLocalJobManager

func NewLocalJobManager(userMaxCores int, userMaxMemGB int,
	debug bool, limitLoadavg bool, clusterMode bool) *LocalJobManager

func (*LocalJobManager) Enqueue

func (self *LocalJobManager) Enqueue(shellCmd string, argv []string,
	envs map[string]string, metadata *Metadata, threads int, memGB int,
	fqname string, retries int, waitTime int, localpreflight bool)

func (*LocalJobManager) GetMaxCores

func (self *LocalJobManager) GetMaxCores() int

func (*LocalJobManager) GetMaxMemGB

func (self *LocalJobManager) GetMaxMemGB() int

func (*LocalJobManager) GetSettings

func (self *LocalJobManager) GetSettings() *JobManagerSettings

func (*LocalJobManager) GetSystemReqs

func (self *LocalJobManager) GetSystemReqs(threads int, memGB int) (int, int)

func (*LocalJobManager) HandleSignal

func (self *LocalJobManager) HandleSignal(sig os.Signal)

type MaxJobsSemaphore

type MaxJobsSemaphore struct {
	Limit int
	// contains filtered or unexported fields
}

A semaphore limiting the number of unique jobs which are active at a time.

func NewMaxJobsSemaphore

func NewMaxJobsSemaphore(limit int) *MaxJobsSemaphore

func (*MaxJobsSemaphore) Acquire

func (self *MaxJobsSemaphore) Acquire(metadata *Metadata) bool

Wait for this semaphore to have capacity to run this metadata object.

If the object is not in the queued or waiting states, it was canceled between when the job was enqueued and now.

If the object was already in the semaphore, as may be the case in the event of automatic restart if the failure was missed for whatever reason, then we only treat the metadata object as having one job running ever.

func (*MaxJobsSemaphore) Current

func (self *MaxJobsSemaphore) Current() int

func (*MaxJobsSemaphore) FindDone

func (self *MaxJobsSemaphore) FindDone()

Check that each metadata object which holds the semaphore is still actually running.

func (*MaxJobsSemaphore) Release

func (self *MaxJobsSemaphore) Release(metadata *Metadata)

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Manages interatction with the filesystem-based "database" of Martian metadata for a pipeline node (pipeline, subpipeline, fork, stage, split, chunk, join).

func NewMetadata

func NewMetadata(fqname string, p string) *Metadata

func NewMetadataRunWithJournalPath

func NewMetadataRunWithJournalPath(fqname string, p string, filesPath string, journalPath string, runType string) *Metadata

func NewMetadataWithJournalPath

func NewMetadataWithJournalPath(fqname string, p string, journalPath string) *Metadata

func (*Metadata) AppendAlarm

func (self *Metadata) AppendAlarm(text string) error

Add text to the Alarm file for this node.

func (*Metadata) FilePath

func (self *Metadata) FilePath(name string) string

Get the absolute path to the named file in the stage's files path.

func (*Metadata) FilesPath

func (self *Metadata) FilesPath() string

The path containing output files for this node.

func (*Metadata) MetadataFilePath

func (self *Metadata) MetadataFilePath(name MetadataFileName) string

Get the absolute path to the given metadata file.

func (*Metadata) ReadInto

func (self *Metadata) ReadInto(name MetadataFileName, target interface{}) error

Reads the content of the given metadata file and deserializes it into the given object.

func (*Metadata) UpdateJournal

func (self *Metadata) UpdateJournal(name MetadataFileName) error

Writes a journal file corresponding to the given metadata file. This is used to notify the runtime of the existence of a new or updated file.

The journal is a performance optimization to prevent the runtime from needing to constantly scan the entire database for changes. Instead, it only scans the journal. This means that when a metadata file is created or modified (except by the runtime itself), the change won't be "noticed" until the journal is updated.

func (*Metadata) Write

func (self *Metadata) Write(name MetadataFileName, object interface{}) error

Serializes the given object and writes it to the given metadata file.

func (*Metadata) WriteAtomic

func (self *Metadata) WriteAtomic(name MetadataFileName, object interface{}) error

Serializes the given object and writes it to the given metadata file in a way that ensures the file is updated atomically and will never be observed in a partially-written form.

func (*Metadata) WriteRaw

func (self *Metadata) WriteRaw(name MetadataFileName, text string) error

Writes the given raw data into the given metadata file.

func (*Metadata) WriteTime

func (self *Metadata) WriteTime(name MetadataFileName) error

Writes the current timestamp into the given metadata file. Generally used for sentinel files.

type MetadataFileName

type MetadataFileName string
const (
	AlarmFile      MetadataFileName = "alarm"
	ArgsFile       MetadataFileName = "args"
	Assert         MetadataFileName = "assert"
	ChunkDefsFile  MetadataFileName = "chunk_defs"
	ChunkOutsFile  MetadataFileName = "chunk_outs"
	CompleteFile   MetadataFileName = "complete"
	Errors         MetadataFileName = "errors"
	FinalState     MetadataFileName = "finalstate"
	Heartbeat      MetadataFileName = "heartbeat"
	InvocationFile MetadataFileName = "invocation"
	JobId          MetadataFileName = "jobid"
	JobInfoFile    MetadataFileName = "jobinfo"
	JobModeFile    MetadataFileName = "jobmode"
	Lock           MetadataFileName = "lock"
	LogFile        MetadataFileName = "log"
	MetadataZip    MetadataFileName = "metadata.zip"
	MroSourceFile  MetadataFileName = "mrosource"
	OutsFile       MetadataFileName = "outs"
	Perf           MetadataFileName = "perf"
	ProgressFile   MetadataFileName = "progress"
	QueuedLocally  MetadataFileName = "queued_locally"
	Stackvars      MetadataFileName = "stackvars"
	StageDefsFile  MetadataFileName = "stage_defs"
	StdErr         MetadataFileName = "stderr"
	StdOut         MetadataFileName = "stdout"
	TagsFile       MetadataFileName = "tags"
	TimestampFile  MetadataFileName = "timestamp"
	UiPort         MetadataFileName = "uiport"
	UuidFile       MetadataFileName = "uuid"
	VdrKill        MetadataFileName = "vdrkill"
	VersionsFile   MetadataFileName = "versions"
	DisabledFile   MetadataFileName = "disabled"
)
const AnyFile MetadataFileName = "*"

func (MetadataFileName) FileName

func (self MetadataFileName) FileName() string

type MetadataInfo

type MetadataInfo struct {
	// The filesystem path containing the metadata files.
	Path string `json:"path"`

	// The metadata file names which exist for this object.
	Names []string `json:"names"`
}

Basic exportable information from a metadata object.

type MetadataState

type MetadataState string
const (
	Complete      MetadataState = "complete"
	Failed        MetadataState = "failed"
	DisabledState MetadataState = "disabled"
	Running       MetadataState = "running"
	Queued        MetadataState = "queued"
	Ready         MetadataState = "ready"
	Waiting       MetadataState = ""
	ForkWaiting   MetadataState = "waiting"
)

func (MetadataState) HasPrefix

func (self MetadataState) HasPrefix(prefix string) bool

func (MetadataState) IsQueued

func (self MetadataState) IsQueued() bool

func (MetadataState) IsRunning

func (self MetadataState) IsRunning() bool

func (MetadataState) Prefixed

func (self MetadataState) Prefixed(prefix string) MetadataState

type MroCache

type MroCache struct {
	// contains filtered or unexported fields
}

func NewMroCache

func NewMroCache() *MroCache

func (*MroCache) CacheMros

func (self *MroCache) CacheMros(mroPaths []string)

func (*MroCache) GetCallable

func (self *MroCache) GetCallable(mroPaths []string, name string) (syntax.Callable, error)

func (*MroCache) GetPipelines

func (self *MroCache) GetPipelines() []string

type Nodable

type Nodable interface {

	// Gets the node's fully-qualified name.
	GetFQName() string

	// Returns the set of nodes which serve as prerequisites to this node,
	// as a mapping from fully-qualified name to node.
	GetPrenodes() map[string]Nodable

	// Returns the set of nodes which are able to run once this node
	// has completed.
	GetPostNodes() map[string]Nodable

	// Gets the mro AST object, if any, which will be executed for this node.
	Callable() syntax.Callable
	// contains filtered or unexported methods
}

type Node

type Node struct {
	// contains filtered or unexported fields
}

Represents a node in the pipeline graph.

func NewNode

func NewNode(parent Nodable, kind string, callStm *syntax.CallStm, callables *syntax.Callables) *Node

func (*Node) Callable

func (self *Node) Callable() syntax.Callable

func (*Node) FindNodeByName

func (n *Node) FindNodeByName(name string, out **Node)

Find a node by a name. |name| may be a "partially" qualified pipestance name (see partiallyQualifiedName()) or just a stage name. If it is a stage name, and that name occurs multiple times in the pipeline, we will panic().

func (*Node) GetFQName

func (self *Node) GetFQName() string

func (*Node) GetPostNodes

func (self *Node) GetPostNodes() map[string]Nodable

func (*Node) GetPrenodes

func (self *Node) GetPrenodes() map[string]Nodable

func (*Node) VDRMurdered

func (n *Node) VDRMurdered() bool

Return true if the data inside a node was VDR'ed.

type NodeByteStamp

type NodeByteStamp struct {
	Timestamp   time.Time `json:"ts"`
	Bytes       int64     `json:"bytes"`
	Description string    `json:"desc"`
}

type NodeErrorInfo

type NodeErrorInfo struct {
	FQname  string `json:"fqname"`
	Path    string `json:"path"`
	Summary string `json:"summary,omitempty"`
	Log     string `json:"log,omitempty"`
}

Encapsulates information about a node failure.

type NodeInfo

type NodeInfo struct {
	Name          string               `json:"name"`
	Fqname        string               `json:"fqname"`
	Type          string               `json:"type"`
	Path          string               `json:"path"`
	State         MetadataState        `json:"state"`
	Metadata      *MetadataInfo        `json:"metadata"`
	SweepBindings []*BindingInfo       `json:"sweepbindings"`
	Forks         []*ForkInfo          `json:"forks"`
	Edges         []EdgeInfo           `json:"edges"`
	StagecodeLang syntax.StageCodeType `json:"stagecodeLang"`
	StagecodeCmd  string               `json:"stagecodeCmd"`
	Error         *NodeErrorInfo       `json:"error,omitempty"`
}

type NodePerfInfo

type NodePerfInfo struct {
	Name      string           `json:"name"`
	Fqname    string           `json:"fqname"`
	Type      string           `json:"type"`
	Forks     []*ForkPerfInfo  `json:"forks"`
	MaxBytes  int64            `json:"maxbytes"`
	BytesHist []*NodeByteStamp `json:"bytehist"`
	HighMem   *ObservedMemory  `json:"highmem,omitempty"`
}

type ObservedMemory

type ObservedMemory struct {
	Rss    int64 `json:"rss"`
	Shared int64 `json:"shared"`
	Vmem   int64 `json:"vmem"`
	Text   int64 `json:"text"`
	Stack  int64 `json:"stack"`
	Procs  int   `json:"proc_count"`
}

Current observed memory usage.

func GetProcessTreeMemory

func GetProcessTreeMemory(pid int, includeParent bool, io map[int]*IoAmount) (mem ObservedMemory, err error)

Gets the total memory usage for the given process and all of its children. Only errors getting the first process's memory, or the set of children for that process, are reported. includeParent specifies whether the top-level pid is included in the total.

func GetRunningMemory

func GetRunningMemory(pid int) (ObservedMemory, error)

Gets the total vmem and rss memory of a running process by pid.

func (*ObservedMemory) Add

func (self *ObservedMemory) Add(other ObservedMemory)

Add other to this.

func (*ObservedMemory) IncreaseRusage

func (self *ObservedMemory) IncreaseRusage(other *RusageInfo)

Increase this value to the max RSS reported by getrusage, if it is higher.

func (*ObservedMemory) IncreaseTo

func (self *ObservedMemory) IncreaseTo(other ObservedMemory)

Increase this value to max(this,other).

func (*ObservedMemory) IsZero

func (self *ObservedMemory) IsZero() bool

func (*ObservedMemory) RssKb

func (self *ObservedMemory) RssKb() int

func (*ObservedMemory) VmemKb

func (self *ObservedMemory) VmemKb() int

type PerfInfo

type PerfInfo struct {
	NumJobs         int       `json:"num_jobs"`
	NumThreads      int       `json:"num_threads"`
	Duration        float64   `json:"duration"`
	CoreHours       float64   `json:"core_hours"`
	MaxRss          int       `json:"maxrss"`
	MaxVmem         int       `json:"maxvmem"`
	InBlocks        int       `json:"in_blocks"`
	OutBlocks       int       `json:"out_blocks"`
	TotalBlocks     int       `json:"total_blocks"`
	InBlocksRate    float64   `json:"in_blocks_rate"`
	OutBlocksRate   float64   `json:"out_blocks_rate"`
	TotalBlocksRate float64   `json:"total_blocks_rate"`
	InBytes         int64     `json:"in_bytes"`
	OutBytes        int64     `json:"out_bytes"`
	InBytesRate     float64   `json:"in_bytes_rate"`
	OutBytesRate    float64   `json:"out_bytes_rate"`
	InBytesPeak     float64   `json:"in_bytes_peak"`
	OutBytesPeak    float64   `json:"out_bytes_peak"`
	Start           time.Time `json:"start"`
	End             time.Time `json:"end"`
	WallTime        float64   `json:"walltime"`
	UserTime        float64   `json:"usertime"`
	SystemTime      float64   `json:"systemtime"`
	TotalFiles      uint      `json:"total_files"`
	TotalBytes      uint64    `json:"total_bytes"`
	OutputFiles     uint      `json:"output_files"`
	OutputBytes     uint64    `json:"output_bytes"`
	VdrFiles        uint      `json:"vdr_files"`
	VdrBytes        uint64    `json:"vdr_bytes"`

	// Deviation for a single job is deviation over time as measured by mrjob.
	// For node aggregates, it's the deviation between child nodes.
	InBytesDev  float64 `json:"in_bytes_dev"`
	OutBytesDev float64 `json:"out_bytes_dev"`
}

func ComputeStats

func ComputeStats(perfInfos []*PerfInfo, outputPaths []string, vdrKillReport *VDRKillReport) *PerfInfo

type PerfInfoByStart

type PerfInfoByStart []*PerfInfo

func (PerfInfoByStart) Len

func (self PerfInfoByStart) Len() int

func (PerfInfoByStart) Less

func (self PerfInfoByStart) Less(i, j int) bool

func (PerfInfoByStart) Swap

func (self PerfInfoByStart) Swap(i, j int)

type Pipestance

type Pipestance struct {
	// contains filtered or unexported fields
}

Encapsulates information about an instance of a running (or failed, or completed) pipeline.

func NewPipestance

func NewPipestance(parent Nodable, callStm *syntax.CallStm, callables *syntax.Callables) (*Pipestance, error)

func (*Pipestance) BlacklistMRTNodes

func (self *Pipestance) BlacklistMRTNodes(namesToBlacklist []string, nodemap map[*Node]*Node) error

This marks a set of nodes as well as any nodes dependent on them as blacklisted. A node is dependent another node if it uses data that it provides (is in postnodes) or if it is a parent of that node.

func (*Pipestance) Callable

func (self *Pipestance) Callable() syntax.Callable

func (*Pipestance) CheckHeartbeats

func (self *Pipestance) CheckHeartbeats()

func (*Pipestance) ClearUiPort

func (self *Pipestance) ClearUiPort() error

func (*Pipestance) ComputeDiskUsage

func (self *Pipestance) ComputeDiskUsage(nodePerf *NodePerfInfo) *NodePerfInfo

func (*Pipestance) GetFQName

func (self *Pipestance) GetFQName() string

func (*Pipestance) GetFailedNodes

func (self *Pipestance) GetFailedNodes() []*Node

func (*Pipestance) GetFatalError

func (self *Pipestance) GetFatalError() (string, bool, string, string, MetadataFileName, []string)

func (*Pipestance) GetInvocation

func (self *Pipestance) GetInvocation() interface{}

func (*Pipestance) GetPath

func (self *Pipestance) GetPath() string

func (*Pipestance) GetPname

func (self *Pipestance) GetPname() string

func (*Pipestance) GetPostNodes

func (self *Pipestance) GetPostNodes() map[string]Nodable

func (*Pipestance) GetPrenodes

func (self *Pipestance) GetPrenodes() map[string]Nodable

func (*Pipestance) GetPsid

func (self *Pipestance) GetPsid() string

func (*Pipestance) GetState

func (self *Pipestance) GetState() MetadataState

func (*Pipestance) GetTimestamp

func (self *Pipestance) GetTimestamp() string

func (*Pipestance) GetUuid

func (self *Pipestance) GetUuid() (string, error)

func (*Pipestance) GetVersions

func (self *Pipestance) GetVersions() (string, string, error)

func (*Pipestance) HandleSignal

func (self *Pipestance) HandleSignal(sig os.Signal)

func (*Pipestance) Immortalize

func (self *Pipestance) Immortalize(force bool) error

Generate the final state file for the pipestance and zip the content up for posterity.

Unless force is true, this is only permitted for locked pipestances.

func (*Pipestance) IsErrorTransient

func (self *Pipestance) IsErrorTransient() (bool, string)

Returns true if there is no error or if the error is one we expect to not recur if the pipeline is rerun, and the log message from the first error found, if any.

func (*Pipestance) Kill

func (self *Pipestance) Kill()

func (*Pipestance) KillWithMessage

func (self *Pipestance) KillWithMessage(message string)

func (*Pipestance) LoadMetadata

func (self *Pipestance) LoadMetadata()

func (*Pipestance) Lock

func (self *Pipestance) Lock() error

func (*Pipestance) OnFinishHook

func (self *Pipestance) OnFinishHook()

Run a script whenever a pipestance finishes

func (*Pipestance) PostProcess

func (self *Pipestance) PostProcess()

func (*Pipestance) RecordUiPort

func (self *Pipestance) RecordUiPort(url string) error

func (*Pipestance) RefreshState

func (self *Pipestance) RefreshState()

func (*Pipestance) Reset

func (self *Pipestance) Reset() error

func (*Pipestance) RestartLocalJobs

func (self *Pipestance) RestartLocalJobs(jobMode string) error

Resets local nodes which are queued or are running with a PID that is not a running job. If |jobMode| is "local" then all nodes are treated as local. This is nessessary for when e.g. mrp is restarted in local mode after ctrl-C kills it and all of its child processes.

func (*Pipestance) RestartRunningNodes

func (self *Pipestance) RestartRunningNodes(jobMode string) error

func (*Pipestance) Serialize

func (self *Pipestance) Serialize(name MetadataFileName) interface{}

func (*Pipestance) SerializePerf

func (self *Pipestance) SerializePerf() []*NodePerfInfo

func (*Pipestance) SerializeState

func (self *Pipestance) SerializeState() []*NodeInfo

func (*Pipestance) SetUuid

func (self *Pipestance) SetUuid(uuid string) error

func (*Pipestance) StepNodes

func (self *Pipestance) StepNodes() bool

Process state updates for nodes. Returns true if there was a change in state which would make it productive to call StepNodes again immediately.

func (*Pipestance) Unlock

func (self *Pipestance) Unlock()

func (*Pipestance) VDRKill

func (self *Pipestance) VDRKill() *VDRKillReport

func (*Pipestance) VerifyJobMode

func (self *Pipestance) VerifyJobMode() error

func (*Pipestance) ZipMetadata

func (self *Pipestance) ZipMetadata(zipPath string) error

type PipestanceCopyingError

type PipestanceCopyingError struct {
	Psid string
}

PipestanceCopyingError

func (*PipestanceCopyingError) Error

func (self *PipestanceCopyingError) Error() string

type PipestanceExistsError

type PipestanceExistsError struct {
	Psid string
}

PipestanceExistsError

func (*PipestanceExistsError) Error

func (self *PipestanceExistsError) Error() string

type PipestanceFactory

type PipestanceFactory interface {
	ReattachToPipestance() (*Pipestance, error)
	InvokePipeline() (*Pipestance, error)
}

Encapsulates the information needed to instantiate a pipestance, either by creating one or reattaching to an existing one.

func NewRuntimePipestanceFactory

func NewRuntimePipestanceFactory(rt *Runtime,
	invocationSrc string,
	invocationPath string,
	psid string,
	mroPaths []string,
	pipestancePath string,
	mroVersion string,
	envs map[string]string,
	checkSrc bool,
	readOnly bool,
	tags []string) PipestanceFactory

type PipestanceInvocationError

type PipestanceInvocationError struct {
	Psid           string
	InvocationPath string
}

PipestanceInvocationError

func (*PipestanceInvocationError) Error

func (self *PipestanceInvocationError) Error() string

type PipestanceJobModeError

type PipestanceJobModeError struct {
	Psid    string
	JobMode string
}

PipestanceJobModeError

func (*PipestanceJobModeError) Error

func (self *PipestanceJobModeError) Error() string

type PipestanceLockedError

type PipestanceLockedError struct {
	Psid           string
	PipestancePath string
}

PipestanceLockedError

func (*PipestanceLockedError) Error

func (self *PipestanceLockedError) Error() string

type PipestanceNotExistsError

type PipestanceNotExistsError struct {
	Psid string
}

PipestanceNotExistsError

func (*PipestanceNotExistsError) Error

func (self *PipestanceNotExistsError) Error() string

type PipestanceNotFailedError

type PipestanceNotFailedError struct {
	Psid string
}

PipestanceNotFailedError

func (*PipestanceNotFailedError) Error

func (self *PipestanceNotFailedError) Error() string

type PipestanceNotRunningError

type PipestanceNotRunningError struct {
	Psid string
}

PipestanceNotRunningError

func (*PipestanceNotRunningError) Error

func (self *PipestanceNotRunningError) Error() string

type PipestanceOverrides

type PipestanceOverrides struct {
	// contains filtered or unexported fields
}

func ReadOverrides

func ReadOverrides(path string) (*PipestanceOverrides, error)

Read the overrides file and produce a pipestance overrides object.

func (*PipestanceOverrides) GetOverride

func (self *PipestanceOverrides) GetOverride(node *Node, what string, def interface{}) interface{}

Compute the value to use for a stage option when that value might be overrided.

|node| is the Node object for the stage

|what| is the name of the override we're considering

|def| is the default value to use if the value is not overridded

type PipestancePathError

type PipestancePathError struct {
	Path string
}

PipestancePathError

func (*PipestancePathError) Error

func (self *PipestancePathError) Error() string

type PipestanceSetup

type PipestanceSetup struct {
	Srcpath        string            // Path to the mro invocation file
	Psid           string            // pipestance ID
	PipestancePath string            // Path to put this pipestance
	MroPaths       []string          // Where to look for MROs
	MroVersion     string            // mro version
	Envs           map[string]string // mro environment vars to pass through
	JobMode        string            // jobmode to use
}

PipestanceSetup defines the parameters we need to start a pipestance. It encapsulates the argument to InvokePipelineand friends.

type PipestanceSizeError

type PipestanceSizeError struct {
	Psid string
}

PipestanceSizeError

func (*PipestanceSizeError) Error

func (self *PipestanceSizeError) Error() string

type PipestanceWipeError

type PipestanceWipeError struct {
	Psid string
}

PipestanceWipeError

func (*PipestanceWipeError) Error

func (self *PipestanceWipeError) Error() string

type ProfileMode

type ProfileMode string

Defines available profiling modes for stage code.

const (
	DisableProfile    ProfileMode = "disable"
	CpuProfile        ProfileMode = "cpu"
	MemProfile        ProfileMode = "mem"
	LineProfile       ProfileMode = "line"
	PyflameProfile    ProfileMode = "pyflame"
	PerfRecordProfile ProfileMode = "perf"
)

type PythonInfo

type PythonInfo struct {
	BinPath string `json:"binpath"`
	Version string `json:"version"`
}

type RemoteJobManager

type RemoteJobManager struct {
	// contains filtered or unexported fields
}

func NewRemoteJobManager

func NewRemoteJobManager(jobMode string, memGBPerCore int, maxJobs int, jobFreqMillis int,
	jobResources string, debug bool) *RemoteJobManager

func (*RemoteJobManager) GetMaxCores

func (self *RemoteJobManager) GetMaxCores() int

func (*RemoteJobManager) GetMaxMemGB

func (self *RemoteJobManager) GetMaxMemGB() int

func (*RemoteJobManager) GetSettings

func (self *RemoteJobManager) GetSettings() *JobManagerSettings

func (*RemoteJobManager) GetSystemReqs

func (self *RemoteJobManager) GetSystemReqs(threads int, memGB int) (int, int)

type ResourceSemaphore

type ResourceSemaphore struct {
	Name string
	// contains filtered or unexported fields
}

A semaphore type which allows for the maxium size of things entering the semaphore to be dynamically reduced based on observed resource availability.

func NewResourceSemaphore

func NewResourceSemaphore(size int64, name string) *ResourceSemaphore

Create a new semaphore with the given capactiy.

func (*ResourceSemaphore) Acquire

func (self *ResourceSemaphore) Acquire(n int64) error

Reserve n of the resource. Block until it is available. Returns an error if more was requested than is possible to serve.

func (*ResourceSemaphore) Available

func (self *ResourceSemaphore) Available() int64

Get the current amount of available resources.

func (*ResourceSemaphore) CurrentSize

func (self *ResourceSemaphore) CurrentSize() int64

Get the current amount of resources which are reservable (including those already reserved).

func (*ResourceSemaphore) InUse

func (self *ResourceSemaphore) InUse() int64

Get the current amount of resources in use. This includes both reserved resources and resources for which their usage is unaccounted for.

func (*ResourceSemaphore) QueueLength

func (self *ResourceSemaphore) QueueLength() int

Get the number of items waiting on the semaphore.

func (*ResourceSemaphore) Release

func (self *ResourceSemaphore) Release(n int64)

Release n of the resource.

func (*ResourceSemaphore) Reserved

func (self *ResourceSemaphore) Reserved() int64

Get the current amount of explicitly reserved resources.

func (*ResourceSemaphore) UpdateActual

func (self *ResourceSemaphore) UpdateActual(n int64) int64

Set the current actual availability, e.g. by checking free memory. Returns the difference between the unreserved and actual. A negative return value indicates either a job which is using more memory than it reserved, or some other process on the system is using memory.

func (*ResourceSemaphore) UpdateFreeUsed

func (self *ResourceSemaphore) UpdateFreeUsed(free, usedReservation int64) int64

Set the current actual availability based on the current free amount and the amount of the reserved usage which is actually in use. This handles the case where, for example, 30 of 32 GB of memory are reserved, but only 16GB has actually been committed so far, so 16GB appears to be free.

func (*ResourceSemaphore) UpdateSize

func (self *ResourceSemaphore) UpdateSize(n int64)

Change the current semaphore size. This is is for cases where the resource limit may change but the current consumption is invisible. It is logically equivalent to self.UpdateActual(n, self.Reserved()), though without the potential race conditions.

type Runtime

type Runtime struct {
	Config *RuntimeOptions

	MroCache        *MroCache
	JobManager      JobManager
	LocalJobManager JobManager
	// contains filtered or unexported fields
}

Collects configuration and state required to initialize and run pipestances and stagestances.

func NewRuntime deprecated

func NewRuntime(jobMode string, vdrMode string, profileMode ProfileMode, martianVersion string) *Runtime

Deprecated: use RuntimeConfig.NewRuntime() instead

func NewRuntimeWithCores deprecated

func NewRuntimeWithCores(jobMode string, vdrMode string, profileMode ProfileMode, martianVersion string,
	reqCores int, reqMem int, reqMemPerCore int, maxJobs int, jobFreqMillis int, jobQueues string,
	fullStageReset bool, enableStackVars bool, enableZip bool, skipPreflight bool, enableMonitor bool,
	debug bool, stest bool, onFinishExec string, overrides *PipestanceOverrides, limitLoadavg bool) *Runtime

Deprecated: use RuntimeConfig.NewRuntime() instead

func (*Runtime) BuildCallSource

func (self *Runtime) BuildCallSource(incpaths []string, name string, args map[string]interface{},
	sweepargs []string, mroPaths []string) (string, error)

func (*Runtime) GetMetadata

func (self *Runtime) GetMetadata(pipestancePath string, metadataPath string) (string, error)

func (*Runtime) GetSerialization

func (self *Runtime) GetSerialization(pipestancePath string, name MetadataFileName) (interface{}, bool)

func (*Runtime) GetSerializationInto

func (self *Runtime) GetSerializationInto(pipestancePath string, name MetadataFileName, target interface{}) error

func (*Runtime) InvokePipeline

func (self *Runtime) InvokePipeline(src string, srcPath string, psid string,
	pipestancePath string, mroPaths []string, mroVersion string,
	envs map[string]string, tags []string) (*Pipestance, error)

Invokes a new pipestance.

func (*Runtime) InvokeStage

func (self *Runtime) InvokeStage(src string, srcPath string, ssid string,
	stagestancePath string, mroPaths []string, mroVersion string,
	envs map[string]string) (*Stagestance, error)

Instantiate a stagestance.

func (*Runtime) ReattachToPipestance

func (self *Runtime) ReattachToPipestance(psid string, pipestancePath string, src string, mroPaths []string,
	mroVersion string, envs map[string]string, checkSrc bool, readOnly bool) (*Pipestance, error)

func (*Runtime) ReattachToPipestanceWithMroSrc

func (self *Runtime) ReattachToPipestanceWithMroSrc(psid string, pipestancePath string, src string, mroPaths []string,
	mroVersion string, envs map[string]string, checkSrc bool, readOnly bool) (*Pipestance, error)

type RuntimeError

type RuntimeError struct {
	Msg string
}

RuntimeError

func (*RuntimeError) Error

func (self *RuntimeError) Error() string

type RuntimeOptions

type RuntimeOptions struct {
	// The runtime mode (required): either "local" or a named mode from
	// jobmanagers/config.json
	JobMode string

	// The volatile disk recovery mode (required): either "post",
	// "rolling", or "disable".
	VdrMode string

	// The profiling mode (required): "disable" or one of the available
	// constants.
	ProfileMode     ProfileMode
	MartianVersion  string
	LocalMem        int
	LocalCores      int
	MemPerCore      int
	MaxJobs         int
	JobFreqMillis   int
	ResourceSpecial string
	FullStageReset  bool
	StackVars       bool
	Zip             bool
	SkipPreflight   bool
	Monitor         bool
	Debug           bool
	StressTest      bool
	OnFinishHandler string
	Overrides       *PipestanceOverrides
	LimitLoadavg    bool
	NeverLocal      bool
}

Configuration required to initialize a Runtime object.

func DefaultRuntimeOptions

func DefaultRuntimeOptions() RuntimeOptions

func (*RuntimeOptions) NewRuntime

func (c *RuntimeOptions) NewRuntime() *Runtime

func (*RuntimeOptions) ToFlags

func (config *RuntimeOptions) ToFlags() []string

returns the set of command line flags which would set these options.

type Rusage

type Rusage struct {
	MaxRss       int     `json:"ru_maxrss"`
	SharedRss    int     `json:"ru_ixrss"`
	UnsharedRss  int     `json:"ru_idrss"`
	MinorFaults  int     `json:"ru_minflt"`
	MajorFaults  int     `json:"ru_majflt"`
	SwapOuts     int     `json:"ru_nswap"`
	UserTime     float64 `json:"ru_utime"`
	SystemTime   float64 `json:"ru_stime"`
	InBlocks     int     `json:"ru_inblock"`
	OutBlocks    int     `json:"ru_oublock"`
	MessagesSent int     `json:"ru_msgsnd"`
	MessagesRcvd int     `json:"ru_msgrcv"`
	SignalsRcvd  int     `json:"ru_nsignals"`
}

type RusageInfo

type RusageInfo struct {
	Self     *Rusage `json:"self,omitempty"`
	Children *Rusage `json:"children,omitempty"`
}

func GetRusage

func GetRusage() *RusageInfo

type StageDefs

type StageDefs struct {
	ChunkDefs []*ChunkDef   `json:"chunks"`
	JoinDef   *JobResources `json:"join,omitempty"`
}

func (*StageDefs) UnmarshalJSON

func (self *StageDefs) UnmarshalJSON(b []byte) error

type StageOverride

type StageOverride map[string]interface{}

type StagePerfInfo

type StagePerfInfo struct {
	Name   string `json:"name"`
	Fqname string `json:"fqname"`
	Forki  int    `json:"forki"`
}

type Stagestance

type Stagestance struct {
	// contains filtered or unexported fields
}

Similar to a pipestance, except for a single stage. Intended for use during testing and development of pipelines, e.g. with `mrs`.

func NewStagestance

func NewStagestance(parent Nodable, callStm *syntax.CallStm, callables *syntax.Callables) (*Stagestance, error)

func (*Stagestance) Callable

func (self *Stagestance) Callable() syntax.Callable

func (*Stagestance) CheckHeartbeats

func (self *Stagestance) CheckHeartbeats()

func (*Stagestance) GetFQName

func (self *Stagestance) GetFQName() string

func (*Stagestance) GetFatalError

func (self *Stagestance) GetFatalError() (string, bool, string, string, MetadataFileName, []string)

func (*Stagestance) GetPostNodes

func (self *Stagestance) GetPostNodes() map[string]Nodable

func (*Stagestance) GetPrenodes

func (self *Stagestance) GetPrenodes() map[string]Nodable

func (*Stagestance) GetState

func (self *Stagestance) GetState() MetadataState

func (*Stagestance) LoadMetadata

func (self *Stagestance) LoadMetadata()

func (*Stagestance) PostProcess

func (self *Stagestance) PostProcess()

func (*Stagestance) RefreshState

func (self *Stagestance) RefreshState()

func (*Stagestance) Step

func (self *Stagestance) Step() bool

type StorageEvent

type StorageEvent struct {
	Timestamp time.Time
	Delta     int64
	Name      string
}

func NewStorageEvent

func NewStorageEvent(timestamp time.Time, delta int64, fqname string) *StorageEvent

type StorageEventByTimestamp

type StorageEventByTimestamp []*StorageEvent

func (StorageEventByTimestamp) Len

func (self StorageEventByTimestamp) Len() int

func (StorageEventByTimestamp) Less

func (self StorageEventByTimestamp) Less(i, j int) bool

func (StorageEventByTimestamp) Swap

func (self StorageEventByTimestamp) Swap(i, j int)

type TopNode

type TopNode struct {
	// contains filtered or unexported fields
}

The top-level node for a pipestance.

func NewTopNode

func NewTopNode(rt *Runtime, psid string, p string, mroPaths []string, mroVersion string,
	envs map[string]string, j *InvocationData) *TopNode

func (*TopNode) Callable

func (self *TopNode) Callable() syntax.Callable

func (*TopNode) GetFQName

func (self *TopNode) GetFQName() string

func (*TopNode) GetPostNodes

func (self *TopNode) GetPostNodes() map[string]Nodable

func (*TopNode) GetPrenodes

func (self *TopNode) GetPrenodes() map[string]Nodable

type VDRByTimestamp

type VDRByTimestamp []*VDRKillReport

func (VDRByTimestamp) Len

func (self VDRByTimestamp) Len() int

func (VDRByTimestamp) Less

func (self VDRByTimestamp) Less(i, j int) bool

func (VDRByTimestamp) Swap

func (self VDRByTimestamp) Swap(i, j int)

type VDRKillReport

type VDRKillReport struct {
	Count     uint     `json:"count"`
	Size      uint64   `json:"size"`
	Timestamp string   `json:"timestamp"`
	Paths     []string `json:"paths"`
	Errors    []string `json:"errors"`
}

Volatile Disk Recovery

type VersionInfo

type VersionInfo struct {
	Martian   string `json:"martian"`
	Pipelines string `json:"pipelines"`
}

type WallClockInfo

type WallClockInfo struct {
	Start    string  `json:"start"`
	End      string  `json:"end,omitempty"`
	Duration float64 `json:"duration_seconds,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL