reflow

package module
v0.0.0-...-0f4c570 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2018 License: Apache-2.0 Imports: 30 Imported by: 0

README

Reflow

Gitter Build Status

Reflow is a system for incremental data processing in the cloud. Reflow enables scientists and engineers to compose existing tools (packaged in Docker images) using ordinary programming constructs. Reflow then evaluates these programs in a cloud environment, transparently parallelizing work and memoizing results. Reflow was created at GRAIL to manage our NGS (next generation sequencing) bioinformatics workloads on AWS, but has also been used for many other applications, including model training and ad-hoc data analyses.

Reflow comprises:

  • a functional, lazy, type-safe domain specific language for writing workflow programs;
  • a runtime for evaluating Reflow programs incrementally, coordinating cluster execution, and transparent memoization;
  • a cluster scheduler to dynamically provision and tear down resources from a cloud provider (AWS currently supported).

Reflow thus allows scientists and engineers to write straightforward programs and then have them transparently executed in a cloud environment. Programs are automatically parallelized and distributed across multiple machines, and redundant computations (even across runs and users) are eliminated by its memoization cache. Reflow evaluates its programs incrementally: whenever the input data or program changes, only those outputs that depend on the changed data or code are recomputed.

In addition to the default cluster computing mode, Reflow programs can also be run locally, making use of the local machine's Docker daemon (including Docker for Mac).

Reflow was designed to support sophisticated, large-scale bioinformatics workflows, but should be widely applicable to scientific and engineering computing workloads. It was built using Go.

Reflow joins a long list of systems designed to tackle bioinformatics workloads, but differ from these in important ways:

  • it is a vertically integrated system with a minimal set of external dependencies; this allows Reflow to be "plug-and-play": bring your cloud credentials, and you're off to the races;
  • it defines a strict data model which is used for transparent memoization and other optimizations;
  • it takes workflow software seriously: the Reflow DSL provides type checking, modularity, and other constructors that are commonplace in general purpose programming languages;
  • because of its high level data model and use of caching, Reflow computes incrementally: it is always able to compute the smallest set of operations given what has been computed previously.

Getting Reflow

You can get binaries (macOS/amd64, Linux/amd64) for the latest release at the GitHub release page.

Reflow is implemented in Go, and its packages are go-gettable. You can retrieve Reflow and its dependencies with

% go get [-u] github.com/grailbio/reflow

and build the "reflow" binary using

% go install github.com/grailbio/reflow/cmd/reflow

Note that Reflow makes use of its own agent binaries, called reflowlets. These are compiled for Linux/amd64, and are invoked by the cluster manager during instance bootstrapping. Thus, when you make changes to the code, you may also have to update the agent docker image. This process is handled by command releasereflow, which can be installed by

% go install github.com/grailbio/reflow/cmd/releasereflow

Releasereflow builds target agent binaries and uploads them to a Docker repository (the release builds use the grailbio/reflowlet repository). Releasereflow then updates the file version.go which is compiled into the standard reflow binary.

Quickstart - AWS

Reflow is distributed with an EC2 cluster manager, and a memoization cache implementation based on S3. These must be configured before use. Reflow maintains a configuration file in $HOME/.reflow/config.yaml by default (this can be overridden with the -config option). Reflow's setup commands modify this file directly. After each step, the current configuration can be examined by running reflow config.

Note Reflow must have access to AWS credentials in the environment (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) while running these commands.

% reflow setup-ec2
% reflow config
cluster: ec2cluster
ec2cluster:
  ami: ami-d0e54eb0
  diskspace: 100
  disktype: gp2
  instancetypes:
  - c1.medium
  - c1.xlarge
  - c3.2xlarge
  - c3.4xlarge
  - c3.8xlarge
  - c3.large
  - c3.xlarge
  - c4.2xlarge
  - c4.4xlarge
  - c4.8xlarge
  - c4.large
  - c4.xlarge
  - cc2.8xlarge
  - m1.large
  - m1.medium
  - m1.small
  - m1.xlarge
  - m2.2xlarge
  - m2.4xlarge
  - m2.xlarge
  - m3.2xlarge
  - m3.large
  - m3.medium
  - m3.xlarge
  - m4.16xlarge
  - m4.4xlarge
  - m4.xlarge
  - r4.xlarge
  - t1.micro
  - t2.large
  - t2.medium
  - t2.micro
  - t2.nano
  - t2.small
  keyname: ""
  maxinstances: 10
  region: us-west-2
  securitygroup: <a newly created security group here>
  sshkey: <your public SSH key here>
https: httpsca,$HOME/.reflow/reflow.pem

