scipipe: github.com/scipipe/scipipe Index | Examples | Files | Directories

package scipipe

import "github.com/scipipe/scipipe"

Package scipipe is a library for writing scientific workflows (sometimes also called "pipelines") of shell commands that depend on each other, in the Go programming languages. It was initially designed for problems in cheminformatics and bioinformatics, but should apply equally well to any domain involving complex pipelines of interdependent shell commands.

Index

Examples

Package Files

audit.go const.go ip.go log.go port.go process.go sink.go task.go utils.go workflow.go

Constants

const (
    // Standard buffer size used for channels connecting processes
    BUFSIZE = 16
)

Variables

var (
    Trace     *log.Logger
    Debug     *log.Logger
    Info      *log.Logger
    Audit     *log.Logger
    Warning   *log.Logger
    Error     *log.Logger
    LogExists bool
)

func Check Uses

func Check(err error, errMsg string)

func CheckErr Uses

func CheckErr(err error)

func Connect Uses

func Connect(port1 *FilePort, port2 *FilePort)

func ExecCmd Uses

func ExecCmd(cmd string) string

func ExpandParams Uses

func ExpandParams(cmd string, params map[string]string) string

ExpandParams takes a command pattern and a map of parameter names mapped to parameter values, and returns the command as a string where any parameter placeholders (on the form `{p:paramname}` are replaced with the parameter value from the provided parameter values map.

Code:

fmt.Println(ExpandParams("echo {p:msg}", map[string]string{"msg": "Hello"}))

Output:

echo Hello

func InitLog Uses

func InitLog(
    traceHandle io.Writer,
    debugHandle io.Writer,
    infoHandle io.Writer,
    auditHandle io.Writer,
    warningHandle io.Writer,
    errorHandle io.Writer)

Initiate logging

func InitLogAudit Uses

func InitLogAudit()

Initiate logging with level=AUDIT

func InitLogDebug Uses

func InitLogDebug()

Initiate logging with level=DEBUG

func InitLogError Uses

func InitLogError()

Initiate logging with level=ERROR

func InitLogInfo Uses

func InitLogInfo()

Initiate logging with level=INFO

func InitLogWarning Uses

func InitLogWarning()

Initiate logging with level=WARNING

type AuditInfo Uses

type AuditInfo struct {
    Command    string
    Params     map[string]string
    Keys       map[string]string
    ExecTimeMS time.Duration
    Upstream   map[string]*AuditInfo
}

func NewAuditInfo Uses

func NewAuditInfo() *AuditInfo

type ExecMode Uses

type ExecMode int

ExecMode specifies which execution mode should be used for a SciProcess and its corresponding SciTasks

const (
    // ExecModeLocal indicates that commands on the local computer
    ExecModeLocal ExecMode = iota
    // ExecModeSLURM indicates that commands should be executed on a HPC cluster
    // via a SLURM resource manager
    ExecModeSLURM ExecMode = iota
)

type FilePort Uses

type FilePort struct {
    Port
    InChan chan *InformationPacket
    // contains filtered or unexported fields
}

FilePort

func NewFilePort Uses

func NewFilePort() *FilePort

func (*FilePort) AddInChan Uses

func (pt *FilePort) AddInChan(inChan chan *InformationPacket)

func (*FilePort) AddOutChan Uses

func (pt *FilePort) AddOutChan(outChan chan *InformationPacket)

func (*FilePort) Close Uses

func (pt *FilePort) Close()

func (*FilePort) Connect Uses

func (localPort *FilePort) Connect(remotePort *FilePort)

func (*FilePort) IsConnected Uses

func (pt *FilePort) IsConnected() bool

func (*FilePort) Recv Uses

func (pt *FilePort) Recv() *InformationPacket

func (*FilePort) RunMergeInputs Uses

func (pt *FilePort) RunMergeInputs()

RunMerge merges (multiple) inputs on pt.inChans into pt.InChan. This has to start running when the owning process runs, in order to merge in-ports

func (*FilePort) Send Uses

func (pt *FilePort) Send(ip *InformationPacket)

func (*FilePort) SetConnectedStatus Uses

func (pt *FilePort) SetConnectedStatus(connected bool)

type IPGen Uses

type IPGen struct {
    Process

    Out       *FilePort
    FilePaths []string
    // contains filtered or unexported fields
}

IPGen is initialized by a set of strings with file paths, and from that will return instantiated (generated) InformationPacket on its Out-port, when run.

func NewIPGen Uses

func NewIPGen(workflow *Workflow, name string, filePaths ...string) (fq *IPGen)

Initialize a new IPGen component from a list of file paths

func (*IPGen) IsConnected Uses

func (ipg *IPGen) IsConnected() bool

Check if the IPGen outport is connected

func (*IPGen) Name Uses

func (ipg *IPGen) Name() string

func (*IPGen) Run Uses

func (ipg *IPGen) Run()

Execute the IPGen, returning instantiated InformationPacket

type InformationPacket Uses

type InformationPacket struct {
    SubStream *FilePort
    // contains filtered or unexported fields
}

InformationPacket contains information and helper methods for a physical file on a normal disk.

func NewInformationPacket Uses

func NewInformationPacket(path string) *InformationPacket

Create new InformationPacket "object"

func (*InformationPacket) AddKey Uses

func (ip *InformationPacket) AddKey(k string, v string)

func (*InformationPacket) AddKeys Uses

func (ip *InformationPacket) AddKeys(keys map[string]string)

func (*InformationPacket) Atomize Uses

func (ip *InformationPacket) Atomize()

Change from the temporary file name to the final file name

func (*InformationPacket) CreateFifo Uses

func (ip *InformationPacket) CreateFifo()

Create FIFO file for the InformationPacket

func (*InformationPacket) Exists Uses

func (ip *InformationPacket) Exists() bool

Check if the file exists (at its final file name)

func (*InformationPacket) GetAuditFilePath Uses

func (ip *InformationPacket) GetAuditFilePath() string

func (*InformationPacket) GetAuditInfo Uses

func (ip *InformationPacket) GetAuditInfo() *AuditInfo

func (*InformationPacket) GetFifoPath Uses

func (ip *InformationPacket) GetFifoPath() string

Get the path to use when a FIFO file is used instead of a normal file

func (*InformationPacket) GetKey Uses

func (ip *InformationPacket) GetKey(k string) string

func (*InformationPacket) GetKeys Uses

func (ip *InformationPacket) GetKeys() map[string]string

func (*InformationPacket) GetParam Uses

func (ip *InformationPacket) GetParam(key string) string

func (*InformationPacket) GetPath Uses

func (ip *InformationPacket) GetPath() string

Get the (final) path of the physical file

func (*InformationPacket) GetSize Uses

func (ip *InformationPacket) GetSize() int64

Get the size of an existing file, in bytes

func (*InformationPacket) GetTempPath Uses

func (ip *InformationPacket) GetTempPath() string

Get the temporary path of the physical file

func (*InformationPacket) Open Uses

func (ip *InformationPacket) Open() *os.File

Open the file and return a file handle (*os.File)

func (*InformationPacket) OpenTemp Uses

func (ip *InformationPacket) OpenTemp() *os.File

Open the temp file and return a file handle (*os.File)

func (*InformationPacket) OpenWriteTemp Uses

func (ip *InformationPacket) OpenWriteTemp() *os.File

Open the file for writing return a file handle (*os.File)

func (*InformationPacket) Read Uses

func (ip *InformationPacket) Read() []byte

Read the whole content of the file and return as a byte array ([]byte)

func (*InformationPacket) ReadAuditFile Uses

func (ip *InformationPacket) ReadAuditFile() []byte

Read the whole content of the file and return as a byte array ([]byte)

func (*InformationPacket) RemoveFifo Uses

func (ip *InformationPacket) RemoveFifo()

Remove the FIFO file, if it exists

func (*InformationPacket) SetAuditInfo Uses

func (ip *InformationPacket) SetAuditInfo(ai *AuditInfo)

func (*InformationPacket) TempFileExists Uses

func (ip *InformationPacket) TempFileExists() bool

Check if the temp-file exists

func (*InformationPacket) UnMarshalJson Uses

func (ip *InformationPacket) UnMarshalJson(v interface{})

func (*InformationPacket) WriteAuditLogToFile Uses

func (ip *InformationPacket) WriteAuditLogToFile()

func (*InformationPacket) WriteTempFile Uses

func (ip *InformationPacket) WriteTempFile(dat []byte)

Write a byte array ([]byte) to the file (first to its temp path, and then atomize)

type ParamPort Uses

type ParamPort struct {
    Chan chan string
    // contains filtered or unexported fields
}

ParamPort

func NewParamPort Uses

func NewParamPort() *ParamPort

func (*ParamPort) Close Uses

func (pp *ParamPort) Close()

func (*ParamPort) Connect Uses

func (pp *ParamPort) Connect(otherParamPort *ParamPort)

func (*ParamPort) ConnectStr Uses

func (pp *ParamPort) ConnectStr(strings ...string)

func (*ParamPort) IsConnected Uses

func (pp *ParamPort) IsConnected() bool

func (*ParamPort) Recv Uses

func (pp *ParamPort) Recv() string

func (*ParamPort) Send Uses

func (pp *ParamPort) Send(param string)

func (*ParamPort) SetConnectedStatus Uses

func (pp *ParamPort) SetConnectedStatus(connected bool)

type Port Uses

type Port interface {
    Connect(Port)
    IsConnected() bool
    SetConnectedStatus(bool)
}

type Process Uses

type Process interface {
    Name() string
    IsConnected() bool // Sanity check, to see whether all ports are connected
    Run()
}

Base interface for all processes

type SciProcess Uses

type SciProcess struct {
    Process

    CommandPattern string
    ExecMode       ExecMode
    Prepend        string
    Spawn          bool

    OutPortsDoStream map[string]bool
    PathFormatters   map[string]func(*SciTask) string

    CustomExecute func(*SciTask)

    CoresPerTask int
    // contains filtered or unexported fields
}

func NewProc Uses

func NewProc(workflow *Workflow, name string, cmd string) *SciProcess

func NewSciProcess Uses

func NewSciProcess(workflow *Workflow, name string, command string) *SciProcess

func ShellExpand Uses

func ShellExpand(workflow *Workflow, name string, cmd string, inPaths map[string]string, outPaths map[string]string, params map[string]string) *SciProcess

func (*SciProcess) GetInPorts Uses

func (p *SciProcess) GetInPorts() map[string]*FilePort

func (*SciProcess) GetOutPorts Uses

func (p *SciProcess) GetOutPorts() map[string]*FilePort

func (*SciProcess) GetParamPorts Uses

func (p *SciProcess) GetParamPorts() map[string]*ParamPort

func (*SciProcess) In Uses

func (p *SciProcess) In(portName string) *FilePort

func (*SciProcess) IsConnected Uses

func (proc *SciProcess) IsConnected() (isConnected bool)

------- Sanity checks -------

func (*SciProcess) Name Uses

func (p *SciProcess) Name() string

func (*SciProcess) Out Uses

func (p *SciProcess) Out(portName string) *FilePort

func (*SciProcess) PP Uses

func (p *SciProcess) PP(paramPortName string) *ParamPort

func (*SciProcess) ParamPort Uses

func (p *SciProcess) ParamPort(paramPortName string) *ParamPort

func (*SciProcess) Run Uses

func (p *SciProcess) Run()

Run runs the process by instantiating and executing SciTasks for all inputs and parameter values on its in-ports. in the case when there are no inputs or parameter values on the in-ports, it will run just once before it terminates. note that the actual execution of shell commands are done inside SciTask.Execute, not here.

func (*SciProcess) SetInPort Uses

func (p *SciProcess) SetInPort(portName string, port *FilePort)

func (*SciProcess) SetOutPort Uses

func (p *SciProcess) SetOutPort(portName string, port *FilePort)

func (*SciProcess) SetParamPort Uses

func (p *SciProcess) SetParamPort(paramPortName string, paramPort *ParamPort)

func (*SciProcess) SetPathCustom Uses

func (p *SciProcess) SetPathCustom(outPortName string, pathFmtFunc func(task *SciTask) (path string))

SetPathCustom takes a function which produces a file path based on data available in *SciTask, such as concrete file paths and parameter values,

func (*SciProcess) SetPathExtend Uses

func (p *SciProcess) SetPathExtend(inPortName string, outPortName string, extension string)

SetPathExtend creates an (output) path formatter that extends the path of an input InformationPacket

func (*SciProcess) SetPathReplace Uses

func (p *SciProcess) SetPathReplace(inPortName string, outPortName string, old string, new string)

SetPathReplace creates an (output) path formatter that uses an input's path but replaces parts of it.

func (*SciProcess) SetPathStatic Uses

func (p *SciProcess) SetPathStatic(outPortName string, path string)

SetPathStatic creates an (output) path formatter returning a static string file name

type SciTask Uses

type SciTask struct {
    Name          string
    Command       string
    ExecMode      ExecMode
    CustomExecute func(*SciTask)
    InTargets     map[string]*InformationPacket
    OutTargets    map[string]*InformationPacket
    Params        map[string]string
    Done          chan int
    Image         string
    DataFolder    string
    // contains filtered or unexported fields
}

func NewSciTask Uses

func NewSciTask(workflow *Workflow, name string, cmdPat string, inTargets map[string]*InformationPacket, outPathFuncs map[string]func(*SciTask) string, outPortsDoStream map[string]bool, params map[string]string, prepend string, execMode ExecMode, cores int) *SciTask

func (*SciTask) Execute Uses

func (t *SciTask) Execute()

func (*SciTask) InPath Uses

func (t *SciTask) InPath(portName string) string

func (*SciTask) Param Uses

func (t *SciTask) Param(portName string) string

type ShellProcess Uses

type ShellProcess interface {
    Process

    In(string) *FilePort
    GetInPorts() map[string]*FilePort

    Out(string) *FilePort
    GetOutPorts() map[string]*FilePort

    SetPathStatic(outPortName string, path string)
    SetPathExtend(inPortName string, outPortName string, extension string)
    SetPathReplace(inPortName string, outPortName string, old string, new string)
    SetPathCustom(outPortName string, pathFmtFunc func(task *SciTask) (path string))
}

type Sink Uses

type Sink struct {
    Process
    // contains filtered or unexported fields
}

Sink is a simple component that just receives InformationPacket on its In-port without doing anything with them

func NewSink Uses

func NewSink(name string) (s *Sink)

Instantiate a Sink component

func (*Sink) Connect Uses

func (p *Sink) Connect(outPort *FilePort)

func (*Sink) IsConnected Uses

func (p *Sink) IsConnected() bool

func (*Sink) Name Uses

func (p *Sink) Name() string

func (*Sink) Run Uses

func (p *Sink) Run()

Execute the Sink component

type Workflow Uses

type Workflow struct {
    // contains filtered or unexported fields
}

func NewWorkflow Uses

func NewWorkflow(name string, maxConcurrentTasks int) *Workflow

func (*Workflow) AddProc Uses

func (wf *Workflow) AddProc(proc Process)

func (*Workflow) AddProcs Uses

func (wf *Workflow) AddProcs(procs ...Process)

func (*Workflow) ConnectLast Uses

func (wf *Workflow) ConnectLast(outPort *FilePort)

ConnectLast connects the last (most downstream) out-ports in the workflow to an implicit sink process which will be used to drive the workflow. This can be used instead of manually creating a sink, connecting it, and setting it as the driver process of the workflow.

func (*Workflow) DecConcurrentTasks Uses

func (wf *Workflow) DecConcurrentTasks(slots int)

func (*Workflow) Driver Uses

func (wf *Workflow) Driver() Process

func (*Workflow) IncConcurrentTasks Uses

func (wf *Workflow) IncConcurrentTasks(slots int)

func (*Workflow) NewProc Uses

func (wf *Workflow) NewProc(procName string, commandPattern string) *SciProcess

func (*Workflow) Proc Uses

func (wf *Workflow) Proc(procName string) Process

func (*Workflow) Procs Uses

func (wf *Workflow) Procs() map[string]Process

func (*Workflow) Run Uses

func (wf *Workflow) Run()

func (*Workflow) SetDriver Uses

func (wf *Workflow) SetDriver(driver Process)

func (*Workflow) SetSink Uses

func (wf *Workflow) SetSink(sink *Sink)

func (*Workflow) Sink Uses

func (wf *Workflow) Sink() *Sink

Directories

PathSynopsis
components

Package scipipe imports 15 packages (graph) and is imported by 1 packages. Updated 2017-11-17. Refresh now. Tools for package owners.