Documentation ¶
Index ¶
- Constants
- Variables
- func AddMinKNOWReportAnnotation(obj *ex.DataObject, report MinKNOWReport) error
- func CalculateFileMD5(path FilePath) (md5sum []byte, err error)
- func CompressFile(path FilePath) (err error)
- func CreateMD5ChecksumFile(path FilePath) error
- func CreateOrUpdateMD5ChecksumFile(path FilePath) error
- func DefaultIgnorePatterns(dataDir string) ([]string, error)
- func DoNothing(path FilePath) error
- func DoProcessFiles(paths <-chan FilePath, workPlan WorkPlan, maxThreads int) error
- func FindFiles(ctx context.Context, root string, pred FilePredicate, pruneFn FilePredicate) (<-chan FilePath, <-chan error)
- func FindFilesInterval(ctx context.Context, root string, pred FilePredicate, pruneFn FilePredicate, ...) (<-chan FilePath, <-chan error)
- func HasChecksumFile(path FilePath) (bool, error)
- func HasCompressedVersion(path FilePath) (bool, error)
- func HasStaleChecksumFile(path FilePath) (bool, error)
- func HasValidReportAnnotation(obj *ex.DataObject, report MinKNOWReport) (bool, error)
- func IsCompressed(path FilePath) (bool, error)
- func IsDir(path FilePath) (bool, error)
- func IsFalse(_ FilePath) (bool, error)
- func IsMinKNOWReport(path FilePath) (bool, error)
- func IsMinKNOWRunDir(path FilePath) (bool, error)
- func IsMinKNOWRunID(name string) bool
- func IsRegular(path FilePath) (bool, error)
- func IsTrue(_ FilePath) (bool, error)
- func MergeErrorChannels(x <-chan error, y <-chan error) chan error
- func MergeFileChannels(x <-chan FilePath, y <-chan FilePath) chan FilePath
- func ProcessFiles(cancelCtx context.Context, params ProcessParams) error
- func ReadMD5ChecksumFile(path FilePath) (md5sum []byte, err error)
- func RemoveDirectory(path FilePath) error
- func RemoveFile(path FilePath) error
- func RemoveMD5ChecksumFile(path FilePath) error
- func SortFilePaths(paths []FilePath)
- func UpdateMD5ChecksumFile(path FilePath) error
- func WatchFiles(cancelCtx context.Context, root string, pred FilePredicate, ...) (<-chan FilePath, <-chan error)
- type FilePath
- type FilePredicate
- func And(predicates ...FilePredicate) FilePredicate
- func MakeDefaultPruneFunc(dataDir string) (FilePredicate, error)
- func MakeGlobPruneFunc(patterns []string) (FilePredicate, error)
- func MakeIsAnnotated(localBase string, remoteBase string, cPool *ex.ClientPool) FilePredicate
- func MakeIsCopied(localBase string, remoteBase string, cPool *ex.ClientPool) FilePredicate
- func MakeIsOlderThan(duration time.Duration) FilePredicate
- func MakeRequiresRemoval(duration time.Duration) FilePredicate
- func Not(predicate FilePredicate) FilePredicate
- func Or(predicates ...FilePredicate) FilePredicate
- type FileResource
- type MinKNOWReport
- type ProcessParams
- type Work
- type WorkArr
- type WorkFunc
- type WorkMatch
- type WorkPlan
- func ArchiveFilesWorkPlan(localBase string, remoteBase string, cPool *ex.ClientPool, deleteLocal bool, ...) WorkPlan
- func ChecksumStateWorkPlan(countFunc WorkFunc) WorkPlan
- func CreateChecksumWorkPlan() WorkPlan
- func DryRunWorkPlan() WorkPlan
- func RemoveDirectoryWorkPlan(duration time.Duration) WorkPlan
Constants ¶
const BAISuffix string = "bai"
const BAMSuffix string = "bam"
const BEDSuffix string = "bed"
const CSVSuffix string = "csv"
const DefaultCleanupDelay = 14 * 24 * time.Hour
const DefaultSweepInterval = 5 * time.Minute
const Fast5Suffix string = "fast5"
const FastqSuffix string = "fastq"
const GzipSuffix string = "gz"
const HTMLSuffix string = "html"
const JSONSuffix string = "json"
const MD5Suffix string = "md5" // The recognised suffix for MD5 checksum files
const MarkdownSuffix string = "md"
const MinCleanupDelay = 60 * time.Second
const MinSweepInterval = 30 * time.Second
const OxfordNanoporeNamespace string = "ont"
const PDFSuffix string = "pdf"
const POD5Suffix string = "pod5"
const TSVSuffix string = "tsv"
const TxtSuffix string = "txt"
Variables ¶
var HasValidChecksumFile = Not(HasStaleChecksumFile)
var IsBAI = makeNoCompFilePredicate(baiRegex)
IsBAI returns true if path matches the recognised BAI file pattern.
var IsBAM = makeNoCompFilePredicate(bamRegex)
IsBAM returns true if path matches the recognised BAM file pattern.
var IsBED = makeCompFilePredicate(bedRegex)
IsBED returns true if path matches the recognised BED file pattern. Supports compressed versions.
var IsCSV = makeCompFilePredicate(csvRegex)
IsCSV returns true if path matches the recognised CSV file pattern. Supports compressed versions.
var IsFast5 = makeNoCompFilePredicate(fast5Regex)
IsFast5 returns true if path matches the recognised fast5 pattern.
var IsFastq = makeCompFilePredicate(fastqRegex)
IsFastq returns true if path matches the recognised fastq pattern. Supports compressed versions.
var IsHTML = makeCompFilePredicate(htmlRegex)
IsHTML returns true if path matches the recognised HTML file pattern. Supports compressed versions.
var IsJSON = makeCompFilePredicate(jsonRegex)
IsJSON returns true if path matches the recognised JSON file pattern. Supports compressed versions.
var IsMarkdown = makeCompFilePredicate(markdownRegex)
IsMarkdown returns true if path matches the recognised markdown file pattern. Supports compressed versions.
var IsPDF = makeNoCompFilePredicate(pdfRegex)
IsPDF returns true if path matches the recognised PDF file pattern.
var IsPOD5 = makeNoCompFilePredicate(pod5Regex)
IsPOD5 returns true if path matches the recognised pod5 pattern.
var IsTSV = makeCompFilePredicate(tsvRegex)
IsTSV returns true if path matches the recognised TSV file pattern. Supports compressed versions.
var IsTxt = makeCompFilePredicate(txtRegex)
IsTxt returns true if path matches the recognised text file pattern. Supports compressed versions.
var MinKNOWIgnore = []string{
"core-dump-db",
"devices",
"epi2me_inside",
"intermediate",
"Install_logs",
"logs",
"lost+found",
"network",
"npg",
"persistence",
"pings",
"queued_reads",
"reads",
"reports",
"user_scripts",
}
Directory names within the root MinKNOW data directory (typically /data) that we will ignore by default.
var MinKNOWRunIDRegex = regexp.MustCompile(`^\d+_\d+_\S+_[A-Za-z0-9]+_[A-Za-z0-9]+$`)
MinKNOWRunIDRegex matches the run ID of MinKNOW c. August 2019 for GridION and PromethION i.e. of the form:
20190701_1522_GA10000_FAK83493_3bba1763
var RequiresAnnotation = IsMinKNOWReport
var RequiresChecksum = And( IsRegular, RequiresCopying, Or(Not(HasChecksumFile), HasStaleChecksumFile))
RequiresChecksum returns true if the argument is a regular file that is recognised as a checksum target and either has no checksum file, or has a checksum file that is stale.
var RequiresCompression = And( Or( IsBED, IsCSV, IsFastq, IsJSON, IsTxt, ), Not(IsCompressed), Not(HasCompressedVersion))
var RequiresCopying = Or( And(IsBED, IsCompressed), And(IsCSV, IsCompressed), And(IsFastq, IsCompressed), And(IsJSON, IsCompressed), And(IsTxt, IsCompressed), IsBAI, IsBAM, IsFast5, IsHTML, IsMarkdown, IsPDF, IsPOD5, IsTSV, )
var Version string
Version is the valet version reported by the --version CLI option.
Functions ¶
func AddMinKNOWReportAnnotation ¶
func AddMinKNOWReportAnnotation(obj *ex.DataObject, report MinKNOWReport) error
AddMinKNOWReportAnnotation adds annotation from report to the parent collection of the archived report obj.
func CalculateFileMD5 ¶
CalculateFileMD5 returns the MD5 checksum of the file at path.
func CompressFile ¶
CompressFile compresses the target file using gzip. While doing so, it tee's both the uncompressed data and compressed data to make MD5 checksums of these and writes checksum files for the original, uncompressed file and the new compressed file.
func CreateMD5ChecksumFile ¶
CreateMD5ChecksumFile calculates a checksum file for the data file at path with contents as a hex-encoded string. It raises an error if the checksum file already exists.
func CreateOrUpdateMD5ChecksumFile ¶
CreateOrUpdateMD5ChecksumFile calculates a checksum for the file at path and writes it to a new checksum file as a hex-encoded string. This function only operates when there is no existing checksum file, or when the existing checksum file is stale (its last modified time is older than the last modified time of path). If the checksum file is stale this function deletes it before creating a new one.
func DefaultIgnorePatterns ¶
DefaultIgnorePatterns returns glob patterns matching directories in the root MinKNOW data directory that will be ignored by default.
func DoNothing ¶
DoNothing does nothing apart from log at debug level that it has been called. It is used to implement dry-run operations.
func DoProcessFiles ¶
DoProcessFiles operates by applying workPlan to each FilePath in the paths channel. Each WorkPlan is executed in its own goroutine, with no more than maxThreads goroutines running in parallel.
This function keeps track of the FilePaths being worked on. If a FilePath is passed in subsequently, but before existing work has finished, it is skipped.
If any WorkPlan encounters an error, the error is logged and counted. When DoProcessFiles exits, it will return an error if the error count across all the WorkPlans was greater than 0.
func FindFiles ¶
func FindFiles( ctx context.Context, root string, pred FilePredicate, pruneFn FilePredicate) (<-chan FilePath, <-chan error)
FindFiles walks the directory tree under root recursively, except into directories where pruneFn returns filepath.SkipDir, which prunes the directory traversal at that point.
Files encountered are reported to the caller on the first returned (output) channel and any errors on the second (error) channel. Files are filtered by testing with the predicate pred; only where the predicate returns true are the files sent to the channel.
The walking goroutine will continue to run until the directory tree is fully traversed, or the cancel function of cancelCtx is called. Either will close the output and error channels and exit the goroutine cleanly.
func FindFilesInterval ¶
func FindFilesInterval( ctx context.Context, root string, pred FilePredicate, pruneFn FilePredicate, interval time.Duration) (<-chan FilePath, <-chan error)
FindFilesInterval executes FindFiles every interval seconds. Aside from having the additional intervals parameter, it behaves in the same way as FindFiles.
func HasChecksumFile ¶
HasChecksumFile returns true if the argument has a corresponding checksum file.
func HasCompressedVersion ¶
HasCompressedVersion returns true if the argument is not a compressed file and has a corresponding compressed version.
func HasStaleChecksumFile ¶
HasStaleChecksumFile returns true if the argument has a checksum file with a timestamp older than the argument file i.e. the argument file appears to have been modified since the checksum file was last modified.
If the argument path does not exist, or has no checksum file, this function returns false.
func HasValidReportAnnotation ¶
func HasValidReportAnnotation(obj *ex.DataObject, report MinKNOWReport) (bool, error)
HasValidReportAnnotation returns true if the metadata in report, which has been archived as obj, is up-to-date in the remote archive.
func IsCompressed ¶
IsCompressed returns true if the path matches the recognised compressed file pattern (simply *.gz at the moment).
func IsMinKNOWReport ¶
IsMinKNOWReport returns true if path is a MinKNOW run report file. This file is Markdown that contains a section of JSON metadata describing details of the run.
func IsMinKNOWRunDir ¶
IsMinKNOWRunDir returns true if path is a MinKNOW run directory. This type of directory is located two levels down from the data directory, within an experiment and a sample directory and its name is a MinKNOW run identifier.
func IsMinKNOWRunID ¶
IsMinKNOWRunID returns true if name is in the form of a MinKNOW run identifier (matches MinKNOWRunIDRegex).
func MergeErrorChannels ¶
MergeErrorChannels merges values from its two input channels x and y for as long as at least one of them is open. One both x and y have been closed, the channel returned will be closed by this function. The caller should not close the returned channel themselves.
func MergeFileChannels ¶
MergeFileChannels merges values from its two input channels x and y for as long as at least one of them is open. One both x and y have been closed, the channel returned will be closed by this function. The caller should not close the returned channel themselves.
func ProcessFiles ¶
func ProcessFiles(cancelCtx context.Context, params ProcessParams) error
ProcessFiles detects files to work on, dispatches any files found to suitable work functions and monitors any errors that occur during the detection and processing steps. The function will continue to run until cancelled.
Errors that occur in detection are logged as warnings, but do not cause this function to return an error itself. Error that occur during processing are counted. If when cancelled, this function has counted any processing errors, it will return an error itself.
func ReadMD5ChecksumFile ¶
ReadMD5ChecksumFile reads and returns a checksum from a local file created by CreateMD5ChecksumFile. It trims any whitespace (including any newline) from the beginning and end of the checksum.
func RemoveDirectory ¶ added in v1.7.0
RemoveDirectory removes directories under a root, recursively. It skips any that contain files, or whose descendants contain files.
func RemoveMD5ChecksumFile ¶
RemoveMD5ChecksumFile removes the MD5 checksum file corresponding to path. If the file does not exist by the time removal is attempted, no error is raised.
func UpdateMD5ChecksumFile ¶
UpdateMD5ChecksumFile removes the existing checksum file, if it exists and creates a new one.
func WatchFiles ¶
func WatchFiles( cancelCtx context.Context, root string, pred FilePredicate, pruneFn FilePredicate) (<-chan FilePath, <-chan error)
WatchFiles reports filesystem events on the directories below root. Watches are set up recursively on every directory, except those for which pruneFn returns filepath.SkipDir, which prunes the directory traversal at that point. WatchFiles uses an internal event handler to add watches to any new directories added to the tree while is is operating, except those pruned as described.
Events on files are reported to the caller on the first returned (output) channel and any errors on the second (error) channel. Events are filtered by testing the event file with the predicate pred; only where the predicate returns true are the events sent to the channel.
The watching goroutine will continue to run until the cancel function of cancelCtx is called. This will close the output and error channels and exit the goroutine cleanly.
Types ¶
type FilePath ¶
type FilePath struct { FileResource Info os.FileInfo }
FilePath is a FileResource that is on a local filesystem. It represents a file that is present at the time the instance is created.
func NewFilePath ¶
NewFilePath returns a new instance where the path has been cleaned and made absolute and the FileInfo populated by os.Stat. FilePaths should be created using this constructor to ensure that always have a clean, absolute path and populated FileInfo.
func (*FilePath) ChecksumFilename ¶
ChecksumFilename returns the expected path of the checksum file belonging to the path.
func (*FilePath) CompressedFilename ¶
CompressedFilename returns the expected path of the compressed version of this file.
func (*FilePath) UncompressedFilename ¶
UncompressedFilename returns the expected path of the uncompressed version of this file. If the file is not compressed, returns the path of this file.
type FilePredicate ¶
func And ¶
func And(predicates ...FilePredicate) FilePredicate
And returns a predicate that returns true if all its arguments return true, or returns false otherwise.
func MakeDefaultPruneFunc ¶
func MakeDefaultPruneFunc(dataDir string) (FilePredicate, error)
MakeDefaultPruneFunc returns a directory pruning function for MinKNOW data directory dataDir. This will exclude directories matching DefaultIgnorePatterns.
func MakeGlobPruneFunc ¶
func MakeGlobPruneFunc(patterns []string) (FilePredicate, error)
MakeGlobPruneFunc returns a FilePredicate that will return false for any directory matching at least one of the glob pattern arguments. The returned function is intended for use as a pruning function argument to the valet.WatchFiles and valet.FindFiles functions.
func MakeIsAnnotated ¶
func MakeIsAnnotated(localBase string, remoteBase string, cPool *ex.ClientPool) FilePredicate
MakeIsAnnotated returns a predicate that will return true if its argument has had its associated metadata annotated in iRODS, and no errors occur while confirming this.
The criteria for annotated state are:
- The metadata associated with the file has been obtained e.g. parsed from a file.
2. The metadata are annotated in iRODS.
Note that is not testing for the presence of a specific data object e.g. the report file that contained the metadata. That is achieved using the IsCopied predicate.
func MakeIsCopied ¶
func MakeIsCopied(localBase string, remoteBase string, cPool *ex.ClientPool) FilePredicate
MakeIsCopied returns a predicate that will return true if its argument has been successfully copied from localBase to remoteBase, and no errors occur while confirming this.
The criteria for copied state are:
- The file has a valid checksum file (not stale), otherwise there could be no way to test the checksum against the checksum in the archive.
2. The data object exists in the archive.
- The checksum of the data object in the archive matches the expected checksum.
- The data object has metadata under the "md5" key whose value matches the checksum.
func MakeIsOlderThan ¶ added in v1.7.0
func MakeIsOlderThan(duration time.Duration) FilePredicate
MakeIsOlderThan returns a predicate that will return true if its argument is older than the specified duration.
func MakeRequiresRemoval ¶ added in v1.7.0
func MakeRequiresRemoval(duration time.Duration) FilePredicate
MakeRequiresRemoval returns a predicate that will return true if its argument is a run directory that may be removed because it is older than the specified duration.
func Not ¶
func Not(predicate FilePredicate) FilePredicate
Not returns a predicate that returns true if its argument returns false, or returns false otherwise.
func Or ¶
func Or(predicates ...FilePredicate) FilePredicate
Or returns a predicate that returns true if any of its arguments return true, or returns false otherwise.
type FileResource ¶
type FileResource struct {
Location string // Raw URL or file path
}
FileResource is a locatable file.
type MinKNOWReport ¶
type MinKNOWReport struct { Path string // The path of the report DeviceID string `json:"device_id"` // The device ID (flowcell position) DeviceType string `json:"device_type"` // The device type e.g. promethion DistributionVersion string `json:"distribution_version"` // The MinKNOW version FlowcellID string `json:"flow_cell_id"` // The flowcell ID GuppyVersion string `json:"guppy_version"` // The Guppy basecaller version Hostname string `json:"hostname"` // The sequencing instrument hostname ProtocolGroupID string `json:"protocol_group_id"` // The user-supplied experiment name RunID string `json:"run_id"` // The automatically generated run ID SampleID string `json:"sample_id"` // The user-supplied sample ID }
func ParseMinKNOWReport ¶
func ParseMinKNOWReport(path string) (MinKNOWReport, error)
ParseMinKNOWReport parses a file at path and extracts MinKNOW run metadata from it.
func (MinKNOWReport) AsEnhancedMetadata ¶
func (report MinKNOWReport) AsEnhancedMetadata() ([]ex.AVU, error)
AsEnhancedMetadata returns the report as iRODS AVUs. It returns all the AVUs of AsMetadata with some additional members:
The value of 'protocol_group_id' is duplicated under the attribute 'experiment_name'.
The value of 'device_id' is normalized to a position (in the range 1-5 for GridION, representing slot position on the instrument). The device ID may be of the form "GAn0000" or "Xn" (for GridION), where n is the position. The value is added under the attribute 'instrument_slot'
Slot positions are more complex for the PromethION as they are arranged in a grid and therefore have an X and Y position. The PromethION beta and PromethION-24 have different nomenclature.
For the PromethION-24 we are following the column-major order used by ONT's MinKNOW API i.e. 1A - 1H, 2A - 2H, 3A - 3H.
func (MinKNOWReport) AsMetadata ¶
func (report MinKNOWReport) AsMetadata() []ex.AVU
AsMetadata returns the report content as iRODS AVUs.
type ProcessParams ¶
type ProcessParams struct { Root string // The local root directory to work on. MatchFunc FilePredicate // The file selecting predicate. PruneFunc FilePredicate // The local directory tree pruning predicate. Plan WorkPlan // The plan for selected files. SweepInterval time.Duration // The interval between sweeps of the local directory tree. MaxProc int // The maximum number of threads to run. }
type Work ¶
Work describes a function to be executed and the rank of the execution. When there is a choice of Work to be executed, Work with the smallest Rank value (i.e. the highest rank) is performed first. In the case of a tie, either Work may be selected for execution.
type WorkArr ¶
type WorkArr []Work
WorkArr is a series of Work to be executed in ascending rank order.
type WorkFunc ¶
WorkFunc is a worker function used by DoProcessFiles.
func MakeAnnotator ¶
func MakeAnnotator(localBase string, remoteBase string, cPool *ex.ClientPool) WorkFunc
MakeAnnotator returns a WorkFunc that will add to iRODS any annotation associated with local files. Each file passed to the WorkFunc will be examined to see if has associated metadata e.g. it might contain metadata itself, or be somehow linked to some metadata. Any relevant metadata will be copied to iRODS e.g. it might be added to the file's data object in iRODS, or to some other data object or collection.
The capabilities are listed below:
- MinKNOW report files.
The metadata contained in MinKNOW report files is parsed abd added to the collection containing the report data object in iRODS.
func MakeCopier ¶
func MakeCopier(localBase string, remoteBase string, cPool *ex.ClientPool) WorkFunc
MakeCopier returns a WorkFunc capable of copying files to iRODS. Each file passed to the WorkFunc will have its path relative to localBase calculated. This relative path will then be appended to remoteBase to give the full destination path in iRODS. E.g.
localBase = /a/b/c remoteBase = /zone1/x/y
file path = /a/b/c/d/e/f.fast5
therefore:
relative path = ./d/e/f.txt destination path = /zone1/x/y/d/e/f.fast5
Any leading iRODS collections will be created by the WorkFunc as required.
WorkFunc prerequisites: CreateOrUpdateMD5ChecksumFile
i.e. files for copying are expected to have an MD5 checksum file.
type WorkMatch ¶
type WorkMatch struct {
// contains filtered or unexported fields
}
WorkMatch is an association between a FilePredicate and Work to be done. If the predicate returns true then the work will be done.
type WorkPlan ¶
type WorkPlan []WorkMatch
WorkPlan is a slice of WorkMatches. Where more than one Work is matched, they will be done in rank order.
func ArchiveFilesWorkPlan ¶
func ArchiveFilesWorkPlan(localBase string, remoteBase string, cPool *ex.ClientPool, deleteLocal bool, cleanup time.Duration) WorkPlan
ArchiveFilesWorkPlan copies files and metadata to iRODS via the following steps:
1. Compresses local files where needed 2. Creates or updated checksum files 3. Copies files to iRODS 4. Annotates metadata in iRODS
Additional steps are done if deleteLocal is true:
5. Uncompressed copies of local compressed files are removed 6. Successfully archived local files are removed 7. Redundant local checksum files are removed 8. Empty run directories are removed, after a delay
func ChecksumStateWorkPlan ¶
ChecksumStateWorkPlan counts files that do not have a checksum.
func CreateChecksumWorkPlan ¶
func CreateChecksumWorkPlan() WorkPlan
CreateChecksumWorkPlan manages checksum files.
func DryRunWorkPlan ¶
func DryRunWorkPlan() WorkPlan
DryRunWorkPlan matches any FilePath and does DoNothing Work.
func RemoveDirectoryWorkPlan ¶ added in v1.7.0
RemoveDirectoryWorkPlan removes empty work directories that are older than the specified duration.