Difference between revisions of "Python multiprocessing"
From ScientificComputing
Line 11: | Line 11: | ||
== Load modules == | == Load modules == | ||
Switch to the new software stack | Switch to the new software stack | ||
− | $ env2lmod | + | |
+ | [jarunanp@eu-login-05 ~]$ env2lmod | ||
or, set your default software stack to the new software stack | or, set your default software stack to the new software stack | ||
− | $ set_software_stack.sh new | + | [jarunanp@eu-login-05 ~]$ set_software_stack.sh new |
Load a Python module | Load a Python module | ||
+ | |||
+ | [jarunanp@eu-login-05 ~]$ module load gcc/6.3.0 python/3.8.5 | ||
− | $ | + | == Code == |
+ | Create a project folder | ||
+ | [jarunanp@eu-login-05 ~]$ mkdir python_multiprocessing | ||
+ | [jarunanp@eu-login-05 ~]$ cd python_multiprocessing | ||
+ | [jarunanp@eu-login-05 python_multiprocessing]$ | ||
− | |||
Open a new file named ''process.py'' with a text editor and add the following code: | Open a new file named ''process.py'' with a text editor and add the following code: | ||
from concurrent.futures import ProcessPoolExecutor | from concurrent.futures import ProcessPoolExecutor | ||
+ | import sys | ||
+ | import time | ||
+ | import numpy as np | ||
− | def accumulate_sum( | + | def accumulate_sum(v): |
− | + | sumv = 0 | |
− | for i in | + | for i in v: |
− | + | sumv += i | |
− | return | + | return sumv |
− | |||
− | |||
+ | def main(): | ||
n = 50_000_000 | n = 50_000_000 | ||
− | num_processes = 1 | + | vec = np.random.randint(0,1000,n) |
− | n_per_process = | + | # The script requires an input argument which is the number of processes to execute the program |
+ | num_processes = int(sys.argv[1]) | ||
+ | n_per_process = int(n/num_processes) | ||
+ | vec_per_process = [vec[i*n_per_process:(i+1)*n_per_process] for i in range(num_processes)] | ||
+ | |||
+ | # start the stop watch | ||
+ | start = time.time() | ||
with ProcessPoolExecutor(max_workers=num_processes) as executor: | with ProcessPoolExecutor(max_workers=num_processes) as executor: | ||
− | results=executor.map(accumulate_sum, | + | results=executor.map(accumulate_sum, vec_per_process) |
− | print("The accumulated sum is {}".format(sum(results))) | + | # end the stop watch |
+ | end = time.time() | ||
+ | |||
+ | print("The accumulated sum is {:3.2e}".format(sum(results))) | ||
+ | print("Elasped time: {:3.2f}".format(end-start)) | ||
if __name__ == '__main__': | if __name__ == '__main__': | ||
Line 47: | Line 65: | ||
== Request an interactive session on a compute node == | == Request an interactive session on a compute node == | ||
− | + | [jarunanp@eu-login-05 python_multiprocessing]$ bsub -n 5 -Is bash | |
− | [jarunanp@eu-login- | ||
Generic job. | Generic job. | ||
− | Job < | + | Job <176062929> is submitted to queue <normal.4h>. |
<<Waiting for dispatch ...>> | <<Waiting for dispatch ...>> | ||
− | <<Starting on eu- | + | <<Starting on eu-c7-112-13>> |
− | FILE: /sys/fs/cgroup/cpuset/lsf/euler/job. | + | FILE: /sys/fs/cgroup/cpuset/lsf/euler/job.176062929.12449.1624344257/tasks |
− | [jarunanp@eu- | + | [jarunanp@eu-c7-112-13 python_multiprocessing]$ |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | Launch the Python script with num_processes = 1, 2 and 4 | |
− | |||
− | |||
− | |||
− | + | [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 1 | |
+ | The accumulated sum is 2.50e+10 | ||
+ | Elasped time: 14.10 | ||
− | + | [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 2 | |
+ | The accumulated sum is 2.50e+10 | ||
+ | Elasped time: 7.88 | ||
− | [jarunanp@eu- | + | [jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 4 |
− | The accumulated sum is | + | The accumulated sum is 2.50e+10 |
− | + | Elasped time: 4.75 | |
− | |||
− | |||
− | |||
− | |||
− | + | From the output, increasing number of processes reduced the runtime to execute the operations. The speedup was around 2 and 3 times for num_processes = 2 and 4, respectively. It is not linear but we could gain a significant factor of runtime. | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
{{back_to_tutorials}} | {{back_to_tutorials}} |
Revision as of 07:02, 22 June 2021
< Examples |
In this example we show how to launch parallel tasks in Python by using ProcessPoolExecutor in the concurrent.futures module.
"The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class."
Source: https://docs.python.org/3/library/concurrent.futures.html
Load modules
Switch to the new software stack
[jarunanp@eu-login-05 ~]$ env2lmod
or, set your default software stack to the new software stack
[jarunanp@eu-login-05 ~]$ set_software_stack.sh new
Load a Python module
[jarunanp@eu-login-05 ~]$ module load gcc/6.3.0 python/3.8.5
Code
Create a project folder
[jarunanp@eu-login-05 ~]$ mkdir python_multiprocessing [jarunanp@eu-login-05 ~]$ cd python_multiprocessing [jarunanp@eu-login-05 python_multiprocessing]$
Open a new file named process.py with a text editor and add the following code:
from concurrent.futures import ProcessPoolExecutor import sys import time import numpy as np def accumulate_sum(v): sumv = 0 for i in v: sumv += i return sumv def main(): n = 50_000_000 vec = np.random.randint(0,1000,n) # The script requires an input argument which is the number of processes to execute the program num_processes = int(sys.argv[1]) n_per_process = int(n/num_processes) vec_per_process = [vec[i*n_per_process:(i+1)*n_per_process] for i in range(num_processes)] # start the stop watch start = time.time() with ProcessPoolExecutor(max_workers=num_processes) as executor: results=executor.map(accumulate_sum, vec_per_process) # end the stop watch end = time.time()
print("The accumulated sum is {:3.2e}".format(sum(results))) print("Elasped time: {:3.2f}".format(end-start)) if __name__ == '__main__': main()
Request an interactive session on a compute node
[jarunanp@eu-login-05 python_multiprocessing]$ bsub -n 5 -Is bash Generic job. Job <176062929> is submitted to queue <normal.4h>. <<Waiting for dispatch ...>> <<Starting on eu-c7-112-13>> FILE: /sys/fs/cgroup/cpuset/lsf/euler/job.176062929.12449.1624344257/tasks [jarunanp@eu-c7-112-13 python_multiprocessing]$
Launch the Python script with num_processes = 1, 2 and 4
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 1 The accumulated sum is 2.50e+10 Elasped time: 14.10
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 2 The accumulated sum is 2.50e+10 Elasped time: 7.88
[jarunanp@eu-c7-112-13 python_multiprocessing]$ python process.py 4 The accumulated sum is 2.50e+10 Elasped time: 4.75
From the output, increasing number of processes reduced the runtime to execute the operations. The speedup was around 2 and 3 times for num_processes = 2 and 4, respectively. It is not linear but we could gain a significant factor of runtime.
< Examples |