901 lines
38 KiB
Plaintext
901 lines
38 KiB
Plaintext
% File src/library/parallel/vignettes/parallel.Rnw
|
|
% Part of the R package, https://www.R-project.org
|
|
% Copyright 2011-2017 R Core Team
|
|
% Distributed under GPL 2 or later
|
|
|
|
\documentclass[a4paper]{article}
|
|
|
|
\usepackage{Rd, parskip, amsmath, enumerate}
|
|
\usepackage[round]{natbib}
|
|
\usepackage{hyperref}
|
|
\usepackage{color}
|
|
\definecolor{Blue}{rgb}{0,0,0.8}
|
|
\hypersetup{%
|
|
colorlinks,%
|
|
plainpages=true,%
|
|
linkcolor=black,%
|
|
citecolor=black,%
|
|
urlcolor=Blue,%
|
|
pdfstartview={XYZ null null 1},% 100%
|
|
pdfview={XYZ null null null},%
|
|
pdfpagemode=UseNone,% for no outline
|
|
pdfauthor={R Core Team},%
|
|
pdftitle={Package `parallel'}% Could also have pdfusetitle
|
|
}
|
|
|
|
%\VignetteIndexEntry{Package 'parallel'}
|
|
%\VignettePackage{parallel}
|
|
|
|
\newcommand{\I}[1]{#1}
|
|
|
|
\title{Package `parallel'}
|
|
\author{R Core Team}
|
|
|
|
|
|
\begin{document}
|
|
|
|
\maketitle
|
|
|
|
\section{Introduction}
|
|
|
|
Package \pkg{parallel} was first included in \R{} 2.14.0. It builds
|
|
on the work done for CRAN packages \CRANpkg{multicore} \citep{multicore}
|
|
and \CRANpkg{snow} \citep{snow} and provides drop-in replacements for most
|
|
of the functionality of those packages, with integrated handling of
|
|
random-number generation.
|
|
|
|
Parallelism can be done in computation at many different levels: this
|
|
package is principally concerned with `coarse-grained
|
|
parallelization'. At the lowest level, modern CPUs can do several
|
|
basic operations simultaneously (e.g.{} integer and floating-point
|
|
arithmetic), and several implementations of external \abbr{BLAS} libraries
|
|
use multiple threads to do parts of basic vector/matrix operations in
|
|
parallel. Several contributed \R{} packages use multiple threads at C
|
|
level \emph{via} \abbr{OpenMP} or \I{pthreads}.
|
|
|
|
This package handles running much larger chunks of computations in
|
|
parallel. A typical example is to evaluate the same \R{} function on
|
|
many different sets of data: often simulated data as in bootstrap
|
|
computations (or with `data' being the random-number stream). The
|
|
crucial point is that these chunks of computation are unrelated and do
|
|
not need to communicate in any way. It is often the case that the
|
|
chunks take approximately the same length of time. The basic
|
|
computational model is
|
|
\begin{enumerate}[(a)]
|
|
\item Start up $M$ `worker' processes, and do any initialization
|
|
needed on the workers.
|
|
\item Send any data required for each task to the workers.
|
|
\item Split the task into $M$ roughly equally-sized chunks, and send
|
|
the chunks (including the \R{} code needed) to the workers.
|
|
\item Wait for all the workers to complete their tasks, and ask them
|
|
for their results.
|
|
\item Repeat steps (b--d) for any further tasks.
|
|
\item Shut down the worker processes.
|
|
\end{enumerate}
|
|
Amongst the initializations which may be needed are to load packages
|
|
and initialize the random-number stream.
|
|
|
|
There are implementations of this model in the functions
|
|
\code{mclapply} and \code{parLapply} as near-drop-in replacements for
|
|
\code{lapply}.
|
|
|
|
A slightly different model is to split the task into $M_1 > M$ chunks,
|
|
send the first $M$ chunks to the workers, then repeatedly wait for any
|
|
worker to complete and send it the next remaining task: see the
|
|
section on `load balancing'.
|
|
|
|
In principle the workers could be implemented by threads\footnote{only
|
|
`in principle' since the \R{} interpreter is not thread-safe.} or
|
|
lightweight processes, but in the current implementation they are full
|
|
processes. They can be created in one of three ways:
|
|
\begin{enumerate}
|
|
\item \emph{Via} \code{system("Rscript")} or similar to launch a new
|
|
process on the current machine or a similar machine with an identical
|
|
\R{} installation. This then needs a way to communicate between
|
|
master and worker processes, which is usually done \emph{via}
|
|
sockets.
|
|
|
|
This should be available on all \R{} platforms, although it is
|
|
conceivable that zealous security measures could block the
|
|
inter-process communication \emph{via} sockets. Users of Windows
|
|
and \abbr{macOS} may expect pop-up dialog boxes from the firewall asking
|
|
if an \R{} process should accept incoming connections.
|
|
|
|
Following \CRANpkg{snow}, a pool of worker processes listening
|
|
\emph{via} sockets for commands from the master is called a
|
|
`cluster' of nodes.
|
|
|
|
\item \emph{Via} forking. \emph{Fork} is a
|
|
concept\footnote{\url{https://en.wikipedia.org/wiki/Fork_(operating_system)}}
|
|
from POSIX operating systems, and should be available on all \R{}
|
|
platforms except Windows. This creates a new \R{} process by taking
|
|
a complete copy of the master process, including the workspace and
|
|
state of the random-number stream. However, the copy will (in any
|
|
reasonable OS) share memory pages with the master until modified so
|
|
forking is very fast.
|
|
|
|
The use of forking was pioneered by package \CRANpkg{multicore}.
|
|
|
|
Note that as it does share the complete process, it also shares any
|
|
GUI elements, for example an \R{} console and on-screen devices.
|
|
This can cause havoc.\footnote{Some precautions are taken on \abbr{macOS}:
|
|
for example the event loops for \command{R.app} and the
|
|
\code{quartz} device are inhibited in the child. This information
|
|
is available at C level in the \code{Rboolean} variable
|
|
\code{R\_isForkedChild}.}
|
|
|
|
There needs to be a way to communicate between master and worker.
|
|
Once again there are several possibilities since master and workers
|
|
share memory. In \CRANpkg{multicore} the initial fork sends an \R{}
|
|
expression to be evaluated to the worker, and the master process
|
|
opens a pipe for reading that is used by the worker to return the
|
|
results. Both that and creating a cluster of nodes communicating
|
|
\emph{via} sockets are supported in package \pkg{parallel}.
|
|
|
|
\item Using OS-level facilities to set up a means to send tasks to
|
|
other members of a group of machines. There are several ways to do
|
|
that, and for example package \CRANpkg{snow} can make use of MPI
|
|
(`message passing interface') using \R{} package \CRANpkg{Rmpi}.
|
|
Communication overheads can dominate computation times in this
|
|
approach, so it is most often used on tightly-coupled networks of
|
|
computers with high-speed interconnects.
|
|
|
|
CRAN packages following this approach include \CRANpkg{GridR} (using
|
|
\I{Condor} or \I{Globus}) and \CRANpkg{Rsge} (using \I{SGE}, currently
|
|
called `Oracle Grid Engine').
|
|
|
|
It will not be considered further in this vignette, but those parts
|
|
of \pkg{parallel} which provide \CRANpkg{snow}-like functions will
|
|
accept \CRANpkg{snow} clusters including MPI clusters.
|
|
\end{enumerate}
|
|
|
|
The landscape of parallel computing has changed with the advent of
|
|
shared-memory computers with multiple (and often many) CPU cores.
|
|
Until the late 2000's parallel computing was mainly done on clusters
|
|
of large numbers of single- or dual-CPU computers: nowadays even
|
|
laptops have two or four cores, and servers with 8, 32 or more cores
|
|
are commonplace. It is such hardware that package \pkg{parallel} is
|
|
designed to exploit. It can also be used with several computers
|
|
running the same version of \R{} connected by (reasonable-speed)
|
|
ethernet: the computers need not be running the same OS.
|
|
|
|
Note that all these methods of communication use
|
|
\code{serialize}/\code{unserialize} to send \R{} objects between
|
|
processes. This has limits (typically hundreds of millions of
|
|
elements) which a well-designed parallelized algorithm should not
|
|
approach.
|
|
|
|
\section{Numbers of CPUs/cores}
|
|
|
|
In setting up parallel computations it can be helpful to have some
|
|
idea of the number of CPUs or cores available, but this is a rather
|
|
slippery concept. Nowadays almost all physical CPUs contain two or
|
|
more cores that run more-or-less independently (they may share parts
|
|
of the cache memory, and they do share access to RAM). However, on
|
|
some processors these cores may themselves be able to run multiple
|
|
tasks simultaneously, and some OSes (e.g.{} Windows) have the concept
|
|
of \emph{logical} CPUs which may exceed the number of cores.
|
|
|
|
Note that all a program can possibly determine is the total number of
|
|
CPUs and/or cores available. This is not necessarily the same as the
|
|
number of CPUs available \emph{to the current user} which may well be
|
|
restricted by system policies on multi-user systems. Nor does it give
|
|
much idea of a reasonable number of CPUs to use for the current task:
|
|
the user may be running many \R{} processes simultaneously, and those
|
|
processes may themselves be using multiple threads through a
|
|
multi-threaded \abbr{BLAS}, compiled code using \abbr{OpenMP} or other low-level
|
|
forms of parallelism. We have even seen instances of
|
|
\CRANpkg{multicore}'s \code{mclapply} being called
|
|
recursively,\footnote{\code{parallel::mclapply} detects this and runs
|
|
nested calls serially.} generating $2n + n^2$ processes on a machine
|
|
estimated to have $n = 16$ cores.
|
|
|
|
But in so far as it is a useful guideline, function
|
|
\code{detectCores()} tries to determine the number of CPU cores in the
|
|
machine on which \R{} is running: it has ways to do so on all known
|
|
current \R{} platforms. What exactly it measures is OS-specific: we
|
|
try where possible to report the number of logical cores available.
|
|
|
|
On Windows the default is to report the number of logical CPUs. On
|
|
modern hardware (e.g.{} Intel \emph{Core i7}) the latter may not be
|
|
unreasonable as hyper-threading does give a significant extra
|
|
throughput. What \code{detectCores(logical = FALSE)} reports is
|
|
OS-version-dependent: on recent versions of Windows it reports the
|
|
number of physical cores but on older versions it might report the
|
|
number of physical CPU packages.
|
|
|
|
\section{Analogues of apply functions}
|
|
|
|
By far the most common direct applications of packages \CRANpkg{multicore}
|
|
and \CRANpkg{snow} have been to provide parallelized replacements of
|
|
\code{lapply}, \code{sapply}, \code{apply} and related functions.
|
|
|
|
As analogues of \code{lapply} there are
|
|
\begin{verbatim}
|
|
parLapply(cl, x, FUN, ...)
|
|
mclapply(X, FUN, ..., mc.cores)
|
|
\end{verbatim}
|
|
where \code{mclapply} is not available\footnote{except as a stub
|
|
which simply calls \code{lapply}.} on Windows and has further
|
|
arguments discussed on its help page. They differ slightly in
|
|
philosophy: \code{mclapply} sets up a pool of \code{mc.cores} workers
|
|
just for this computation, whereas \code{parLapply} uses a less
|
|
ephemeral pool specified by the object \code{cl} created by a call to
|
|
\code{makeCluster} (which \emph{\I{inter alia}} specifies the size of the
|
|
pool). So the workflow is
|
|
\begin{verbatim}
|
|
cl <- makeCluster(<size of pool>)
|
|
# one or more parLapply calls
|
|
stopCluster(cl)
|
|
\end{verbatim}
|
|
% we could arrange to stop a cluster when cl is gc-ed
|
|
|
|
For matrices there are the rarely used \code{parApply} and
|
|
\code{parCapply} functions, and the more commonly used
|
|
\code{parRapply}, a parallel row \code{apply} for a matrix.
|
|
|
|
\section{SNOW Clusters}
|
|
|
|
The package contains a slightly revised copy of much of \CRANpkg{snow},
|
|
and the functions it contains can also be used with clusters created
|
|
by \CRANpkg{snow} (provided the package is on the search path).
|
|
|
|
Two functions are provided to create SNOW clusters,
|
|
\code{makePSOCKcluster} (a streamlined version of
|
|
\code{snow::makeSOCKcluster}) and (except on Windows)
|
|
\code{makeForkCluster}. They differ only in the way they spawn worker
|
|
processes: \code{makePSOCKcluster} uses \code{Rscript} to launch
|
|
further copies of \R{} (on the same host or optionally elsewhere)
|
|
whereas \code{makeForkCluster} forks the workers on the current host
|
|
(which thus inherit the environment of the current session).
|
|
|
|
These functions would normally be called \emph{via} \code{makeCluster}.
|
|
|
|
Both \code{stdout()} and \code{stderr()} of the workers are
|
|
redirected, by default being discarded but they can be logged using
|
|
the \code{outfile} option. Note that the previous sentence refers to
|
|
the \emph{connections} of those names, not the C-level file handles.
|
|
Thus properly written \R{} packages using \code{Rprintf} will have
|
|
their output redirected, but not direct C-level output.
|
|
|
|
A default cluster can be registered by a call to
|
|
\code{setDefaultCluster()}: this is then used whenever one of the
|
|
higher-level functions such as \code{parApply} is called without an
|
|
explicit cluster. A little care is needed when repeatedly re-using a
|
|
pool of workers, as their workspaces will accumulate objects from past
|
|
usage, and packages may get added to the search path.
|
|
|
|
If clusters are to be created on a host other than the current machine
|
|
(\samp{localhost}), \code{makeCluster} may need to be be given more
|
|
information in the shape of extra arguments.
|
|
\begin{itemize}
|
|
\item If the worker machines are not set up in exactly the same way
|
|
as the master (for example if they are of a different
|
|
architecture), use \code{homogeneous = FALSE} and perhaps set
|
|
\code{rscript} to the full path to \command{Rscript} on the workers.
|
|
\item The worker machines need to know how to communicate with the
|
|
master: normally this can be done using the hostname found by
|
|
\code{Sys.info()}, but on private networks this need not be the
|
|
case and \code{master} may need to be supplied as a name or IP
|
|
address, e.g. \code{master = "192.168.1.111"}.
|
|
\item By default \command{ssh} is used to launch R on the workers.
|
|
If it is known by some other name, use e.g.{}
|
|
\code{rshcmd = "plink.exe"} for a Windows box using PUTTY.
|
|
SSH should be set
|
|
up to use silent authentication: setups which require a password
|
|
to be supplied may or may not work.
|
|
\item Socket communication is done over port a randomly chosen port
|
|
in the range \code{11000:11999}: site policies
|
|
might require some other port to be used, in which case set
|
|
argument \code{port} or environment variable \env{R\_PARALLEL\_PORT}.
|
|
\end{itemize}
|
|
|
|
\section{Forking}
|
|
|
|
Except on Windows, the package contains a copy of \CRANpkg{multicore}:
|
|
there a few names with the added prefix \code{mc}, e.g.{}
|
|
\code{mccollect} and \code{mcparallel}. (Package \CRANpkg{multicore}
|
|
used these names, but also the versions without the prefix which are
|
|
too easily masked: e.g.{} package \CRANpkg{lattice} used to have a
|
|
function \code{parallel}.)
|
|
|
|
The low-level functions from \CRANpkg{multicore} are provided but not
|
|
exported from the namespace.
|
|
|
|
There are high-level functions \code{mclapply} and \code{pvec}: unlike
|
|
the versions in \CRANpkg{multicore} these default to 2 cores, but this can
|
|
be controlled by setting \code{options("mc.cores")}, and that takes
|
|
its default from environment variable \code{MC\_CORES} when the
|
|
package is loaded. (Setting this to \code{1} inhibits parallel
|
|
operation: there are stub versions of these functions on Windows which
|
|
force \code{mc.cores = 1}.)
|
|
|
|
Functions \code{mcmapply} and \code{mcMap} provide analogues of
|
|
\code{mapply} and \code{Map}.
|
|
|
|
Note the earlier comments about using forking in a GUI environment.
|
|
|
|
The parent and forked \R{} processes share the per-session temporary
|
|
directory \code{tempdir()}, which can be a problem as a lot of code
|
|
has assumed it is private to the \R{} process. Further, prior to \R{}
|
|
2.14.1 it was possibly for \code{tempfile} in two processes to select
|
|
the same filename in that temporary directory, and do it sufficiently
|
|
simultaneously that neither saw it as being in use.
|
|
|
|
The forked workers share file handles with the master: this means that
|
|
any output from the worker should go to the same place as
|
|
\file{stdout} and \file{stderr} of the master process. (This does not
|
|
work reliably on all OSes: problems have also been noted when forking
|
|
a session that is processing batch input from \file{stdin}.) Setting
|
|
argument \code{mc.silent = TRUE} silences \file{stdout} for the child:
|
|
\file{stderr} is not affected.
|
|
|
|
Sharing file handles also impacts graphics devices as forked workers
|
|
inherit all open graphics devices of the parent: they should not
|
|
attempt to make use of them.
|
|
|
|
|
|
\section{Random-number generation}
|
|
|
|
Some care is needed with parallel computation using (pseudo-)random
|
|
numbers: the processes/threads which run separate parts of the
|
|
computation need to run independent (and preferably reproducible)
|
|
random-number streams. One way to avoid any difficulties is (where
|
|
possible) to do all the randomization in the master process: this is
|
|
done where possible in package \CRANpkg{boot} (version 1.3-1 and later).
|
|
|
|
When an \R{} process is started up it takes the random-number seed
|
|
from the object \code{.Random.seed} in a saved workspace or constructs
|
|
one from the clock time and process ID when random-number generation
|
|
is first used (see the help on \code{RNG}). Thus worker processes
|
|
might get the same seed because a workspace containing
|
|
\code{.Random.seed} was restored or the random number generator has
|
|
been used before forking: otherwise these get a non-reproducible seed
|
|
(but with very high probability a different seed for each worker).
|
|
|
|
The alternative is to set separate seeds for each worker process in
|
|
some reproducible way from the seed in the master process. This is
|
|
generally plenty safe enough, but there have been worries that the
|
|
random-number streams in the workers might somehow get into step. One
|
|
approach is to take the seeds a long way apart in the random-number
|
|
stream: note that random numbers taken a long (fixed) distance apart
|
|
in a single stream are not necessarily (and often are not) as
|
|
independent as those taken a short distance apart. Yet another idea
|
|
(as used by e.g.{} \pkg{JAGS}) is to use different random-number
|
|
generators for each separate run/process.
|
|
|
|
Package \pkg{parallel} contains an implementation of the ideas of
|
|
\citet{lecuyer.2002}: this uses a single RNG and make \emph{streams}
|
|
with seeds $2^{127}$ steps apart in the random number stream (which
|
|
has period approximately $2^{191}$). This is based on the generator
|
|
of \citet{lecuyer.1999}; the reason for choosing that
|
|
generator\footnote{apart from the commonality of authors!} is that it
|
|
has a fairly long period with a small seed (6 integers), and unlike
|
|
\R{}'s default \code{"Mersenne-Twister"} RNG, it is simple to advance
|
|
the seed by a fixed number of steps. The generator is the combination
|
|
of two:
|
|
\begin{eqnarray*}
|
|
x_n &=& 1403580 \times x_{n-2} - 810728 \times x_{n-3} \mod{(2^{32} - 209)}\\
|
|
y_n &=& 527612 \times y_{n-1} - 1370589 \times y_{n-3} \mod{(2^{32} - 22853)}\\
|
|
z_n &=& (x_n - y_n) \mod{4294967087}\\
|
|
u_n &=& z_n/4294967088\ \mbox{unless $z_n = 0$}
|
|
\end{eqnarray*}
|
|
The `seed' then consists of $(x_{n-3}, x_{n-2}, x_{n-1}, y_{n-3}, y_{n-2}, y_{n-1})$,
|
|
and the recursion for each of $x_n$ and $y_n$ can have
|
|
pre-computed coefficients for $k$ steps ahead. For $k = 2^{127}$, the
|
|
seed is advanced by $k$ steps by \R{} call
|
|
\code{.Random.seed <- nextRNGStream(.Random.seed)}.
|
|
|
|
% use \verb to get consistent quote mapping.
|
|
The \citet{lecuyer.1999} generator is available \emph{via}
|
|
\code{RNGkind("L'Ecuyer-CMRG")}. Thus using the ideas of
|
|
\citet{lecuyer.2002} is as simple as
|
|
<<Ecuyer-ex, eval=FALSE>>=
|
|
RNGkind("L'Ecuyer-CMRG")
|
|
set.seed(2002) # something
|
|
M <- 16 ## start M workers
|
|
s <- .Random.seed
|
|
for (i in 1:M) {
|
|
s <- nextRNGStream(s)
|
|
# send s to worker i as .Random.seed
|
|
}
|
|
@
|
|
and this is is implemented for SNOW clusters in function
|
|
\code{clusterSetRNGStream}, and as part of \code{mcparallel} and
|
|
\code{mclapply} (by default).
|
|
|
|
Apart from \emph{streams} ($2^{127}$ steps apart), there is the concept of
|
|
\emph{sub-streams} starting from seeds $2^{76}$ steps apart. Function
|
|
\code{nextRNGSubStream} advances to the next substream.
|
|
|
|
A direct \R{} interface to the (clunkier) original C implementation is
|
|
available in CRAN package \CRANpkg{rlecuyer} \citep{rlecuyer}. That works
|
|
with named streams, each of which have three 6-element seeds
|
|
associated with them. This can easily be emulated in \R{} by storing
|
|
\code{.Random.seed} at suitable times. There is another interface
|
|
using S4 classes in package \CRANpkg{rstream} \citep{rstream}.
|
|
|
|
\section{Load balancing}
|
|
|
|
The introduction mentioned a different strategy which dynamically
|
|
allocates tasks to workers: this is sometimes known as `load
|
|
balancing' and is implemented in \code{mclapply(mc.preschedule =
|
|
FALSE)}, \code{clusterApplyLB} and wrappers.
|
|
|
|
Load balancing is potentially advantageous when the tasks take quite
|
|
dissimilar amounts of computation time, or where the nodes are of
|
|
disparate capabilities. But some \emph{caveats} are in order:
|
|
\begin{enumerate}[(a)]
|
|
\item Random number streams are allocated to nodes, so if the tasks
|
|
involve random numbers they are likely to be non-repeatable (as the
|
|
allocation of tasks to nodes depends on the workloads of the nodes).
|
|
It would however take only slightly more work to allocate a stream to
|
|
each task.
|
|
\item More care is needed is allocating the tasks. If 1000 tasks need
|
|
to be allocated to 10 nodes, the standard approach send chunks of
|
|
100 tasks to each of the nodes. The load-balancing approach sends
|
|
tasks one at a time to a node, and the communication overhead may be
|
|
high. So it makes sense to have substantially more tasks than
|
|
nodes, but not by a factor of 100 (and maybe not by 10).
|
|
\end{enumerate}
|
|
|
|
|
|
\section{Setting the CPU Affinity with \code{mclapply}}
|
|
%%\author{Helena Kotthaus, Andreas Lang \\ LS12 Department of Computer Science, TU Dortmund}
|
|
|
|
The parameter \code{affinity.list} of the \code{mclapply} function
|
|
can be used to run elements of the input vector \code{X} of \code{mclapply} on
|
|
specific CPUs.
|
|
\code{affinity.list} is a vector (atomic or list) containing the CPU
|
|
affinity mask for each element of \code{X}, it describes on which CPU
|
|
(core or hyper-thread unit) a given item is allowed to run, see
|
|
\code{? mcaffinity}.
|
|
This can be helpful, if the elements of \code{X} (parallel jobs) have a
|
|
high variance of completion time or if the hardware architecture
|
|
is heterogeneous. It also enables the development of scheduling
|
|
strategies for optimizing the overall runtime of independent parallel jobs.
|
|
If \code{affinity.list} is set, the \code{mc.cores} parameter is
|
|
replaced with the number of CPU ids used in the affinity masks.
|
|
To use this parameter prescheduling has to be deactivated
|
|
(\code{mc.preschedule = FALSE}).
|
|
For each value of the input vector \code{X} a separate job is forked.
|
|
The master process only forks one child process for each selected CPU
|
|
at once, to ensure that the number of forked jobs does not exceed the number
|
|
of selected CPUs. As soon as a child process has finished, the next
|
|
available job for the CPU is forked.
|
|
|
|
The following code example demonstrates how the execution
|
|
time can be reduced by assigning the elements of \code{X} to specific CPUs,
|
|
not on Windows, where \code{mc.cores} must remain at 1.
|
|
|
|
%% setting eval=TRUE would cost ca. 1.5 minutes every time this is built
|
|
%% also, .Platform$OS.type == "Windows" cannot use mc.cores = 2
|
|
<<affinity-ex, eval=FALSE>>=
|
|
## Exemplary variance filter executed on three different matrices in parallel.
|
|
## Can be used in gene expression analysis as a prefilter
|
|
## for the number of covariates.
|
|
|
|
library(parallel)
|
|
n <- 300 # observations
|
|
p <- 20000 # covariates
|
|
|
|
## Different sized matrices as filter inputs
|
|
## Matrix A and B form smaller work loads
|
|
## while matrix C forms a bigger workload (2*p)
|
|
library(stats)
|
|
A <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p)
|
|
B <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p)
|
|
C <- matrix(replicate(2*p, rnorm(n, sd = runif(1, 0.1, 10))), n, 2*p)
|
|
|
|
varFilter <- function (X, nSim = 20) {
|
|
for (i in 1:nSim) {
|
|
train <- sample(nrow(X), 2 / 3 * nrow(X))
|
|
colVars <- apply(X[train, ], 2, var)
|
|
keep <- names(head(sort(colVars, decreasing = TRUE), 100))
|
|
# myAlgorithm(X[, keep])
|
|
}
|
|
}
|
|
|
|
## Runtime comparison -----------------------------------
|
|
|
|
## mclapply with affinity.list
|
|
## CPU mapping: A and B run on CPU 1 while C runs on CPU 2:
|
|
affinity <- c(1,1,2)
|
|
system.time(
|
|
mclapply(X = list(A,B,C), FUN = varFilter,
|
|
mc.preschedule = FALSE, affinity.list = affinity))
|
|
## user system elapsed
|
|
## 34.909 0.873 36.720
|
|
|
|
|
|
## mclapply without affinity.list
|
|
system.time(
|
|
mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2,
|
|
mc.preschedule = FALSE) )
|
|
## user system elapsed
|
|
## 72.893 1.588 55.982
|
|
|
|
|
|
## mclapply with prescheduling
|
|
system.time(
|
|
mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2,
|
|
mc.preschedule = TRUE) )
|
|
## user system elapsed
|
|
## 53.455 1.326 53.399
|
|
@
|
|
|
|
Instead of using \code{affinity.list} for runtime optimization,
|
|
it can also be used to simply restrict the computation
|
|
to specific CPUs as in the following example.
|
|
|
|
<<eval = FALSE>>=
|
|
## Restricts all elements of X to run on CPU 1 and 2.
|
|
X <- list(1, 2, 3)
|
|
affinity.list <- list(c(1,2), c(1,2), c(1,2))
|
|
mclapply(X = X, FUN = function (i) i*i,
|
|
mc.preschedule = FALSE, affinity.list = affinity.list)
|
|
@
|
|
|
|
|
|
\section{Portability considerations}
|
|
|
|
People wanting to provide parallel facilities in their code need to
|
|
decide how hard they want to try to be portable and efficient: no
|
|
approach works optimally on all platforms.
|
|
|
|
Using \code{mclapply} is usually the simplest approach, but will run
|
|
serial versions of the code on Windows. This may suffice where
|
|
parallel computation is only required for use on a single multi-core
|
|
Unix-alike server---for \code{mclapply} can only run on a single
|
|
shared-memory system. There is fallback to serial use when needed, by
|
|
setting \code{mc.cores = 1}.
|
|
|
|
Using \code{parLapply} will work everywhere that socket communication
|
|
works, and can be used, for example, to harness all the CPU cores in a
|
|
lab of machines that are not otherwise in use. But socket
|
|
communication may be blocked even when using a single machine and is
|
|
quite likely to be blocked between machines in a lab. There is not
|
|
currently any fallback to serial use, nor could there easily be (as
|
|
the workers start with a different \R{} environment from the one
|
|
current on the master).
|
|
|
|
An example of providing access to both approaches as well as serial
|
|
code is package \CRANpkg{boot}, version \code{1.3-3} or later.
|
|
|
|
\section{Extended examples}
|
|
|
|
\SweaveOpts{eval=FALSE}
|
|
<<hide=TRUE>>=
|
|
library(parallel)
|
|
@
|
|
|
|
Probably the most common use of coarse-grained parallelization in
|
|
statistics is to do multiple simulation runs, for example to do large
|
|
numbers of bootstrap replicates or several runs of an MCMC simulation.
|
|
We show an example of each.
|
|
|
|
Note that some of the examples will only work serially on Windows and
|
|
some actually are computer-intensive.
|
|
|
|
\subsection{Bootstrapping}
|
|
|
|
Package \CRANpkg{boot} \citep{boot} is support software for the monograph
|
|
by \citet{Davison.Hinkley.97}. Bootstrapping is often used as an
|
|
example of easy parallelization, and some methods of producing
|
|
confidence intervals require many thousands of bootstrap samples.
|
|
As from version \code{1.3-1} the package itself has parallel support
|
|
within its main functions, but we illustrate how to use the original
|
|
(serial) functions in parallel computations.
|
|
|
|
We consider two examples using the \code{cd4} dataset from package
|
|
\CRANpkg{boot} where the interest is in the correlation between before and
|
|
after measurements. The first is a straight simulation, often called
|
|
a \emph{parametric bootstrap}. The non-parallel form is
|
|
<<>>=
|
|
library(boot)
|
|
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
|
|
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
|
|
cd4.boot <- boot(cd4, corr, R = 999, sim = "parametric",
|
|
ran.gen = cd4.rg, mle = cd4.mle)
|
|
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
|
|
conf = 0.9, h = atanh, hinv = tanh)
|
|
@
|
|
|
|
To do this with \code{mclapply} we need to break this into separate
|
|
runs, and we will illustrate two runs of 500 simulations each:
|
|
<<>>=
|
|
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
|
|
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
|
|
run1 <- function(...) boot(cd4, corr, R = 500, sim = "parametric",
|
|
ran.gen = cd4.rg, mle = cd4.mle)
|
|
mc <- 2 # set as appropriate for your hardware
|
|
## To make this reproducible:
|
|
set.seed(123, "L'Ecuyer")
|
|
cd4.boot <- do.call(c, mclapply(seq_len(mc), run1) )
|
|
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
|
|
conf = 0.9, h = atanh, hinv = tanh)
|
|
@
|
|
There are many ways to program things like this: often the neatest is
|
|
to encapsulate the computation in a function, so this is the parallel
|
|
form of
|
|
<<eval=FALSE>>=
|
|
do.call(c, lapply(seq_len(mc), run1))
|
|
@
|
|
|
|
To run this with \code{parLapply} we could take a similar approach by
|
|
<<>>=
|
|
run1 <- function(...) {
|
|
library(boot)
|
|
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
|
|
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
|
|
boot(cd4, corr, R = 500, sim = "parametric",
|
|
ran.gen = cd4.rg, mle = cd4.mle)
|
|
}
|
|
cl <- makeCluster(mc)
|
|
## make this reproducible
|
|
clusterSetRNGStream(cl, 123)
|
|
library(boot) # needed for c() method on master
|
|
cd4.boot <- do.call(c, parLapply(cl, seq_len(mc), run1) )
|
|
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
|
|
conf = 0.9, h = atanh, hinv = tanh)
|
|
stopCluster(cl)
|
|
@
|
|
Note that whereas with \code{mclapply} all the packages and objects
|
|
we use are automatically available on the workers, this is not in
|
|
general\footnote{it is with clusters set up with
|
|
\code{makeForkCluster}.} the case with the \code{parLapply}
|
|
approach. There is often a delicate choice of where to do the
|
|
computations: for example we could compute \code{cd4.mle} on the
|
|
workers (as above) or on the master and send the value to the
|
|
workers. We illustrate the latter by the following code
|
|
<<>>=
|
|
cl <- makeCluster(mc)
|
|
cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
|
|
cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
|
|
clusterExport(cl, c("cd4.rg", "cd4.mle"))
|
|
junk <- clusterEvalQ(cl, library(boot)) # discard result
|
|
clusterSetRNGStream(cl, 123)
|
|
res <- clusterEvalQ(cl, boot(cd4, corr, R = 500,
|
|
sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle))
|
|
library(boot) # needed for c() method on master
|
|
cd4.boot <- do.call(c, res)
|
|
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
|
|
conf = 0.9, h = atanh, hinv = tanh)
|
|
stopCluster(cl)
|
|
@
|
|
|
|
Running the double bootstrap on the same problem is far more
|
|
computer-intensive. The standard version is
|
|
<<fig=TRUE>>=
|
|
R <- 999; M <- 999 ## we would like at least 999 each
|
|
cd4.nest <- boot(cd4, nested.corr, R=R, stype="w", t0=corr(cd4), M=M)
|
|
## nested.corr is a function in package boot
|
|
op <- par(pty = "s", xaxs = "i", yaxs = "i")
|
|
qqplot((1:R)/(R+1), cd4.nest$t[, 2], pch = ".", asp = 1,
|
|
xlab = "nominal", ylab = "estimated")
|
|
abline(a = 0, b = 1, col = "grey")
|
|
abline(h = 0.05, col = "grey")
|
|
abline(h = 0.95, col = "grey")
|
|
par(op)
|
|
|
|
nominal <- (1:R)/(R+1)
|
|
actual <- cd4.nest$t[, 2]
|
|
100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))]
|
|
@
|
|
which took about 55 secs on one core of an 8-core Linux server.
|
|
|
|
Using \code{mclapply} we could use
|
|
<<eval=FALSE>>=
|
|
mc <- 9
|
|
R <- 999; M <- 999; RR <- floor(R/mc)
|
|
run2 <- function(...)
|
|
cd4.nest <- boot(cd4, nested.corr, R=RR, stype="w", t0=corr(cd4), M=M)
|
|
cd4.nest <- do.call(c, mclapply(seq_len(mc), run2, mc.cores = mc) )
|
|
nominal <- (1:R)/(R+1)
|
|
actual <- cd4.nest$t[, 2]
|
|
100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))]
|
|
@
|
|
which ran in 11 secs (elapsed) using all of that server.
|
|
|
|
\subsection{MCMC runs}
|
|
|
|
\citet{Ripley.88} discusses the maximum-likelihood estimation of the
|
|
Strauss process, which is done by solving a moment equation
|
|
\[
|
|
E_c T = t
|
|
\]
|
|
where $T$ is the number of $R$-close pairs and $t$ is the observed
|
|
value, $30$ in the following example. A serial approach to the
|
|
initial exploration might be
|
|
<<>>=
|
|
library(spatial)
|
|
towns <- ppinit("towns.dat")
|
|
tget <- function(x, r=3.5) sum(dist(cbind(x$x, x$y)) < r)
|
|
t0 <- tget(towns)
|
|
R <- 1000
|
|
c <- seq(0, 1, 0.1)
|
|
## res[1] = 0
|
|
res <- c(0, sapply(c[-1], function(c)
|
|
mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))))
|
|
plot(c, res, type="l", ylab="E t")
|
|
abline(h=t0, col="grey")
|
|
@
|
|
which takes about 20 seconds today, but many hours when first done
|
|
in 1985. A parallel version might be
|
|
<<>>=
|
|
run3 <- function(c) {
|
|
library(spatial)
|
|
towns <- ppinit("towns.dat") # has side effects
|
|
mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
|
|
}
|
|
cl <- makeCluster(10, methods = FALSE)
|
|
clusterExport(cl, c("R", "towns", "tget"))
|
|
res <- c(0, parSapply(cl, c[-1], run3)) # 10 tasks
|
|
stopCluster(cl)
|
|
@
|
|
which took about 4.5 secs, plus 2 secs to set up the cluster. Using a
|
|
fork cluster (not on Windows) makes the startup much faster and setup easier:
|
|
<<eval=FALSE>>=
|
|
cl <- makeForkCluster(10) # fork after the variables have been set up
|
|
run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
|
|
res <- c(0, parSapply(cl, c[-1], run4))
|
|
stopCluster(cl)
|
|
@
|
|
As one might expect, the \code{mclapply} version is slightly simpler:
|
|
<<eval=FALSE>>=
|
|
run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
|
|
res <- c(0, unlist(mclapply(c[-1], run4, mc.cores = 10)))
|
|
@
|
|
If you do not have as many as 10 cores, you might want to consider
|
|
load-balancing in a task like this as the time taken per simulation
|
|
does vary with \code{c}. This can be done using
|
|
\code{mclapply(mc.preschedule = FALSE)} or \code{parSapplyLB}. The
|
|
disadvantage is that the results would not be reproducible (which does
|
|
not matter here).
|
|
|
|
\subsection{Package installation}
|
|
|
|
With over 4000 \R{} packages available, it is often helpful to do a
|
|
comprehensive install in parallel. We provide facilities to do so in
|
|
\code{install.packages} \emph{via} a parallel \command{make}, but that
|
|
approach may not be suitable.\footnote{A parallel \command{make} might
|
|
not be available, and we have seen a couple of instances of package
|
|
installation not working correctly when run from \command{make}.}
|
|
Some of the tasks take many times longer than others, and we do not
|
|
know in advance which these are.
|
|
|
|
We illustrate an approach using package \pkg{parallel} which is used
|
|
on part of the CRAN check farm. Suppose that there is a function
|
|
\code{do\_one(pkg)} which installs a single package and then returns.
|
|
Then the task is to run \code{do\_one} on as many of the \code{M}
|
|
workers as possible whilst ensuring that all of the direct and
|
|
indirect dependencies of \code{pkg} are installed before \code{pkg}
|
|
itself. As the installation of a single package can block several
|
|
others, we do need to allow the number of installs running
|
|
simultaneously to vary: the following code achieves that, but needs to
|
|
use low-level functions to do so.
|
|
|
|
<<eval=FALSE>>=
|
|
pkgs <- "<names of packages to be installed>"
|
|
M <- 20 # number of parallel installs
|
|
M <- min(M, length(pkgs))
|
|
library(parallel)
|
|
unlink("install_log")
|
|
cl <- makeCluster(M, outfile = "install_log")
|
|
clusterExport(cl, c("tars", "fakes", "gcc")) # variables needed by do_one
|
|
|
|
## set up available via a call to available.packages() for
|
|
## repositories containing all the packages involved and all their
|
|
## dependencies.
|
|
DL <- utils:::.make_dependency_list(pkgs, available, recursive = TRUE)
|
|
DL <- lapply(DL, function(x) x[x %in% pkgs])
|
|
lens <- sapply(DL, length)
|
|
ready <- names(DL[lens == 0L])
|
|
done <- character() # packages already installed
|
|
n <- length(ready)
|
|
submit <- function(node, pkg)
|
|
parallel:::sendCall(cl[[node]], do_one, list(pkg), tag = pkg)
|
|
for (i in 1:min(n, M)) submit(i, ready[i])
|
|
DL <- DL[!names(DL) %in% ready[1:min(n, M)]]
|
|
av <- if(n < M) (n+1L):M else integer() # available workers
|
|
while(length(done) < length(pkgs)) {
|
|
d <- parallel:::recvOneResult(cl)
|
|
av <- c(av, d$node)
|
|
done <- c(done, d$tag)
|
|
OK <- unlist(lapply(DL, function(x) all(x %in% done) ))
|
|
if (!any(OK)) next
|
|
p <- names(DL)[OK]
|
|
m <- min(length(p), length(av)) # >= 1
|
|
for (i in 1:m) submit(av[i], p[i])
|
|
av <- av[-(1:m)]
|
|
DL <- DL[!names(DL) %in% p[1:m]]
|
|
}
|
|
@
|
|
|
|
\subsection{Passing \code{...}}
|
|
|
|
The semantics of \code{...} do not fit well with parallel operation,
|
|
since lazy evaluation may be delayed until the tasks have been sent to
|
|
the workers. This is no problem in the forking approach, as the
|
|
information needed for lazy evaluation will be present in the forked workers.
|
|
|
|
For \CRANpkg{snow}-like clusters the trick is to ensure that \code{...}
|
|
has any promises forced whilst the information is still available.
|
|
This is how \CRANpkg{boot} does it:
|
|
<<eval=FALSE>>=
|
|
fn <- function(r) statistic(data, i[r, ], ...)
|
|
RR <- sum(R)
|
|
res <- if (ncpus > 1L && (have_mc || have_snow)) {
|
|
if (have_mc) {
|
|
parallel::mclapply(seq_len(RR), fn, mc.cores = ncpus)
|
|
} else if (have_snow) {
|
|
list(...) # evaluate any promises
|
|
if (is.null(cl)) {
|
|
cl <- parallel::makePSOCKcluster(rep("localhost", ncpus))
|
|
if(RNGkind()[1L] == "L'Ecuyer-CMRG")
|
|
parallel::clusterSetRNGStream(cl)
|
|
res <- parallel::parLapply(cl, seq_len(RR), fn)
|
|
parallel::stopCluster(cl)
|
|
res
|
|
} else parallel::parLapply(cl, seq_len(RR), fn)
|
|
}
|
|
} else lapply(seq_len(RR), fn)
|
|
@
|
|
Note that \code{...} is an argument to \code{boot}, and so after
|
|
<<eval=FALSE>>=
|
|
list(...) # evaluate any promises
|
|
@
|
|
it refers to objects within the evaluation frame of \code{boot} and
|
|
hence the environment of \code{fn} which will therefore be sent to the
|
|
workers along with \code{fn}.
|
|
|
|
\section{Differences from earlier versions}
|
|
|
|
The support of parallel RNG differs from \CRANpkg{snow}: \CRANpkg{multicore}
|
|
had no support.
|
|
|
|
\subsection{Differences from multicore}
|
|
|
|
\CRANpkg{multicore} had quite elaborate code to inhibit the Aqua event
|
|
loop for \command{R.app} and the event loop for the \code{quartz}
|
|
graphics device. This has been replaced by recording a flag in the
|
|
\R{} executable for a child process.
|
|
|
|
Functions \code{fork} and \code{kill} have \code{mc} prefixes and are
|
|
not exported. This avoids confusion with other packages (such as
|
|
package \CRANpkg{fork}), and note that \code{mckill} is not as general as
|
|
\code{tools::pskill}.
|
|
|
|
Aliased functions \code{collect} and \code{parallel} are no longer provided.
|
|
|
|
\subsection{Differences from snow}
|
|
|
|
\CRANpkg{snow} set a timeout that exceeded the maximum value required by
|
|
POSIX, and did not provide a means to set the timeout on the workers.
|
|
This resulted in process interlocks on Solaris.
|
|
|
|
\code{makeCluster} creates MPI %% or NWS
|
|
clusters by calling \CRANpkg{snow}.
|
|
|
|
\code{makePSOCKcluster} has been streamlined, as package \pkg{parallel} is in
|
|
a known location on all systems, and \command{Rscript} is these days
|
|
always available. Logging of workers is set up to append to the file,
|
|
so multiple processes can be logged.
|
|
|
|
\code{parSapply} has been brought into line with \code{sapply}.
|
|
|
|
\code{clusterMap()} gains \code{SIMPLIFY} and \code{USE.NAMES}
|
|
arguments to make it a parallel version of \code{mapply} and \code{Map}.
|
|
|
|
The timing interface has not been copied.
|
|
|
|
\bibliographystyle{jss}
|
|
\bibliography{parallel}
|
|
|
|
\end{document}
|
|
|
|
%%% Local Variables:
|
|
%%% mode: latex
|
|
%%% TeX-master: t
|
|
%%% End:
|