Chapter 4. parallel

A new parallel programming package named parallel will be included in R 2.14.0, tentatively scheduled for release on October 31, 2011. It is derived from the snow and multicore packages, providing many of the same functions as those packages. Some of the functions derived from multicore have been renamed by adding the prefix “mc.”, and some of the arguments to mclapply() have been changed a bit, but if you have read the snow and multicore chapters of this book, you will have very little difficulty learning to use parallel.

This is an exciting development, since it makes parallel computing in R more mainstream. Hopefully the parallel package will be used from other standard packages, giving many more users the benefit of parallel computing, perhaps without knowing that they’re using it.[39]

An important feature of parallel is its integration with the new L’Ecuyer-CMRG random number generator (RNG), also new in R 2.14.0. The seed of this generator can be easily advanced a given number of steps, making it very useful as a parallel RNG. This is accomplished using the same concepts used in the rlecuyer package, but it is a completely new implementation, so parallel has no dependency on the rlecuyer package itself.

In particular, the multicore derived functions in parallel now have true parallel RNG support, solving the biggest “gotcha” in the multicore package.

Warning

This chapter was written using an experimental version of the parallel package using the development version of R 2.14.0. Officially, anything in the package can change or be removed without notice until October 2011, which is just after the “all-in” date for this book. However, this is such an important package for parallel computing with R that I really wanted to include it in this book.

Quick Look

Motivation: You have an R script that spends two days executing a function using lapply() on your laptop.

Solution: Replace lapply() with the mclapply() function from the multicore package, and consider using parLapply() if you have a cluster handy.

Good because: It comes built it as of R 2.14.0, and there isn’t much to learn if you’ve used snow or multicore before.

How It Works

Note

Since the parallel package has so much in common with the snow and multicore packages, I don’t want to repeat all of the material that I just covered in the last two chapters. Instead, I assume that you’ve either read the snow and multicore chapters of this book, or are already reasonably familiar with those packages.

parallel can be used to run on Posix-based multicore systems using functions such as mclapply() and mcparallel() that were derived from the multicore package. But parallel can also be used with a “PSOCK” cluster and functions such as parLapply() and clusterApplyLB() to execute on multicore Windows systems, as well as Linux clusters. It can also be used with cluster objects that were created using snow, making it possible to use parallel with MPI as the transport.

In other words, it addresses essentially everything addressed by the snow and multicore packages.

Setting Up

This is the real beauty of parallel. If you’re using R 2.14.0 or later, it’s already installed: you don’t need to install any additional packages unless you want to use the MPI, PVM, or NetWorkSpaces transports.

If you have any doubts, you can try loading it:

library(parallel)

If this fails, you should check the version of R that you’re using with:

R.version.string

You need to have R 2.14.0 or better to use parallel.

Working with It

Getting Started

If you’re using a Posix-based system, such as Linux or Mac OS X, you can use the multicore derived functions, such as mclapply(). Mclapply is basically the same as the version in the multicore package, except that a couple of the arguments work slightly differently. For example, the mc.cores argument doesn’t automatically detect the number of cores in the machine. However, the parallel package does include a function to do that, called detectCores().[40]

Here’s the parallel K-Means example for the parallel package using mclapply(). It is very similar to the version in the multicore chapter, except that it loads parallel, uses detectCores() to specify the value of the mc.cores argument, and uses the parallel RNG as a bonus:

library(parallel)
library(MASS)
RNGkind("L'Ecuyer-CMRG")
mc.cores <- detectCores()
results <- mclapply(rep(25, 4),
                    function(nstart) kmeans(Boston, 4, nstart=nstart),
                    mc.cores=mc.cores)
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]

We’ll discuss the use of RNGkind("L'Ecuyer-CMRG") in Parallel Random Number Generation.

The default value of the mc.cores argument is getOption("mc.cores", 2L),[41] so you might want to add the following line to the beginning of your scripts when converting from multicore to parallel:

options(mc.cores=detectCores())

Then mclapply() and pvec() will work more like that do in multicore.

If you’re using Windows, you need to use the snow derived API in parallel. The following parallel K-Means example works on any platform supported by the parallel package:

library(parallel)
cl <- makeCluster(detectCores())
clusterSetRNGStream(cl)
clusterEvalQ(cl, library(MASS))
results <- clusterApply(cl, rep(25, 4), function(nstart) kmeans(Boston, 4, 
    nstart=nstart))
i <- sapply(results, function(result) result$tot.withinss)
result <- results[[which.min(i)]]
stopCluster(cl)

