搜 索

Python并行处理学习小记

  • 270阅读
  • 2022年05月22日
  • 0评论
首页 / 科技区 / 正文

“并行”指的是将一个大任务分解成多个子任务,同一时间一起解决的过程。Python 依靠它万金油的特性被各行各业拿去处理各种各样的数据,但是它缓慢的运行速度也历来被人诟病。在数据量较大时,巧妙运用并行处理可以省下海量的时间。

并行的问题包含诸多的概念,细细钻研起来有许多值得探索的内容。但从日常使用与提速这一角度看,我们可以直接借鉴包装好了的方法,而不用太过纠结内部机理。在下文,我总结了一些常见、易用的多进程并行处理方法(指多 CPU 并行,而非“多线程”!),分享了一些自己给代码加速的心得,仅做引子与概述。

常见的并行处理方法

目前,常用且较为简洁的 Python 多进程实现方法主要有以下几种:

multiprocessing.Pool()

Pool即为进程池,可以提供指定数量的进程供用户调用。当有新的请求提交到Pool时,如果池还没有满,那么就会创建一个新的进程用来执行该请求。最简洁的用法是使用map()方法映射单个参数给函数:

from multiprocessing import Pool

def f(x:int):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

在上述方法里使用with关键字是为了避免使用p.close()方法。

如果需要映射多个参数,则只能对包含多个参数的方法进行封装,这个方法的缺点在于传递给函数的参数必须存储在序列里,例如tupledict

from multiprocessing import Pool

def f1(y:tuple):
    return y[0] * y[1]  # 注意参数

def f2(z:dict):
    return z['arg1'] * z['arg2']  # 注意参数

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f1, [(1, 2), (2, 3), (3, 5)]))
    with Pool(5) as p: 
        print(p.map(f2, [('arg': 1, 'arg': 2), ('arg': 3, 'arg': 5)]

上述情况里,进程池里最多能容纳 5 个进程,而实际上只用了 2~3 个。如果请求的进程数大于池所能容纳的最大值,那么该请求就会等待,直到池中有进程结束。apply_async()方法用于不阻塞的异步执行(无需等待当前进程执行完毕),能根据系统调度自动进行进程切换。

from multiprocessing import Pool

def f(x):
    return x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=5)
    params= ['param1', 'param2', 'param3', 'param4', 'param5']
    for param in params:
        pool.apply_async(f, args=(param, ))
     pool.close()

此外还有Pool.startmap_async()Pool.map_async()方法,可以自行研究,这里不作赘述。

multiprocessing.Process()

Process表示一个子进程中的任务,经常可以见到开启多个Process对象进行并行。一个简单的案例是:

from multiprocessing import Process

def f(name):
    print('Hello', name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()
        process_list.append(p)
    for i in process_list:
        p.join()

其中,p.start()表示创建子进程pp.join()表示主线程等待p终止。使用Process的一个常见实例是结合timeout机制进行多进程爬虫。

joblib

joblib里面最常用到Parallel()类和delayed()方法。Parallel用于指定CPU数量,而delayed则用来指定需要执行的函数被并行的参数

官网上的案例如下:

from joblib import Parallel, delayed
from math import sqrt

if __name__ == '__main__':
    Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10))

concurrent.futures

concurrent.futures常用来实现多线程(ThreadPoolExecutor)和多进程(ProcessPoolExecutor)的异步并发,支持map()方法。这也是一个简洁的方法。

import os
from concurrent.futures import ProcessPoolExecutor

def f(n):
    print("I'm number {} with process id {}".format(n, os.getpid()))

with ProcessPoolExecutor(max_workers=10) as executor:
    executor.map(f, range(10))

偶然发现的小众包——pqdm

pqdm结合了进度条包tqdmconcurrent.futures,由此可以使用多个 CPU 进行高效的多进程并行。这个包是我在学习tqdm时偶然发现的,结果发现它意外地简洁易用。这个包既支持传递多个参数给函数并行,也能够显示进度条,非常好用,强烈推荐。

基本的使用方法如下:

from pqdm.processes import pqdm

args = [
    {'a': 1, 'b': 2},
    {'a': 2, 'b': 3},
    {'a': 3, 'b': 4},
    {'a': 4, 'b': 5}
]

def multiply(a, b):
    return a * b

results = pqdm(args, multiply, n_jobs=3, argument_type='kwargs')
print(results)

如果函数返回的是一个对象,那么可以使用从results中逐个提取返回的对象并进行后续操作。个人认为,这个包的语法与上面几种相比更加精炼,用起来不容易出错。

我自己是在并行分块导出栅格时用了这个包。友情提醒:在遇到栅格读写这种内存密集型操作时,一定要关注内存的占用量。我在我的 MacBook 上开了 6 个核,每个核按np.array的形式读取大小为 12000*12000 的栅格,直接把我的 32GB 内存给占爆了,而调成 5 个核就刚刚好。

关于代码提速的小建议

网上相关的内容很多,我在这儿列一些自己的心得感悟。

1、涉及到数据读取或交换时,使用pickle库。例如,我在自己拯救者电脑上使用geopandas读取一个几十万条记录的 shp 点文件需要约 40s,而我有数 10 个这样的文件。如果以后每次运行程序时都读取一次这该死的 shp 文件会占很多时间。后来发现,我可以一起读取所有的shp文件,然后把它们一起转成pickle格式。同样内容的文件,读取pickle格式只需要 4s,这极大地节约了我的时间。

2、使用del关键字,手动删掉一些数据内存占用大的中间变量。这也有助于在 CPU 上进行更密集的计算。

3、涉及到索引时巧妙使用字典dict,这个是最快的,千万别逐行遍历pandas.DataFrame

4、少写全局变量,保持良好的变量命名习惯!在 Jupyter 下,我以前经常会把函数的参数名与外面代码块的变量名写成相同的值。这样写特别特别容易出错,说多了都是泪。这里的代码提速提的是调试的速。

5、题外话:GPU 加速,也就是常见的 CUDA。Python 与 CUDA 结合的产物是numba库,它既支持 CPU 也支持 GPU。不过目前只能加速 Python 原生函数和部分 numpy 函数。网上有很多“一行代码让你的 python 加速10倍”这样的帖子,用的就是numba下的@jit。这个库我并不太熟,以后有时间再钻研。

参考资料

https://cainiaojiaocheng.com/Python/docs/3.10/library/multiprocessing

https://www.zhangshengrong.com/p/8AaYmbVba2/

https://joblib.readthedocs.io/en/latest/parallel.html

https://pqdm.readthedocs.io/en/latest/usage.html#different-ways-to-pass-arguments-to-function

评论区
暂无评论
avatar