介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。managers提供了一种创建数据的方法,可以在不同的进程之间共享数据,包括在不同机器上运行的进程之间通过网络共享。manage对象控制管理共享对象的服务器进程。其他进程可以通过使用代理访问共享对象。

服务进程

我们先来看服务进程,服务进程主要工作:

  • 服务进程负责启动Queue
  • 把Queue注册到网络上
  • 往Queue里面写入任务

我们用代码来看实际效果:

import queue
from multiprocessing.managers import BaseManager
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''
# 创建task_queue和result_queue对了用来存放任务和结果
task_queue = queue.Queue()
result_queue = queue.Queue()

class QueueManager(BaseManager):
    """
    继承BaseManager
    """
    pass

# 把创建的两个队列注册到网络上,利用register方法,callable参数关联对象
# 注意windows下绑定调用接口不能使用lambda
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定5000端口, 设置密钥
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动queue,监听通道
manager.start()
# 获得通过网络访问的对象,注意分布式进程必须通过manager.get_task_queue()获得的Queue接口添加 
task = manager.get_task_queue()
result = manager.get_result_queue()
# 添加任务
for url in ['url_' + str(i) for i in range(10)]:
    print('put task %s...' % url)
    task.put(url)

# 获取返回结果
for i in range(10):
    print('result is %s' % result.get(timeout=10))
# 关闭管理
manager.shutdown()
print('master exit')

任务进程

任务进程主要的工作如下:

  • 注册获取网络上queue
  • 连接服务器
  • 从task队列获取任务,并写入结果

示例代码如下:

import time
from multiprocessing.managers import BaseManager
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''
class QueueManager(BaseManager):
    """
    继承类BaseManager
    """
    pass

# 使用register注册,获取网络queue名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 配置服务器ip并连接服务器
server_addr = '127.0.0.1'
print('connect to server %s...' % server_addr)
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接
m.connect()
# 获取queue对象
task = m.get_task_queue()
result = m.get_result_queue()
# 从task获取任务并将结果写入result
while(not task.empty()):
    image_url = task.get(True, timeout=5)
    print('run task download %s...' % image_url)
    time.sleep(1)
    result.put('%s ------>success' % image_url)
print('worker exit')

先启动服务进程,结果如下:

put task url_0...
put task url_1...
put task url_2...
put task url_3...
put task url_4...
put task url_5...
put task url_6...
put task url_7...
put task url_8...
put task url_9...

再启动任务进程,结果如下:

connect to server 127.0.0.1...
run task download url_0...
run task download url_1...
run task download url_2...
run task download url_3...
run task download url_4...
run task download url_5...
run task download url_6...
run task download url_7...
run task download url_8...
run task download url_9...
worker exit

再回过头来看服务进程端输出

put task url_0...
put task url_1...
put task url_2...
put task url_3...
put task url_4...
put task url_5...
put task url_6...
put task url_7...
put task url_8...
put task url_9...
result is url_0 ------>success
result is url_1 ------>success
result is url_2 ------>success
result is url_3 ------>success
result is url_4 ------>success
result is url_5 ------>success
result is url_6 ------>success
result is url_7 ------>success
result is url_8 ------>success
result is url_9 ------>success
master exit

这个简单的分布式有什么用呢?从例子也可以看出,如果我们设置多个任务进程,就可以把任务分配到多台机器上,服务端来添加url到队列,任务端来下载并返回结果。而Queue之所以能通过网络访问,就是通过QueueManager实现的。


本文转载:CSDN博客