Difference between revisions of "Distributed computing in R with Rmpi"

From ScientificComputing
Jump to: navigation, search
Line 1: Line 1:
 
== Load modules ==
 
== Load modules ==
 +
Change to the new software stack and load required modules
 
  $ env2lmod
 
  $ env2lmod
 
  $ module load gcc/6.3.0 openmpi/4.0.2 r/4.0.2
 
  $ module load gcc/6.3.0 openmpi/4.0.2 r/4.0.2
  
== Run in an interactive session ==
+
== Run R in an interactive session ==
 +
Request 5 processors. Rmpi assigns one processor to be the master and other processors to be worker
 
   $ bsub -n 5 -W 02:00 -I bash
 
   $ bsub -n 5 -W 02:00 -I bash
 +
  Generic job.
 +
  Job <155200980> is submitted to queue <normal.4h>.
 +
  <<Waiting for dispatch ...>>
 +
  <<Starting on eu-c7-105-05>>
 +
 +
Define available global number of processors with the environment parameter MPI_UNIVERSE_SIZE
 
   $ export MPI_UNIVERSE_SIZE=5
 
   $ export MPI_UNIVERSE_SIZE=5
 +
 +
Start R
 
   $ R
 
   $ R
 
   >  
 
   >  
  
Load Rmpi which calls mpi.initialize()
+
== Use Rmpi ==
 +
1. Load Rmpi which calls mpi.initialize()
 
   > library(Rmpi)
 
   > library(Rmpi)
  
Spawn R-slaves to the host. nslaves = requested number of processors - 1
+
2. Spawn R-slaves to the host. nslaves = requested number of processors - 1
 
   > usize <- as.numeric(Sys.getenv("MPI_UNIVERSE_SIZE"))
 
   > usize <- as.numeric(Sys.getenv("MPI_UNIVERSE_SIZE"))
 
   > ns <- usize - 1
 
   > ns <- usize - 1
 
   > mpi.spawn.Rslaves(nslaves=ns)
 
   > mpi.spawn.Rslaves(nslaves=ns)
  
Set up a variable array
+
3. Set up a variable array
 
   > var = c(11.0, 22.0, 33.0)
 
   > var = c(11.0, 22.0, 33.0)
  
Root sends state variables and parameters to other ranks
+
4. Root sends state variables and parameters to other ranks
 
   > mpi.bcast.data2slave(var, comm = 1, buffunit = 100)
 
   > mpi.bcast.data2slave(var, comm = 1, buffunit = 100)
  
Get the rank number of that processor
+
5. Get the rank number of that processor
 
   > mpi.bcast.cmd(id <- mpi.comm.rank())
 
   > mpi.bcast.cmd(id <- mpi.comm.rank())
  
Check if each rank can use its own value
+
6. Check if each rank can use its own value
 
   > mpi.remote.exec(paste("The variable on rank ",id," is ", var[id]))
 
   > mpi.remote.exec(paste("The variable on rank ",id," is ", var[id]))
  
Root orders other ranks to calculate
+
7. Root orders other ranks to calculate
 
   > mpi.bcast.cmd(output <- var[id]*2)
 
   > mpi.bcast.cmd(output <- var[id]*2)
  
Root orders other ranks to gather the output
+
8. Root orders other ranks to gather the output
 
   > mpi.bcast.cmd(mpi.gather(output, 2, double(1)))
 
   > mpi.bcast.cmd(mpi.gather(output, 2, double(1)))
  
Root gathers the output from other ranks
+
9. Root gathers the output from other ranks
 
   > mpi.gather(double(1), 2, double(usize))
 
   > mpi.gather(double(1), 2, double(usize))
  
Close down and quit
+
10. Close down and quit
 
   > mpi.close.Rslaves(dellog = FALSE)
 
   > mpi.close.Rslaves(dellog = FALSE)
 
   > mpi.quit()
 
   > mpi.quit()
Line 44: Line 55:
 
== Exercise ==
 
== Exercise ==
  
What happens when we use mpi.scatter.Robj() instead of mpi.bcast.data2slave()?
+
What happens when replacing mpi.scatter.Robj() instead of mpi.bcast.data2slave() in point 4?

Revision as of 07:34, 9 December 2020

Load modules

Change to the new software stack and load required modules

$ env2lmod
$ module load gcc/6.3.0 openmpi/4.0.2 r/4.0.2

Run R in an interactive session

Request 5 processors. Rmpi assigns one processor to be the master and other processors to be worker

 $ bsub -n 5 -W 02:00 -I bash
 Generic job.
 Job <155200980> is submitted to queue <normal.4h>.
 <<Waiting for dispatch ...>>
 <<Starting on eu-c7-105-05>>

Define available global number of processors with the environment parameter MPI_UNIVERSE_SIZE

 $ export MPI_UNIVERSE_SIZE=5

Start R

 $ R
 > 

Use Rmpi

1. Load Rmpi which calls mpi.initialize()

 > library(Rmpi)

2. Spawn R-slaves to the host. nslaves = requested number of processors - 1

 > usize <- as.numeric(Sys.getenv("MPI_UNIVERSE_SIZE"))
 > ns <- usize - 1
 > mpi.spawn.Rslaves(nslaves=ns)

3. Set up a variable array

 > var = c(11.0, 22.0, 33.0)

4. Root sends state variables and parameters to other ranks

 > mpi.bcast.data2slave(var, comm = 1, buffunit = 100)

5. Get the rank number of that processor

 > mpi.bcast.cmd(id <- mpi.comm.rank())

6. Check if each rank can use its own value

 > mpi.remote.exec(paste("The variable on rank ",id," is ", var[id]))

7. Root orders other ranks to calculate

 > mpi.bcast.cmd(output <- var[id]*2)

8. Root orders other ranks to gather the output

 > mpi.bcast.cmd(mpi.gather(output, 2, double(1)))

9. Root gathers the output from other ranks

 > mpi.gather(double(1), 2, double(usize))

10. Close down and quit

 > mpi.close.Rslaves(dellog = FALSE)
 > mpi.quit()

Exercise

What happens when replacing mpi.scatter.Robj() instead of mpi.bcast.data2slave() in point 4?