进程介绍
python开发中,进程与线程是非常重要的,打造分布式爬虫,提高工作效率都离不开进程与线程。
进程
进程就是一个程序在一个数据集上的一次动态执行过程。 进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
多进程
python实现多进程的方式有2种,一种为os模块中的fork方法,另一种为multiprocessing模块,两种方式 的区别是fork方法只支持Unix/Linux系统,不支持Windows,而后一种方法是跨平台的。
fork方式实现多进程
fork方法比较特殊,普通方法调用一次,返回一次,但fork方法调用一次,返回两次,因为操作系统将当前父进程复制出一个子进程,这两个进程几乎完全相同,于是fork方法分别在父进程和子进程中返回,子进程中永远返回0,父进程中返回的是子进程的id,os模块的getpid用于获取子进程id,getppid用于获取父进程id,见下面示例:
import os
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''
if __name__ == '__main__':
print 'current Process %s start ...' % (os.getpid())
pid = os.fork()
print pid
if pid < 0:
print 'error in fork'
elif pid == 0:
print 'I am child process %s and my parent process is %s' % (os.getpid(), os.getppid())
else:
print 'I %s created a child process %s.' %(os.getpid(), pid)
返回结果如下:
# 父进程
current Process 11989 start ...
# 子进程pid
11990
I 11989 created a child process 11990.
# 子进程pid此时返回0
0
I am child process 11990 and my parent process is 11989
multiprocessing方式多进程
multiprocessing的一个类Process来描述进程对象,创建子进程时只需要传入一个函数和函数参数就可以创建一个Process实例,用start()方法启动进程,join()方法实现进程间同步,见下例:
import os
from multiprocessing import Process
def run_proc(name):
"""
定义函数
"""
print('child process %s (%s) running...' % (name, os.getpid()))
if __name__ == '__main__':
# 打印当前进程pid
print('Parent process %s.' % os.getpid())
# 循环方式生成process实例
for i in range(5):
p = Process(target=run_proc, args=str(i))
print('Process will start.')
p.start()
# 这里的join要注意,默认父进程执行完自己的任务以后,就退出了,
# 此时子进程会继续执行自己的任务,直到自己的任务结束,而加上
# join后join所完成的工作就是进程同步,即父进程任务结束之后,
# 进入阻塞状态,一直等待其他的子进程执行结束之后,主进程再终止
p.join()
print('Process end.')
结果如下:
Parent process 3488.
Process will start.
Process will start.
Process will start.
Process will start.
Process will start.
child process 0 (3148) running...
child process 3 (12580) running...
child process 2 (2668) running...
child process 1 (13664) running...
child process 4 (13916) running...
Process end.
以上两种方式生成少量进程可以实现,但是如果要有成百上千的进程生成就不那么好实现了,这是进程池Pool就发挥作用了。
Pool
multiprocessing提供了一个Pool类来代表进程池对象,Pool可以提供指定数量的进程供用户调用,默认的大小是cpu核数,如果有新请求提交到Pool后,如果进程池没有满,就会创建一个新进程来执行该请求,如果进程池已经满了,那么请求就会等待,直到进程池中有进程结束,才会创建新进程,见示例:
import os
import time
import random
from multiprocessing import Pool
def run_task(name):
print('Tast %s (%s) is running' % (name, os.getpid()))
# 等待时间
time.sleep(random.random() * 3)
print('Task %s end.' % name)
if __name__ == '__main__':
print('current process %s.' % os.getpid())
# 创建容量为3的进程池
p = Pool(processes=3)
# 依次执行5个进程
for i in range(5):
# apply_async非阻塞方式
p.apply_async(run_task, args=str(i))
print('Writing for all subprocesses done...')
# 调用close后就不能再添加新进程了
p.close()
# 调用join方法会等待所有子进程结束,要在close之后执行join
p.join()
print('All subprocessed done.')
程序执行结果如下:
current process 18044.
Writing for all subprocesses done...
Tast 0 (15000) is running
Tast 1 (16796) is running
Tast 2 (12500) is running
Task 0 end.
Tast 3 (15000) is running
Task 3 end.
Tast 4 (15000) is running
Task 2 end.
Task 1 end.
Task 4 end.
All subprocessed done.
进程间通信
如果创建的大量进程,那么进程间通信是必不可少的,python提供了多种进程间通信方式,我们主要讲Queue和Pipe两种方式,Pipe常用来在两个进程间通信,Queue用来在多个进程间通信。
Queue
有两个方法put和get可以进行Queue操作:
put方法用来插入数据到队列中,get方法用来读取并删除队列内容,下面通过列子来说明:
import os
import time
import random
from multiprocessing import Process, Queue
def proc_write(q, urls):
"""
写数据进程执行的代码
"""
print('Process %s is writing...' % os.getpid())
for url in urls:
q.put(url)
print('Put %s to queue...' % url)
time.sleep(random.random())
def proc_read(q):
"""
读数据进程执行的代码
"""
print('Process %s is reading...' % os.getpid())
while True:
url = q.get(True)
print('Get %s from queue.' % url)
if __name__ == '__main__':
# 父进程创建queue,并传给各个子进程
q = Queue()
proc_write1 = Process(target=proc_write, args=(q, ['url1', 'url2', 'url3']))
proc_write2 = Process(target=proc_write, args=(q, ['url4', 'url5', 'url6']))
proc_read1 = Process(target=proc_read, args=(q,))
# 启动子进程写入
proc_write1.start()
proc_write2.start()
# 启动子进程读取
proc_read1.start()
# 等待proc_write结束
proc_write1.join()
proc_write2.join()
# proc_read1进程是死循环,无法等待结束,强制结束
proc_read1.terminate()
执行结果如下:
Process 13088 is writing...
Put url1 to queue...
Process 7700 is writing...
Put url4 to queue...
Process 11688 is reading...
Get url1 from queue.
Get url4 from queue.
Put url5 to queue...
Get url5 from queue.
Put url2 to queue...
Get url2 from queue.
Put url6 to queue...
Get url6 from queue.
Put url3 to queue...
Get url3 from queue.
可以看到上面示例,两个进程往里写数据,一个进程往外读数据。
最后介绍一下Pipe通信,Pipe常用于两个进程间通信,两个进程分别位于管道两端,返回的conn1, conn2代表管道的两端,注意duplex参数,如果为True表示全双工模式,两端均可以send和recv,如果为False表示conn1只负责接收,conn2只负责发,示例如下:
import os
import time
import random
import multiprocessing
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''
def proc_send(pipe, urls):
"""
发送端函数
"""
for url in urls:
print('Process %s send: %s ' % (os.getpid(), url))
pipe.send(url)
time.sleep(random.random())
def proc_recv(pipe):
"""
接收端函数
"""
while True:
print('Process %s rev: %s ' % (os.getpid(), pipe.recv()))
time.sleep(random.random())
if __name__ == '__main__':
# 创建实例
pipe = multiprocessing.Pipe()
# 建立发送进程
p1 = multiprocessing.Process(target=proc_send, args=(pipe[0], ['url_' + str(i) for i in range(10)]))
# 建立接收进程
p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1],))
# 启动
p1.start()
p2.start()
# 等待执行完成
p1.join()
p2.join()
同样是创建两个子进程,一个发数据,一个收数据,结果如下:
Process 16904 send: url_0
Process 2016 rev: url_0
Process 16904 send: url_1
Process 2016 rev: url_1
Process 16904 send: url_2
Process 2016 rev: url_2
Process 16904 send: url_3
Process 2016 rev: url_3
Process 16904 send: url_4
Process 16904 send: url_5
Process 2016 rev: url_4
Process 16904 send: url_6
Process 2016 rev: url_5
Process 2016 rev: url_6
Process 16904 send: url_7
Process 2016 rev: url_7
Process 16904 send: url_8
Process 2016 rev: url_8
Process 16904 send: url_9
Process 2016 rev: url_9