This is very similar to the K-Means example in the snow chapter. The difference is in loading parallel, creating the cluster object, and enabling parallel random number generation. As with snow, we use the makeCluster() function, but in parallel, the type argument doesn’t need to be specified. We’ll discuss the parallel version of makeCluster() in more depth in the next section, and parallel random number generation in Parallel Random Number Generation.

Creating Clusters with makeCluster

If you’re running on Windows or a Linux cluster, you can’t use multicore derived functions such as mclapply() and pvec(). Instead you’ll need to use snow derived functions such as parLapply() and clusterApplyLB(). The first argument to these functions is a cluster object, so before you can use one of these functions, you’ll have to create a cluster object.

The parallel package comes with two transports: “PSOCK” and “FORK”. The “PSOCK” transport is a streamlined version of snow’s “SOCK” transport. It starts workers using the Rscript command, and communicates between the master and workers using socket connections.

As in snow, the makeCluster() function creates a cluster object. The default value of the type argument is “PSOCK”, so we can create a “PSOCK” cluster with four local workers using the command:

cl <- makeCluster(4)

It’s often useful to specify the cluster size using the detectCores() function:

cl <- makeCluster(detectCores())

If you have ssh installed, you can specify a list of machines for the first argument:

cl <- makeCluster(c("n1", "n2", "n3", "n4"))

Note that this is nearly identical to the way that socket clusters are created in snow, except that we never need to specify the type argument.

The “FORK” transport starts workers using the mcfork() function, and communicates between the master and workers using socket connections.

To create a “FORK” cluster, use makeCluster() with type set to “FORK”:

cl <- makeCluster(4, type="FORK")

You cannot start workers on remote machines with a “FORK” cluster, since mcfork() is built on the fork() system call, which only creates processes on the local machine. Also, “FORK” clusters are only supported on Posix-based systems, not Windows, since fork() is a Posix system call.

An interesting feature of “FORK” clusters is that the workers inherit the data and environment of the master process. This is like the workers that are automatically started by mclapply(), but unlike the workers started in a “PSOCK” cluster. That can be useful, but it’s important to remember that a “FORK” cluster is persistent, like a “PSOCK” cluster, and unlike the workers started by mclapply(). Thus, variables created on the master after creating the “FORK” cluster will not magically appear on the workers, as in mclapply(). You would have to always create a new “FORK” cluster immediately before calling parLapply(), for example, to emulate the behaviour of mclapply(). But since that won’t work with any other type of cluster object, you should probably just use mclapply().

Since “FORK” clusters can be created quickly, they can be useful when parallelizing lapply() operations that are deep in some package, but you don’t want to use a global variable or add an argument to dozens of functions in order to pass the cluster object to the appropriate function. In that case, you can just create the cluster object right where you need it, and shut it down afterwards. Here’s one way that you could create and use a one shot cluster object with parallel that would be about as fast as using mclapply() on a Posix-based system, but would also work on Windows:

type <- if (exists("mcfork", mode="function")) "FORK" else "PSOCK"
cores <- getOption("mc.cores", detectCores())
cl <- makeCluster(cores, type=type)
results <- parLapply(cl, 1:100, sqrt)
stopCluster(cl)

Of course, you could also use mclapply() instead of a “FORK” cluster if you prefer.

Parallel Random Number Generation

The parallel random number generation support is perhaps the most interesting and important feature of parallel. It uses the ideas of the rlecuyer package, but not the code.

To use this new support in the multicore derived functions, simply set the random number generator to "L’Ecuyer-CMRG" using the RNGkind() function, and leave mc.set.seed to TRUE:

RNGkind("L'Ecuyer-CMRG")
mclapply(1:2, function(i) rnorm(1))

The first time that one of the multicore derived, high-level functions is called, the parallel random number generator is initialized. Each worker that is started by any high-level function will get a new random number stream. If the mc.reset.stream() function is called, the parallel random number generator is reinitialized using the current seed on the master.

Warning

At the time of this writing, during the development of parallel, mc.reset.stream() does not reset the state of the RNG to the same state as the first time that a high-level function is called. That may change by the time R 2.14.0 is released.

Here’s one way to use mc.reset.stream() to get reproducible random numbers from two calls to mclapply():[42]

> RNGkind("L'Ecuyer-CMRG")
> set.seed(7777442)
> mc.reset.stream()
> unlist(mclapply(1:2, function(i) rnorm(1)))
[1] -2.0043112  0.9315424
> set.seed(7777442)
> mc.reset.stream()
> unlist(mclapply(1:2, function(i) rnorm(1)))
[1] -2.0043112  0.9315424

