import "github.com/scipipe/scipipe"
Package scipipe is a library for writing scientific workflows (sometimes also called "pipelines") of shell commands that depend on each other, in the Go programming languages. It was initially designed for problems in cheminformatics and bioinformatics, but should apply equally well to any domain involving complex pipelines of interdependent shell commands.
audit.go baseprocess.go common.go const.go ip.go log.go port.go process.go sink.go task.go workflow.go
const ( // BUFSIZE is the standard buffer size used for channels connecting processes BUFSIZE = 128 // Version is the SciPipe version in string format Version = "0.9.10" )
const FSRootPlaceHolder = "__fsroot__"
FSRootPlaceHolder is a string to use instead of an initial '/', to indicate a path that belongs to the absolute root
var ( // Trace is a log handler for extremely detailed level logs. It is so far // sparely used in scipipe. Trace *log.Logger // Debug is a log handler for debugging level logs Debug *log.Logger // Info is a log handler for information level logs Info *log.Logger // Audit is a log handler for audit level logs Audit *log.Logger // Warning is a log handler for warning level logs Warning *log.Logger // Error is a log handler for error level logs Error *log.Logger )
AtomizeIPs renames temporary output files/directories to their proper paths. It is called both from Task, and from Process that implement cutom execution schedule.
Check checks the error err, and prints the message in the error
CheckWithMsg checks the error err, and prints both the original error message, and a custom one provided in errMsg
ExecCmd executes the command cmd, as a shell command via bash
func Fail(vs ...interface{})
Fail logs the error message, so that it will be possible to improve error messages in one place
Failf is like Fail but with msg being a formatter string for the message and vs being items to format into the message
func InitLog( traceHandle io.Writer, debugHandle io.Writer, infoHandle io.Writer, auditHandle io.Writer, warningHandle io.Writer, errorHandle io.Writer)
InitLog initiates logging handlers
func InitLogAudit()
InitLogAudit initiate logging with level=AUDIT
InitLogAuditToFile initiate logging with level=AUDIT, and write that to fileName
func InitLogDebug()
InitLogDebug initiates logging with level=DEBUG
func InitLogError()
InitLogError initiates logging with level=ERROR
func InitLogInfo()
InitLogInfo initiates logging with level=INFO
func InitLogWarning()
InitLogWarning initiates logging with level=WARNING
LogAuditf logs a pretty printed log message with the AUDIT log level, where componentName is a name of a process, task, workflow or similar that generates the message, while message and values are formatted in the manner of fmt.Printf
LogAuditln logs a pretty printed log message with the AUDIT log level, where componentName is a name of a process, task, workflow or similar that generates the message, while message is a custom message (can be specified as multiple strings, which will then be formatted in the manner of fmt.Println).
type AuditInfo struct { ID string ProcessName string Command string Params map[string]string Tags map[string]string StartTime time.Time FinishTime time.Time ExecTimeNS time.Duration OutFiles map[string]string Upstream map[string]*AuditInfo }
AuditInfo contains structured audit/provenance logging information for a particular task (invocation), to go with all outgoing IPs from that task
NewAuditInfo returns a new AuditInfo struct
UnmarshalAuditInfoJSONFile returns an AuditInfo object from an AuditInfo .json file
type BaseIP struct {
// contains filtered or unexported fields
}
BaseIP contains foundational functionality which all IPs need to implement. It is meant to be embedded into other IP implementations.
NewBaseIP creates a new BaseIP
ID returns a globally unique ID for the IP
type BaseProcess struct {
// contains filtered or unexported fields
}
BaseProcess provides a skeleton for processes, such as the main Process component, and the custom components in the scipipe/components library
func NewBaseProcess(wf *Workflow, name string) BaseProcess
NewBaseProcess returns a new BaseProcess, connected to the provided workflow, and with the name name
func (p *BaseProcess) CloseAllOutPorts()
CloseAllOutPorts closes all normal-, and parameter out ports
func (p *BaseProcess) CloseOutParamPorts()
CloseOutParamPorts closes all parameter out-ports
func (p *BaseProcess) CloseOutPorts()
CloseOutPorts closes all (normal) out-ports
func (p *BaseProcess) DeleteInParamPort(portName string)
DeleteInParamPort deletes a InParamPort object from the process
func (p *BaseProcess) DeleteInPort(portName string)
DeleteInPort deletes an InPort object from the process
func (p *BaseProcess) DeleteOutParamPort(portName string)
DeleteOutParamPort deletes a OutParamPort object from the process
func (p *BaseProcess) DeleteOutPort(portName string)
DeleteOutPort deletes a OutPort object from the process
func (p *BaseProcess) InParamPort(portName string) *InParamPort
InParamPort returns the parameter port with name portName
func (p *BaseProcess) InParamPorts() map[string]*InParamPort
InParamPorts returns all parameter in-ports of the process
func (p *BaseProcess) InPort(portName string) *InPort
InPort returns the in-port with name portName
func (p *BaseProcess) InPorts() map[string]*InPort
InPorts returns a map of all the in-ports of the process, keyed by their names
func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string)
InitInParamPort adds the parameter port paramPort with name portName
func (p *BaseProcess) InitInPort(proc WorkflowProcess, portName string)
InitInPort adds the in-port port to the process, with name portName
func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string)
InitOutParamPort initializes the parameter port paramPort with name portName to the process We need to supply the concrete process used here as well, since this method might be used as part of an embedded struct, meaning that the process in the receiver is just the *BaseProcess, which doesn't suffice.
func (p *BaseProcess) InitOutPort(proc WorkflowProcess, portName string)
InitOutPort adds the out-port port to the process, with name portName
func (p *BaseProcess) Name() string
Name returns the name of the process
func (p *BaseProcess) OutParamPort(portName string) *OutParamPort
OutParamPort returns the parameter port with name portName
func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort
OutParamPorts returns all parameter out-ports of the process
func (p *BaseProcess) OutPort(portName string) *OutPort
OutPort returns the out-port with name portName
func (p *BaseProcess) OutPorts() map[string]*OutPort
OutPorts returns a map of all the out-ports of the process, keyed by their names
func (p *BaseProcess) Ready() (isReady bool)
Ready checks whether all the process' ports are connected
func (p *BaseProcess) Workflow() *Workflow
Workflow returns the workflow the process is connected to
FileIP (Short for "Information Packet" in Flow-Based Programming terminology) contains information and helper methods for a physical file on a normal disk.
NewFileIP creates a new FileIP
AddTag adds the tag k with value v
AddTags adds a map of tags to the IPs audit info
Atomize renames the temporary file name to the final file name, thus enabling to separate unfinished, and finished files
AuditFilePath returns the file path of the audit info file for the FileIP
AuditInfo returns the AuditInfo struct for the FileIP
CreateFifo creates a FIFO file for the FileIP
Exists checks if the file exists (at its final file name)
FifoFileExists checks if the FIFO-file (named pipe file) exists
FifoPath returns the path to use when a FIFO file is used instead of a normal file
Open opens the file and returns a file handle (*os.File)
OpenTemp opens the temp file and returns a file handle (*os.File)
OpenWriteTemp opens the file for writing, and returns a file handle (*os.File)
Param returns the parameter named key, from the IPs audit info
Path returns the (final) path of the physical file
Read reads the whole content of the file and returns the content as a byte array
RemoveFifo removes the FIFO file, if it exists
SetAuditInfo sets the AuditInfo struct for the FileIP
Size returns the size of an existing file, in bytes
Tag returns the tag for the tag with key k from the IPs audit info
Tags returns the audit info's tags
TempDir returns the path to a temporary directory where outputs are written
TempFileExists checks if the temp-file exists
TempPath returns the temporary path of the physical file
UnMarshalJSON is a helper function to unmarshal the content of the IPs file to the interface v
Write writes a byte array ([]byte) to the file's temp file path
WriteAuditLogToFile writes the audit log to its designated file
IP Is the base interface which all other IPs need to adhere to
type InParamPort struct { Chan chan string RemotePorts map[string]*OutParamPort // contains filtered or unexported fields }
InParamPort is an in-port for parameter values of string type
func NewInParamPort(name string) *InParamPort
NewInParamPort returns a new InParamPort
func (pip *InParamPort) AddRemotePort(pop *OutParamPort)
AddRemotePort adds a remote OutParamPort to the InParamPort
func (pip *InParamPort) CloseConnection(popName string)
CloseConnection closes the connection to the remote out-port with name popName, on the InParamPort
func (pip *InParamPort) From(pop *OutParamPort)
From connects one parameter port with another one
func (pip *InParamPort) FromFloat(floats ...float64)
FromFloat feeds one or more parameters of type float64 to the param port
func (pip *InParamPort) FromInt(ints ...int)
FromInt feeds one or more parameters of type int to the param port
func (pip *InParamPort) FromStr(strings ...string)
FromStr feeds one or more parameters of type string to a port
func (pip *InParamPort) Name() string
Name returns the name of the InParamPort
func (pip *InParamPort) Process() WorkflowProcess
Process returns the process that is connected to the port
func (pip *InParamPort) Ready() bool
Ready tells whether the port is ready or not
func (pip *InParamPort) Recv() string
Recv receiveds a param value over the ports connection
func (pip *InParamPort) Send(param string)
Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port
func (pip *InParamPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
func (pip *InParamPort) SetReady(ready bool)
SetReady sets the ready status of the InParamPort
type InPort struct { Chan chan *FileIP RemotePorts map[string]*OutPort // contains filtered or unexported fields }
InPort represents a pluggable connection to multiple out-ports from other processes, from its own process, and with which it is communicating via channels under the hood
NewInPort returns a new InPort struct
AddRemotePort adds a remote OutPort to the InPort
CloseConnection closes the connection to the remote out-port with name rptName, on the InPort
Disconnect disconnects the (out-)port with name rptName, from the InPort
From connects an OutPort to the InPort
Name returns the name of the InPort
func (pt *InPort) Process() WorkflowProcess
Process returns the process connected to the port
Ready tells whether the port is ready or not
Recv receives IPs from the port
Send sends IPs to the in-port, and is supposed to be called from the remote (out-) port, to send to this in-port
func (pt *InPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
SetReady sets the ready status of the InPort
type OutParamPort struct { RemotePorts map[string]*InParamPort // contains filtered or unexported fields }
OutParamPort is an out-port for parameter values of string type
func NewOutParamPort(name string) *OutParamPort
NewOutParamPort returns a new OutParamPort
func (pop *OutParamPort) AddRemotePort(pip *InParamPort)
AddRemotePort adds a remote InParamPort to the OutParamPort
func (pop *OutParamPort) Close()
Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.
func (pop *OutParamPort) Disconnect(pipName string)
Disconnect disonnects the (in-)port with name rptName, from the OutParamPort
func (pop *OutParamPort) Name() string
Name returns the name of the OutParamPort
func (pop *OutParamPort) Process() WorkflowProcess
Process returns the process that is connected to the port
func (pop *OutParamPort) Ready() bool
Ready tells whether the port is ready or not
func (pop *OutParamPort) Send(param string)
Send sends an FileIP to all the in-ports connected to the OutParamPort
func (pop *OutParamPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
func (pop *OutParamPort) SetReady(ready bool)
SetReady sets the ready status of the OutParamPort
func (pop *OutParamPort) To(pip *InParamPort)
To connects an InParamPort to the OutParamPort
OutPort represents a pluggable connection to multiple in-ports from other processes, from its own process, and with which it is communicating via channels under the hood
NewOutPort returns a new OutPort struct
AddRemotePort adds a remote InPort to the OutPort
Close closes the connection between this port and all the ports it is connected to. If this port is the last connected port to an in-port, that in-ports channel will also be closed.
Disconnect disconnects the (in-)port with name rptName, from the OutPort
Name returns the name of the OutPort
func (pt *OutPort) Process() WorkflowProcess
Process returns the process connected to the port
Ready tells whether the port is ready or not
Send sends an FileIP to all the in-ports connected to the OutPort
func (pt *OutPort) SetProcess(p WorkflowProcess)
SetProcess sets the process of the port to p
SetReady sets the ready status of the OutPort
To connects an InPort to the OutPort
type PortInfo struct {
// contains filtered or unexported fields
}
PortInfo is a container for various information about process ports
type Process struct { BaseProcess CommandPattern string PathFuncs map[string]func(*Task) string CustomExecute func(*Task) CoresPerTask int Prepend string Spawn bool PortInfo map[string]*PortInfo }
Process is the central component in SciPipe after Workflow. Processes are long-running "services" that schedules and executes Tasks based on the IPs and parameters received on its in-ports and parameter ports
NewProc returns a new Process, and initializes its ports based on the command pattern.
In is a short-form for InPort() (of BaseProcess), which works only on Process processes
func (p *Process) InParam(portName string) *InParamPort
InParam is a short-form for InParamPort() (of BaseProcess), which works only on Process processes
Out is a short-form for OutPort() (of BaseProcess), which works only on Process processes
func (p *Process) OutParam(portName string) *OutParamPort
OutParam is a short-form for OutParamPort() (of BaseProcess), which works only on Process processes
Run runs the process by instantiating and executing Tasks for all inputs and parameter values on its in-ports. in the case when there are no inputs or parameter values on the in-ports, it will run just once before it terminates. note that the actual execution of shell commands are done inside Task.Execute, not here.
SetOut initializes a port (if it does not already exist), and takes a configuration for its outputs paths via a pattern similar to the command pattern used to create new processes, with placeholder tags. Available placeholder tags to use are: {i:inport_name} {p:param_name} {t:tag_name} An example might be: {i:foo}.replace_with_{p:replacement}.txt ... given that the process contains an in-port named 'foo', and a parameter named 'replacement'. If an out-port with the specified name does not exist, it will be created. This allows to create out-ports for filenames that are created without explicitly stating a filename on the commandline, such as when only submitting a prefix.
SetOutFunc takes a function which produces a file path based on data available in *Task, such as concrete file paths and parameter values,
type Sink struct { BaseProcess }
Sink is a simple component that just receives IPs on its In-port without doing anything with them. It is used to drive pipelines of processes
NewSink returns a new Sink component
From connects an out-port to the sinks in-port
func (p *Sink) FromParam(outParamPort *OutParamPort)
FromParam connects a param-out-port to the sinks param-in-port
Run runs the Sink process
type Task struct { Name string Command string CustomExecute func(*Task) InIPs map[string]*FileIP OutIPs map[string]*FileIP Params map[string]string Tags map[string]string Done chan int Process *Process // contains filtered or unexported fields }
Task represents a single static shell command, or go function, to be executed, and are scheduled and managed by a corresponding Process
func NewTask(workflow *Workflow, process *Process, name string, cmdPat string, inIPs map[string]*FileIP, outPathFuncs map[string]func(*Task) string, portInfos map[string]*PortInfo, params map[string]string, tags map[string]string, prepend string, customExecute func(*Task), cores int) *Task
NewTask instantiates and initializes a new Task
Execute executes the task (the shell command or go function in CustomExecute)
InIP returns an IP for the in-port with name portName
InPath returns the path name of an input file for the task
OutIP returns an IP for the in-port with name portName
OutPath returns the path name of an input file for the task
Param returns the value of a param, for the task
Tag returns the value of a param, for the task
TempDir returns a string that is unique to a task, suitable for use in file paths. It is built up by merging all input filenames and parameter values that a task takes as input, joined with dots.
type Workflow struct { PlotConf WorkflowPlotConf // contains filtered or unexported fields }
Workflow is the centerpiece of the functionality in SciPipe, and is a container for a pipeline of processes making up a workflow. It has various methods for coordination the execution of the pipeline as a whole, such as keeping track of the maxiumum number of concurrent tasks, as well as helper methods for creating new processes, that automatically gets plugged in to the workflow on creation
NewWorkflow returns a new Workflow
NewWorkflowCustomLogFile returns a new Workflow, with
func (wf *Workflow) AddProc(proc WorkflowProcess)
AddProc adds a Process to the workflow, to be run when the workflow runs
func (wf *Workflow) AddProcs(procs ...WorkflowProcess)
AddProcs takes one or many Processes and adds them to the workflow, to be run when the workflow runs.
DecConcurrentTasks decreases the conter for how many concurrent tasks are currently running in the workflow
DotGraph generates a graph description in DOT format (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29) If Workflow.PlotConf.EdgeLabels is set to true, a label containing the in-port and out-port to which edges are connected to, will be printed.
IncConcurrentTasks increases the conter for how many concurrent tasks are currently running in the workflow
Name returns the name of the workflow
NewProc returns a new process based on a commandPattern (See the documentation for scipipe.NewProcess for more details about the pattern) and connects the process to the workflow
PlotGraph writes the workflow structure to a dot file
PlotGraphPDF writes the workflow structure to a dot file, and also runs the graphviz dot command to produce a PDF file (requires graphviz, with the dot command, installed on the system)
func (wf *Workflow) Proc(procName string) WorkflowProcess
Proc returns the process with name procName from the workflow
func (wf *Workflow) Procs() map[string]WorkflowProcess
Procs returns a map of all processes keyed by their names in the workflow
func (wf *Workflow) ProcsSorted() []WorkflowProcess
ProcsSorted returns the processes of the workflow, in an array, sorted by the process names
Run runs all the processes of the workflow
RunTo runs all processes upstream of, and including, the process with names provided as arguments
func (wf *Workflow) RunToProcs(finalProcs ...WorkflowProcess)
RunToProcs runs all processes upstream of, and including, the process strucs provided as arguments
RunToRegex runs all processes upstream of, and including, the process whose name matches any of the provided regexp patterns
SetSink sets the sink of the workflow to the provided sink process
Sink returns the sink process of the workflow
WorkflowPlotConf contains configuraiton for plotting the workflow as a graph with graphviz
type WorkflowProcess interface { Name() string InPorts() map[string]*InPort OutPorts() map[string]*OutPort InParamPorts() map[string]*InParamPort OutParamPorts() map[string]*OutParamPort Ready() bool Run() }
WorkflowProcess is an interface for processes to be handled by Workflow
Path | Synopsis |
---|---|
components | |
examples/custom_execution_function | |
examples/experiment_synchronized_multiple_sends | |
examples/experiment_with_new_api | |
examples/fifo | |
examples/filegen | |
examples/helloworld | |
examples/helloworld_paths | |
examples/param_channels | |
examples/prepend_feature | |
examples/resequencing | Implementation (work in progress) of the resequencing analysis pipeline used to teach the introductory NGS bioinformatics analysis course at SciLifeLab as described on this page: https://scilifelab.github.io/courses/ngsintro/1502/labs/resequencing-analysis |
examples/revcomplement | |
examples/run_specific_procs | |
examples/scatter_gather | |
examples/subworkflow | An example that shows how to create a sub-network / sub-workflow that can be used as a component |
examples/wrapper_procs |
Package scipipe imports 19 packages (graph) and is imported by 16 packages. Updated 2020-10-19. Refresh now. Tools for package owners.