---
title: "Lab 7b: Efficient and Parallel Evaluation"
output:
BiocStyle::html_document:
toc: true
number_sections: false
vignette: >
% \VignetteIndexEntry{Lab 7b: Efficient and Parallel Evaluation}
% \VignetteEngine{knitr::rmarkdown}
\usepackage[utf8]{inputenc}
---
```{r style, echo=FALSE, results='asis'}
BiocStyle::markdown()
```
```{r setup, echo=FALSE, results='hide'}
library(knitr)
opts_chunk$set(cache=TRUE, error=FALSE)
```
Original Authors: Valerie Obenchain, Martin Morgan
Presenting Author: Martin Morgan (martin.morgan@roswellpark.org)
Date: 15 July, 2016
# Efficient _R_ code
The goal of this section is to highlight practices for writing correct, robust
and efficient R code.
## Priorities
1. Correct: consistent with hand-worked examples (`identical()`, `all.equal()`)
2. Robust: supports realistic inputs, e.g., 0-length vectors, `NA`
values, ...
3. Simple: easy to understand next month; easy to describe what it
does to a colleague; easy to spot logical errors; easy to enhance.
4. Fast, or at least reasonable given the speed of modern computers.
## Strategies
1. Profile
- _Look_ at the script to understand in general terms what it is doing.
- _Step_ through the code to see how it is executed, and to gain an
understanding of the speed of each line.
- _Time_ evaluation of select lines or simple chunks of code with
`system.time()` or the `r CRANpkg("microbenchmark")` package.
- _Profile_ the code with a tool that indicates how much time is
spent in each function call or line -- the built-in `Rprof()`
function, or packages such as `r CRANpkg("lineprof")` or
`r CRANpkg("aprof")`
2. Vectorize -- operate on vectors, rather than explicit loops
```{r vectorize}
x <- 1:10
log(x) ## NOT for (i in seq_along) x[i] <- log(x[i])
```
3. Pre-allocate memory, then fill in the result
```{r pre-allocate}
result <- numeric(10)
result[1] <- runif(1)
for (i in 2:length(result))
result[i] <- runif(1) * result[i - 1]
result
```
4. Hoist common sub-expressions outside of repeated calculations, so
that the sub-expression is only calculated once
- Simple, e.g., 'hoist' constant multiplications from a `for` loop
- Higher-level, e.g., use `lm.fit()` rather than repeatedly fitting
the same design matrix.
5. Re-use existing, tested code
- Efficient implementations of common operations -- `tabulate()`,
`rowSums()` and friends, `%in%`, ...
- Efficient domain-specific implementations, e.g.,
`r Biocpkg("snpStats")` for GWAS linear models; `r Biocpkg("limma")`
for microarray linear models; `r Biocpkg("edgeR")`,
`r Biocpkg("DESeq2")` for negative binomial GLMs relevant to
RNASeq.
- Reuse others' work -- `r Biocpkg("DESeq2")`,
`r Biocpkg("GenomicRanges")`, `r Biocpkg("Biostrings")`, ...,
`r CRANpkg("dplyr")`, `r CRANpkg("data.table")`, `r CRANpkg("Rcpp")`
Back to top
### Case Study: Pre-allocate and vectorize
Here's an obviously inefficient function:
```{r inefficient}
f0 <- function(n, a=2) {
## stopifnot(is.integer(n) && (length(n) == 1) &&
## !is.na(n) && (n > 0))
result <- numeric()
for (i in seq_len(n))
result[[i]] <- a * log(i)
result
}
```
Use `system.time()` to investigate how this algorithm scales with `n`,
focusing on elapsed time.
```{r system-time}
system.time(f0(10000))
n <- 1000 * seq(1, 20, 2)
t <- sapply(n, function(i) system.time(f0(i))[[3]])
plot(t ~ n, type="b")
```
Remember the current 'correct' value, and an approximate time
```{r correct-init}
n <- 10000
system.time(expected <- f0(n))
head(expected)
```
Revise the function to hoist the common multiplier, `a`, out of the
loop. Make sure the result of the 'optimization' and the original
calculation are the same. Use the `r CRANpkg("microbenchmark")`
package to compare the two versions
```{r hoist}
f1 <- function(n, a=2) {
result <- numeric()
for (i in seq_len(n))
result[[i]] <- log(i)
a * result
}
identical(expected, f1(n))
library(microbenchmark)
microbenchmark(f0(n), f1(n), times=5)
```
Adopt a 'pre-allocate and fill' strategy
```{r preallocate-and-fill}
f2 <- function(n, a=2) {
result <- numeric(n)
for (i in seq_len(n))
result[[i]] <- log(i)
a * result
}
identical(expected, f2(n))
microbenchmark(f0(n), f2(n), times=5)
```
Use an `*apply()` function to avoid having to explicitly pre-allocate,
and make opportunities for vectorization more apparent.
```{r use-apply}
f3 <- function(n, a=2)
a * sapply(seq_len(n), log)
identical(expected, f3(n))
microbenchmark(f0(n), f2(n), f3(n), times=10)
```
Now that the code is presented in a single line, it is apparent that
it could be easily vectorized. Seize the opportunity to vectorize it:
```{r use-vectorize}
f4 <- function(n, a=2)
a * log(seq_len(n))
identical(expected, f4(n))
microbenchmark(f0(n), f3(n), f4(n), times=10)
```
`f4()` definitely seems to be the winner. How does it scale with `n`?
(Repeat several times)
```{r vectorized-scale}
n <- 10 ^ (5:8) # 100x larger than f0
t <- sapply(n, function(i) system.time(f4(i))[[3]])
plot(t ~ n, log="xy", type="b")
```
Any explanations for the different pattern of response?
Lessons learned:
1. Vectorizing offers a huge improvement over iteration
2. Pre-allocate-and-fill is very helpful when explicit iteration is
required.
3. `*apply()` functions help avoid need for explicit pre-allocation
and make opportunities for vectorization more apparent. This may
come at a small performance cost, but is generally worth it
4. Hoisting common sub-expressions can be helpful for improving
performance when explicit iteration is required.
Back to top
# Iteration and restriction to manage memory
When data are too large to fit in memory, we can iterate through files in
chunks or subset the data by fields or genomic positions.
Iteration
- Chunk-wise
- `open()`, read chunk(s), `close()`.
- e.g., `yieldSize` argument to `Rsamtools::BamFile()`
- Framework: `GenomicFiles::reduceByYield()`
Restriction
- Limit to columns and / or rows of interest
- Exploit domain-specific formats
-- BAM files and `Rsamtools::ScanBamParam()`
-- BAM files and `Rsamtools::PileupParam()`
-- VCF files and `VariantAnnotation::ScanVcfParam()`
- Use a data base
## Exercise: Counting overlaps
Iterate through files: `GenomicFiles::reduceByYield()
(1) yield a chunk
(2) map from the input chunk to a possibly transformed representation
(3) reduce mapped chunks
```{r reduceByYield-setup}
suppressPackageStartupMessages({
library(GenomicFiles)
library(GenomicAlignments)
library(Rsamtools)
library(TxDb.Hsapiens.UCSC.hg19.knownGene)
})
yield <- # how to input the next chunk of data
function(X, ...)
{
readGAlignments(X)
}
map <- # what to do to each chunk
function(VALUE, ..., roi)
{
olaps <- findOverlaps(VALUE, roi, type="within", ignore.strand=TRUE)
count <- tabulate(subjectHits(olaps), subjectLength(olaps))
setNames(count, names(roi))
}
reduce <- `+` # how to combine mapped chunks
```
Improvement: "yield factory" to keep track of how many records input
```{r yieldFactory}
yieldFactory <- # return a function with local state
function()
{
n_records <- 0L
function(X, ...) {
aln <- readGAlignments(X)
n_records <<- n_records + length(aln)
message(n_records)
aln
}
}
```
Regions of interest, named like the chromosomes in the bam file.
```{r count-overlaps-roi, eval=FALSE}
exByTx <- exonsBy(TxDb.Hsapiens.UCSC.hg19.knownGene, "tx")
fl <- "/home/ubuntu/data/vobencha/LargeData/srarchive/hg19_alias.tab"
map0 <- read.delim(fl, header=FALSE, stringsAsFactors=FALSE)
seqlevels(exByTx, force=TRUE) <- setNames(map0$V1, map0$V2)
```
A function to iterate through a bam file.
```{r count-overlaps, eval=FALSE}
count1 <- function(filename, roi) {
message(filename)
## Create and open BAM file
bf <- BamFile(filename, yieldSize=1000000)
reduceByYield(bf, yieldFactory(), map, reduce, roi=roi)
}
```
In action
```{r count-overlaps-doit, eval=FALSE}
bam <- "/home/ubuntu/data/vobencha/LargeData/srarchive/SRR1039508_sorted.bam"
count <- count1(bam, exByTx)
```
Back to top
# File management
## File classes
| Type | Example use | Name | Package |
|-------|-----------------------|-----------------------------|----------------------------------|
| .bed | Range annotations | `BedFile()` | `r Biocpkg("rtracklayer")` |
| .wig | Coverage | `WigFile()`, `BigWigFile()` | `r Biocpkg("rtracklayer")` |
| .gtf | Transcript models | `GTFFile()` | `r Biocpkg("rtracklayer")` |
| | | `makeTxDbFromGFF()` | `r Biocpkg("GenomicFeatures")` |
| .2bit | Genomic Sequence | `TwoBitFile()` | `r Biocpkg("rtracklayer")` |
| .fastq | Reads & qualities | `FastqFile()` | `r Biocpkg("ShortRead")` |
| .bam | Aligned reads | `BamFile()` | `r Biocpkg("Rsamtools")` |
| .tbx | Indexed tab-delimited | `TabixFile()` | `r Biocpkg("Rsamtools")` |
| .vcf | Variant calls | `VcfFile()` | `r Biocpkg("VariantAnnotation")` |
```{r rtracklayer-file-classes}
## rtracklayer menagerie
suppressPackageStartupMessages(library(rtracklayer))
names(getClass("RTLFile")@subclasses)
```
Notes
- Not a consistent interface
- `open()`, `close()`, `import()` / `yield()` / `read*()`
- Some: selective import via index (e.g., `.bai`, bam index);
selection ('columns'); restriction ('rows')
## Managing a collection of files
`*FileList()` classes
- `reduceByYield()` -- iterate through a single large file
- `bplapply()` (`r Biocpkg("BiocParallel")`) -- perform independent
operations on several files, in parallel
`GenomicFiles()`
- 'rows' as genomic range restrictions, 'columns' as files
- Each row x column is a _map_ from file data to useful representation
in _R_
- `reduceByRange()`, `reduceByFile()`: collapse maps into summary
representation
- see the GenomicFiles vignette
[Figure 1](http://bioconductor.org/packages/devel/bioc/vignettes/GenomicFiles/inst/doc/GenomicFiles.pdf)
`VcfStack()`
- Common practice of spliting VCF files into one-per-chromosome.
- Easy way to treat as a signle entitty
- see `?VcfStack`
Back to top
# Parallel evaluation
A couple of caveats -
Iteration / restriction techniques keep the memory requirements under control
while parallel evaluation distributes computational load across nodes.
Keep in mind that parallel computations are still restricted by the amount of
memory available on each node.
There is overhead in setting up and tearing down a cluster and more so when
computing in distributed memory. For small calculations, the parallel
overhead may outweigh the benefits with no improvement in performance.
Jobs that benefit the most from parallel execution are CPU-intensive
and operate on data chunks that fits into memory.
## BiocParallel
`r Biocpkg("BiocParallel")` provides a standardized interface for parallel
evaluation and supports the major parallel computing styles: forks and
processes on a single computer, ad hoc clusters, batch schedulers and cloud
computing. By default, `r Biocpkg("BiocParallel")` chooses a parallel back-end
appropriate for the OS and is supported across Unix, Mac and Windows.
General ideas:
- Use `bplapply()` instead of `lapply()`
- Argument `BPPARAM` influences how parallel evaluation occurs
- `MulticoreParam()`: threads on a single (non-Windows) machine
- `SnowParam()`: processes on the same or different machines
- `BatchJobsParam()`: resource scheduler on a cluster
### Exercise: Sleeping serially and in parallel
This small example motivates the use of parallel execution and demonstrates how
`bplapply()` can be a drop in for `lapply`.
Use `system.time()` to explore how long this takes to execute as `n`
increases from 1 to 10. Use `identical()` and
`r CRANpkg("microbenchmark")` to compare alternatives `f0()` and `f1()`
for both correctness and performance.
`fun` sleeps for 1 second, then returns `i`.
```{r parallel-sleep}
library(BiocParallel)
fun <- function(i) {
Sys.sleep(1)
i
}
## serial
f0 <- function(n)
lapply(seq_len(n), fun)
## parallel
f1 <- function(n)
bplapply(seq_len(n), fun)
```
Back to top
### Exercise: error handling and `BPREDO`
`r Biocpkg("BiocParallel")` "catches and returns" errors along with
successful results. This exercise demonstrates how to access the
`traceback()` of a failed task and how to re-run the failed tasks with
'BPREDO'. Full details on error handling, logging and debugging are
in the [Errors, Logs and Debugging][] vignette.
```{r parallel-bpredo-param}
param <- MulticoreParam(workers = 3)
```
Call the `sqrt()` function across 'X'; the second element is a character
and will throw and error.
```{r parallel-bpredo-bplapply, error=TRUE}
X <- list(1, "2", 3)
res <- bplapply(X, sqrt, BPPARAM = param)
```
It's also possible to catch the error and partially evaluated result
```{r parallel-bptry}
res <- bptry(bplapply(X, sqrt, BPPARAM=param))
res
```
Re-run the failed results by repeating the call to `bplapply()` this
time with corrected input data and the partial results as
'BPREDO'. Only the failed values are re-run.
```{r parallel-bpredo}
X.redo <- list(1, 2, 3)
bplapply(X.redo, sqrt, BPREDO = res)
```
Alternatively, switch to a `SerialParam()` and debug the specific
element that caused the error.
```{r parallel-debug, eval=FALSE}
> fun = function(i) { browser(); sqrt(i) }
> bplapply(X, fun, BPREDO=res, BPPARAM=SerialParam())
resuming previous calculation ...
Called from: FUN(...)
Browse[1]>
debug at #1: sqrt(i)
Browse[2]> i
[1] "2"
Browse[2]> i = 2
Browse[2]> c
[[1]]
[1] 1
[[2]]
[1] 1.414214
[[3]]
[1] 1.732051
```
Back to top
### Exercise: logging
`r Biocpkg("BiocParallel")` uses the
[futile.logger](http://cran.r-project.org/web/packages/futile.logger/index.html)
package for logging. The package has a flexible system for filtering
messages of varying severity thresholds such as INFO, DEBUG, ERROR etc.
(For a list of all thresholds see the ?bpthreshold man page.)
`r Biocpkg("BiocParallel")` captures messages written in
futile.logger format as well as messages written to stdout and stderr.
This function does some argument checking and has DEBUG, WARN and
INFO-level log messages.
```{r logging, eval=FALSE}
FUN <- function(i) {
flog.debug(paste0("value of 'i': ", i))
if (!length(i)) {
flog.warn("'i' is missing")
NA
} else if (!is(i, "numeric")) {
flog.info("coercing to numeric")
as.numeric(i)
} else {
i
}
}
```
Turn logging on in the param and set the threshold to WARN.
```{r logging-WARN, eval=FALSE}
param <- SnowParam(3, log = TRUE, threshold = "WARN")
bplapply(list(1, "2", integer()), FUN, BPPARAM = param)
```
Lower the threshold to INFO and DEBUG (i.e., use `bpthreshold<-`) to see how
messages are filtered on severity.
Back to top
### Exercise: Worker timeout
For long running jobs or untested code it can be useful to set a time limit.
The _timeout_ field is the time, in seconds, allowed for each worker to
complete a task. If a task takes longer than _timeout_ the worker returns
an error.
_timeout_ can be set during param construction,
```{r timeout_constructor}
param <- SnowParam(timeout = 20)
param
```
or with the \Rcode{bptimeout} setter:
```{r timeout_setter}
bptimeout(param) <- 2
param
```
Use this function to explore different _timeout_s over a numeric vector of 'X'
values with `bplapply()`. 'X' values less than _timeout_ return successfully
while those over _threshold_ return an error.
```{r timeout_bplapply}
fun <- function(i) {
Sys.sleep(i)
i
}
```
Back to top
### Exercise: Counting overlaps in parallel
Distribute files over workers: `GenomicFiles::reduceByFile()`
The previous counting example used `GenomicFiles::reduceByYield()` which
operates on a single file and implements a yield, map, reduce paradigm.
In this exercise we'll use `GenomicFiles::reduceByFile()` which uses
`bplaply()` under the hood to operate on multiple files in parallel.
Primary arguments to `reduceByFile()` are a set of files and a set of ranges.
Files are sent to the workers and data subsets are extracted based on the
ranges. The bulk of the work is done in the _MAP_ function and an optional
_REDUCE_ function combines the output on each worker.
```{r co-setup}
suppressPackageStartupMessages({
library(BiocParallel)
library(GenomicFiles)
library(GenomicAlignments)
library(Rsamtools)
})
```
On Unix or Mac, configure a `MulticoreParam()` with 4 workers. Turn on
logging and set a timeout of 60 seconds.
```{r co-param}
param <- MulticoreParam(4, log = TRUE, timeout = 60)
```
On Windows do the same with `SnowParam()`:
```{r co-param-snow, eval=FALSE}
param <- SnowParam(4, log = TRUE, timeout = 60)
```
Point to the collection of bam files.
```{r co-bams}
fls <- dir("/home/ubuntu/data/vobencha/LargeData/copynumber", ".bam$", full=TRUE)
names(fls) <- basename(fls)
bfl <- BamFileList(fls)
```
Defining ranges (region of interest) restricts the amount of data on the
workers and keeps memory requirements under control. We'll use a set of ranges
on the Major Histocompatibility Complex locus on chromosome 6.
```{r co-GRanges}
ranges <- GRanges("chr6", IRanges(c(28477797, 29527797, 32448354),
c(29477797, 30527797, 33448354)))
```
The _MAP_ function reads in records and counts overlaps. `readGAlignments()`
reads in bam records that overlap with any portion of the ranges defined in the
_scanBamParam_ (i.e., they could be overlapping the start or the end). Once
we've got the records in _R_, we want to count only those that fall 'within'
the ranges.
```{r co-map, eval=FALSE}
MAP <- function(range, file, ...) {
library(GenomicAlignments) ## readGAlignments(), ScanBamParam()
param = ScanBamParam(which=range) ## restriction
gal = readGAlignments(file, param=param)
## log messages
flog.info(paste0("file: ", basename(file)))
flog.debug(paste0("records: ", length(gal)))
## overlaps
olaps <- findOverlaps(gal, range, type="within", ignore.strand=TRUE)
tabulate(subjectHits(olaps), subjectLength(olaps))
}
```
Count ...
```{r co-doit, eval=FALSE}
cts <- reduceByFile(ranges, fls, MAP, BPPARAM = param)
```
The result is a list the same length as the number of files.
```{r co-length, eval=FALSE}
length(cts)
```
Each list element is the length of the number of ranges.
```{r co-elementlengths, eval=FALSE}
elementLengths(cts)
```
Tables of counts for each range are extracted with '[[':
```{r co-tables, eval=FALSE}
cts[[1]]
```
Back to top
## Other resources
- [Bioconductor Amazon AMI](http://bioconductor.org/help/bioconductor-cloud-ami/)
- easily 'spin up' 10's of instances
- Pre-configured with Bioconductor packages and StarCluster
management
- `r Biocpkg("GoogleGenomics")` to interact with google compute cloud
and resources
Back to top
# Resources
- Lawrence, M, and Morgan, M. 2014. Scalable Genomics with R and
Bioconductor. Statistical Science 2014, Vol. 29, No. 2,
214-226. http://arxiv.org/abs/1409.2864v1
- BiocParallel: http://bioconductor.org/packages/release/bioc/html/BiocParallel.html
- GenomicFiles: http://bioconductor.org/packages/release/bioc/html/GenomicFiles.html
Back to top
[Errors, Logs and Debugging]: http://www.bioconductor.org/packages/3.2/bioc/vignettes/BiocParallel/inst/doc/Errors_Logs_And_Debugging.pdf