2025-01-12 04:36:52 +08:00

304 lines
11 KiB
R

### R code from vignette source 'parallel.Rnw'
###################################################
### code chunk number 1: Ecuyer-ex (eval = FALSE)
###################################################
## RNGkind("L'Ecuyer-CMRG")
## set.seed(2002) # something
## M <- 16 ## start M workers
## s <- .Random.seed
## for (i in 1:M) {
## s <- nextRNGStream(s)
## # send s to worker i as .Random.seed
## }
###################################################
### code chunk number 2: affinity-ex (eval = FALSE)
###################################################
## ## Exemplary variance filter executed on three different matrices in parallel.
## ## Can be used in gene expression analysis as a prefilter
## ## for the number of covariates.
##
## library(parallel)
## n <- 300 # observations
## p <- 20000 # covariates
##
## ## Different sized matrices as filter inputs
## ## Matrix A and B form smaller work loads
## ## while matrix C forms a bigger workload (2*p)
## library(stats)
## A <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p)
## B <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p)
## C <- matrix(replicate(2*p, rnorm(n, sd = runif(1, 0.1, 10))), n, 2*p)
##
## varFilter <- function (X, nSim = 20) {
## for (i in 1:nSim) {
## train <- sample(nrow(X), 2 / 3 * nrow(X))
## colVars <- apply(X[train, ], 2, var)
## keep <- names(head(sort(colVars, decreasing = TRUE), 100))
## # myAlgorithm(X[, keep])
## }
## }
##
## ## Runtime comparison -----------------------------------
##
## ## mclapply with affinity.list
## ## CPU mapping: A and B run on CPU 1 while C runs on CPU 2:
## affinity <- c(1,1,2)
## system.time(
## mclapply(X = list(A,B,C), FUN = varFilter,
## mc.preschedule = FALSE, affinity.list = affinity))
## ## user system elapsed
## ## 34.909 0.873 36.720
##
##
## ## mclapply without affinity.list
## system.time(
## mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2,
## mc.preschedule = FALSE) )
## ## user system elapsed
## ## 72.893 1.588 55.982
##
##
## ## mclapply with prescheduling
## system.time(
## mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2,
## mc.preschedule = TRUE) )
## ## user system elapsed
## ## 53.455 1.326 53.399
###################################################
### code chunk number 3: parallel.Rnw:531-536 (eval = FALSE)
###################################################
## ## Restricts all elements of X to run on CPU 1 and 2.
## X <- list(1, 2, 3)
## affinity.list <- list(c(1,2), c(1,2), c(1,2))
## mclapply(X = X, FUN = function (i) i*i,
## mc.preschedule = FALSE, affinity.list = affinity.list)
###################################################
### code chunk number 4: parallel.Rnw:568-569 (eval = FALSE)
###################################################
## library(parallel)
###################################################
### code chunk number 5: parallel.Rnw:594-601 (eval = FALSE)
###################################################
## library(boot)
## cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
## cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
## cd4.boot <- boot(cd4, corr, R = 999, sim = "parametric",
## ran.gen = cd4.rg, mle = cd4.mle)
## boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
## conf = 0.9, h = atanh, hinv = tanh)
###################################################
### code chunk number 6: parallel.Rnw:606-616 (eval = FALSE)
###################################################
## cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
## cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
## run1 <- function(...) boot(cd4, corr, R = 500, sim = "parametric",
## ran.gen = cd4.rg, mle = cd4.mle)
## mc <- 2 # set as appropriate for your hardware
## ## To make this reproducible:
## set.seed(123, "L'Ecuyer")
## cd4.boot <- do.call(c, mclapply(seq_len(mc), run1) )
## boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
## conf = 0.9, h = atanh, hinv = tanh)
###################################################
### code chunk number 7: parallel.Rnw:621-622 (eval = FALSE)
###################################################
## do.call(c, lapply(seq_len(mc), run1))
###################################################
### code chunk number 8: parallel.Rnw:626-641 (eval = FALSE)
###################################################
## run1 <- function(...) {
## library(boot)
## cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
## cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
## boot(cd4, corr, R = 500, sim = "parametric",
## ran.gen = cd4.rg, mle = cd4.mle)
## }
## cl <- makeCluster(mc)
## ## make this reproducible
## clusterSetRNGStream(cl, 123)
## library(boot) # needed for c() method on master
## cd4.boot <- do.call(c, parLapply(cl, seq_len(mc), run1) )
## boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
## conf = 0.9, h = atanh, hinv = tanh)
## stopCluster(cl)
###################################################
### code chunk number 9: parallel.Rnw:651-664 (eval = FALSE)
###################################################
## cl <- makeCluster(mc)
## cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
## cd4.mle <- list(m = colMeans(cd4), v = var(cd4))
## clusterExport(cl, c("cd4.rg", "cd4.mle"))
## junk <- clusterEvalQ(cl, library(boot)) # discard result
## clusterSetRNGStream(cl, 123)
## res <- clusterEvalQ(cl, boot(cd4, corr, R = 500,
## sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle))
## library(boot) # needed for c() method on master
## cd4.boot <- do.call(c, res)
## boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
## conf = 0.9, h = atanh, hinv = tanh)
## stopCluster(cl)
###################################################
### code chunk number 10: parallel.Rnw:669-683 (eval = FALSE)
###################################################
## R <- 999; M <- 999 ## we would like at least 999 each
## cd4.nest <- boot(cd4, nested.corr, R=R, stype="w", t0=corr(cd4), M=M)
## ## nested.corr is a function in package boot
## op <- par(pty = "s", xaxs = "i", yaxs = "i")
## qqplot((1:R)/(R+1), cd4.nest$t[, 2], pch = ".", asp = 1,
## xlab = "nominal", ylab = "estimated")
## abline(a = 0, b = 1, col = "grey")
## abline(h = 0.05, col = "grey")
## abline(h = 0.95, col = "grey")
## par(op)
##
## nominal <- (1:R)/(R+1)
## actual <- cd4.nest$t[, 2]
## 100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))]
###################################################
### code chunk number 11: parallel.Rnw:688-696 (eval = FALSE)
###################################################
## mc <- 9
## R <- 999; M <- 999; RR <- floor(R/mc)
## run2 <- function(...)
## cd4.nest <- boot(cd4, nested.corr, R=RR, stype="w", t0=corr(cd4), M=M)
## cd4.nest <- do.call(c, mclapply(seq_len(mc), run2, mc.cores = mc) )
## nominal <- (1:R)/(R+1)
## actual <- cd4.nest$t[, 2]
## 100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))]
###################################################
### code chunk number 12: parallel.Rnw:710-721 (eval = FALSE)
###################################################
## library(spatial)
## towns <- ppinit("towns.dat")
## tget <- function(x, r=3.5) sum(dist(cbind(x$x, x$y)) < r)
## t0 <- tget(towns)
## R <- 1000
## c <- seq(0, 1, 0.1)
## ## res[1] = 0
## res <- c(0, sapply(c[-1], function(c)
## mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))))
## plot(c, res, type="l", ylab="E t")
## abline(h=t0, col="grey")
###################################################
### code chunk number 13: parallel.Rnw:725-734 (eval = FALSE)
###################################################
## run3 <- function(c) {
## library(spatial)
## towns <- ppinit("towns.dat") # has side effects
## mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
## }
## cl <- makeCluster(10, methods = FALSE)
## clusterExport(cl, c("R", "towns", "tget"))
## res <- c(0, parSapply(cl, c[-1], run3)) # 10 tasks
## stopCluster(cl)
###################################################
### code chunk number 14: parallel.Rnw:738-742 (eval = FALSE)
###################################################
## cl <- makeForkCluster(10) # fork after the variables have been set up
## run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
## res <- c(0, parSapply(cl, c[-1], run4))
## stopCluster(cl)
###################################################
### code chunk number 15: parallel.Rnw:745-747 (eval = FALSE)
###################################################
## run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5))))
## res <- c(0, unlist(mclapply(c[-1], run4, mc.cores = 10)))
###################################################
### code chunk number 16: parallel.Rnw:778-812 (eval = FALSE)
###################################################
## pkgs <- "<names of packages to be installed>"
## M <- 20 # number of parallel installs
## M <- min(M, length(pkgs))
## library(parallel)
## unlink("install_log")
## cl <- makeCluster(M, outfile = "install_log")
## clusterExport(cl, c("tars", "fakes", "gcc")) # variables needed by do_one
##
## ## set up available via a call to available.packages() for
## ## repositories containing all the packages involved and all their
## ## dependencies.
## DL <- utils:::.make_dependency_list(pkgs, available, recursive = TRUE)
## DL <- lapply(DL, function(x) x[x %in% pkgs])
## lens <- sapply(DL, length)
## ready <- names(DL[lens == 0L])
## done <- character() # packages already installed
## n <- length(ready)
## submit <- function(node, pkg)
## parallel:::sendCall(cl[[node]], do_one, list(pkg), tag = pkg)
## for (i in 1:min(n, M)) submit(i, ready[i])
## DL <- DL[!names(DL) %in% ready[1:min(n, M)]]
## av <- if(n < M) (n+1L):M else integer() # available workers
## while(length(done) < length(pkgs)) {
## d <- parallel:::recvOneResult(cl)
## av <- c(av, d$node)
## done <- c(done, d$tag)
## OK <- unlist(lapply(DL, function(x) all(x %in% done) ))
## if (!any(OK)) next
## p <- names(DL)[OK]
## m <- min(length(p), length(av)) # >= 1
## for (i in 1:m) submit(av[i], p[i])
## av <- av[-(1:m)]
## DL <- DL[!names(DL) %in% p[1:m]]
## }
###################################################
### code chunk number 17: parallel.Rnw:825-842 (eval = FALSE)
###################################################
## fn <- function(r) statistic(data, i[r, ], ...)
## RR <- sum(R)
## res <- if (ncpus > 1L && (have_mc || have_snow)) {
## if (have_mc) {
## parallel::mclapply(seq_len(RR), fn, mc.cores = ncpus)
## } else if (have_snow) {
## list(...) # evaluate any promises
## if (is.null(cl)) {
## cl <- parallel::makePSOCKcluster(rep("localhost", ncpus))
## if(RNGkind()[1L] == "L'Ecuyer-CMRG")
## parallel::clusterSetRNGStream(cl)
## res <- parallel::parLapply(cl, seq_len(RR), fn)
## parallel::stopCluster(cl)
## res
## } else parallel::parLapply(cl, seq_len(RR), fn)
## }
## } else lapply(seq_len(RR), fn)
###################################################
### code chunk number 18: parallel.Rnw:845-846 (eval = FALSE)
###################################################
## list(...) # evaluate any promises