Note that the second call to set.seed() is not technically necessary in this case, since the state of the master’s RNG hasn’t changed. It would be necessary if any random numbers were generated on the master between the two calls to mc.reset.stream().

If RNGkind("L'Ecuyer-CMRG") isn’t called on the master and mc.set.seed is TRUE, the workers will be randomly seeded after they are started since .Random.seed will be removed from the global environment if it exists. Thus, as long as you don’t set mc.set.seed to FALSE, your workers should generate different random numbers, but using L’Ecuyer-CMRG for true parallel RNG support is recommended.

As with multicore, I wouldn’t recommend setting mc.set.seed to FALSE unless you’re sure you know what you’re doing.

To use the new parallel RNG support in the snow derived functions, use the new clusterSetRNGStream() function. This replaces the clusterSetupRNGstream() function in snow:

> cl <- makeCluster(4, type = "FORK")
> clusterSetRNGStream(cl, 7777442)
> unlist(clusterEvalQ(cl, rnorm(1)))
[1] -0.9360073 -2.0043112  0.9315424 -0.8751129
> clusterSetRNGStream(cl, 7777442)
> unlist(clusterEvalQ(cl, rnorm(1)))
[1] -0.9360073 -2.0043112  0.9315424 -0.8751129
> stopCluster(cl)

Here the seed is specified as an argument to clusterSetRNGStream(), not using set.seed().

The parallel package also includes utility functions to easily advance the seed. The nextRNGStream() function advances a seed to the next stream of 2127 random numbers, and the nextRNGSubStream() function advances it to the next sub-stream of 276 random numbers.

To advance the L’Ecuyer-CMRG RNG to the next sub-stream, simply reassign the .Random.seed variable in the global environment using nextRNGStream():

.Random.seed <<- nextRNGSubStream(.Random.seed)

This will fail if RNGkind("L'Ecuyer-CMRG") hasn’t been called, since nextRNGSubStream() requires a L’Ecuyer-CMRG seed.

Summary of Differences

As of 9/26/2011, here is a summary of the differences between parallel and multicore or snow:

Differences from multicore

  • fork() function renamed to mcfork()

  • exit() function renamed to mcexit()

  • kill() function renamed to mckill()

  • parallel() function renamed to mcparallel(), but the name “parallel” is still exported for compatibility

  • collect() function renamed to mccollect(), but the name “collect” is still exported for compatibility

  • Different default value of mc.cores argument

  • New mc.allow.recursive argument can prevent recursive calls to mclapply()

  • mc.set.seed argument reimplemented using a real parallel RNG

  • New mc.reset.stream() function

  • cores option renamed to mc.cores

Differences from snow

  • New function clusterSetRNGStream() initializes parallel RNG

  • setDefaultClusterOptions() not exported

  • The namespace doesn’t export every defined function in the package

  • makeCluster() supports additional types “FORK” and “PSOCK”

  • New cluster options methods and renice when creating a cluster (although renice doesn’t currently work on my Linux machine as of 9/26/2011).

  • Cluster option type defaults to “PSOCK”

  • Cluster option port can be set via the environment variable “R_PARALLEL_PORT”

  • snow.time() function not included

  • Timeout implemented using new socketConnection() timeout argument, which resolves obscure problem in snow

New functions useful in both sets of functions

  • detectCores() function now exported

  • Additional functions for parallel RNG: nextRNGStream(), nextRNGSubStream()

When It Works…

Since it includes the best features of both snow and multicore, parallel is a very versatile package. Its main limitation is in dealing with huge numbers of tasks and very large datasets.

…And When It Doesn’t

parallel has basically the same gotchas as the snow and multicore packages, except that it does include support for parallel random number generation in the multicore-derived API, and allows recursive calls to mclapply() to be prevented.

The Wrap-up

The parallel package is an exciting new development in the world of Parallel R. Traditional parallel computing is finally becoming mainstream. But there are other new packages becoming available for R that use a newer parallel programming paradigm: MapReduce. The rest of this book will show you how to take advantage of many of those packages.



[39] This has already been done to a degree with multithreaded math libraries, but this takes another important step forward.

[40] The detectCores() function is in the multicore package, but as of version 0.1-5, is not exported.

[41] The multicore version of mclapply() uses the option cores. This is another case where parallel adds the “mc.” prefix.

[42] Note that mc.reset.stream() is called before both calls to mclapply(). That was necessary in the development version of R leading up to R 2.14.0, because mclapply() moves to the next RNG stream if the RNG is already initialized. If the first mc.reset.stream() was skipped, the second mclapply() would use a different set of streams than the first. That may be changed in R 2.14.0, but this example will probably still work.

Get Parallel R now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.