py

楼主: sustainer123 (caster)   2024-09-28 19:01:43
利用多进程实现简单的分布式运算
process可以分散到多台机器运行
multiprocessing的manager可以实现这功能
他把queue丢到网络上并被其他机器读取
py:
import random, os, time
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
task_queue = Queue()
result_queue = Queue()
def get_task_queue():
return task_queue
def get_result_queue():
return result_queue
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue', callable=get_task_queue)
QueueManager.register('get_result_queue', callable=get_result_queue)
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
print(os.getpid())
s = manager.get_server()
s.serve_forever()
这边先建立服务器管理任务
这边分成四个步骤:
1.建立queue queue负责进程间的交流
2.将创建的queue丢到网上,callable是能调用的对象,其他机器可以透过网络直接
调用这函数,然后透过函数取得queue
3.建立manager,设定端口与密码,密码需要二进制,所以前面有个b
4.启动服务器
运行这段程式码 其他机器就能连进服务器
py:
import random, time, os
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '127.0.0.1'
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
for i in range(10):
n = random.randint(0, 10000)
print(f'put task {n}')
task.put(n)
print('少女祈祷中')
for i in range(10):
try:
r = result.get(timeout=10)
print(f'result {r}')
except Exception as e:
print(e)
print(os.getpid())
这段程式码的功能是产生资料,存入任务queue,等待,读取运算结果
这边是负责执行任务的机器
首先使用QueueManager注册要调用的函数
第二步 连结服务器
第三步 取得queue
第四步 执行任务
py:
import time, queue, os
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '127.0.0.1'
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
for i in range(10):
try:
n = task.get(timeout=1)
print(f'run task {n}')
r = f'{n} * {n} = {n * n}'
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty.')
print(os.getpid() )
print('worker exit.')
这段程式码的功能是对task queue里面的数字平方
完成后放入 result queue
大致步骤跟上面一样 不赘述
透过这三段程式码 我们就完成了一个简单的分布式架构
我印象Celery dask之类的好像更常用
但我没修过分布式系统 就请其他大师说明
另外有错的部分欢迎指正 感谢
程式码参考下面两个网站 不过直接照抄跑不起来 所以我有另外修改
参考资料:
https://liaoxuefeng.com/books/python/process-thread/process-manager/index.html
https://docs.python.org/3.10/library/multiprocessing.html

Links booklink

Contact Us: admin [ a t ] ucptt.com