After running reflow setup-ec2, we see that Reflow created a new security group (associated with the account's default VPC), and configured the cluster to use some default settings. Feel free to edit the configuration file ($HOME/.reflow/config.yaml) to your taste. If you want to use spot instances, add a new key under ec2cluster: spot: true.

Reflow only configures one security group per account: Reflow will reuse a previously created security group if reflow setup-ec2 is run anew. See reflow setup-ec2 -help for more details.

Next, we'll set up a cache. This isn't strictly necessary, but we'll need it in order to use many of Reflow's sophisticated caching and incremental computation features. On AWS, Reflow implements a cache based on S3 and DynamoDB. A new S3-based cache is provisioned by reflow setup-s3-repository and reflow setup-dynamodb-assoc, each of which take one argument naming the S3 bucket and DynamoDB table name to be used, respectively. The S3 bucket is used to store file objects while the DynamoDB table is used to store associations between logically named computations and their concrete output. Note that S3 bucket names are global, so pick a name that's likely to be unique.

% reflow setup-s3-repository reflow-quickstart-cache
2017/10/18 15:09:10 creating s3 bucket reflow-quickstart-cache
2017/10/18 15:09:12 created s3 bucket reflow-quickstart-cache
% reflow setup-dynamodb-assoc reflow-quickstart
2017/10/18 15:09:40 creating DynamoDB table reflow-quickstart
2017/10/18 15:09:40 created DynamoDB table reflow-quickstart
% reflow config
assoc: dynamodb,reflow-quickstart
repository: s3,reflow-quickstart-cache

<rest is same as before>

The setup commands created the S3 bucket and DynamoDB table as needed, and modified the configuration accordingly.

We're now ready to run our first "hello world" program!

Create a file called "hello.rf" with the following contents:

val Main = exec(image := "ubuntu", mem := GiB) (out file) {"
	echo hello world >>{{out}}
"}

and run it:

% reflow run hello.rf
2017/10/18 15:11:05 run name: marius@localhost/e08374e8
2017/10/18 15:11:08 ec2cluster: launched instance i-0bd7d189617e53767: t2.small: 1.9GiB 1 100.0GiB
2017/10/18 15:11:54 -> hello.Main   3dca1cc0 run    exec ubuntu echo hello world >>{{out}}
2017/10/18 15:12:02 <- hello.Main   3dca1cc0 ok     exec 0s 12B
2017/10/18 15:12:02 total n=1 time=8s
	ident      n   ncache runtime(m) cpu mem(GiB) disk(GiB) tmp(GiB)
	hello.Main 1   0                                        
	
file(sha256=sha256:a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447, size=12)

Here, Reflow started a new t2.small instance (Reflow matches the workload with available instance types), ran "echo hello world" inside of an Ubuntu container, placed the output in a file, and returned its SHA256 digest. (Reflow represents file contents using their SHA256 digest.)

We're now ready to explore Reflow more fully.

Simple bioinformatics workflow

Let's explore some of Reflow's features through a simple task: aligning NGS read data from the 1000genomes project. Create a file called "align.rf" with the following. The code is commented inline for clarity.

// In order to align raw NGS data, we first need to construct an index
// against which to perform the alignment. We're going to be using
// the BWA aligner, and so we'll need to retrieve a reference sequence
// and create an index that's usable from BWA.

// g1kv37 is the a human reference FASTA sequence. (All
// chromosomes.) Reflow has a static type system, but most type
// annotations can be omitted: they are inferred by Reflow. In this
// case, we're creating a file: a reference to the contents of the
// named URL. We're retrieving data from the public 1000genomes S3
// bucket.
val g1kv37 = file("s3://1000genomes/technical/reference/human_g1k_v37.fasta.gz")

// Here we create an indexed version of the g1kv37 reference. It is
// created using the "bwa index" command with the raw FASTA data as
// input. Here we encounter another way to produce data in reflow:
// the exec. An exec runs a (Bash) script inside of a Docker image,
// placing the output in files or directories (or both: execs can
// return multiple values). In this case, we're returning a
// directory since BWA stores multiple index files alongside the raw
// reference. We also declare that the image to be used is
// "biocontainers/bwa" (the BWA image maintained by the
// biocontainers project).
//
// Inside of an exec template (delimited by {" and "}) we refer to
// (interpolate) values in our environment by placing expressions
// inside of the {{ and }} delimiters. In this case we're referring
// to the file g1kv37 declared above, and our output, named out.
//
// Many types of expressions can be interpolated inside of an exec,
// for example strings, integers, files, and directories. Strings
// and integers are rendered using their normal representation,
// files and directories are materialized to a local path before
// starting execution. Thus, in this case, {{g1kv37}} is replaced at
// runtime by a path on disk with a file with the contents of the
// file g1kv37 (i..e,
// s3://1000genomes/technical/reference/human_g1k_v37.fasta.gz)
val reference = exec(image := "biocontainers/bwa", mem := GiB, cpu := 1) (out dir) {"
	# Ignore failures here. The file from 1000genomes has a trailer
	# that isn't recognized by gunzip. (This is not recommended practice!)
	gunzip -c {{g1kv37}} > {{out}}/g1k_v37.fa || true
	cd {{out}}
	bwa index -a bwtsw g1k_v37.fa
"}

// Now that we have defined a reference, we can define a function to
// align a pair of reads against the reference, producing an output
// SAM-formatted file. Functions compute expressions over a set of
// abstract parameters, in this case, a pair of read files. Unlike almost
// everywhere else in Reflow, function parameters must be explicitly
// typed.
//
// (Note that we're using a syntactic short-hand here: parameter lists can 
// be abbreviated. "r1, r2 file" is equivalent to "r1 file, r2 file".)
//
// The implementation of align is a straightforward invocation of "bwa mem".
// Note that "r1" and "r2" inside of the exec refer to the function arguments,
// thus align can be invoked for any set of r1, r2.
func align(r1, r2 file) = 
	exec(image := "biocontainers/bwa", mem := 20*GiB, cpu := 16) (out file) {"
		bwa mem -M -t 16 {{reference}}/g1k_v37.fa {{r1}} {{r2}} > {{out}}
	"}

// We're ready to test our workflow now. We pick an arbitrary read
// pair from the 1000genomes data set, and invoke align. There are a
// few things of note here. First is the identifier "Main". This
// names the expression that's evaluated by "reflow run" -- the
// entry point of the computation. Second, we've defined Main to be
// a block. A block is an expression that contains one or more
// definitions followed by an expression. The value a block is the
// final expression. Finally, Main contains a @requires annotation.
// This instructs Reflow how many resources to reserve for the work
// being done. Note that, because Reflow is able to distribute work,
// if a single instance is too small to execute fully in parallel,
// Reflow will provision additional compute instances to help along.
// @requires thus denotes the smallest possible instance
// configuration that's required for the program.
@requires(cpu := 16, mem := 24*GiB, disk := 50*GiB)	
val Main = {
	r1 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_1.filt.fastq.gz")
	r2 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_2.filt.fastq.gz")
	align(r1, r2)
}

Now we're ready to run our module. First, let's run reflow doc. This does two things. First, it typechecks the module (and any dependent modules), and second, it prints documentation for the public declarations in the module. Identifiers that begin with an uppercase letter are public (and may be used from other modules); others are not.

% reflow doc align.rf
Declarations

val Main (out file)
    We're ready to test our workflow now. We pick an arbitrary read pair from the
    1000genomes data set, and invoke align. There are a few things of note here.
    First is the identifier "Main". This names the expression that's evaluated by
    "reflow run" -- the entry point of the computation. Second, we've defined Main
    to be a block. A block is an expression that contains one or more definitions
    followed by an expression. The value a block is the final expression. Finally,
    Main contains a @requires annotation. This instructs Reflow how many resources
    to reserve for the work being done. Note that, because Reflow is able to
    distribute work, if a single instance is too small to execute fully in parallel,
    Reflow will provision additional compute instances to help along. @requires thus
    denotes the smallest possible instance configuration that's required for the
    program.

Then let's run it:

% reflow run align.rf
2017/10/18 15:47:51 run name: marius@localhost/fb77159e
2017/10/18 15:47:54 ec2cluster: launched instance i-0a6a6901865001c4c: c4.4xlarge: 28.5GiB 16 100.0GiB

Reflow launched a new instance: the previously launched instance (a t2.small) was not big enough to fit the requirements of align.rf. Note also that Reflow assigns a run name for each "reflow run" invocation. This can be used to look up run details with the "reflow info" command. In this case:

% reflow info marius@localhost/fb77159e
marius@localhost/fb77159e
    time:    Wed Oct 18 15:47:52 2017
    program: /Users/marius/align.rf
    phase:   Eval
    alloc:   ec2-34-210-106-90.us-west-2.compute.amazonaws.com:9000/a79b63ddf0952cff
    log:     /Users/marius/.reflow/runs/marius@localhost/fb77159efc639b14aaff55fcd5229484ec78694ada7973bd0af862fd4f06adbd.execlog

Here we see that the run is currently being performed on the alloc named ec2-34-210-106-90.us-west-2.compute.amazonaws.com:9000/a79b63ddf0952cff. An alloc is a resource reservation on a single machine. A run can make use of multiple allocs to distribute work across multiple machines. The alloc is a URI, and the first component is the real hostname. You can ssh into the host in order to inspect what's going on. Reflow launched the instance with your public SSH key (as long as it was setup by reflow setup-ec2, and $HOME/.ssh/id_rsa.pub existed at that time).

% ssh core@ec2-34-210-106-90.us-west-2.compute.amazonaws.com
...

As the run progresses, Reflow prints execution status on the console. Lines beginning with "->" indicate that a task was started (e.g., download a file, start an exec), while lines beginning with "<-" indicate that a task finished.

...
2017/10/18 16:03:07 -> example.Main.r2 a8788913 run  intern ..hase3/data/HG00103/sequence_read/SRR062640_2.filt.fastq.gz
2017/10/18 16:03:07 -> example.Main.r1 882d8d39 run  intern ..hase3/data/HG00103/sequence_read/SRR062640_1.filt.fastq.gz
2017/10/18 16:03:14 -> example.reference a4a82ba4 run    exec biocontainers/bwa # Ignore failures here. The f..bwa index -a bwtsw g1k_v37.fa

Here, Reflow started downloading r1 and r2 in parallel with creating the reference. Creating the reference is an expensive operation. We can examine it while it's running with "reflow ps":

% reflow ps 
marius@localhost/220ab625 example.reference 4:03PM 0:00 running 272.8MiB 8.0 4.4GiB bwa

This tells us that the only task that's currently running is bwa to compute the reference. It's currently using 272MiB of memory, 8 cores, and 4.4 GiB of disk space. By passing the -l option, reflow ps also prints the task's exec URI.

% reflow ps -l
marius@localhost/220ab625 example.reference 4:03PM 0:00 running 302.8MiB 8.0 4.4GiB bwa ec2-52-11-236-67.us-west-2.compute.amazonaws.com:9000/6b2879b9c282a12f/a4a82ba4b5c6d82985d31e79be5a8fe568d75a86db284d78f9972df525cf70d3

An exec URI is a handle to the actual task being executed. It globally identifies all tasks, and can examined with "reflow info":

% reflow info ec2-52-11-236-67.us-west-2.compute.amazonaws.com:9000/6b2879b9c282a12f/a4a82ba4b5c6d82985d31e79be5a8fe568d75a86db284d78f9972df525cf70d3
ec2-52-11-236-67.us-west-2.compute.amazonaws.com:9000/6b2879b9c282a12f/a4a82ba4b5c6d82985d31e79be5a8fe568d75a86db284d78f9972df525cf70d3
    state: running
    type:  exec
    ident: example.reference
    image: biocontainers/bwa
    cmd:   "\n\t# Ignore failures here. The file from 1000genomes has a trailer\n\t# that isn't recognized by gunzip. (This is not recommended practice!)\n\tgunzip -c {{arg[0]}} > {{arg[1]}}/g1k_v37.fa || true\n\tcd {{arg[2]}}\n\tbwa index -a bwtsw g1k_v37.fa\n"
      arg[0]:
        .: sha256:8b6c538abf0dd92d3f3020f36cc1dd67ce004ffa421c2781205f1eb690bdb442 (851.0MiB)
      arg[1]: output 0
      arg[2]: output 0
    top:
         bwa index -a bwtsw g1k_v37.fa

Here, Reflow tells us that the currently running process is "bwa index...", its template command, and the SHA256 digest of its inputs. Programs often print helpful output to standard error while working; this output can be examined with "reflow logs":

% reflow logs ec2-52-11-236-67.us-west-2.compute.amazonaws.com:9000/6b2879b9c282a12f/a4a82ba4b5c6d82985d31e79be5a8fe568d75a86db284d78f9972df525cf70d3
gzip: /arg/0/0: decompression OK, trailing garbage ignored
[bwa_index] Pack FASTA... 22.36 sec
[bwa_index] Construct BWT for the packed sequence...
[BWTIncCreate] textLength=6203609478, availableWord=448508744
[BWTIncConstructFromPacked] 10 iterations done. 99999990 characters processed.
[BWTIncConstructFromPacked] 20 iterations done. 199999990 characters processed.
[BWTIncConstructFromPacked] 30 iterations done. 299999990 characters processed.
[BWTIncConstructFromPacked] 40 iterations done. 399999990 characters processed.
[BWTIncConstructFromPacked] 50 iterations done. 499999990 characters processed.
[BWTIncConstructFromPacked] 60 iterations done. 599999990 characters processed.
%

At this point, it looks like everything is running as expected. There's not much more to do than wait. Note that, while creating an index takes a long time, Reflow only has to compute it once. When it's done, Reflow memoizes the result, uploading the resulting data directly to the configured S3 cache bucket. The next time the reference expression is encountered, Reflow will use the previously computed result. If the input file changes (e.g., we decide to use another reference sequence), Reflow will recompute the index again. The same will happen if the command (or Docker image) that's used to compute the index changes. Reflow keeps track of all the dependencies for a particular sub computation, and recomputes them only when dependencies have changed. This way, we always know what being computed is correct (the result is the same as if we had computed the result from scratch), but avoid paying the cost of redundant computation.

After a little while, the reference will have finished generating, and Reflow begins alignment. Here, Reflow reports that the reference took 56 minutes to compute, and produced 8 GiB of output.

2017/10/19 10:04:39 <- align.reference a4a82ba4 ok     exec 56m49s 8.0GiB
2017/10/19 10:04:39 -> align.align  80b24fa0 run    exec biocontainers/bwa bwa mem -M -t 16 {{reference}..37.fa {{r1}} {{r2}} > {{out}}

If we query ("info") the reference exec again, Reflow reports precisely what was produced:

% reflow info ec2-34-215-254-146.us-west-2.compute.amazonaws.com:9000/e34ae0cf7c0482f6/a4a82ba4b5c6d82985d31e79be5a8fe568d75a86db284d78f9972df525cf70d3
ec2-34-215-254-146.us-west-2.compute.amazonaws.com:9000/e34ae0cf7c0482f6/a4a82ba4b5c6d82985d31e79be5a8fe568d75a86db284d78f9972df525cf70d3
    state: complete
    type:  exec
    ident: align.reference
    image: biocontainers/bwa
    cmd:   "\n\t# Ignore failures here. The file from 1000genomes has a trailer\n\t# that isn't recognized by gunzip. (This is not recommended practice!)\n\tgunzip -c {{arg[0]}} > {{arg[1]}}/g1k_v37.fa || true\n\tcd {{arg[2]}}\n\tbwa index -a bwtsw g1k_v37.fa\n"
      arg[0]:
        .: sha256:8b6c538abf0dd92d3f3020f36cc1dd67ce004ffa421c2781205f1eb690bdb442 (851.0MiB)
      arg[1]: output 0
      arg[2]: output 0
    result:
      list[0]:
        g1k_v37.fa:     sha256:2f9cd9e853a9284c53884e6a551b1c7284795dd053f255d630aeeb114d1fa81f (2.9GiB)
        g1k_v37.fa.amb: sha256:dd51a07041a470925c1ebba45c2f534af91d829f104ade8fc321095f65e7e206 (6.4KiB)
        g1k_v37.fa.ann: sha256:68928e712ef48af64c5b6a443f2d2b8517e392ae58b6a4ab7191ef7da3f7930e (6.7KiB)
        g1k_v37.fa.bwt: sha256:2aec938930b8a2681eb0dfbe4f865360b98b2b6212c1fb9f7991bc74f72d79d8 (2.9GiB)
        g1k_v37.fa.pac: sha256:d62039666da85d859a29ea24af55b3c8ffc61ddf02287af4d51b0647f863b94c (739.5MiB)
        g1k_v37.fa.sa:  sha256:99eb6ff6b54fba663c25e2642bb2a6c82921c931338a7144327c1e3ee99a4447 (1.4GiB)

In this case, "bwa index" produced a number of auxiliary index files. These are the contents of the "reference" directory.

We can again query Reflow for running execs, and examine the alignment. We see now that the reference is passed in (argument 0), along side the read pairs (arguments 1 and 2).

% reflow ps -l
marius@localhost/e22cbf4c align.align 10:04AM 0:00 running 7.8GiB 127.9 5.8GiB bwa ec2-34-215-254-146.us-west-2.compute.amazonaws.com:9000/e34ae0cf7c0482f6/80b24fa01b988ca666d4c1eae0fa21d1dbd68e9cd18a82fc4cd6da20fec5abbd
% reflow info ec2-34-215-254-146.us-west-2.compute.amazonaws.com:9000/e34ae0cf7c0482f6/80b24fa01b988ca666d4c1eae0fa21d1dbd68e9cd18a82fc4cd6da20fec5abbd
ec2-34-215-254-146.us-west-2.compute.amazonaws.com:9000/e34ae0cf7c0482f6/80b24fa01b988ca666d4c1eae0fa21d1dbd68e9cd18a82fc4cd6da20fec5abbd
    state: running
    type:  exec
    ident: align.align
    image: biocontainers/bwa
    cmd:   "\n\t\tbwa mem -M -t 16 {{arg[0]}}/g1k_v37.fa {{arg[1]}} {{arg[2]}} > {{arg[3]}}\n\t"
      arg[0]:
        g1k_v37.fa:     sha256:2f9cd9e853a9284c53884e6a551b1c7284795dd053f255d630aeeb114d1fa81f (2.9GiB)
        g1k_v37.fa.amb: sha256:dd51a07041a470925c1ebba45c2f534af91d829f104ade8fc321095f65e7e206 (6.4KiB)
        g1k_v37.fa.ann: sha256:68928e712ef48af64c5b6a443f2d2b8517e392ae58b6a4ab7191ef7da3f7930e (6.7KiB)
        g1k_v37.fa.bwt: sha256:2aec938930b8a2681eb0dfbe4f865360b98b2b6212c1fb9f7991bc74f72d79d8 (2.9GiB)
        g1k_v37.fa.pac: sha256:d62039666da85d859a29ea24af55b3c8ffc61ddf02287af4d51b0647f863b94c (739.5MiB)
        g1k_v37.fa.sa:  sha256:99eb6ff6b54fba663c25e2642bb2a6c82921c931338a7144327c1e3ee99a4447 (1.4GiB)
      arg[1]:
        .: sha256:0c1f85aa9470b24d46d9fc67ba074ca9695d53a0dee580ec8de8ed46ef347a85 (1.8GiB)
      arg[2]:
        .: sha256:47f5e749123d8dda92b82d5df8e32de85273989516f8e575d9838adca271f630 (1.7GiB)
      arg[3]: output 0
    top:
         /bin/bash -e -l -o pipefail -c ..bwa mem -M -t 16 /arg/0/0/g1k_v37.fa /arg/1/0 /arg/2/0 > /return/0 .
         bwa mem -M -t 16 /arg/0/0/g1k_v37.fa /arg/1/0 /arg/2/0
% 

Note that the read pairs are files. Files in Reflow do not have names; they are just blobs of data. When Reflow runs a process that requires input files, those anonymous files are materialized on disk, but the filenames are not meaningful. In this case, we can see from the "top" output (these are the actual running processes, as reported by the OS), that the r1 ended up being called "/arg/1/0" and r2 "/arg/2/0". The output is a file named "/return/0".

Finally, alignment is complete. Aligning a single read pair took around 19m, and produced 13.2 GiB of output. Upon completion, Reflow prints runtime statistics and the result.

2017/10/19 10:25:13 <- align.align  80b24fa0 ok     exec 19m27s 13.2GiB
2017/10/19 10:25:13 total n=2 time=1h18m24s
	ident           n   ncache runtime(m) cpu               mem(GiB)    disk(GiB)      tmp(GiB)
	align.align     1   0      19/19/19   123.9/123.9/123.9 7.8/7.8/7.8 12.9/12.9/12.9 0.0/0.0/0.0
	align.reference 1   0      57/57/57   7.8/7.8/7.8       4.4/4.4/4.4 8.0/8.0/8.0    0.0/0.0/0.0
	
file(sha256=sha256:becb04856590893048e698b1c4bc26192105a44866a06ad8af4f7bda0104c43b, size=14196491221)
% 

Reflow represents file values by the SHA256 digest of the file's content. In this case, that's not very useful: you want the file, not its digest. Reflow provides mechanisms to export data. In this case let's copy the resulting file to an S3 bucket.

We'll make use of the "files" system module to copy the aligned file to an external S3 bucket. Modify align.rf's Main to the following (but pick an S3 bucket you own), and then run it again. Commentary is inline for clarity.

@requires(cpu := 16, mem := 24*GiB, disk := 50*GiB)	
val Main = {
	r1 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_1.filt.fastq.gz")
	r2 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_2.filt.fastq.gz")
	// Instantiate the system modules "files" (system modules begin
	// with $), assigning its instance to the "files" identifier. To
	// view the documentation for this module, run "reflow doc
	// $/files".
	files := make("$/files")
	// As before.
	aligned := align(r1, r2)
	// Use the files module's Copy function to copy the aligned file to
	// the provided destination.
	files.Copy(aligned, "s3://marius-test-bucket/aligned.sam")
}

And run it again:

% reflow run align.rf
2017/10/19 10:46:01 run name: marius@localhost/e18f1312
2017/10/19 10:46:02 -> align.align  80b24fa0 xfer   exec biocontainers/bwa bwa mem -M -t 16 {{reference}..37.fa {{r1}} {{r2}} > {{out}}
2017/10/19 10:47:27 <- align.align  80b24fa0 ok     exec 0s 13.2GiB
2017/10/19 10:47:28 -> align.Main   9e252400 run  extern s3://marius-test-bucket/aligned.sam 13.2GiB
2017/10/19 10:49:42 <- align.Main   9e252400 ok   extern 2m6s 0B
2017/10/19 10:49:42 total n=2 time=3m40s
	ident       n   ncache runtime(m) cpu mem(GiB) disk(GiB) tmp(GiB)
	align.Main  1   0                                        
	align.align 1   1                                        
	
val<>

Here we see that Reflow did not need to recompute the aligned file, it is instead retrieved from cache. The reference index generation is skipped altogether. Status lines that indicate "xfer" (instead of "run") means that Reflow is performing a cache transfer in place of running the computation. Reflow claims to have transferred a 13.2 GiB file to s3://marius-test-bucket/aligned.sam. Indeed it did:

% aws s3 ls s3://marius-test-bucket/aligned.sam
2017-10-19 10:47:37 14196491221 aligned.sam

1000align

This code was modularized and generalized in 1000align. Here, fastq, bam, and alignment utilities are split into their own parameterized modules. The toplevel module, 1000align, is instantiated from the command line. Command line invocations (reflow run) can pass module parameters through flags (strings, booleans, and integers):

% reflow run 1000align.rf -help
usage of 1000align.rf:
  -out string
    	out is the target of the output merged BAM file (required)
  -sample string
    	sample is the name of the 1000genomes phase 3 sample (required)

For example, to align the full sample from above, we can invoke 1000align.rf with the following arguments:

% reflow run 1000align.rf  -sample HG00103 -out s3://marius-test-bucket/HG00103.bam

In this case, if your account limits allow it, Reflow will launch additional EC2 instances in order to further parallelize the work to be done. (Since we're aligning multiple pairs of FASTQ files). In this run, we can see that Reflow is aligning 5 pairs in parallel across 2 instances (four can fit on the initial m4.16xlarge instance).

% reflow ps -l
marius@localhost/66986b84 align.align.sam 1:43PM 0:00 running 7.8GiB  4.7  6.5GiB   bwa ec2-52-40-140-59.us-west-2.compute.amazonaws.com:9000/21d327bb4da2b7cf/0551e1353c385dc420c00d93ff5b645c5b6dd022986d42fabb4003d9c632f383
marius@localhost/66986b84 align.align.sam 1:43PM 0:00 running 10.5GiB 58.1 4.2GiB   bwa ec2-52-40-140-59.us-west-2.compute.amazonaws.com:9000/21d327bb4da2b7cf/a39f5f6d8a8ed8b3200cc1ca6b6497dd6bc05ad501bd3f54587139e255972f21
marius@localhost/66986b84 align.align.sam 1:44PM 0:00 running 8.6GiB  50.8 925.0MiB bwa ec2-52-40-140-59.us-west-2.compute.amazonaws.com:9000/5cbba84c0ca1cb4e/5a956304a2cd3ec37db6240c82a97b4660803f2ea30f2b69139a384cd0664f68
marius@localhost/66986b84 align.align.sam 1:44PM 0:00 running 9.1GiB  3.3  0B       bwa ec2-52-40-140-59.us-west-2.compute.amazonaws.com:9000/5cbba84c0ca1cb4e/5ebd233dc5495c5c090c5177c7bfdecc1882972cf3504b532d22be1200e7e5f1
marius@localhost/66986b84 align.align.sam 1:48PM 0:00 running 6.8GiB  31.5 0B       bwa ec2-34-212-44-28.us-west-2.compute.amazonaws.com:9000/f128adf14a7a3d5a/6c1266c7c3e8fc9f2a56e370b552a163e14bf6f4ae73b7b5b067d408eb38dbcf

When it completes, an approximately 17GiB BAM file is deposited to s3:

% aws s3 ls s3://marius-test-bucket/HG00103.bam
2017-10-24 20:25:48 18752460252 HG00103.bam

A note on Reflow's cluster manager

Reflow comes with a built-in cluster manager, which is responsible for elastically increasing or decreasing required compute resources. The AWS EC2 cluster manager keeps track of instance type availability and account limits, and uses these to launch the most appropriate set of instances for a given job. When instances become idle, they will terminate themselves if they are idle for more than 10 minutes; idle instances are reused when possible.

Documentation

Support and community

Please join us on on Gitter or on the mailing list to discuss Reflow.

Documentation

Overview

Package reflow implements the core data structures and (abstract) runtime for Reflow.

Reflow is a system for distributed program execution. The programs are described by Flows, which are an abstract specification of the program's execution. Each Flow node can take any number of other Flows as dependent inputs and perform some (local) execution over these inputs in order to compute some output value.

Reflow supports a limited form of dynamic dependencies: a Flow may evaluate to a list of values, each of which may be executed independently. This mechanism also provides parallelism.

The system orchestrates Flow execution by evaluating the flow in the manner of an abstract syntax tree; see Eval for more details.

Index

Constants

This section is empty.

Variables

Digester is the Digester used throughout reflow. We use a SHA256 digest.

View Source
var Universe string

Universe is the global namespace for digest computation.

Functions

func WithBackground

func WithBackground(ctx context.Context, wg WaitGroup) (context.Context, context.CancelFunc)

WithBackground returns a new context.Context with an affiliated Context, accessible via Background. The background context may be canceled with the returned cancellation function. The supplied WaitGroup is used to inform the caller of pending background operations: wg.Add(1) is called for each call to Background; wg.Done is called when the context returned from Background is disposed of through (Context).Complete.

Types

type Arg

type Arg struct {
	// Out is true if this is an output argument.
	Out bool
	// Fileset is the fileset used as an input argument.
	Fileset *Fileset `json:",omitempty"`
	// Index is the output argument index.
	Index int
}

Arg represents an exec argument (either input or output).

type Cache

type Cache interface {
	// Lookup returns the value associated with a (digest) key.
	// Lookup returns an error flagged errors.NotExist when there
	// is no such value.
	//
	// Lookup should also check to make sure that the objects
	// actually exist, and provide a reasonable guarantee that they'll
	// be available for transfer.
	//
	// TODO(marius): allow the caller to maintain a lease on the desired
	// objects so that garbage collection can (safely) be run
	// concurrently with flows. This isn't a correctness concern (the
	// flows may be restarted), but rather one of efficiency.
	Lookup(context.Context, digest.Digest) (Fileset, error)

	// Transfer transmits the file objects associated with value v
	// (usually retrieved by Lookup) to the repository dst. Transfer
	// should be used in place of direct (cache) repository access since
	// it may apply additional policies (e.g., rate limiting, etc.)
	Transfer(ctx context.Context, dst Repository, v Fileset) error

	// NeedTransfer returns the set of files in the Fileset v that are absent
	// in the provided repository.
	NeedTransfer(ctx context.Context, dst Repository, v Fileset) ([]File, error)

	// Write stores the Value v, whose file objects exist in Repository repo,
	// under the key id. If the repository is nil no objects are transferred.
	Write(ctx context.Context, id digest.Digest, v Fileset, repo Repository) error

	// Delete removes the value named by id from this cache.
	Delete(ctx context.Context, id digest.Digest) error

	// Repository returns this cache's underlying repository. It should
	// not be used for data transfer during the course of evaluation; see
	// Transfer.
	Repository() Repository
}

A Cache stores Values and their associated File objects for later retrieval. Caches may be temporary: objects are not guaranteed to persist.

type CacheMode

type CacheMode int

CacheMode is a bitmask that tells how caching is to be used in the evaluator.

const (
	// CacheOff is CacheMode's default value and indicates
	// no caching (read or write) is to be performed.
	CacheOff CacheMode = 0
	// CacheRead indicates that cache lookups should be performed
	// during evaluation.
	CacheRead CacheMode = 1 << iota
	// CacheWrite indicates that the evaluator should write evaluation
	// results to the cache.
	CacheWrite
)

func (CacheMode) Reading

func (m CacheMode) Reading() bool

Reading returns whether the cache mode contains CacheRead.

func (CacheMode) Writing

func (m CacheMode) Writing() bool

Writing returns whether the cache mode contains CacheWrite.

type Config

type Config struct {
	// HashV1 should be set to true if the flow should use the legacy
	// "v1" hash algorithm.
	HashV1 bool
}

Config stores flow configuration information. Configs modulate Flow behavior.

func (Config) IsZero

func (c Config) IsZero() bool

IsZero tells whether this config stores any non-default config.

func (*Config) Merge

func (c *Config) Merge(d Config)

Merge merges config d into config c.

func (Config) String

func (c Config) String() string

String returns a summary of the configuration c.

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context is a context.Context that is used for background operations within reflow. In addition to providing a common background context for operations, it also carries a WaitGroup, so that the caller can wait for background operation completion.

func Background

func Background(ctx context.Context) Context

Background returns the Context associated with the given / parent context.Context. If there is no associated context, it returns a fresh Context without an affiliated WaitGroup.

func (Context) Complete

func (c Context) Complete()

Complete should be called when the operation is complete.

type Eval

type Eval struct {
	// EvalConfig is the evaluation configuration used in this
	// evaluation.
	EvalConfig
	// contains filtered or unexported fields
}

Eval is an evaluator for Flows.

func NewEval

func NewEval(root *Flow, config EvalConfig) *Eval

NewEval creates and initializes a new evaluator using the provided evaluation configuration and root flow.

func (*Eval) CacheWrite

func (e *Eval) CacheWrite(ctx context.Context, f *Flow, repo Repository) error

CacheWrite writes the cache entry for flow f, with objects in the provided source repository. CacheWrite returns nil on success, or else the first error encountered.

func (*Eval) Do

func (e *Eval) Do(ctx context.Context) error

Do evaluates a flow (as provided in Init) and returns its value, or error.

There are two evaluation modes, configured by EvalConfig.BottomUp.

When BottomUp is true, the Flow is evaluated in bottom-up mode. Each node's dependencies are evaluated (recursively); a node is evaluated when all of its dependencies are complete (and error free). Before a node is run, its result is first looked up in the configured cache. If there is a cache hit, evaluation without any work done. Only the node's value is downloaded; its objects are fetched lazily. When a node is ready to be evaluated, we check that all of the objects that it depends on are present in the executor's repository; missing objects are retrieved from cache. If these objects are not present in the cache (this can happen if the object is removed from the cache's repository after the cache lookup was done but before the transfer began), evaluation fails with a restartable error.

When BottomUp is false, the flow is evaluated first top-down, and then bottom up. In this mode, objects are looked up first in the top-down phase; a nodes dependencies are explored only on cache miss. Once this phase is complete, evaluation proceeds in bottom-up mode. Object retrievial is as in bottom-up mode.

Eval keeps track of the evaluation state of each node; these are described in the documentation for FlowState.

Evaluation is performed by simplification: ready nodes are added to a todo list. Single-step evaluation yields either a fully evaluated node (where (*Flow).Value is set to its result) or by a new Flow node (whose (*Flow).Parent is always set to its ancestor). Evaluations are restartable.

Eval permits supplementary workers to steal nodes to evaluate. These workers are responsible for transferring any necessary data between the Eval's repository and the worker's. Once a Flow node has been stolen, it is owned by the worker until it is returned; the worker must set the Flow node's state appropriately.

This provides a simple evaluation scheme that also does not leave any parallelism "on the ground".

Eval employs a conservative admission controller to ensure that we do not exceed available resources.

The root flow is canonicalized before evaluation.

Eval reclaims unreachable objects after each exec has completed and e.GC is set to true.

TODO(marius): wait for all nodes to complete before returning (early) when cancelling...

TODO(marius): explore making use of CAS flow states, so that we don't have to separately track pending nodes internally (so we don't clobber stolen nodes).

TODO(marius): permit "steal-only" mode. The only provision for this setup is that the parent must contain some sort of global repository (e.g., S3).

func (*Eval) Err

func (e *Eval) Err() error

Err returns the root evaluation error, if any.

func (*Eval) Flow

func (e *Eval) Flow() *Flow

Flow returns the root flow of this eval.

func (*Eval) LogFlow

func (e *Eval) LogFlow(ctx context.Context, f *Flow)

LogFlow logs flow f's state, and then tracks it for future logging.

func (*Eval) LogSummary

func (e *Eval) LogSummary(log *log.Logger)

LogSummary prints an execution summary to an io.Writer.

func (*Eval) Mutate

func (e *Eval) Mutate(f *Flow, muts ...interface{})

Mutate safely applies a set of mutations vis-a-vis the garbage collector. Mutations may be applied concurrently with each other; mutations are not applied during garbage collection.

func (*Eval) Need

func (e *Eval) Need() Requirements

Need returns the total resource requirements needed in order to avoid queueing work.

func (*Eval) Requirements

func (e *Eval) Requirements() Requirements

Requirements returns the minimum and maximum resource requirements for this Eval's flow.

func (*Eval) Stealer

func (e *Eval) Stealer() *Stealer

Stealer returns Stealer from which flow nodes may be stolen. This permits an external worker to perform the work implied by the return Flow, which is always in FlowReady state. When the external worker has completed processing (or decided not to process after all), the node must be returned via Return.

func (*Eval) Value

func (e *Eval) Value() values.T

Value returns the root value of this eval.

type EvalConfig

type EvalConfig struct {
	// The executor to which execs are submitted.
	Executor Executor

	// An (optional) logger to which the evaluation transcript is printed.
	Log *log.Logger

	// Status gets evaluation status reports.
	Status *status.Group

	// An (optional) logger to print evaluation trace.
	Trace *log.Logger

	// Transferer is used to arrange transfers between repositories,
	// including nodes and caches.
	Transferer Transferer

	// Repository is the main, shared repository between evaluations.
	Repository Repository

	// Assoc is the main, shared assoc that is used to store cache and
	// metadata associations.
	Assoc assoc.Assoc

	// CacheMode determines whether the evaluator reads from
	// or writees to the cache. If CacheMode is nonzero, Assoc,
	// Repository, and Transferer must be non-nil.
	CacheMode CacheMode

	// NoCacheExtern determines whether externs are cached.
	NoCacheExtern bool

	// GC tells whether Eval should perform garbage collection
	// after each exec has completed.
	GC bool

	// RecomputeEmpty determines whether cached empty values
	// are recomputed.
	RecomputeEmpty bool

	// BottomUp determines whether we perform bottom-up only
	// evaluation, skipping the top-down phase.
	BottomUp bool

	// Config stores the flow config to be used.
	Config Config

	// CacheLookupTimeout is the timeout for cache lookups.
	// After the timeout expires, a cache lookup is considered
	// a miss.
	CacheLookupTimeout time.Duration

	// Invalidate is a function that determines whether or not f's cached
	// results should be invalidated.
	Invalidate func(f *Flow) bool
}

EvalConfig provides runtime configuration for evaluation instances.

func (EvalConfig) String

func (e EvalConfig) String() string

String returns a human-readable form of the evaluation configuration.

type Exec

type Exec interface {
	// ID returns the digest of the exec. This is equivalent to the Digest of the value computed
	// by the Exec.
	ID() digest.Digest

	// URI names execs in a process-agnostic fashion.
	URI() string

	// Result returns the exec's result after it has been completed.
	Result(ctx context.Context) (Result, error)

	// Inspect inspects the exec. It can be called at any point in the Exec's lifetime.
	Inspect(ctx context.Context) (ExecInspect, error)

	// Wait awaits completion of the Exec.
	Wait(ctx context.Context) error

	// Logs returns the standard error and/or standard output of the Exec.
	// If it is called during execution, and if follow is true, it follows
	// the logs until completion of execution.
	// Completed Execs return the full set of available logs.
	Logs(ctx context.Context, stdout, stderr, follow bool) (io.ReadCloser, error)

	// Shell invokes /bin/bash inside an Exec. It can be invoked only when
	// the Exec is executing. r provides the shell input. The returned read
	// closer has the shell output. The caller has to close the read closer
	// once done.
	// TODO(pgopal) - Implement shell for zombie execs.
	Shell(ctx context.Context) (io.ReadWriteCloser, error)

	// Promote installs this exec's objects into the alloc's repository.
	Promote(context.Context) error
}

An Exec computes a Value. It is created from an ExecConfig; the Exec interface permits waiting on completion, and inspection of results as well as ongoing execution.

type ExecArg

type ExecArg struct {
	// Out tells whether this argument is an output argument.
	Out bool
	// Index is the dependency index represented by this argument.
	Index int
}

ExecArg indexes arguments to dependencies.

type ExecConfig

type ExecConfig struct {
	// The type of exec: "exec", "intern", "extern"
	Type string

	// A human-readable name for the exec.
	Ident string

	// intern, extern: the URL from which data is fetched or to which
	// data is pushed.
	URL string

	// exec: the docker image used to perform an exec
	Image string

	// exec: the Sprintf-able command that is to be run inside of the
	// Docker image.
	Cmd string

	// exec: the set of arguments (one per %s in Cmd) passed to the command
	// extern: the single argument which is to be exported
	Args []Arg

	// exec: the resource requirements for the exec
	Resources

	// NeedAWSCreds indicates the exec needs AWS credentials defined in
	// its environment: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and
	// AWS_SESSION_TOKEN will be available with the user's default
	// credentials.
	NeedAWSCreds bool

	// OutputIsDir tells whether an output argument (by index)
	// is a directory.
	OutputIsDir []bool `json:",omitempty"`
}

ExecConfig contains all the necessary information to perform an exec.

func (ExecConfig) String

func (e ExecConfig) String() string

type ExecInspect

type ExecInspect struct {
	Created time.Time
	Config  ExecConfig
	State   string        // "created", "waiting", "running", .., "zombie"
	Status  string        // human readable status
	Error   *errors.Error `json:",omitempty"` // non-nil runtime on error
	Profile Profile

	// Gauges are used to export realtime exec stats. They are used only
	// while the Exec is in running state.
	Gauges Gauges
	// Commands running from top, for live inspection.
	Commands []string

	Docker types.ContainerJSON // Docker inspect output.
}

ExecInspect describes the current state of an Exec.

func (ExecInspect) Runtime

func (e ExecInspect) Runtime() time.Duration

Runtime computes the exec's runtime based on Docker's timestamps.

type Executor

type Executor interface {
	// Put creates a new Exec at id. It it idempotent.
	Put(ctx context.Context, id digest.Digest, exec ExecConfig) (Exec, error)

	// Get retrieves the Exec named id.
	Get(ctx context.Context, id digest.Digest) (Exec, error)

	// Remove deletes an Exec.
	Remove(ctx context.Context, id digest.Digest) error

	// Execs lists all Execs known to the Executor.
	Execs(ctx context.Context) ([]Exec, error)

	// Resources indicates the total amount of resources available at the Executor.
	Resources() Resources

	// Repository returns the Repository associated with this Executor.
	Repository() Repository
}

Executor manages Execs and their values.

type File

type File struct {
	// The digest of the contents of the file.
	ID digest.Digest

	// The size of the file.
	Size int64
}

File represents a name-by-hash file.

type Fileset

type Fileset struct {
	List []Fileset       `json:",omitempty"`
	Map  map[string]File `json:"Fileset,omitempty"`
}

Fileset is the result of an evaluated flow. Values may either be lists of values or Filesets. Filesets are a map of paths to Files.

func (Fileset) AnyEmpty

func (v Fileset) AnyEmpty() bool

AnyEmpty tells whether this value, or any of its constituent values contain no files.

func (Fileset) Digest

func (v Fileset) Digest() digest.Digest

Digest returns a digest representing the value. Digests preserve semantics: two values with the same digest are considered to be equivalent.

func (Fileset) Empty

func (v Fileset) Empty() bool

Empty tells whether this value is empty, that is, it contains no files.

func (Fileset) Equal

func (v Fileset) Equal(w Fileset) bool

Equal reports whether v is equal to w. Two values are equal when they produce the same digest.

func (Fileset) Files

func (v Fileset) Files() []File

Files returns the set of Files that comprise the value.

func (Fileset) Flatten

func (v Fileset) Flatten() []Fileset

Flatten is a convenience function to flatten (shallowly) the value v, returning a list of Values. If the value is a list value, the list is returned; otherwise a unary list of the value v is returned.

func (Fileset) Flow

func (v Fileset) Flow() *Flow

Flow returns the Flow which evaluates to the constant Value v.

func (Fileset) N

func (v Fileset) N() int

N returns the number of files (not necessarily unique) in this value.

func (Fileset) Pullup

func (v Fileset) Pullup() Fileset

Pullup merges this value (tree) into a single toplevel fileset.

func (Fileset) Short

func (v Fileset) Short() string

Short returns a short, human-readable string representing the value. Its intended use if for pretty-printed output. In particular, hashes are abbreviated, and lists display only the first member, followed by ellipsis. For example, a list of values is printed as:

list<val<sample.fastq.gz=f2c59c40>, ...50MB>

func (Fileset) Size

func (v Fileset) Size() int64

Size returns the total size of this value.

func (Fileset) String

func (v Fileset) String() string

String returns a full, human-readable string representing the value v. Unlike Short, string is fully descriptive: it contains the full digest and lists are complete. For example:

list<sample.fastq.gz=sha256:f2c59c40a1d71c0c2af12d38a2276d9df49073c08360d72320847efebc820160>,
  sample2.fastq.gz=sha256:59eb82c49448e349486b29540ad71f4ddd7f53e5a204d50997f054d05c939adb>>

func (Fileset) WriteDigest

func (v Fileset) WriteDigest(w io.Writer)

WriteDigest writes the digestible material for v to w. The io.Writer is assumed to be produced by a Digester, and hence infallible. Errors are not checked.

type Flow

type Flow struct {
	// The operation represented by this node. See Op
	// for definitions.
	Op Op

	// Parent is set when a node is Forked.
	Parent *Flow

	// Deps holds this Flow's data dependencies.
	Deps []*Flow

	// Config stores this Flow's config.
	Config Config

	Image   string                           // OpExec
	Cmd     string                           // OpExec
	URL     *url.URL                         // OpIntern, OpExtern
	Re      *regexp.Regexp                   // OpGroupby, OpCollect
	Repl    string                           // OpCollect
	MapFunc func(*Flow) *Flow                // OpMap, OpMapMerge
	MapFlow *Flow                            // OpMap
	K       func(vs []values.T) *Flow        // OpK
	Coerce  func(values.T) (values.T, error) // OpCoerce
	// ArgMap maps exec arguments to dependencies. (OpExec).
	Argmap []ExecArg
	// OutputIsDir tells whether the output i is a directory.
	OutputIsDir []bool

	// Argstrs stores a symbolic argument name, used for pretty printing
	// and debugging.
	Argstrs []string

	// FlowDigest stores, for OpVal and OpK, a digest representing
	// just the operation or value.
	FlowDigest digest.Digest

	// A human-readable identifier for the node, for use in
	// debugging output, etc.
	Ident string

	// Source code position of this node.
	Position string

	// State stores the evaluation state of the node; see FlowState
	// for details.
	State FlowState

	// Resources indicates the expected resource usage of this node.
	// Currently it is only defined for OpExec.
	Resources Resources

	// Reserved stores the amount of resources that have been reserved
	// on behalf of this node.
	Reserved Resources

	// FlowRequirements stores the requirements indicated by
	// OpRequirements.
	FlowRequirements Requirements

	// Value stores the Value to which the node was evaluated.
	Value values.T
	// Err stores any evaluation error that occured during flow evaluation.
	Err *errors.Error

	// The total runtime for evaluating this node.
	Runtime time.Duration

	// The current owning executor of this Flow.
	Owner Executor

	// The exec working on this node.
	Exec Exec

	// Cached stores whether the flow was retrieved from cache.
	Cached bool

	// The amount of data to be transferred.
	TransferSize data.Size

	// Inspect stores an exec's inspect output.
	Inspect ExecInspect

	Tracked bool

	Status *status.Task

	Data []byte // OpData
	// contains filtered or unexported fields
}

Flow defines an AST for data flows. It is a logical union of ops as defined by type Op. Child nodes witness computational dependencies and must therefore be evaluated before its parents.

func (*Flow) CacheKeys

func (f *Flow) CacheKeys() []digest.Digest

CacheKeys returns all the valid cache keys for this flow node. They are returned in order from most concrete to least concrete.

func (*Flow) Canonicalize

func (f *Flow) Canonicalize(config Config) *Flow

Canonicalize returns a canonical version of Flow f, where semantically equivalent flows (as per Flow.Digest) are collapsed into one.

func (*Flow) Copy

func (f *Flow) Copy() *Flow

Copy performs a shallow copy of the Flow.

func (*Flow) DebugString

func (f *Flow) DebugString() string

DebugString returns a human readable representation of the flow appropriate for debugging.

func (*Flow) Digest

func (f *Flow) Digest() digest.Digest

Digest produces a digest of Flow f. The digest captures the entirety of the Flows semantics: two flows with the same digest must evaluate to the same value. OpMap Flows are canonicalized by passing a no-op Flow to its MapFunc.

func (*Flow) ExecArg

func (f *Flow) ExecArg(i int) ExecArg

ExecArg returns the ith ExecArg. It is drawn from f.Argmap if it is defined, or else it just the i'th input argument.

ExecArg panics if i >= f.NExecArg().

func (*Flow) ExecString

func (f *Flow) ExecString(cache bool) string

ExecString renders a string representing the operation performed by this node. Cache should be set to true if the result was retrieved from cache; in this case, values from dependencies are not rendered in this case since they may not be available. The returned string has the following format:

how:digest(ident) shortvalue = execstring (runtime transfer rate)

where "execstring" is a string indicating the operation performed.

For example, the ExecString of a node that interns a directory of FASTQs looks like this:

ecae46a4(inputfastq) val<161216_E00472_0063_AH7LWNALXX/CNVS-LUAD-120587007-cfDNA-WGS-Rep1_S1_L001_R1_001.fastq.gz=87f7ca18, ...492.45GB> = intern "s3://grail-avalon/samples/CNVS-LUAD-120587007-cfDNA-WGS-Rep1/fastq/" (1h29m33.908587144s 93.83MB/s)

func (*Flow) Fork

func (f *Flow) Fork(flow *Flow)

Fork creates a new fork of this flow. The current version of Flow f becomes the parent flow.

func (*Flow) Label

func (f *Flow) Label(ident string)

Label labels this flow with ident. It then recursively labels its ancestors. Labeling stops when a node is already labeled.

func (*Flow) MapInit

func (f *Flow) MapInit()

MapInit initializes Flow.MapFlow from the supplied MapFunc.

func (*Flow) NExecArg

func (f *Flow) NExecArg() int

NExecArg returns the number of exec arguments of this node. If f.Argmap is defined, it returns the length of the argument map, or else the number of dependencies.

func (*Flow) PhysicalDigest

func (f *Flow) PhysicalDigest() digest.Digest

PhysicalDigest computes the physical digest of the Flow f, reflecting the actual underlying operation to be performed, and not the logical one.

It is an error to call PhysicalDigest on nodes whose dependencies are not fully resolved (i.e., state FlowDone, contains a Fileset value), or on nodes not of type OpExec, OpIntern, or OpExtern. This is because the physical input values must be available to compute the digest.

Physical digest returns an empty digest a physical digest is not computable for node f.

func (*Flow) Requirements

func (f *Flow) Requirements() (req Requirements)

Requirements computes the minimum and maximum resource requirements for this flow. It currently assumes that the width of any map operation is infinite.

BUG(marius): Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.

TODO(marius): include width hints on map nodes

TODO(marius): account for static parallelism too.

func (*Flow) String

func (f *Flow) String() string

Strings returns a shallow and human readable string representation of the flow.

func (*Flow) Visitor

func (f *Flow) Visitor() *FlowVisitor

Visitor returns a new FlowVisitor rooted at this node.

func (*Flow) WriteDigest

func (f *Flow) WriteDigest(w io.Writer)

WriteDigest writes the digestible material of f to w. The io.Writer is assumed to be produced by a Digester, and hence infallible. Errors are not checked.

type FlowState

type FlowState int64

FlowState is an enum representing the state of a Flow node during evaluation.

const (
	// FlowInit indicates that the flow is initialized but not evaluated
	FlowInit FlowState = iota

	// FlowNeedLookup indicates that the evaluator should perform a
	// cache lookup on the flow node.
	FlowNeedLookup
	// FlowLookup indicates that the evaluator is currently performing a
	// cache lookup of the flow node. After a successful cache lookup,
	// the node is transfered to FlowDone, and the (cached) value is
	// attached to the flow node. The objects may not be transfered into
	// the evaluator's repository.
	FlowLookup

	// FlowTODO indicates that the evaluator should consider the node
	// for evaluation once its dependencies are completed.
	FlowTODO

	// FlowNeedTransfer indicates that the evaluator should transfer all
	// objects needed for execution into the evaluator's repository.
	FlowNeedTransfer
	// FlowTransfer indicates that the evalutor is currently
	// transferring the flow's dependent objects from cache.
	FlowTransfer

	// FlowReady indicates that the node is ready for evaluation and should
	// be scheduled by the evaluator. A node is ready only once all of its
	// dependent objects are available in the evaluator's repository.
	FlowReady
	// FlowRunning indicates that the node is currently being evaluated by
	// the evaluator.
	FlowRunning

	// FlowDone indicates that the node has completed evaluation.
	FlowDone

	// FlowMax is the number of flow states.
	FlowMax
)

FlowState denotes a Flow node's state during evaluation. Flows begin their life in FlowInit, where they remain until they are examined by the evaluator. The precise state transitions depend on the evaluation mode (whether it is evaluating bottom-up or top-down, and whether a cache is used), but generally follow the order in which they are laid out here.

func (FlowState) Name

func (s FlowState) Name() string

Name returns the FlowStat's string name.

func (FlowState) String

func (i FlowState) String() string

type FlowVisitor

type FlowVisitor struct {
	*Flow
	// contains filtered or unexported fields
}

FlowVisitor implements a convenient visitor for flow graphs.

func (*FlowVisitor) Push

func (v *FlowVisitor) Push(f *Flow)

Push pushes node f onto visitor stack.

func (*FlowVisitor) Visit

func (v *FlowVisitor) Visit()

Visit pushes the current node's children on to the visitor stack, including both data and control dependencies.

func (*FlowVisitor) Walk

func (v *FlowVisitor) Walk() bool

Walk visits the next flow node on the stack. Walk returns false when it runs out of nodes to visit; it also guarantees that each node is visited only once.

type Fork

type Fork *Flow

Fork is a an argument to (*Eval).Mutate to indicate a fork mutation.

type Gauges

type Gauges map[string]float64

Gauges stores a set of named gauges.

func (Gauges) Snapshot

func (g Gauges) Snapshot() Gauges

Snapshot returns a snapshot of the gauge values g.

type Liveset

type Liveset interface {
	// Contains returns true if the given object definitely is in the
	// set; it may rarely return true when the object does not.
	Contains(digest.Digest) bool
}

A Liveset contains a possibly approximate judgement about live objects.

type Mutation

type Mutation int

Mutation is a type of mutation.

const (
	// Incr is the mutation that increments the reference count used for
	// GC.
	Incr Mutation = iota
	// Decr is the mutation that decrements the reference count used for
	// GC.
	Decr
	// Cached is the mutation that sets the flow's flag.
	Cached
	// Refresh is the mutation that refreshes the status of the flow node.
	Refresh
)

func (Mutation) String

func (i Mutation) String() string

type Op

type Op int

Op is an enum representing operations that may be performed in a Flow.

const (
	// OpExec runs a command in a docker container on the inputs
	// represented by the Flow's dependencies.
	OpExec Op = 1 + iota
	// OpIntern imports datasets from URLs.
	OpIntern
	// OpExtern exports values to URLs.
	OpExtern
	// OpGroupby applies a regular expression to group an input value.
	OpGroupby
	// OpMap applies a function (which returns a Flow) to each element
	// in the input.
	OpMap
	// OpCollect filters and rewrites values.
	OpCollect
	// OpMerge merges a set of flows.
	OpMerge
	// OpVal returns a value.
	OpVal
	// OpPullup merges a set of results into one value.
	OpPullup
	// OpK is a flow continuation.
	OpK
	// OpCoerce is a flow value coercion. (Errors allowed.)
	OpCoerce
	// OpRequirements modifies the flow's requirements.
	OpRequirements
	// OpData evaluates to a literal (inline) piece of data.
	OpData
)

func (Op) DigestString

func (i Op) DigestString() string

func (Op) String

func (o Op) String() string

type Profile

type Profile map[string]struct{ Max, Mean float64 }

Profile stores keyed statistical summaries (currently: mean, max).

func (Profile) String

func (p Profile) String() string

type Repository

type Repository interface {
	// Collect removes from this repository any objects not in the
	// Liveset
	Collect(context.Context, Liveset) error

	// Stat returns the File metadata for the blob with the given digest.
	// It returns errors.NotExist if the blob does not exist in this
	// repository.
	Stat(context.Context, digest.Digest) (File, error)

	// Get streams the blob named by the given Digest.
	// If it does not exist in this repository, an error with code
	// errors.NotFound will be returned.
	Get(context.Context, digest.Digest) (io.ReadCloser, error)

	// Put streams a blob to the repository and returns its
	// digest when completed.
	Put(context.Context, io.Reader) (digest.Digest, error)

	// WriteTo writes a blob identified by a Digest directly to a
	// foreign repository named by a URL. If the repository is
	// unable to write directly to the foreign repository, an error
	// with flag errors.NotSupported is returned.
	WriteTo(context.Context, digest.Digest, *url.URL) error

	// ReadFrom reads a blob identified by a Digest directly from a
	// foreign repository named by a URL. If the repository is
	// unable to read directly from the foreign repository, an error
	// with flag errors.NotSupported is returned.
	ReadFrom(context.Context, digest.Digest, *url.URL) error

	// URL returns the URL of this repository, or nil if it does not
	// have one. The returned URL may be used for direct transfers via
	// WriteTo or ReadFrom.
	URL() *url.URL
}

Repository defines an interface used for servicing blobs of data that are named-by-hash.

type Requirements

type Requirements struct {
	// Min is the smallest amount of resources that must be allocated
	// to satisfy the requirements.
	Min Resources
	// Width is the width of the requirements. A width of zero indicates
	// a "narrow" job: minimum describes the exact resources needed.
	// Widths greater than zero are "wide" requests: they require some
	// multiple of the minimum requirement. The distinction between a
	// width of zero and a width of one is a little subtle: width
	// represents the smallest acceptable width, and thus a width of 1
	// can be taken as a hint to allocate a higher multiple of the
	// minimum requirements, whereas a width of 0 represents a precise
	// requirement: allocating any more is likely to be wasteful.
	Width int
}

Requirements stores resource requirements, comprising the minimum amount of acceptable resources and a width.

func (*Requirements) Add

func (r *Requirements) Add(s Requirements)

Add adds the provided requirements s to the requirements r. R's minimum requirements are set to the larger of the two; the two widths are added.

func (*Requirements) AddParallel

func (r *Requirements) AddParallel(s Resources)

AddParallel adds the provided resources s to the requirements, and also increases the requirement's width by one.

func (*Requirements) AddSerial

func (r *Requirements) AddSerial(s Resources)

AddSerial adds the provided resources s to the requirements.

func (Requirements) Equal

func (r Requirements) Equal(s Requirements) bool

Equal reports whether r and s represent the same requirements.

func (*Requirements) Max

func (r *Requirements) Max() Resources

Max is the maximum amount of resources represented by this resource request.

func (Requirements) String

func (r Requirements) String() string

String renders a human-readable representation of r.

func (*Requirements) Wide

func (r *Requirements) Wide() bool

Wide returns whether these requirements represent a wide resource request.

type Reserve

type Reserve Resources

Reserve adds resources to the flow's reservation.

type Resources

type Resources map[string]float64

Resources describes a set of labeled resources. Each resource is described by a string label and assigned a value. The zero value of Resources represents the resources with zeros for all labels.

func (*Resources) Add

func (r *Resources) Add(x, y Resources) *Resources

Add sets r to the sum x[key]+y[key] for all keys and returns r.

func (Resources) Available

func (r Resources) Available(s Resources) bool

Available tells if s resources are available from r.

func (Resources) Equal

func (r Resources) Equal(s Resources) bool

Equal tells whether the resources r and s are equal in all dimensions of both r and s.

func (*Resources) Max

func (r *Resources) Max(x, y Resources) *Resources

Max sets r to the maximum max(x[key], y[key]) for all keys and returns r.

func (*Resources) Min

func (r *Resources) Min(x, y Resources) *Resources

Min sets r to the minimum min(x[key], y[key]) for all keys and returns r.

func (*Resources) Scale

func (r *Resources) Scale(s Resources, factor float64) *Resources

Scale sets r to the scaled resources s[key]*factor for all keys and returns r.

func (Resources) ScaledDistance

func (r Resources) ScaledDistance(u Resources) float64

ScaledDistance returns the distance between two resources computed as a sum of the differences in memory, cpu and disk with some predefined scaling.

func (*Resources) Set

func (r *Resources) Set(s Resources) *Resources

Set sets r[key]=s[key] for all keys and returns r.

func (Resources) String

func (r Resources) String() string

String renders a Resources. All nonzero-valued labels are included; mem, cpu, and disk are always included regardless of their value.

func (*Resources) Sub

func (r *Resources) Sub(x, y Resources) *Resources

Sub sets r to the difference x[key]-y[key] for all keys and returns r.

type Result

type Result struct {
	// Fileset is the fileset produced by an exec.
	Fileset Fileset `json:",omitempty"`

	// Err is error produced by an exec.
	Err *errors.Error `json:",omitempty"`
}

Result is the result of an exec.

func (Result) Equal

func (r Result) Equal(s Result) bool

Equal tells whether r is equal to s.

func (Result) Short

func (r Result) Short() string

Short renders an abbreviated human-readable string of this result.

func (Result) String

func (r Result) String() string

String renders a human-readable string of this result.

type Stealer

type Stealer struct {
	// contains filtered or unexported fields
}

A Stealer coordinates work stealing with an Eval. A Stealer instance is obtained by (*Eval).Stealer.

func (*Stealer) Admit

func (s *Stealer) Admit(max Resources) <-chan *Flow

Admit returns a channel that will return a stolen Flow node that makes use of at most max resources. Only one Admit can be active at a time: if Admit is called while another is outstanding, the first Admit is cancelled, closing its channel.

func (*Stealer) Close

func (s *Stealer) Close()

Close discards the Stealer and returns any potential pending Flows. Admit should not be called after Close.

func (*Stealer) Return

func (s *Stealer) Return(f *Flow)

Return returns a stolen Flow to the evaluator.

type Transferer

type Transferer interface {
	// Transfer transfers a set of files from the src to the dst
	// repository. A transfer manager may apply policies (e.g., rate
	// limits and concurrency limits) to these transfers.
	Transfer(ctx context.Context, dst, src Repository, files ...File) error

	NeedTransfer(ctx context.Context, dst Repository, files ...File) ([]File, error)
}

Transferer defines an interface used for management of transfers between multiple repositories.

type Unreserve

type Unreserve Resources

Unreserve subtracts resources from the flow's reservation.

type Value

type Value struct{ Value values.T }

Value is an argument to (*Eval).Mutate to indicate a set-value mutation.

type WaitGroup

type WaitGroup interface {
	Add(int)
	Done()
}

WaitGroup defines a subset of sync.WaitGroup's interface for use with Context.

Notes

Bugs

  • Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.

Directories

Path Synopsis
Package assoc defines data types for associative maps used within Reflow.
Package assoc defines data types for associative maps used within Reflow.
dydbassoc
Package dydbassoc implements an assoc.Assoc based on AWS's DynamoDB.
Package dydbassoc implements an assoc.Assoc based on AWS's DynamoDB.
Package batch implements support for running batches of reflow (stateful) evaluations.
Package batch implements support for running batches of reflow (stateful) evaluations.
cmd
reflowlet
Reflowlet is the agent process that is run on nodes in a Reflow cluster.
Reflowlet is the agent process that is run on nodes in a Reflow cluster.
Package config defines an interface for configuring a Reflow instance.
Package config defines an interface for configuring a Reflow instance.
all
Package all imports all standard configuration providers in Reflow.
Package all imports all standard configuration providers in Reflow.
awsenvconfig
Package awsenvconfig configures AWS credentials to be derived from the user's environment.
Package awsenvconfig configures AWS credentials to be derived from the user's environment.
ec2metadataconfig
Package ec2config defines and registers configuration providers using Amazon's EC2 metadata service.
Package ec2config defines and registers configuration providers using Amazon's EC2 metadata service.
httpscaconfig
Package httpscaconfig defines a configuration provider named "httpsca" which can be used to configure HTTPS certificates via an on-disk certificate authority.
Package httpscaconfig defines a configuration provider named "httpsca" which can be used to configure HTTPS certificates via an on-disk certificate authority.
httpsconfig
Package httpsconfig defines a configuration provider named "file" which can be used to configure HTTPS certificates.
Package httpsconfig defines a configuration provider named "file" which can be used to configure HTTPS certificates.
s3config
Package s3config defines a configuration provider named "s3" which can be used to configure S3-based caches.
Package s3config defines a configuration provider named "s3" which can be used to configure S3-based caches.
Package ec2cluster implements support for maintaining elastic clusters of Reflow instances on EC2.
Package ec2cluster implements support for maintaining elastic clusters of Reflow instances on EC2.
Package errors provides a standard error definition for use in Reflow.
Package errors provides a standard error definition for use in Reflow.
internal
ec2authenticator
Package ec2authenticator implements Docker repository authentication for ECR using an AWS SDK session and a root.
Package ec2authenticator implements Docker repository authentication for ECR using an AWS SDK session and a root.
ecrauth
Package ecrauth provides an interface and utilities for authenticating AWS EC2 ECR Docker repositories.
Package ecrauth provides an interface and utilities for authenticating AWS EC2 ECR Docker repositories.
fs
scanner
Package scanner provides a scanner and tokenizer for UTF-8-encoded text.
Package scanner provides a scanner and tokenizer for UTF-8-encoded text.
status
Package status provides facilities for reporting statuses from a number of tasks working towards a common goal.
Package status provides facilities for reporting statuses from a number of tasks working towards a common goal.
wg
Package wg implements a channel-enabled WaitGroup.
Package wg implements a channel-enabled WaitGroup.
Package lang implements the reflow language.
Package lang implements the reflow language.
pooltest
Package pooltest tests pools.
Package pooltest tests pools.
testutil
Package testutil provides utilities for testing code that involves pools.
Package testutil provides utilities for testing code that involves pools.
Package log implements leveling and teeing on top of Go's standard logs package.
Package log implements leveling and teeing on top of Go's standard logs package.
Package pool implements resource pools for reflow.
Package pool implements resource pools for reflow.
client
Package client implements a remoting client for reflow pools.
Package client implements a remoting client for reflow pools.
server
Package server exposes a pool implementation for remote access.
Package server exposes a pool implementation for remote access.
Package repository provides common ways to dial reflow.Repository implementations; it also provides some common utilities for working with repositories.
Package repository provides common ways to dial reflow.Repository implementations; it also provides some common utilities for working with repositories.
client
Package client implements repository REST client.
Package client implements repository REST client.
file
Package file implements a filesystem-backed repository.
Package file implements a filesystem-backed repository.
s3
Package s3 implements an S3-backed repository.
Package s3 implements an S3-backed repository.
server
Package server implements a Repository REST server.
Package server implements a Repository REST server.
Package rest provides a framework for serving and accessing hierarchical resource-based APIs.
Package rest provides a framework for serving and accessing hierarchical resource-based APIs.
Package syntax implements the Reflow language.
Package syntax implements the Reflow language.
test
flow
Package flow contains a number of constructors for Flow nodes that are convenient for testing.
Package flow contains a number of constructors for Flow nodes that are convenient for testing.
testutil
Package testutil contains various utilities for testing Reflow functionality.
Package testutil contains various utilities for testing Reflow functionality.
Package tool implements the reflow command.
Package tool implements the reflow command.
Package trace provides a tracing system for Reflow events.
Package trace provides a tracing system for Reflow events.
Package types contains data structures and algorithms for dealing with value types in Reflow.
Package types contains data structures and algorithms for dealing with value types in Reflow.
Package values defines data structures for representing (runtime) values in Reflow.
Package values defines data structures for representing (runtime) values in Reflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL