A new parallel programming package named parallel
will be included in R 2.14.0, tentatively
scheduled for release on October 31, 2011. It is derived from the snow
and multicore
packages, providing many of the same
functions as those packages. Some of the functions derived from multicore
have been renamed by adding the prefix
“mc.”, and some of the arguments to mclapply()
have been changed a bit, but if you
have read the snow
and multicore
chapters of this book, you will have
very little difficulty learning to use parallel
.
This is an exciting development, since it makes parallel computing in
R more mainstream. Hopefully the parallel
package will be used from other standard packages, giving many more users
the benefit of parallel computing, perhaps without knowing that they’re
using it.[39]
An important feature of parallel
is
its integration with the new L’Ecuyer-CMRG random
number generator (RNG), also new in R 2.14.0. The seed of this generator can
be easily advanced a given number of steps, making it very useful as a
parallel RNG. This is accomplished using the same concepts used in the
rlecuyer
package, but it is a completely
new implementation, so parallel
has no
dependency on the rlecuyer
package
itself.
In particular, the multicore
derived functions in parallel
now have
true parallel RNG support, solving the biggest “gotcha” in the multicore
package.
Warning
This chapter was written using an experimental version of the
parallel
package using the development
version of R 2.14.0. Officially, anything in the package can change or be
removed without notice until October 2011, which is just after the
“all-in” date for this book. However, this is such an important package
for parallel computing with R that I really wanted to include it in this
book.
Motivation: You have an R script
that spends two days executing a function using lapply()
on your laptop.
Solution: Replace lapply()
with the mclapply()
function from the multicore
package, and consider using parLapply()
if you have a cluster handy.
Good because: It comes built it
as of R 2.14.0, and there isn’t much to learn if you’ve used snow
or multicore
before.
Note
Since the parallel
package has
so much in common with the snow
and
multicore
packages, I don’t want to
repeat all of the material that I just covered in the last two chapters.
Instead, I assume that you’ve either read the snow
and multicore
chapters of this book, or are
already reasonably familiar with those packages.
parallel
can be used to run on
Posix-based multicore systems using functions such as mclapply()
and mcparallel()
that were
derived from the multicore
package. But
parallel
can also
be used with a “PSOCK” cluster and functions such as parLapply()
and clusterApplyLB()
to execute on multicore Windows
systems, as well as Linux clusters. It can also be used with cluster
objects that were created using snow
,
making it possible to use parallel
with
MPI as the transport.
In other words, it addresses essentially everything addressed by the
snow
and multicore
packages.
This is the real beauty of parallel
. If you’re using R 2.14.0 or later,
it’s already installed: you don’t need to install any additional packages
unless you want to use the MPI, PVM, or NetWorkSpaces transports.
If you have any doubts, you can try loading it:
library(parallel)
If this fails, you should check the version of R that you’re using with:
R.version.string
You need to have R 2.14.0 or better to use parallel
.
If you’re using a Posix-based system, such as Linux or Mac OS X,
you can use the multicore
derived
functions, such as mclapply()
.
Mclapply
is basically the same as the
version in the multicore
package,
except that a couple of the arguments work slightly differently. For
example, the mc.cores
argument
doesn’t automatically detect the number of cores in the machine.
However, the parallel
package does
include a function to do that, called detectCores()
.[40]
Here’s the parallel K-Means example for the parallel
package using mclapply()
. It is very similar to the version
in the multicore
chapter, except that
it loads parallel
, uses detectCores()
to specify the value of the
mc.cores
argument, and uses the
parallel RNG as a bonus:
library(parallel) library(MASS) RNGkind("L'Ecuyer-CMRG") mc.cores <- detectCores() results <- mclapply(rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart), mc.cores=mc.cores) i <- sapply(results, function(result) result$tot.withinss) result <- results[[which.min(i)]]
We’ll discuss the use of RNGkind("L'Ecuyer-CMRG")
in Parallel Random Number Generation.
The default value of the mc.cores
argument is getOption("mc.cores", 2L)
,[41] so you might want to add the following line to the
beginning of your scripts when converting from multicore
to parallel
:
options(mc.cores=detectCores())
Then mclapply()
and pvec()
will work more like that do in multicore
.
If you’re using Windows, you need to use the snow
derived API in parallel
. The following parallel K-Means
example works on any platform supported by the parallel
package:
library(parallel) cl <- makeCluster(detectCores()) clusterSetRNGStream(cl) clusterEvalQ(cl, library(MASS)) results <- clusterApply(cl, rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart)) i <- sapply(results, function(result) result$tot.withinss) result <- results[[which.min(i)]] stopCluster(cl)
This is very similar to the K-Means example in the snow
chapter. The difference is in loading
parallel
, creating the cluster
object, and enabling parallel random number generation. As with snow
, we use the makeCluster()
function, but in parallel
, the type
argument doesn’t need to be specified.
We’ll discuss the parallel
version of
makeCluster()
in more depth in the
next section, and parallel random number generation in Parallel Random Number Generation.
If you’re running on Windows or a Linux cluster, you can’t use
multicore
derived functions such as
mclapply()
and pvec()
. Instead you’ll need to use snow
derived functions such as parLapply()
and clusterApplyLB()
. The first argument to these
functions is a cluster object, so before you can use one of these
functions, you’ll have to create a cluster object.
The parallel
package comes with
two transports: “PSOCK” and “FORK”. The “PSOCK” transport is a
streamlined version of snow
’s “SOCK”
transport. It starts workers using the Rscript
command, and communicates between the
master and workers using socket connections.
As in snow
, the makeCluster()
function creates a cluster
object. The default value of the type
argument is “PSOCK”, so we can create a “PSOCK” cluster with four local
workers using the command:
cl <- makeCluster(4)
It’s often useful to specify the cluster size using the detectCores()
function:
cl <- makeCluster(detectCores())
If you have ssh
installed, you
can specify a list of machines for the first argument:
cl <- makeCluster(c("n1", "n2", "n3", "n4"))
Note that this is nearly identical to the way that socket clusters
are created in snow
, except that we
never need to specify the type
argument.
The “FORK” transport starts workers using the mcfork()
function, and communicates between
the master and workers using socket connections.
To create a “FORK” cluster, use makeCluster()
with type
set to “FORK”:
cl <- makeCluster(4, type="FORK")
You cannot start workers on remote machines with a “FORK” cluster,
since mcfork()
is built on the
fork()
system call, which only
creates processes on the local machine. Also, “FORK” clusters are only
supported on Posix-based systems, not Windows, since fork()
is a Posix system call.
An interesting feature of “FORK” clusters is that the workers
inherit the data and environment of the master process. This is like the
workers that are automatically started by mclapply()
, but unlike the workers started in
a “PSOCK” cluster. That can be useful, but it’s important to remember
that a “FORK” cluster is persistent, like a “PSOCK” cluster, and unlike
the workers started by mclapply()
.
Thus, variables created on the master after creating the “FORK” cluster
will not magically appear on the workers, as in mclapply()
. You would have to always create a
new “FORK” cluster immediately before calling parLapply()
, for example, to emulate the
behaviour of mclapply()
. But since
that won’t work with any other type of cluster object, you should
probably just use mclapply()
.
Since “FORK” clusters can be created quickly, they can be useful
when parallelizing lapply()
operations that are deep in some package, but you don’t want to use a
global variable or add an argument to dozens of functions in order to
pass the cluster object to the appropriate function. In that case, you
can just create the cluster object right where you need it, and shut it
down afterwards. Here’s one way that you could create and use a
one shot cluster object with parallel
that would be about as fast as using
mclapply()
on a Posix-based system,
but would also work on Windows:
type <- if (exists("mcfork", mode="function")) "FORK" else "PSOCK" cores <- getOption("mc.cores", detectCores()) cl <- makeCluster(cores, type=type) results <- parLapply(cl, 1:100, sqrt) stopCluster(cl)
Of course, you could also use mclapply()
instead of a “FORK” cluster if you
prefer.
The parallel random number generation support is perhaps the most
interesting and important feature of parallel
. It uses the ideas of the rlecuyer
package, but not the code.
To use this new support in the multicore
derived functions, simply set the
random number generator to "L’Ecuyer-CMRG"
using the
RNGkind()
function, and leave
mc.set.seed
to TRUE
:
RNGkind("L'Ecuyer-CMRG") mclapply(1:2, function(i) rnorm(1))
The first time that one of the multicore
derived, high-level functions is
called, the parallel random number generator is initialized. Each worker
that is started by any high-level function will get a new random number
stream. If the mc.reset.stream()
function is called, the parallel random number generator is
reinitialized using the current seed on the master.
Warning
At the time of this writing, during the development of parallel
, mc.reset.stream()
does not reset the state
of the RNG to the same state as the first time that a high-level
function is called. That may change by the time R 2.14.0 is
released.
Here’s one way to use mc.reset.stream()
to get reproducible random
numbers from two calls to mclapply()
:[42]
> RNGkind("L'Ecuyer-CMRG") > set.seed(7777442) > mc.reset.stream() > unlist(mclapply(1:2, function(i) rnorm(1))) [1] -2.0043112 0.9315424 > set.seed(7777442) > mc.reset.stream() > unlist(mclapply(1:2, function(i) rnorm(1))) [1] -2.0043112 0.9315424
Note that the second call to set.seed()
is not technically necessary in
this case, since the state of the master’s RNG hasn’t changed. It would
be necessary if any random numbers were generated on the master between
the two calls to mc.reset.stream()
.
If RNGkind("L'Ecuyer-CMRG")
isn’t called on the master and mc.set.seed
is TRUE
, the workers will be randomly seeded
after they are started since .Random.seed
will be removed from the global
environment if it exists. Thus, as long as you don’t set mc.set.seed
to FALSE
, your workers should generate different
random numbers, but using L’Ecuyer-CMRG
for true
parallel RNG support is recommended.
As with multicore
, I wouldn’t
recommend setting mc.set.seed
to
FALSE
unless you’re sure you know
what you’re doing.
To use the new parallel RNG support in the snow
derived functions, use the new clusterSetRNGStream()
function. This replaces
the clusterSetupRNGstream()
function
in snow
:
> cl <- makeCluster(4, type = "FORK") > clusterSetRNGStream(cl, 7777442) > unlist(clusterEvalQ(cl, rnorm(1))) [1] -0.9360073 -2.0043112 0.9315424 -0.8751129 > clusterSetRNGStream(cl, 7777442) > unlist(clusterEvalQ(cl, rnorm(1))) [1] -0.9360073 -2.0043112 0.9315424 -0.8751129 > stopCluster(cl)
Here the seed is specified as an argument to clusterSetRNGStream()
, not using set.seed()
.
The parallel
package also
includes utility functions to easily advance the seed. The nextRNGStream()
function advances a seed to
the next stream of 2127
random numbers, and the nextRNGSubStream()
function advances it to the
next sub-stream of 276
random numbers.
To advance the L’Ecuyer-CMRG
RNG to the next
sub-stream, simply reassign the .Random.seed
variable in the global
environment using nextRNGStream()
:
.Random.seed <<- nextRNGSubStream(.Random.seed)
This will fail if RNGkind("L'Ecuyer-CMRG")
hasn’t been called,
since nextRNGSubStream()
requires a
L’Ecuyer-CMRG
seed.
As of 9/26/2011, here is a summary of the differences between
parallel
and multicore
or snow
:
Differences from multicore
fork()
function renamed tomcfork()
exit()
function renamed tomcexit()
kill()
function renamed tomckill()
parallel()
function renamed tomcparallel()
, but the name “parallel” is still exported for compatibilitycollect()
function renamed tomccollect()
, but the name “collect” is still exported for compatibilityDifferent default value of
mc.cores
argumentNew
mc.allow.recursive
argument can prevent recursive calls tomclapply()
mc.set.seed
argument reimplemented using a real parallel RNGNew
mc.reset.stream()
functioncores
option renamed tomc.cores
Differences from snow
New function
clusterSetRNGStream()
initializes parallel RNGsetDefaultClusterOptions()
not exportedThe namespace doesn’t export every defined function in the package
makeCluster()
supports additional types “FORK” and “PSOCK”New cluster options
methods
andrenice
when creating a cluster (althoughrenice
doesn’t currently work on my Linux machine as of 9/26/2011).Cluster option
type
defaults to “PSOCK”Cluster option
port
can be set via the environment variable “R_PARALLEL_PORT”snow.time()
function not includedTimeout implemented using new
socketConnection()
timeout
argument, which resolves obscure problem insnow
New functions useful in both sets of functions
detectCores()
function now exportedAdditional functions for parallel RNG:
nextRNGStream()
,nextRNGSubStream()
Since it includes the best features of both snow
and multicore
, parallel
is a very versatile package. Its main
limitation is in dealing with huge numbers of tasks and very large
datasets.
parallel
has basically the same
gotchas as the snow
and multicore
packages, except that it does include
support for parallel random number generation in the multicore
-derived API, and allows recursive
calls to mclapply()
to be
prevented.
The parallel
package is an
exciting new development in the world of Parallel R. Traditional parallel
computing is finally becoming mainstream. But there are other new packages
becoming available for R that use a newer parallel programming paradigm:
MapReduce. The rest of this book will show you how to
take advantage of many of those packages.
[39] This has already been done to a degree with multithreaded math libraries, but this takes another important step forward.
[40] The detectCores()
function
is in the multicore
package, but
as of version 0.1-5, is not exported.
[41] The multicore
version of
mclapply()
uses the option
cores
. This is another case where
parallel
adds the “mc.”
prefix.
[42] Note that mc.reset.stream()
is called before both calls to mclapply()
. That was necessary in the
development version of R leading up to R 2.14.0, because mclapply()
moves to the next RNG stream if
the RNG is already initialized. If the first mc.reset.stream()
was skipped, the second
mclapply()
would use a different
set of streams than the first. That may be changed in R 2.14.0, but
this example will probably still work.
Get Parallel R now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.