Difference between revisions of "Python multiprocessing"

From ScientificComputing
Jump to: navigation, search
 
(4 intermediate revisions by the same user not shown)
Line 28: Line 28:
 
  [jarunanp@eu-login-05 python_multiprocessing]$
 
  [jarunanp@eu-login-05 python_multiprocessing]$
  
Open a new file named ''process.py'' with a text editor and add the following code:
+
We are using ProcessPoolExecutor from the concurrent.futures module which will be called in the code:
 +
from concurrent.futures import ProcessPoolExecutor
 +
 +
with ProcessPoolExecutor(max_workers=num_processes) as executor:
 +
        results=executor.map(function, input_argument)
 +
 
 +
Here is a code example. Open a new file named ''process.py'' with a text editor and add the code below. The code generates a vector with 50 million randomized integers and calculates their summation. The number of working processors is defined as an input argument.
  
from concurrent.futures import ProcessPoolExecutor
 
 
  import sys
 
  import sys
 
  import time
 
  import time
 
  import numpy as np
 
  import numpy as np
 +
from concurrent.futures import ProcessPoolExecutor
 
   
 
   
 
  def accumulate_sum(v):
 
  def accumulate_sum(v):
Line 57: Line 63:
 
     # end the stop watch
 
     # end the stop watch
 
     end = time.time()
 
     end = time.time()
 
+
 
     print("The accumulated sum is {:3.2e}".format(sum(results)))
 
     print("The accumulated sum is {:3.2e}".format(sum(results)))
     print("Elasped time: {:3.2f}".format(end-start))
+
     print("Elasped time: {:3.2f}s".format(end-start))
+
 
 
  if __name__ == '__main__':
 
  if __name__ == '__main__':
 
     main()
 
     main()
Line 77: Line 83:
 
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 1
 
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 1
 
  The accumulated sum is 2.50e+10
 
  The accumulated sum is 2.50e+10
  Elasped time: 14.10
+
  Elasped time: 14.10s
  
 
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 2
 
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 2
 
  The accumulated sum is 2.50e+10
 
  The accumulated sum is 2.50e+10
  Elasped time: 7.88
+
  Elasped time: 7.88s
  
 
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 4
 
  [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 4
 
  The accumulated sum is 2.50e+10
 
  The accumulated sum is 2.50e+10
  Elasped time: 4.75
+
  Elasped time: 4.75s
  
  

Latest revision as of 12:20, 12 October 2021

< Examples

In this example we show how to launch parallel tasks in Python by using ProcessPoolExecutor in the concurrent.futures module.

"The concurrent.futures module provides a high-level interface for asynchronously executing callables.

 The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class."

Source: https://docs.python.org/3/library/concurrent.futures.html

Load modules

Switch to the new software stack

[jarunanp@eu-login-05 ~]$ env2lmod

or, set your default software stack to the new software stack

[jarunanp@eu-login-05 ~]$ set_software_stack.sh new

Load a Python module

[jarunanp@eu-login-05 ~]$ module load gcc/6.3.0 python/3.8.5

Code

Create a project folder

[jarunanp@eu-login-05 ~]$ mkdir python_multiprocessing
[jarunanp@eu-login-05 ~]$ cd python_multiprocessing
[jarunanp@eu-login-05 python_multiprocessing]$

We are using ProcessPoolExecutor from the concurrent.futures module which will be called in the code:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor(max_workers=num_processes) as executor:
        results=executor.map(function, input_argument)

Here is a code example. Open a new file named process.py with a text editor and add the code below. The code generates a vector with 50 million randomized integers and calculates their summation. The number of working processors is defined as an input argument.

import sys
import time
import numpy as np
from concurrent.futures import ProcessPoolExecutor

def accumulate_sum(v):
    sumv = 0
    for i in v:
        sumv += i
    return sumv

def main(): 
    n = 50_000_000
    vec = np.random.randint(0,1000,n)
    # The script requires an input argument which is the number of processes to execute the program
    num_processes = int(sys.argv[1])
    n_per_process = int(n/num_processes) 
    vec_per_process = [vec[i*n_per_process:(i+1)*n_per_process] for i in range(num_processes)]
   
    # start the stop watch
    start = time.time()

    with ProcessPoolExecutor(max_workers=num_processes) as executor:
        results=executor.map(accumulate_sum, vec_per_process)

    # end the stop watch
    end = time.time()

    print("The accumulated sum is {:3.2e}".format(sum(results)))
    print("Elasped time: {:3.2f}s".format(end-start))
 
if __name__ == '__main__':
    main()

Request an interactive session on a compute node

[jarunanp@eu-login-05 python_multiprocessing]$ bsub -n 5 -Is bash
Generic job.
Job <176062929> is submitted to queue <normal.4h>.
<<Waiting for dispatch ...>>
<<Starting on eu-c7-112-13>>
FILE: /sys/fs/cgroup/cpuset/lsf/euler/job.176062929.12449.1624344257/tasks
[jarunanp@eu-c7-112-13 python_multiprocessing]$ 

Launch the Python script with num_processes = 1, 2 and 4

[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 1
The accumulated sum is 2.50e+10
Elasped time: 14.10s
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 2
The accumulated sum is 2.50e+10
Elasped time: 7.88s
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 4
The accumulated sum is 2.50e+10
Elasped time: 4.75s


From the output, increasing number of processes reduced the runtime to execute the operations. The speedup was around 2 and 3 times for num_processes = 2 and 4, respectively. It is not linear but we could gain a significant factor of runtime.

< Examples