2025-01-12 04:36:52 +08:00

901 lines
38 KiB
Plaintext

% File src/library/parallel/vignettes/parallel.Rnw
% Part of the R package, https://www.R-project.org
% Copyright 2011-2017 R Core Team
% Distributed under GPL 2 or later
\documentclass[a4paper]{article}
\usepackage{Rd, parskip, amsmath, enumerate}
\usepackage[round]{natbib}
\usepackage{hyperref}
\usepackage{color}
\definecolor{Blue}{rgb}{0,0,0.8}
\hypersetup{%
colorlinks,%
plainpages=true,%
linkcolor=black,%
citecolor=black,%
urlcolor=Blue,%
pdfstartview={XYZ null null 1},% 100%
pdfview={XYZ null null null},%
pdfpagemode=UseNone,% for no outline
pdfauthor={R Core Team},%
pdftitle={Package `parallel'}% Could also have pdfusetitle
}
%\VignetteIndexEntry{Package 'parallel'}
%\VignettePackage{parallel}
\newcommand{\I}[1]{#1}
\title{Package `parallel'}
\author{R Core Team}
\begin{document}
\maketitle
\section{Introduction}
Package \pkg{parallel} was first included in \R{} 2.14.0. It builds
on the work done for CRAN packages \CRANpkg{multicore} \citep{multicore}
and \CRANpkg{snow} \citep{snow} and provides drop-in replacements for most
of the functionality of those packages, with integrated handling of
random-number generation.
Parallelism can be done in computation at many different levels: this
package is principally concerned with `coarse-grained
parallelization'. At the lowest level, modern CPUs can do several
basic operations simultaneously (e.g.{} integer and floating-point
arithmetic), and several implementations of external \abbr{BLAS} libraries
use multiple threads to do parts of basic vector/matrix operations in
parallel. Several contributed \R{} packages use multiple threads at C
level \emph{via} \abbr{OpenMP} or \I{pthreads}.
This package handles running much larger chunks of computations in
parallel. A typical example is to evaluate the same \R{} function on
many different sets of data: often simulated data as in bootstrap
computations (or with `data' being the random-number stream). The
crucial point is that these chunks of computation are unrelated and do
not need to communicate in any way. It is often the case that the
chunks take approximately the same length of time. The basic
computational model is
\begin{enumerate}[(a)]
\item Start up $M$ `worker' processes, and do any initialization
needed on the workers.
\item Send any data required for each task to the workers.
\item Split the task into $M$ roughly equally-sized chunks, and send
the chunks (including the \R{} code needed) to the workers.
\item Wait for all the workers to complete their tasks, and ask them
for their results.
\item Repeat steps (b--d) for any further tasks.
\item Shut down the worker processes.
\end{enumerate}
Amongst the initializations which may be needed are to load packages
and initialize the random-number stream.
There are implementations of this model in the functions
\code{mclapply} and \code{parLapply} as near-drop-in replacements for
\code{lapply}.
A slightly different model is to split the task into $M_1 > M$ chunks,
send the first $M$ chunks to the workers, then repeatedly wait for any
worker to complete and send it the next remaining task: see the
section on `load balancing'.
In principle the workers could be implemented by threads\footnote{only
`in principle' since the \R{} interpreter is not thread-safe.} or
lightweight processes, but in the current implementation they are full
processes. They can be created in one of three ways:
\begin{enumerate}
\item \emph{Via} \code{system("Rscript")} or similar to launch a new
process on the current machine or a similar machine with an identical
\R{} installation. This then needs a way to communicate between
master and worker processes, which is usually done \emph{via}
sockets.
This should be available on all \R{} platforms, although it is
conceivable that zealous security measures could block the
inter-process communication \emph{via} sockets. Users of Windows
and \abbr{macOS} may expect pop-up dialog boxes from the firewall asking
if an \R{} process should accept incoming connections.
Following \CRANpkg{snow}, a pool of worker processes listening
\emph{via} sockets for commands from the master is called a
`cluster' of nodes.
\item \emph{Via} forking. \emph{Fork} is a
concept\footnote{\url{https://en.wikipedia.org/wiki/Fork_(operating_system)}}
from POSIX operating systems, and should be available on all \R{}
platforms except Windows. This creates a new \R{} process by taking
a complete copy of the master process, including the workspace and
state of the random-number stream. However, the copy will (in any
reasonable OS) share memory pages with the master until modified so
forking is very fast.
The use of forking was pioneered by package \CRANpkg{multicore}.
Note that as it does share the complete process, it also shares any
GUI elements, for example an \R{} console and on-screen devices.
This can cause havoc.\footnote{Some precautions are taken on \abbr{macOS}:
for example the event loops for \command{R.app} and the
\code{quartz} device are inhibited in the child. This information
is available at C level in the \code{Rboolean} variable
\code{R\_isForkedChild}.}
There needs to be a way to communicate between master and worker.
Once again there are several possibilities since master and workers
share memory. In \CRANpkg{multicore} the initial fork sends an \R{}
expression to be evaluated to the worker, and the master process
opens a pipe for reading that is used by the worker to return the
results. Both that and creating a cluster of nodes communicating
\emph{via} sockets are supported in package \pkg{parallel}.
\item Using OS-level facilities to set up a means to send tasks to
other members of a group of machines. There are several ways to do
that, and for example package \CRANpkg{snow} can make use of MPI
(`message passing interface') using \R{} package \CRANpkg{Rmpi}.
Communication overheads can dominate computation times in this
approach, so it is most often used on tightly-coupled networks of
computers with high-speed interconnects.
CRAN packages following this approach include \CRANpkg{GridR} (using
\I{Condor} or \I{Globus}) and \CRANpkg{Rsge} (using \I{SGE}, currently
called `Oracle Grid Engine').
It will not be considered further in this vignette, but those parts
of \pkg{parallel} which provide \CRANpkg{snow}-like functions will
accept \CRANpkg{snow} clusters including MPI clusters.
\end{enumerate}
The landscape of parallel computing has changed with the advent of
shared-memory computers with multiple (and often many) CPU cores.
Until the late 2000's parallel computing was mainly done on clusters
of large numbers of single- or dual-CPU computers: nowadays even
laptops have two or four cores, and servers with 8, 32 or more cores
are commonplace. It is such hardware that package \pkg{parallel} is
designed to exploit. It can also be used with several computers
running the same version of \R{} connected by (reasonable-speed)
ethernet: the computers need not be running the same OS.
Note that all these methods of communication use
\code{serialize}/\code{unserialize} to send \R{} objects between
processes. This has limits (typically hundreds of millions of
elements) which a well-designed parallelized algorithm should not
approach.
\section{Numbers of CPUs/cores}
In setting up parallel computations it can be helpful to have some
idea of the number of CPUs or cores available, but this is a rather
slippery concept. Nowadays almost all physical CPUs contain two or
more cores that run more-or-less independently (they may share parts
of the cache memory, and they do share access to RAM). However, on
some processors these cores may themselves be able to run multiple
tasks simultaneously, and some OSes (e.g.{} Windows) have the concept
of \emph{logical} CPUs which may exceed the number of cores.
Note that all a program can possibly determine is the total number of
CPUs and/or cores available. This is not necessarily the same as the
number of CPUs available \emph{to the current user} which may well be
restricted by system policies on multi-user systems. Nor does it give
much idea of a reasonable number of CPUs to use for the current task:
the user may be running many \R{} processes simultaneously, and those
processes may themselves be using multiple threads through a
multi-threaded \abbr{BLAS}, compiled code using \abbr{OpenMP} or other low-level
forms of parallelism. We have even seen instances of
\CRANpkg{multicore}'s \code{mclapply} being called
recursively,\footnote{\code{parallel::mclapply} detects this and runs
nested calls serially.} generating $2n + n^2$ processes on a machine
estimated to have $n = 16$ cores.
But in so far as it is a useful guideline, function
\code{detectCores()} tries to determine the number of CPU cores in the
machine on which \R{} is running: it has ways to do so on all known
current \R{} platforms. What exactly it measures is OS-specific: we
try where possible to report the number of logical cores available.
On Windows the default is to report the number of logical CPUs. On
modern hardware (e.g.{} Intel \emph{Core i7}) the latter may not be
unreasonable as hyper-threading does give a significant extra
throughput. What \code{detectCores(logical = FALSE)} reports is
OS-version-dependent: on recent versions of Windows it reports the
number of physical cores but on older versions it might report the
number of physical CPU packages.
\section{Analogues of apply functions}
By far the most common direct applications of packages \CRANpkg{multicore}
and \CRANpkg{snow} have been to provide parallelized replacements of
\code{lapply}, \code{sapply}, \code{apply} and related functions.
As analogues of \code{lapply} there are
\begin{verbatim}
parLapply(cl, x, FUN, ...)
mclapply(X, FUN, ..., mc.cores)
\end{verbatim}
where \code{mclapply} is not available\footnote{except as a stub
which simply calls \code{lapply}.} on Windows and has further
arguments discussed on its help page. They differ slightly in
philosophy: \code{mclapply} sets up a pool of \code{mc.cores} workers
just for this computation, whereas \code{parLapply} uses a less
ephemeral pool specified by the object \code{cl} created by a call to
\code{makeCluster} (which \emph{\I{inter alia}} specifies the size of the
pool). So the workflow is
\begin{verbatim}
cl <- makeCluster(<size of pool>)
# one or more parLapply calls
stopCluster(cl)
\end{verbatim}
% we could arrange to stop a cluster when cl is gc-ed
For matrices there are the rarely used \code{parApply} and
\code{parCapply} functions, and the more commonly used
\code{parRapply}, a parallel row \code{apply} for a matrix.
\section{SNOW Clusters}
The package contains a slightly revised copy of much of \CRANpkg{snow},
and the functions it contains can also be used with clusters created
by \CRANpkg{snow} (provided the package is on the search path).
Two functions are provided to create SNOW clusters,
\code{makePSOCKcluster} (a streamlined version of
\code{snow::makeSOCKcluster}) and (except on Windows)
\code{makeForkCluster}. They differ only in the way they spawn worker
processes: \code{makePSOCKcluster} uses \code{Rscript} to launch
further copies of \R{} (on the same host or optionally elsewhere)
whereas \code{makeForkCluster} forks the workers on the current host
(which thus inherit the environment of the current session).
These functions would normally be called \emph{via} \code{makeCluster}.
Both \code{stdout()} and \code{stderr()} of the workers are
redirected, by default being discarded but they can be logged using
the \code{outfile} option. Note that the previous sentence refers to
the \emph{connections} of those names, not the C-level file handles.
Thus properly written \R{} packages using \code{Rprintf} will have
their output redirected, but not direct C-level output.
A default cluster can be registered by a call to
\code{setDefaultCluster()}: this is then used whenever one of the
higher-level functions such as \code{parApply} is called without an
explicit cluster. A little care is needed when repeatedly re-using a
pool of workers, as their workspaces will accumulate objects from past
usage, and packages may get added to the search path.
If clusters are to be created on a host other than the current machine
(\samp{localhost}), \code{makeCluster} may need to be be given more
information in the shape of extra arguments.
\begin{itemize}
\item If the worker machines are not set up in exactly the same way
as the master (for example if they are of a different
architecture), use \code{homogeneous = FALSE} and perhaps set
\code{rscript} to the full path to \command{Rscript} on the workers.
\item The worker machines need to know how to communicate with the
master: normally this can be done using the hostname found by
\code{Sys.info()}, but on private networks this need not be the
case and \code{master} may need to be supplied as a name or IP
address, e.g. \code{master = "192.168.1.111"}.
\item By default \command{ssh} is used to launch R on the workers.
If it is known by some other name, use e.g.{}
\code{rshcmd = "plink.exe"} for a Windows box using PUTTY.
SSH should be set
up to use silent authentication: setups which require a password
to be supplied may or may not work.
\item Socket communication is done over port a randomly chosen port
in the range \code{11000:11999}: site policies
might require some other port to be used, in which case set
argument \code{port} or environment variable \env{R\_PARALLEL\_PORT}.
\end{itemize}
\section{Forking}
Except on Windows, the package contains a copy of \CRANpkg{multicore}:
there a few names with the added prefix \code{mc}, e.g.{}
\code{mccollect} and \code{mcparallel}. (Package \CRANpkg{multicore}
used these names, but also the versions without the prefix which are
too easily masked: e.g.{} package \CRANpkg{lattice} used to have a
function \code{parallel}.)
The low-level functions from \CRANpkg{multicore} are provided but not
exported from the namespace.
There are high-level functions \code{mclapply} and \code{pvec}: unlike
the versions in \CRANpkg{multicore} these default to 2 cores, but this can
be controlled by setting \code{options("mc.cores")}, and that takes
its default from environment variable \code{MC\_CORES} when the
package is loaded. (Setting this to \code{1} inhibits parallel
operation: there are stub versions of these functions on Windows which
force \code{mc.cores = 1}.)
Functions \code{mcmapply} and \code{mcMap} provide analogues of
\code{mapply} and \code{Map}.
Note the earlier comments about using forking in a GUI environment.
The parent and forked \R{} processes share the per-session temporary
directory \code{tempdir()}, which can be a problem as a lot of code
has assumed it is private to the \R{} process. Further, prior to \R{}
2.14.1 it was possibly for \code{tempfile} in two processes to select
the same filename in that temporary directory, and do it sufficiently
simultaneously that neither saw it as being in use.
The forked workers share file handles with the master: this means that
any output from the worker should go to the same place as
\file{stdout} and \file{stderr} of the master process. (This does not
work reliably on all OSes: problems have also been noted when forking
a session that is processing batch input from \file{stdin}.) Setting
argument \code{mc.silent = TRUE} silences \file{stdout} for the child:
\file{stderr} is not affected.
Sharing file handles also impacts graphics devices as forked workers
inherit all open graphics devices of the parent: they should not
attempt to make use of them.
\section{Random-number generation}
Some care is needed with parallel computation using (pseudo-)random
numbers: the processes/threads which run separate parts of the
computation need to run independent (and preferably reproducible)
random-number streams. One way to avoid any difficulties is (where
possible) to do all the randomization in the master process: this is
done where possible in package \CRANpkg{boot} (version 1.3-1 and later).
When an \R{} process is started up it takes the random-number seed
from the object \code{.Random.seed} in a saved workspace or constructs
one from the clock time and process ID when random-number generation
is first used (see the help on \code{RNG}). Thus worker processes
might get the same seed because a workspace containing
\code{.Random.seed} was restored or the random number generator has
been used before forking: otherwise these get a non-reproducible seed
(but with very high probability a different seed for each worker).
The alternative is to set separate seeds for each worker process in
some reproducible way from the seed in the master process. This is
generally plenty safe enough, but there have been worries that the
random-number streams in the workers might somehow get into step. One
approach is to take the seeds a long way apart in the random-number
stream: note that random numbers taken a long (fixed) distance apart
in a single stream are not necessarily (and often are not) as
independent as those taken a short distance apart. Yet another idea
(as used by e.g.{} \pkg{JAGS}) is to use different random-number
generators for each separate run/process.
Package \pkg{parallel} contains an implementation of the ideas of
\citet{lecuyer.2002}: this uses a single RNG and make \emph{streams}
with seeds $2^{127}$ steps apart in the random number stream (which
has period approximately $2^{191}$). This is based on the generator
of \citet{lecuyer.1999}; the reason for choosing that
generator\footnote{apart from the commonality of authors!} is that it
has a fairly long period with a small seed (6 integers), and unlike
\R{}'s default \code{"Mersenne-Twister"} RNG, it is simple to advance
the seed by a fixed number of steps. The generator is the combination
of two:
\begin{eqnarray*}
x_n &=& 1403580 \times x_{n-2} - 810728 \times x_{n-3} \mod{(2^{32} - 209)}\\
y_n &=& 527612 \times y_{n-1} - 1370589 \times y_{n-3} \mod{(2^{32} - 22853)}\\
z_n &=& (x_n - y_n) \mod{4294967087}\\
u_n &=& z_n/4294967088\ \mbox{unless $z_n = 0$}
\end{eqnarray*}
The `seed' then consists of $(x_{n-3}, x_{n-2}, x_{n-1}, y_{n-3}, y_{n-2}, y_{n-1})$,
and the recursion for each of $x_n$ and $y_n$ can have
pre-computed coefficients for $k$ steps ahead. For $k = 2^{127}$, the
seed is advanced by $k$ steps by \R{} call
\code{.Random.seed <- nextRNGStream(.Random.seed)}.
% use \verb to get consistent quote mapping.
The \citet{lecuyer.1999} generator is available \emph{via}
\code{RNGkind("L'Ecuyer-CMRG")}. Thus using the ideas of
\citet{lecuyer.2002} is as simple as
<<Ecuyer-ex, eval=FALSE>>=
RNGkind("L'Ecuyer-CMRG")
set.seed(2002) # something
M <- 16 ## start M workers
s <- .Random.seed
for (i in 1:M) {
s <- nextRNGStream(s)
# send s to worker i as .Random.seed
}
@
and this is is implemented for SNOW clusters in function
\code{clusterSetRNGStream}, and as part of \code{mcparallel} and
\code{mclapply} (by default).
Apart from \emph{streams} ($2^{127}$ steps apart), there is the concept of
\emph{sub-streams} starting from seeds $2^{76}$ steps apart. Function
\code{nextRNGSubStream} advances to the next substream.
A direct \R{} interface to the (clunkier) original C implementation is
available in CRAN package \CRANpkg{rlecuyer} \citep{rlecuyer}. That works
with named streams, each of which have three 6-element seeds
associated with them. This can easily be emulated in \R{} by storing
\code{.Random.seed} at suitable times. There is another interface
using S4 classes in package \CRANpkg{rstream} \citep{rstream}.
\section{Load balancing}
The introduction mentioned a different strategy which dynamically
allocates tasks to workers: this is sometimes known as `load
balancing' and is implemented in \code{mclapply(mc.preschedule =
FALSE)}, \code{clusterApplyLB} and wrappers.
Load balancing is potentially advantageous when the tasks take quite
dissimilar amounts of computation time, or where the nodes are of
disparate capabilities. But some \emph{caveats} are in order:
\begin{enumerate}[(a)]
\item Random number streams are allocated to nodes, so if the tasks
involve random numbers they are likely to be non-repeatable (as the
allocation of tasks to nodes depends on the workloads of the nodes).
It would however take only slightly more work to allocate a stream to
each task.
\item More care is needed is allocating the tasks. If 1000 tasks need
to be allocated to 10 nodes, the standard approach send chunks of
100 tasks to each of the nodes. The load-balancing approach sends
tasks one at a time to a node, and the communication overhead may be
high. So it makes sense to have substantially more tasks than
nodes, but not by a factor of 100 (and maybe not by 10).
\end{enumerate}
\section{Setting the CPU Affinity with \code{mclapply}}
%%\author{Helena Kotthaus, Andreas Lang \\ LS12 Department of Computer Science, TU Dortmund}
The parameter \code{affinity.list} of the \code{mclapply} function
can be used to run elements of the input vector \code{X} of \code{mclapply} on
specific CPUs.
\code{affinity.list} is a vector (atomic or list) containing the CPU
affinity mask for each element of \code{X}, it describes on which CPU
(core or hyper-thread unit) a given item is allowed to run, see
\code{? mcaffinity}.
This can be helpful, if the elements of \code{X} (parallel jobs) have a
high variance of completion time or if the hardware architecture
is heterogeneous. It also enables the development of scheduling
strategies for optimizing the overall runtime of independent parallel jobs.
If \code{affinity.list} is set, the \code{mc.cores} parameter is
replaced with the number of CPU ids used in the affinity masks.
To use this parameter prescheduling has to be deactivated
(\code{mc.preschedule = FALSE}).
For each value of the input vector \code{X} a separate job is forked.
The master process only forks one child process for each selected CPU
at once, to ensure that the number of forked jobs does not exceed the number
of selected CPUs. As soon as a child process has finished, the next
available job for the CPU is forked.
The following code example demonstrates how the execution
time can be reduced by assigning the elements of \code{X} to specific CPUs,
not on Windows, where \code{mc.cores} must remain at 1.
%% setting eval=TRUE would cost ca. 1.5 minutes every time this is built
%% also, .Platform$OS.type == "Windows" cannot use mc.cores = 2
<<affinity-ex, eval=FALSE>>=
## Exemplary variance filter executed on three different matrices in parallel.
## Can be used in gene expression analysis as a prefilter
## for the number of covariates.
library(parallel)
n <- 300 # observations
p <- 20000 # covariates
## Different sized matrices as filter inputs
## Matrix A and B form smaller work loads
## while matrix C forms a bigger workload (2*p)
library(stats)
A <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p)
B <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p)
C <- matrix(replicate(2*p, rnorm(n, sd = runif(1, 0.1, 10))), n, 2*p)
varFilter <- function (X, nSim = 20) {
for (i in 1:nSim) {
train <- sample(nrow(X), 2 / 3 * nrow(X))
colVars <- apply(X[train, ], 2, var)
keep <- names(head(sort(colVars, decreasing = TRUE), 100))
# myAlgorithm(X[, keep])
}
}
## Runtime comparison -----------------------------------
## mclapply with affinity.list
## CPU mapping: A and B run on CPU 1 while C runs on CPU 2:
affinity <- c(1,1,2)
system.time(
mclapply(X = list(A,B,C), FUN = varFilter,
mc.preschedule = FALSE, affinity.list = affinity))
## user system elapsed
## 34.909 0.873 36.720
## mclapply without affinity.list
system.time(
mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2,
mc.preschedule = FALSE) )
## user system elapsed
## 72.893 1.588 55.982
## mclapply with prescheduling
system.time(
mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2,
mc.preschedule = TRUE) )
## user system elapsed
## 53.455 1.326 53.399
@
Instead of using \code{affinity.list} for runtime optimization,
it can also be used to simply restrict the computation
to specific CPUs as in the following example.
<<eval = FALSE>>=
## Restricts all elements of X to run on CPU 1 and 2.
X <- list(1, 2, 3)
affinity.list <- list(c(1,2), c(1,2), c(1,2))
mclapply(X = X, FUN = function (i) i*i,
mc.preschedule = FALSE, affinity.list = affinity.list)
@
\section{Portability considerations}
People wanting to provide parallel facilities in their code need to
decide how hard they want to try to be portable and efficient: no
approach works optimally on all platforms.
Using \code{mclapply} is usually the simplest approach, but will run
serial versions of the code on Windows. This may suffice where
parallel computation is only required for use on a single multi-core
Unix-alike server---for \code{mclapply} can only run on a single
shared-memory system. There is fallback to serial use when needed, by
setting \code{mc.cores = 1}.
Using \code{parLapply} will work everywhere that socket communication
works, and can be used, for example, to harness all the CPU cores in a
lab of machines that are not otherwise in use. But socket
communication may be blocked even when using a single machine and is
quite likely to be blocked between machines in a lab. There is not
currently any fallback to serial use, nor could there easily be (as
the workers start with a different \R{} environment from the one
current on the master).
An example of providing access to both approaches as well as serial
code is package \CRANpkg{boot}, version \code{1.3-3} or later.
\section{Extended examples}
\SweaveOpts{eval=FALSE}
<<hide=TRUE>>=
library(parallel)
@
Probably the most common use of coarse-grained parallelization in
statistics is to do multiple simulation runs, for example to do large
numbers of bootstrap replicates or several runs of an MCMC simulation.
We show an example of each.
Note that some of the examples will only work serially on Windows and
some actually are computer-intensive.
\subsection{Bootstrapping}
Package \CRANpkg{boot} \citep{boot} is support software for the monograph
by \citet{Davison.Hinkley.97}. Bootstrapping is often used as an
example of easy parallelization, and some methods of producing
confidence intervals require many thousands of bootstrap samples.
As from version \code{1.3-1} the package itself has parallel support
within its main functions, but we illustrate how to use the original
(serial) functions in parallel computations.
We consider two examples using the \code{cd4} dataset from package
\CRANpkg{boot} where the interest is in the correlation between before and
after measurements. The first is a straight simulation, often called
a \emph{parametric bootstrap}. The non-parallel form is
<<>>=
library(boot)
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
cd4.boot <- boot(cd4, corr, R = 999, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
@
To do this with \code{mclapply} we need to break this into separate
runs, and we will illustrate two runs of 500 simulations each:
<<>>=
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
run1 <- function(...) boot(cd4, corr, R = 500, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
mc <- 2 # set as appropriate for your hardware
## To make this reproducible:
set.seed(123, "L'Ecuyer")
cd4.boot <- do.call(c, mclapply(seq_len(mc), run1) )
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
@
There are many ways to program things like this: often the neatest is
to encapsulate the computation in a function, so this is the parallel
form of
<<eval=FALSE>>=
do.call(c, lapply(seq_len(mc), run1))
@
To run this with \code{parLapply} we could take a similar approach by
<<>>=
run1 <- function(...) {
library(boot)
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
boot(cd4, corr, R = 500, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
}
cl <- makeCluster(mc)
## make this reproducible
clusterSetRNGStream(cl, 123)
library(boot) # needed for c() method on master
cd4.boot <- do.call(c, parLapply(cl, seq_len(mc), run1) )
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
stopCluster(cl)
@
Note that whereas with \code{mclapply} all the packages and objects
we use are automatically available on the workers, this is not in
general\footnote{it is with clusters set up with
\code{makeForkCluster}.} the case with the \code{parLapply}
approach. There is often a delicate choice of where to do the
computations: for example we could compute \code{cd4.mle} on the
workers (as above) or on the master and send the value to the
workers. We illustrate the latter by the following code
<<>>=
cl <- makeCluster(mc)
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
clusterExport(cl, c("cd4.rg", "cd4.mle"))
junk <- clusterEvalQ(cl, library(boot)) # discard result
clusterSetRNGStream(cl, 123)
res <- clusterEvalQ(cl, boot(cd4, corr, R = 500,
sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle))
library(boot) # needed for c() method on master
cd4.boot <- do.call(c, res)
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
stopCluster(cl)
@
Running the double bootstrap on the same problem is far more
computer-intensive. The standard version is
<<fig=TRUE>>=
R <- 999; M <- 999 ## we would like at least 999 each
cd4.nest <- boot(cd4, nested.corr, R=R, stype="w", t0=corr(cd4), M=M)
## nested.corr is a function in package boot
op <- par(pty = "s", xaxs = "i", yaxs = "i")
qqplot((1:R)/(R+1), cd4.nest$t[, 2], pch = ".", asp = 1,
xlab = "nominal", ylab = "estimated")
abline(a = 0, b = 1, col = "grey")
abline(h = 0.05, col = "grey")
abline(h = 0.95, col = "grey")
par(op)
nominal <- (1:R)/(R+1)
actual <- cd4.nest$t[, 2]
100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))]
@
which took about 55 secs on one core of an 8-core Linux server.
Using \code{mclapply} we could use
<<eval=FALSE>>=
mc <- 9
R <- 999; M <- 999; RR <- floor(R/mc)
run2 <- function(...)
cd4.nest <- boot(cd4, nested.corr, R=RR, stype="w", t0=corr(cd4), M=M)
cd4.nest <- do.call(c, mclapply(seq_len(mc), run2, mc.cores = mc) )
nominal <- (1:R)/(R+1)
actual <- cd4.nest$t[, 2]
100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))]
@
which ran in 11 secs (elapsed) using all of that server.
\subsection{MCMC runs}
\citet{Ripley.88} discusses the maximum-likelihood estimation of the
Strauss process, which is done by solving a moment equation
\[
E_c T = t
\]
where $T$ is the number of $R$-close pairs and $t$ is the observed
value, $30$ in the following example. A serial approach to the
initial exploration might be
<<>>=
library(spatial)
towns <- ppinit("towns.dat")
tget <- function(x, r=3.5) sum(dist(cbind(x$x, x$y)) < r)
t0 <- tget(towns)
R <- 1000
c <- seq(0, 1, 0.1)
## res[1] = 0
res <- c(0, sapply(c[-1], function(c)
mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))))
plot(c, res, type="l", ylab="E t")
abline(h=t0, col="grey")
@
which takes about 20 seconds today, but many hours when first done
in 1985. A parallel version might be
<<>>=
run3 <- function(c) {
library(spatial)
towns <- ppinit("towns.dat") # has side effects
mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
}
cl <- makeCluster(10, methods = FALSE)
clusterExport(cl, c("R", "towns", "tget"))
res <- c(0, parSapply(cl, c[-1], run3)) # 10 tasks
stopCluster(cl)
@
which took about 4.5 secs, plus 2 secs to set up the cluster. Using a
fork cluster (not on Windows) makes the startup much faster and setup easier:
<<eval=FALSE>>=
cl <- makeForkCluster(10) # fork after the variables have been set up
run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
res <- c(0, parSapply(cl, c[-1], run4))
stopCluster(cl)
@
As one might expect, the \code{mclapply} version is slightly simpler:
<<eval=FALSE>>=
run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
res <- c(0, unlist(mclapply(c[-1], run4, mc.cores = 10)))
@
If you do not have as many as 10 cores, you might want to consider
load-balancing in a task like this as the time taken per simulation
does vary with \code{c}. This can be done using
\code{mclapply(mc.preschedule = FALSE)} or \code{parSapplyLB}. The
disadvantage is that the results would not be reproducible (which does
not matter here).
\subsection{Package installation}
With over 4000 \R{} packages available, it is often helpful to do a
comprehensive install in parallel. We provide facilities to do so in
\code{install.packages} \emph{via} a parallel \command{make}, but that
approach may not be suitable.\footnote{A parallel \command{make} might
not be available, and we have seen a couple of instances of package
installation not working correctly when run from \command{make}.}
Some of the tasks take many times longer than others, and we do not
know in advance which these are.
We illustrate an approach using package \pkg{parallel} which is used
on part of the CRAN check farm. Suppose that there is a function
\code{do\_one(pkg)} which installs a single package and then returns.
Then the task is to run \code{do\_one} on as many of the \code{M}
workers as possible whilst ensuring that all of the direct and
indirect dependencies of \code{pkg} are installed before \code{pkg}
itself. As the installation of a single package can block several
others, we do need to allow the number of installs running
simultaneously to vary: the following code achieves that, but needs to
use low-level functions to do so.
<<eval=FALSE>>=
pkgs <- "<names of packages to be installed>"
M <- 20 # number of parallel installs
M <- min(M, length(pkgs))
library(parallel)
unlink("install_log")
cl <- makeCluster(M, outfile = "install_log")
clusterExport(cl, c("tars", "fakes", "gcc")) # variables needed by do_one
## set up available via a call to available.packages() for
## repositories containing all the packages involved and all their
## dependencies.
DL <- utils:::.make_dependency_list(pkgs, available, recursive = TRUE)
DL <- lapply(DL, function(x) x[x %in% pkgs])
lens <- sapply(DL, length)
ready <- names(DL[lens == 0L])
done <- character() # packages already installed
n <- length(ready)
submit <- function(node, pkg)
parallel:::sendCall(cl[[node]], do_one, list(pkg), tag = pkg)
for (i in 1:min(n, M)) submit(i, ready[i])
DL <- DL[!names(DL) %in% ready[1:min(n, M)]]
av <- if(n < M) (n+1L):M else integer() # available workers
while(length(done) < length(pkgs)) {
d <- parallel:::recvOneResult(cl)
av <- c(av, d$node)
done <- c(done, d$tag)
OK <- unlist(lapply(DL, function(x) all(x %in% done) ))
if (!any(OK)) next
p <- names(DL)[OK]
m <- min(length(p), length(av)) # >= 1
for (i in 1:m) submit(av[i], p[i])
av <- av[-(1:m)]
DL <- DL[!names(DL) %in% p[1:m]]
}
@
\subsection{Passing \code{...}}
The semantics of \code{...} do not fit well with parallel operation,
since lazy evaluation may be delayed until the tasks have been sent to
the workers. This is no problem in the forking approach, as the
information needed for lazy evaluation will be present in the forked workers.
For \CRANpkg{snow}-like clusters the trick is to ensure that \code{...}
has any promises forced whilst the information is still available.
This is how \CRANpkg{boot} does it:
<<eval=FALSE>>=
fn <- function(r) statistic(data, i[r, ], ...)
RR <- sum(R)
res <- if (ncpus > 1L && (have_mc || have_snow)) {
if (have_mc) {
parallel::mclapply(seq_len(RR), fn, mc.cores = ncpus)
} else if (have_snow) {
list(...) # evaluate any promises
if (is.null(cl)) {
cl <- parallel::makePSOCKcluster(rep("localhost", ncpus))
if(RNGkind()[1L] == "L'Ecuyer-CMRG")
parallel::clusterSetRNGStream(cl)
res <- parallel::parLapply(cl, seq_len(RR), fn)
parallel::stopCluster(cl)
res
} else parallel::parLapply(cl, seq_len(RR), fn)
}
} else lapply(seq_len(RR), fn)
@
Note that \code{...} is an argument to \code{boot}, and so after
<<eval=FALSE>>=
list(...) # evaluate any promises
@
it refers to objects within the evaluation frame of \code{boot} and
hence the environment of \code{fn} which will therefore be sent to the
workers along with \code{fn}.
\section{Differences from earlier versions}
The support of parallel RNG differs from \CRANpkg{snow}: \CRANpkg{multicore}
had no support.
\subsection{Differences from multicore}
\CRANpkg{multicore} had quite elaborate code to inhibit the Aqua event
loop for \command{R.app} and the event loop for the \code{quartz}
graphics device. This has been replaced by recording a flag in the
\R{} executable for a child process.
Functions \code{fork} and \code{kill} have \code{mc} prefixes and are
not exported. This avoids confusion with other packages (such as
package \CRANpkg{fork}), and note that \code{mckill} is not as general as
\code{tools::pskill}.
Aliased functions \code{collect} and \code{parallel} are no longer provided.
\subsection{Differences from snow}
\CRANpkg{snow} set a timeout that exceeded the maximum value required by
POSIX, and did not provide a means to set the timeout on the workers.
This resulted in process interlocks on Solaris.
\code{makeCluster} creates MPI %% or NWS
clusters by calling \CRANpkg{snow}.
\code{makePSOCKcluster} has been streamlined, as package \pkg{parallel} is in
a known location on all systems, and \command{Rscript} is these days
always available. Logging of workers is set up to append to the file,
so multiple processes can be logged.
\code{parSapply} has been brought into line with \code{sapply}.
\code{clusterMap()} gains \code{SIMPLIFY} and \code{USE.NAMES}
arguments to make it a parallel version of \code{mapply} and \code{Map}.
The timing interface has not been copied.
\bibliographystyle{jss}
\bibliography{parallel}
\end{document}
%%% Local Variables:
%%% mode: latex
%%% TeX-master: t
%%% End: