介绍
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。managers提供了一种创建数据的方法,可以在不同的进程之间共享数据,包括在不同机器上运行的进程之间通过网络共享。manage对象控制管理共享对象的服务器进程。其他进程可以通过使用代理访问共享对象。
服务进程
我们先来看服务进程,服务进程主要工作:
- 服务进程负责启动Queue
- 把Queue注册到网络上
- 往Queue里面写入任务
我们用代码来看实际效果:
import queue
from multiprocessing.managers import BaseManager
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''
# 创建task_queue和result_queue对了用来存放任务和结果
task_queue = queue.Queue()
result_queue = queue.Queue()
class QueueManager(BaseManager):
"""
继承BaseManager
"""
pass
# 把创建的两个队列注册到网络上,利用register方法,callable参数关联对象
# 注意windows下绑定调用接口不能使用lambda
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定5000端口, 设置密钥
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动queue,监听通道
manager.start()
# 获得通过网络访问的对象,注意分布式进程必须通过manager.get_task_queue()获得的Queue接口添加
task = manager.get_task_queue()
result = manager.get_result_queue()
# 添加任务
for url in ['url_' + str(i) for i in range(10)]:
print('put task %s...' % url)
task.put(url)
# 获取返回结果
for i in range(10):
print('result is %s' % result.get(timeout=10))
# 关闭管理
manager.shutdown()
print('master exit')
任务进程
任务进程主要的工作如下:
- 注册获取网络上queue
- 连接服务器
- 从task队列获取任务,并写入结果
示例代码如下:
import time
from multiprocessing.managers import BaseManager
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''
class QueueManager(BaseManager):
"""
继承类BaseManager
"""
pass
# 使用register注册,获取网络queue名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 配置服务器ip并连接服务器
server_addr = '127.0.0.1'
print('connect to server %s...' % server_addr)
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接
m.connect()
# 获取queue对象
task = m.get_task_queue()
result = m.get_result_queue()
# 从task获取任务并将结果写入result
while(not task.empty()):
image_url = task.get(True, timeout=5)
print('run task download %s...' % image_url)
time.sleep(1)
result.put('%s ------>success' % image_url)
print('worker exit')
先启动服务进程,结果如下:
put task url_0...
put task url_1...
put task url_2...
put task url_3...
put task url_4...
put task url_5...
put task url_6...
put task url_7...
put task url_8...
put task url_9...
再启动任务进程,结果如下:
connect to server 127.0.0.1...
run task download url_0...
run task download url_1...
run task download url_2...
run task download url_3...
run task download url_4...
run task download url_5...
run task download url_6...
run task download url_7...
run task download url_8...
run task download url_9...
worker exit
再回过头来看服务进程端输出
put task url_0...
put task url_1...
put task url_2...
put task url_3...
put task url_4...
put task url_5...
put task url_6...
put task url_7...
put task url_8...
put task url_9...
result is url_0 ------>success
result is url_1 ------>success
result is url_2 ------>success
result is url_3 ------>success
result is url_4 ------>success
result is url_5 ------>success
result is url_6 ------>success
result is url_7 ------>success
result is url_8 ------>success
result is url_9 ------>success
master exit
这个简单的分布式有什么用呢?从例子也可以看出,如果我们设置多个任务进程,就可以把任务分配到多台机器上,服务端来添加url到队列,任务端来下载并返回结果。而Queue之所以能通过网络访问,就是通过QueueManager实现的。