Difference between revisions of "Python multiprocessing"

From ScientificComputing
Jump to: navigation, search
Line 11: Line 11:
 
== Load modules ==
 
== Load modules ==
 
Switch to the new software stack
 
Switch to the new software stack
  $ env2lmod
+
 
 +
  [jarunanp@eu-login-05 ~]$ env2lmod
  
 
or, set your default software stack to the new software stack
 
or, set your default software stack to the new software stack
  
  $ set_software_stack.sh new
+
  [jarunanp@eu-login-05 ~]$ set_software_stack.sh new
  
 
Load a Python module
 
Load a Python module
 +
 +
[jarunanp@eu-login-05 ~]$ module load gcc/6.3.0 python/3.8.5
  
  $ module load gcc/6.3.0 python/3.8.5
+
== Code ==
 +
Create a project folder
 +
  [jarunanp@eu-login-05 ~]$ mkdir python_multiprocessing
 +
[jarunanp@eu-login-05 ~]$ cd python_multiprocessing
 +
[jarunanp@eu-login-05 python_multiprocessing]$
  
== Code ==
 
 
Open a new file named ''process.py'' with a text editor and add the following code:
 
Open a new file named ''process.py'' with a text editor and add the following code:
  
 
  from concurrent.futures import ProcessPoolExecutor
 
  from concurrent.futures import ProcessPoolExecutor
 +
import sys
 +
import time
 +
import numpy as np
 
   
 
   
  def accumulate_sum(n_part):
+
  def accumulate_sum(v):
     sum = 0
+
     sumv = 0
     for i in range(n_part):
+
     for i in v:
         sum += i
+
         sumv += i
     return sum
+
     return sumv
 
def main():
 
 
   
 
   
 +
def main():
 
     n = 50_000_000
 
     n = 50_000_000
     num_processes = 1
+
    vec = np.random.randint(0,1000,n)
     n_per_process = [int(n/num_process) for i in range(num_processes)]
+
    # The script requires an input argument which is the number of processes to execute the program
 +
     num_processes = int(sys.argv[1])
 +
     n_per_process = int(n/num_processes)
 +
    vec_per_process = [vec[i*n_per_process:(i+1)*n_per_process] for i in range(num_processes)]
 +
   
 +
    # start the stop watch
 +
    start = time.time()
 
   
 
   
 
     with ProcessPoolExecutor(max_workers=num_processes) as executor:
 
     with ProcessPoolExecutor(max_workers=num_processes) as executor:
         results=executor.map(accumulate_sum, n_per_process)
+
         results=executor.map(accumulate_sum, vec_per_process)
 
   
 
   
     print("The accumulated sum is {}".format(sum(results)))
+
    # end the stop watch
 +
    end = time.time()
 +
 
 +
     print("The accumulated sum is {:3.2e}".format(sum(results)))
 +
    print("Elasped time: {:3.2f}".format(end-start))
 
   
 
   
 
  if __name__ == '__main__':
 
  if __name__ == '__main__':
Line 47: Line 65:
  
 
== Request an interactive session on a compute node ==
 
== Request an interactive session on a compute node ==
$ bsub -n 4 -Is bash
+
  [jarunanp@eu-login-05 python_multiprocessing]$ bsub -n 5 -Is bash
  [jarunanp@eu-login-03 python_multiprocessing]$ bsub -n 4 -Is bash
 
 
  Generic job.
 
  Generic job.
  Job <175831537> is submitted to queue <normal.4h>.
+
  Job <176062929> is submitted to queue <normal.4h>.
 
  <<Waiting for dispatch ...>>
 
  <<Waiting for dispatch ...>>
  <<Starting on eu-ms-018-18>>
+
  <<Starting on eu-c7-112-13>>
  FILE: /sys/fs/cgroup/cpuset/lsf/euler/job.175831537.32301.1624026821/tasks
+
  FILE: /sys/fs/cgroup/cpuset/lsf/euler/job.176062929.12449.1624344257/tasks
  [jarunanp@eu-ms-018-18 python_multiprocessing]$
+
  [jarunanp@eu-c7-112-13 python_multiprocessing]$  
 
 
Launch the Python script with
 
 
 
num_processes = 1
 
 
 
[jarunanp@eu-ms-009-45 python_multiprocessing]$ time python process.py
 
The accumulated sum is 1249999975000000
 
 
real 0m2.635s
 
user 0m2.602s
 
