普通的python爬虫是单进程单线程的,这样在遇到大量重复的操作时就只能逐个进行,我们就很难过了。举个栗子:你有1000个美图的链接,逐个喂给下载器(函数),看着图片只能一个个蹦出来,你不心急吗?于是我们想,能不能同时跑多个下载器,实现多图同时下载?——答案是可以的,使用多进程/多线程,把每个带着不同参数下载器分给每个进程/线程就,然后同时跑多个进程/线程就行了。
本文就介绍如何用多线程和多进程给爬虫加速
补充主线程与子线程(进程同理):
- 一个py程序就有一个主线程,主线程负责整个py程序的代码,当主线程处理到启用多线程的代码时,就会创建若干个子线程,这里就有选择了,主线程是等待子线程的结束再继续处理还是直接继续处理让子线程在外头跑
多进程
Python标准库原本有threading和multiprocessing模块编写相应的多线程/多进程代码。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。多进程我们介绍futures的ProcessPoolExecutor
注:python 2.7 请安装future模块,pip install future
ProcessPoolExecutor类是Executor类的子类,实例化ProcessPoolExecutor类以创建进程池,在实例化的过程中应指定同时运行的最大进程数
'''
想要学习Python?Python学习交流群:973783996满足你的需求,资料都已经上传群文件,可以自行下载!
'''
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(max_workers=4) # 运行最大进程数4
#进程池的操作...
pool.shutdown(wait=True) # 关闭进程池,默认等待所有进程的完成。
print('Deep') # 有shutdown的情况下所有进程完成后才会运行下面的print,没有的话会马上运行
'创建进程也可用with,这时会自带shutdown功能
with ProcessPoolExecutor(4) as pool:
#进程池的操作...
'
该类有两种方法对进程池提交任务建立进程(函数及一组参数构成一个任务),分别是submit()
和map()
,如果单纯想多开进程别无他想,用哪个都行,但submit()会有更灵活的用法
map(fn,*iterables)
- fn:函数
- *iterables:函数每个参数的集合,N个参数就接N个集合
可以理解这是python自带map()的多进程版,他返回的是一个迭代器,包含每个任务对应的返回值(有序的),下面用例子来分析
from concurrent.futures import ProcessPoolExecutor
import time
def test(x):
time.sleep(x) # 时间阻塞
print(str(x)+'s')
return x
if __name__ == '__main__':
with ProcessPoolExecutor(4) as pool:
p = pool.map(test,[2,3,10,5,6])
for i in p:
print(i)
输出
2s
2
3s
3
5s
6s
10s
10
5
6
分析(下面以参数代替某个进程):
- 带s的是函数输出的,进程池最大允许4个进程同时运行,所以参数 2,3,10,5 首先一起进去。2最快完成,马上让给6进去,2+6<10 ,所以后进6完成得比10快,最后输出顺序就是
2s,3s,5s,6s,10s
- 不带s的是for循环打印迭代器中的结果,由输出可见,i的值分配是会等待进程完成返回值的,等2的完成返回2,等3的完成返回3,等10的完成返回10,由于10完成前5和6早就完成了,所以返回10后紧接着返回5和6,最后输出顺序为
2,3,10,5,6
,是有序的,对应各任务的返回值
在爬虫中,上面代码中的时间阻塞会对应着网络I/O阻塞,任务中往往包含着网络请求。比如你有很多个图片链接,就写一个下载图片的函数(接收一个图片链接的参数),把函数和图片链接的集合喂给map()就实现多进程了加速了。
submit(fn, *arg)
- fn:函数
- *arg:函数的参数
该方法是往进程池中提交可回调的任务,并返回一个future实例。提交多个任务可用循环实现,返回的future实例用列表存起来,每个future代表一个进程。关于future对象有许多方法:
- future.running():判断某个future(进程)是否运行中
- future.done():判断某个future(进程)是否正常结束
- future.cancel():终止某个future(进程),终止失败返回False,成功返回True
- future.result():获取future对应任务返回的结果。如果future还没完成就会去等待
- future.add_done_callback(fn):接收函数fn,将fn绑定到future对象上。当future对象被终止或完成时,fn将会被调用并接受该future对象
- as_completed(fs):接收futures列表,futures列表中一旦有某个future(进程)完成就将该future对象yield回来,是个迭代器
from concurrent.futures import ProcessPoolExecutor,as_completed
import time
def test(x):
time.sleep(x)
print(str(x)+'s')
return x
if __name__ == '__main__':
with ProcessPoolExecutor(4) as pool:
futures = [pool.submit(test,i) for i in [2,3,10,5,6]]
'''for j in futures:
print(j.result()) # 对应接收参数有序输出,输出2,3,10,5,6
'''
for j in as_completed(futures):
print(j.result()) # 对应进程完成顺序输出,输出2,3,5,6,10
多线程
建议小心使用,虽然多线程能实现高并发,但由于线程资源共享的特性,某个线程操作这些共享的资源时可能操到一半就停止让给另一个线程操作,导致错乱的发生。为避免此情况发生对某些操作需要加锁,所以这里介绍对锁有支持的threading模块,python自带直接导入。
如果你确信这些操作不会发生错乱,可以直接使用concurrent.future 的 ThreadPoolExecutor,方法什么的和ProcessPoolExecutor的一样
线程
创建线程有两种方法:
-
实例化 threading.Thread 类,target接收函数,arg以可迭代形式接收参数。这种方法最简单
import threading
import time
def test(x):
time.sleep(x)
print(str(x)+'s')
return x
t1 = threading.Thread(target=test, args=(1,)) # 创建线程
t2 = threading.Thread(target=test, args=(3,))
t1.start() # 启动线程
t2.start()
继承threading.Thread 类,重写run方法,把函数及参数接收写进自己写的多线程类中。这种方法更灵活,threading.Thread 类并没有供获取线程调用函数返回值的方法,如果需要函数返回值就需要继承该类自己实现
import threading
import time
class TestThread(threading.Thread):
def __init__(self,x):
threading.Thread.__init__(self)
self.x = x # 参数接收
def run(self):
time.sleep(self.x) # 原来的函数写到run中
print(str(self.x)+'s')
def result(self): # 实现获取调用函数的返回值的方法
return self.x
t1 = TestThread(1) #创建线程
t2 = TestThread(3)
t1.start() # 启动线程
t2.start()
t1.join() # 等待线程结束
t2.join()
print(t1.result(),t2.result())
线程相关方法和属性:
- Thread.start():启动线程
- Thread.join():等待线程的结束,没有join的话会接着运行join下面的代码
- Thread.is_alive():判断线程是否在运行,线程未开启/结束时返回 False
- Thread.name:返回线程的名字,默认线程名是Thread-N,N指第N个开启的线程
- Thread.setName(str):给线程命名
- Thread.setDaemon(True/False):设置子线程是否会随主线程结束而结束,原本所有子线程默认是不会随主线程结束而结束的
锁
线程间资源共享,如果多个线程共同对某个数据修改,可能会出现错误,为了保证数据的正确性,需要对多个线程进行同步。这时就需要引入锁了(利用GIL),锁只有一个,一个线程在持有锁的状态下对某些数据进行操作,其他线程就无法对该数据进行操作,直至该线程释放锁让其他线程抢,谁抢到谁就有权修改。
threading提供Lock和RLock两类锁,前者一个线程只能获取获取一次锁,后者允许一个线程能重复获取锁。如果某个线程对全局数据的操作是割裂的(分块的),那就使用RLock。
- acquire():获取锁
- release():释放锁
- 有数据操作放在acquire 和 release 之间,就不会出现多个线程修改同一个数据的风险了
- acquire 和 release 必须成对存在,如果一个线程只拿不放,其他线程没有锁能抢就只能永远阻塞(停止)
一个错乱的例子及锁的使用:
import time, threading
lock = threading.Lock() # rlock = threading.RLock()
balance = [0]
def test(n):
for i in range(100000): # 理想的情况是执行了+n,-n操作后才让另一个线程处理,结果永0
#lock.acquire()
balance[0] = balance[0] + n # 某个线程可能处理到这里就终止让给另一个线程处理了,循环一大,结果可能错乱不为0
balance[0] = balance[0] - n
#lock.release()
t1 = threading.Thread(target=test, args=(5,))
t2 = threading.Thread(target=test, args=(8.0,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance[0])
在不加锁的情况下多跑几次,你会的到不同的结果。但是加了锁之后,+n,-n两个操作完整执行,不会中途中断,结果永0。
限制同时运行线程数
使用 threading.Semaphore 类就行,Semaphore 在内部管理着一个计数器。调用 acquire() 会使这个计数器减1,release() 则是加1。计数器的值永远不会小于 0。当计数器到 0 时,再调用 acquire() 就会阻塞,直到其他线程来调用release(),这样就限制了同时运行线程的数量。
使用上非常简单,实例化Semaphore并指定线程数后,给函数的头加个acquire(),尾加个release()就行。
import threading, time
def test(x):
semaphore.acquire()
time.sleep(x)
print(x)
semaphore.release()
semaphore = threading.Semaphore(4) # 最大4个线程同时进行
ts = [threading.Thread(target=test,args=(i,)) for i in [2,3,5,10,6]]
[t.start() for t in ts]
'输出:2,3,5,6,10
(原理和上面多进程的那个差不多)'
关于threading的其他高级用法本文并未提及,以上都是些常用的用法,如果有更高级的需要,可以参考这文章
应用在爬虫上
讲了这么多,都是模块的用法,没怎么提到爬虫。那么最后大概的讲下如何把多进程/多线程运用到爬虫中,并给个代码实例用作参考。
- 如果爬虫需要重复进行某个操作(如下载一张图片,爬取一张网页的源码,破解一次加密【加密耗cpu最好多进程】),那把这个操作抽象成一个接收相应参数的函数,把函数喂给进程/线程即可。
- 没了,大概就这么用Ծ‸ Ծ
下面给个多进程/多线程结合的网易云音乐评论下载器(下载某首音乐的多页评论),包含加密算法,如不清楚可看之前的文章,我们用多进程加速加密过程,用多线程加速爬取过程。
本代码较长,长到高亮效果都没有了,因此该长代码分为两部分,前半部分是之前文章提到的加密方法,后半部分是本文的多进程多线程重点代码:
import json, re, base64, random, requests, binascii, threading
from Crypto.Cipher import AES#新的加密模块只接受bytes数据,否者报错,密匙明文什么的要先转码
from concurrent.futures import ProcessPoolExecutor
from math import ceil
secret_key = b'0CoJUm6Qyw8W8jud'#第四参数,aes密匙
pub_key ="010001"#第二参数,rsa公匙组成
modulus = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"#第三参数,rsa公匙组成
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36'}
def random_16():
return bytes(''.join(random.sample('1234567890DeepDarkFantasy',16)),'utf-8')
#aes加密
def aes_encrypt(text,key):
pad = 16 - len(text)%16#对长度不是16倍数的字符串进行补全,然后在转为bytes数据
try: #如果接到bytes数据(如第一次aes加密得到的密文)要解码再进行补全
text = text.decode()
except:
pass
text = text + pad * chr(pad)
try:
text = text.encode()
except:
pass
encryptor = AES.new(key,AES.MODE_CBC,b'0102030405060708')
ciphertext = encryptor.encrypt(text)
ciphertext = base64.b64encode(ciphertext)#得到的密文还要进行base64编码
return ciphertext
#rsa加密
def rsa_encrypt(ran_16,pub_key,modulus):
text = ran_16[::-1]#明文处理,反序并hex编码
rsa = int(binascii.hexlify(text), 16) ** int(pub_key, 16) % int(modulus, 16)
return format(rsa, 'x').zfill(256)
#返回加密后内容
def encrypt_data(data):
ran_16 = random_16()
text = json.dumps(data)
params = aes_encrypt(text,secret_key)
params = aes_encrypt(params,ran_16)
encSecKey = rsa_encrypt(ran_16,pub_key,modulus)
return {'params':params.decode(),
'encSecKey':encSecKey }
class OnePageComment(threading.Thread): # 下载一页评论的线程类
def __init__(self,post_url, enc_data):
threading.Thread.__init__(self)
self.post_url = post_url
self.enc_data = enc_data
self.comment = '' # 创建一个comment变量储存爬到的数据
def run(self):
semaphore.acquire()
content = requests.post(self.post_url, headers = headers, data = self.enc_data ).json()
if 'hotComments' in content:
if content['hotComments']:
self.comment += '*************精彩评论\n\n'
self.common(content, 'hotComments')
self.comment += '\n\n*************最新评论\n\n'
self.common(content, 'comments')
else:
self.common(content, 'comments')
semaphore.release()
def common(self, content,c_type):
for each in content[c_type]:
if each ['beReplied']:
if each['beReplied'][0]['content']:
self.comment += each['content'] + '\n\t回复:\n\t' + each['beReplied'][0]['content'] + '\n' + '-' * 60 + '\n'
else:
self.comment += each['content'] + '\n' + '-' * 60 + '\n'
def get_comment(self): # 选择返回评论而不是直接写入文件,因为多个线程同时操作一个文件有风险,应先返回,后统一写入
return self.comment
def get_enc_datas(pages, max_workers=4): # 多进程加密
raw_datas = []
for i in range(pages):
if i == 0:
raw_datas.append({'rid':"", 'offset':'0', 'total':"true", 'limit':"20", 'csrf_token':""})
else:
raw_datas.append({'rid':"", 'offset':str(i*20), 'total':"false", 'limit':"20", 'csrf_token':""})
with ProcessPoolExecutor(max_workers) as pool: # 多进程适合计算密集型任务,如加密
result = pool.map(encrypt_data,raw_datas)
return list(result)
def one_song_comment(id_): # 爬取一首歌的评论并写入txt,网络I/O密集使用多线程
post_url = 'http://music.163.com/weapi/v1/resource/comments/R_SO_4_' + str(id_) + '?csrf_token='
ts = [OnePageComment(post_url,i) for i in enc_datas]
[i.start() for i in ts]
[i.join() for i in ts]
comments = [i.get_comment() for i in ts]
with open(id_ + '.txt', 'w', encoding='utf-8') as f:
f.writelines(comments)
if __name__ == '__main__':
semaphore = threading.Semaphore(4) # 最大线程4
enc_datas = get_enc_datas(10) # 获取加密后的数据,对所有歌曲都是通用的,这里有十页的加密数据,对应爬十页评论
one_song_comment('29498682')
效果提升惊人!!不信你跑一下上面的程序,然后和自己写的单线程/单进程比较
cpu和网络都跑到了峰值,网络峰值在cpu峰值之后,因为是先多进程加密数据,后多线程爬取