前面在《单线程、多线程和协程的爬虫性能对比》一文中已经介绍过,协程和多线程。本文再继续介绍多进程的使用方法,并在文末将上次的爬虫改造成多进程。
相关概念
并发和并行的区别
在 Python 中,并发并不是指同一时刻有多个操作同时进行。相反,某个特定的时刻,它只允许有一个操作发生,只不过线程 / 任务之间会互相切换,直到完成。
并行(multi-processing)指的才是同一时刻、同时发生。比如你的电脑是 6 核处理器,那么同时开 6 个进程执行程序就可以加快运行速度。
适用场景:
- 并发通常应用于 I/O 操作频繁的场景,比如下载多个文件,I/O 操作的时间远远高于 CPU 运行处理的时间。
- 并行则更多应用于更消耗 CPU 的场景,比如复杂的3D渲染算法,将渲染任务分配到多个CPU并行运行可以提升渲染速度。
进程,是计算机用来描述程序运行状态的名词。一个进程在运行时需要消耗一定的资源,包括 CPU 的时间、内存、设备 I/O 等。
Python 的GIL全局解释器锁
Python 的并发是通过多线程的切换完成的,主程序在同一时刻只允许有一个线程执行。
事实上,Python 的解释器并不是线程安全的,为了解决由此带来的 race condition 等问题,Python 便引入了全局解释器锁,也就是同一时刻,只允许一个线程执行。当然,在执行 I/O 操作时,如果一个线程被 block 了,全局解释器锁便会被释放,从而让另一个线程能够继续执行。
我们使用的Python解释器通常都是CPython 解释器,CPython 使用引用计数来管理内存,所有对象实例,都会有一个引用计数,来记录有多少个指针指向它。当引用计数只有 0 时,则会自动释放内存。
GIL(Global Interpreter Lock),即全局解释器锁。本质上是类似操作系统的 Mutex。每一个 Python 线程,在 CPython 解释器中执行时,都会先锁住自己的线程,阻止别的线程执行。
如果没有GIL,有两个线程同时引用对象 a,就会造成引用计数可能最终只增加 1,这时第一个线程结束时,会把引用计数减少 1,第二个线程再试图访问 a 时就找不到有效的内存了。
总之,CPython 引进 GIL 的原因有:
- CPython 主要使用 C 语言库,都不是原生线程安全的
- 为了规避复杂的引用计数竞争风险问题
GIL 的工作机制:
看下图,Thread 1、2、3 轮流执行,每一个线程在开始执行时,都会获取GIL锁锁住,执行一段后,会释放 GIL,以允许别的线程开始利用资源。
如果 Python 线程锁住 GIL后不去释放 GIL,那别的线程就都没有运行的机会。因此CPython 解释器会去轮询检查线程 GIL 的锁住情况,强制当前线程去释放 GIL,这个机制叫check_interval
。
GIL本身只是为了方便 CPython 解释器层面的编写者,但由于check_interval这样的抢占机制的存在,python的多线程并不能保证应用程序层面的线程安全,所有必要的时候还是需要lock 等工具,来确保线程安全。
绕过 GIL 的两种思路:
- 使用 JPython(Java 实现的 Python 解释器);
- 把关键性能代码,放到C/C++等别的语言中实现。
Python中如何实现并行计算
由于默认Cpython解释器中GIL的存在,要做Python实现并行计算,除了更换解释器外,就只能使用多进程库。
在Python中多进程库有Futures和multiprocessing。下面分别介绍一下:
Futures 模块
位于 concurrent.futures 中,表示带有延迟的操作。Futures 会将处于等待状态的操作包裹起来放到队列中,这些操作的状态随时可以查询,当然,它们的结果或是异常,也能够在操作完成后被获取。
**如何使用Futures 模块的多进程呢?**答案是使用concurrent.futures.ProcessPoolExecutor。
对于Futures 中的 Executor 类,executer.map(f,iters)方法表示对iters中的每个元素并发执行f方法,直到完成后返回结果。
比如我们需要分别计算1-1000的平方并最终求和:
from concurrent.futures import ProcessPoolExecutor
# # 用于计算平方
def f(x):
return x * x
if __name__ == '__main__':
with ProcessPoolExecutor() as p:
res = p.map(f, range(1, 1001))
print("结果:", sum(res))
经测试,使用多进程时,必须申明在
if __name__ == '__main__':
中调用,无法会报错:concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
当我们执行 executor.submit(func) 时,它便会执行里面的 func() 函数并返回 future 实例,以便之后查询调用。as_completed(fs)传入一个future的列表或迭代器返回完成后的迭代器。每个future 对象都可以通过result()获取对应的结果或异常。
所以另一种用法就是:
from concurrent.futures import ProcessPoolExecutor, as_completed
# # 用于计算平方
def f(x):
return x * x
if __name__ == '__main__':
with ProcessPoolExecutor() as p:
futures = [p.submit(f, i) for i in range(1, 1001)]
res = [future.result() for future in as_completed(futures)]
print("结果:", sum(res))
对于Future另一种等待全部完成的方式使用wait和ALL_COMPLETED:
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED
# # 用于计算平方
def f(x):
return x * x
if __name__ == '__main__':
with ProcessPoolExecutor() as p:
futures = [p.submit(f, i) for i in range(1, 1001)]
wait(futures, return_when=ALL_COMPLETED)
res = [future.result() for future in futures]
print("结果:", sum(res))
其他方法:
- Futures 中的方法 done()用于查询操作是否已经完成。
- add_done_callback(fn)方法添加钩子,表示完成后调用fn函数。
可以测试一下这两个方法:
import time
from concurrent.futures import ProcessPoolExecutor
import random
# 用于计算平方
def f(x):
time.sleep(random.randint(1, 3))
return x * x
def done_func(future):
print("执行完毕,结果:", future.result())
if __name__ == '__main__':
with ProcessPoolExecutor() as p:
futures = []
for i in range(1, 5):
future = p.submit(f, i)
futures.append(future)
future.add_done_callback(done_func)
print(f"{i}: {future.done()}", end=",")
print()
for i, future in enumerate(futures, 1):
print(f"{i}: {future.done()}")
结果:
1: False,2: False,3: False,4: False,
1: False
2: False
3: False
4: False
执行完毕,结果: 16
执行完毕,结果: 1
执行完毕,结果: 4
执行完毕,结果: 9
future.result()方法能够指定超时时间:
import time
from concurrent.futures import ProcessPoolExecutor
import random
# 用于计算平方
def f(x):
time.sleep(random.randint(1, 3))
return x * x
def done_func(future):
print("执行完毕,结果:", future.result())
if __name__ == '__main__':
with ProcessPoolExecutor() as p:
futures = []
for i in range(1, 6):
future = p.submit(f, i)
futures.append(future)
future.add_done_callback(done_func)
print(f"{i}: {future.done()}", end=",")
print()
for i, future in enumerate(futures, 1):
try:
print(f"{i}: {future.result(2)} {future.done()}")
except Exception:
print(i, "超时")
1: False,2: False,3: False,4: False,5: False,
执行完毕,结果: 1
1: 1 True
执行完毕,结果: 9
执行完毕,结果: 25
执行完毕,结果: 16
2 超时
3: 9 True
4: 16 True
5: 25 True
执行完毕,结果: 4
Futures 模块方法总结
上面分别介绍了Executor 类的map和submit方法,个人感觉对于大部分需求map方法使用起来最方便,直接获取每个进程的最终结果。而submit则会提交不会阻塞当前程序的运行,我们无法确定程序是否已经运行完毕,可以通过as_completed或wait+ALL_COMPLETED来阻塞程序直到所有进程完成。
对于ThreadPoolExecutor(workers)中的参数workers表示进程池的大小,但上面的用法中均已省略该参数,此时系统会使用CPU的数量作为可调用的进程数。
multiprocessing 多进程库
实现程序的并行计算,另一种方式是使用标准库中的 multiprocessing 多进程库。
相对对于前面Futures的map方法,multiprocessing的map方法只需替换一下类名即可,几乎一样的用法:
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool() as p:
res = p.map(f, range(1, 1001))
print("结果:", sum(res))
对于multiprocessing的其他方法,使用方式与Futures 不太一样:
- map提交一批任务同步执行
- apply提交一个任务同步执行
- map_async提交一批任务异步执行
- apply_async提交一个任务异步执行
下面分别看看:
if __name__ == '__main__':
with Pool() as p:
for i in range(1, 11):
res = p.apply(f, (i,))
print(res, end=",")
1,4,9,16,25,36,49,64,81,100,
多任务的异步执行:
if __name__ == '__main__':
with Pool() as p:
futures = p.map_async(f, range(1, 11))
print(futures.get())
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
单个任务的异步提交:
if __name__ == '__main__':
with Pool() as p:
futures = [p.apply_async(f, (i,)) for i in range(1, 1001)]
res = [future.get() for future in futures]
print("结果:", sum(res))
结果: 333833500
Futures和multiprocessing 的多线程库
在Futures模块中:
- concurrent.futures.ProcessPoolExecutor表示多进程
- concurrent.futures.ThreadPoolExecutor表示多线程
在multiprocessing 模块中:
- multiprocessing.Pool表示多进程
- multiprocessing.dummy.Pool表示多线程
同一模块下的用法都一致。
Futures和multiprocessing多进程性能对比
首先看看futures的100个计算任务的并行性能:
from concurrent.futures import ProcessPoolExecutor
import time
def f(x):
time.sleep(0.1)
return x * x
if __name__ == '__main__':
with ProcessPoolExecutor() as p:
max_count = 100
time1 = time.time()
res = p.map(f, range(1, max_count + 1))
print(sum(res))
time2 = time.time()
print(f"{max_count},耗时:{time2 - time1:.2f}秒")
338350
100,耗时:2.07秒
再看看multiprocessing的100个计算任务的并行性能:
import time
from multiprocessing import Pool
def f(x):
time.sleep(0.1)
return x * x
if __name__ == '__main__':
with Pool() as p:
max_count = 100
time1 = time.time()
res = p.map(f, range(1, max_count + 1))
print(sum(res))
time2 = time.time()
print(f"{max_count},耗时:{time2 - time1:.2f}秒")
338350
100,耗时:2.32秒
但是当程序增长任务数,减少耗时时,multiprocessing似乎更快。下面我们将耗时改为0.001,执行10000个任务:
from concurrent.futures import ProcessPoolExecutor
import time
def f(x):
time.sleep(0.001)
return x * x
if __name__ == '__main__':
with ProcessPoolExecutor() as p:
max_count = 10000
time1 = time.time()
res = p.map(f, range(1, max_count + 1))
print(sum(res))
time2 = time.time()
print(f"{max_count},耗时:{time2 - time1:.2f}秒")
333383335000
10000,耗时:6.86秒
再看看multiprocessing:
import time
from multiprocessing import Pool
def f(x):
time.sleep(0.001)
return x * x
if __name__ == '__main__':
with Pool() as p:
max_count = 10000
time1 = time.time()
res = p.map(f, range(1, max_count + 1))
print(sum(res))
time2 = time.time()
print(f"{max_count},耗时:{time2 - time1:.2f}秒")
333383335000
10000,耗时:3.49秒
最后测试10个并行任务耗时1秒时,前者耗时均为2.32秒,后者为2.36秒。
综合来看,在任务数量较少时,concurrent.futures会更快一点;任务数量过多时,multiprocessing会更快一些。当然目前测试的方面非常局限,整体来说两个类都各有千秋都可以使用。相信对于真实的需要并行处理的任务,两个类的性能会相差越来越小。到底用哪个就取决于个人习惯了。
深圳影讯的多进程爬虫代码
下面我们将之前的爬虫代码再用多进程方式实现一遍:
"""
小小明的代码
CSDN主页:https://blog.csdn.net/as604049322
"""
__author__ = '小小明'
__time__ = '2021/7/14 9:43'
import time
import requests
from lxml import etree
import pandas as pd
import re
from concurrent.futures import ProcessPoolExecutor
def fetch_content(url):
print(url)
headers = {
"Accept-Encoding": "Gzip", # 使用gzip压缩传输数据让访问更快
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}
r = requests.get(url, headers=headers)
return r.text
def main():
url = "https://movie.douban.com/cinema/later/shenzhen/"
init_page = fetch_content(url)
html = etree.HTML(init_page)
all_movies = html.xpath("//div[@id='showing-soon']/div")
result = []
for e in all_movies:
# imgurl, = e.xpath(".//img/@src")
name, = e.xpath(".//div[@class='intro']/h3/a/text()")
url, = e.xpath(".//div[@class='intro']/h3/a/@href")
# date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
like_num, = e.xpath(
".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
result.append((name, int(like_num[:like_num.find("人")]), url))
main_df = pd.DataFrame(result, columns=["影名", "想看人数", "url"])
max_workers = main_df.shape[0]
with ProcessPoolExecutor(max_workers=max_workers) as executor:
pages = executor.map(fetch_content, main_df.url)
result = []
for url, html_text in zip(main_df.url, pages):
html = etree.HTML(html_text)
row = {}
for line in re.split("[\n ]*\n[\n ]*", "".join(html.xpath("//div[@id='info']//text()")).strip()):
line = line.strip()
arr = line.split(": ", maxsplit=1)
if len(arr) != 2:
continue
k, v = arr
row[k] = v
row["url"] = url
result.append(row)
detail_df = pd.DataFrame(result)
df = main_df.merge(detail_df, on="url")
df.drop(columns=["url"], inplace=True)
df.sort_values("想看人数", ascending=False, inplace=True)
df.to_csv("shenzhen_movie.csv", index=False)
if __name__ == '__main__':
start_time = time.time()
main()
print("多进程总耗时:", f"{time.time() - start_time:.2f}s")
运行结果:
https://movie.douban.com/cinema/later/shenzhen/
https://movie.douban.com/subject/30330751/
https://movie.douban.com/subject/35489675/
https://movie.douban.com/subject/34961041/
https://movie.douban.com/subject/27191370/
https://movie.douban.com/subject/35256235/
https://movie.douban.com/subject/35182279/
https://movie.douban.com/subject/30463527/
https://movie.douban.com/subject/35400472/
https://movie.douban.com/subject/30435124/
https://movie.douban.com/subject/35132974/
https://movie.douban.com/subject/33437509/
https://movie.douban.com/subject/33428740/
https://movie.douban.com/subject/35445373/
https://movie.douban.com/subject/35215390/
https://movie.douban.com/subject/30174085/
https://movie.douban.com/subject/27599400/
https://movie.douban.com/subject/26371812/
https://movie.douban.com/subject/34906768/
https://movie.douban.com/subject/35081773/
https://movie.douban.com/subject/35158124/
https://movie.douban.com/subject/27624770/
https://movie.douban.com/subject/30312070/
https://movie.douban.com/subject/33973077/
https://movie.douban.com/subject/35030151/
多进程总耗时: 6.55s
爬取结果:
从耗时来说会比之前的协程爬虫和多线程爬虫稍微快一点,但综合来看不推荐爬虫使用多进程,因为性能提升微乎其微,但性能多消耗了非常多,而且要求代码必须在方法内部。
总结
在python的concurrent.futures和multiprocessing多进程库中,对于需要使用多进程并行加速的场景,基本上只需要用map方法就满足要求,可以利用多核CPU加速执行任务了。
今天本文将两个库放在一起演示,目的就是为了防止记忆混淆,通过这种对比的方法,理清其中的逻辑关系。