sys 0m0.019s
 
  
The command line "time" measure the time and output:
+
Launch the Python script with num_processes = 1, 2 and 4
* "real": the total time which CPU spent to execute the program
 
* "user": the time which CPU spent in the user mode
 
* "sys": the time which CPU spent in the system mode
 
  
We focus on the "real" total time which is here 2.635 sec. Time can vary for each run and each computer. Then, we increase the num_processes to 2 and 4 to see the runtime.
+
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 1
 +
The accumulated sum is 2.50e+10
 +
Elasped time: 14.10
  
  num_processes = 2
+
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 2
 +
The accumulated sum is 2.50e+10
 +
Elasped time: 7.88
  
  [jarunanp@eu-ms-009-45 python_multiprocessing]$ time python process.py
+
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 4
  The accumulated sum is 624999975000000
+
  The accumulated sum is 2.50e+10
+
  Elasped time: 4.75
real 0m1.366s
 
  user 0m2.603s
 
sys 0m0.024s
 
  
num_processes = 4
 
  
[jarunanp@eu-ms-009-45 python_multiprocessing]$ time python process.py
+
From the output, increasing number of processes reduced the runtime to execute the operations. The speedup was around 2 and 3 times for num_processes = 2 and 4, respectively. It is not linear but we could gain a significant factor of runtime.
The accumulated sum is 312499975000000
 
 
   
 
   
real 0m0.812s
 
user 0m2.814s
 
sys 0m0.036s
 
 
You can see that with num_processes = 2 the run time reduces to 1.366 sec and with num_processes = 4 the runtime reduces to 0.812 sec.
 
 
 
 
{{back_to_tutorials}}
 
{{back_to_tutorials}}

Revision as of 07:02, 22 June 2021

< Examples

In this example we show how to launch parallel tasks in Python by using ProcessPoolExecutor in the concurrent.futures module.

"The concurrent.futures module provides a high-level interface for asynchronously executing callables.

 The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class."

Source: https://docs.python.org/3/library/concurrent.futures.html

Load modules

Switch to the new software stack

[jarunanp@eu-login-05 ~]$ env2lmod

or, set your default software stack to the new software stack

[jarunanp@eu-login-05 ~]$ set_software_stack.sh new

Load a Python module

[jarunanp@eu-login-05 ~]$ module load gcc/6.3.0 python/3.8.5

Code

Create a project folder

[jarunanp@eu-login-05 ~]$ mkdir python_multiprocessing
[jarunanp@eu-login-05 ~]$ cd python_multiprocessing
[jarunanp@eu-login-05 python_multiprocessing]$

Open a new file named process.py with a text editor and add the following code:

from concurrent.futures import ProcessPoolExecutor
import sys
import time
import numpy as np

def accumulate_sum(v):
    sumv = 0
    for i in v:
        sumv += i
    return sumv

def main(): 
    n = 50_000_000
    vec = np.random.randint(0,1000,n)
    # The script requires an input argument which is the number of processes to execute the program
    num_processes = int(sys.argv[1])
    n_per_process = int(n/num_processes) 
    vec_per_process = [vec[i*n_per_process:(i+1)*n_per_process] for i in range(num_processes)]
   
    # start the stop watch
    start = time.time()

    with ProcessPoolExecutor(max_workers=num_processes) as executor:
        results=executor.map(accumulate_sum, vec_per_process)

    # end the stop watch
    end = time.time()
    print("The accumulated sum is {:3.2e}".format(sum(results)))
    print("Elasped time: {:3.2f}".format(end-start))

if __name__ == '__main__':
    main()

Request an interactive session on a compute node

[jarunanp@eu-login-05 python_multiprocessing]$ bsub -n 5 -Is bash
Generic job.
Job <176062929> is submitted to queue <normal.4h>.
<<Waiting for dispatch ...>>
<<Starting on eu-c7-112-13>>
FILE: /sys/fs/cgroup/cpuset/lsf/euler/job.176062929.12449.1624344257/tasks
[jarunanp@eu-c7-112-13 python_multiprocessing]$ 

Launch the Python script with num_processes = 1, 2 and 4

[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 1
The accumulated sum is 2.50e+10
Elasped time: 14.10
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 2
The accumulated sum is 2.50e+10
Elasped time: 7.88
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 4
The accumulated sum is 2.50e+10
Elasped time: 4.75


From the output, increasing number of processes reduced the runtime to execute the operations. The speedup was around 2 and 3 times for num_processes = 2 and 4, respectively. It is not linear but we could gain a significant factor of runtime.

< Examples