我一直在试验 python 中的多处理模块,我想知道如何通过生成的进程处理不同并行化方法的参数。这是我使用的代码:
import os
import time
import multiprocessing
class StateClass:
def __init__(self):
self.state = 0
def __call__(self):
return f"I am {id(self)}: {self.state}"
CONTEXT = multiprocessing.get_context("fork")
nb_workers = 2
stato = StateClass()
def wrapped_work_function(a1, a2, sss, qqq):
time.sleep(a1 + 1)
if a1 == 0:
sss.state = 0
else:
sss.state = 123
for eee in a2:
time.sleep(a1 + 1)
sss.state += eee
print(
f"Worker {a1} in process {os.getpid()} (parent process {os.getppid()}): {eee}, {sss()}"
)
return sss
print("main", id(stato), stato)
manager = CONTEXT.Manager()
master_workers_queue = manager.Queue()
work_args_list = [
(
worker_index,
[iii for iii in range(4)],
stato,
master_workers_queue,
)
for worker_index in range(nb_workers)
]
pool = CONTEXT.Pool(nb_workers)
result = pool.starmap_async(wrapped_work_function, work_args_list)
pool.close()
pool.join()
print("Finish")
bullo = result.get(timeout=100)
bullo.append(stato)
for sss in bullo:
print(sss, id(sss), sss.state)
例如,我从中得到以下输出:
main 140349939506416 <__main__.StateClass object at 0x7fa5c449dcf0>
Worker 0 in process 9075 (parent process 9047): 0, I am 140350069832528: 0
Worker 0 in process 9075 (parent process 9047): 1, I am 140350069832528: 1
Worker 1 in process 9077 (parent process 9047): 0, I am 140350069832528: 123
Worker 0 in process 9075 (parent process 9047): 2, I am 140350069832528: 3
Worker 0 in process 9075 (parent process 9047): 3, I am 140350069832528: 6
Worker 1 in process 9077 (parent process 9047): 1, I am 140350069832528: 124
Worker 1 in process 9077 (parent process 9047): 2, I am 140350069832528: 126
Worker 1 in process 9077 (parent process 9047): 3, I am 140350069832528: 129
Finish
<__main__.StateClass object at 0x7fa5c43ac190> 140349938516368 6
<__main__.StateClass object at 0x7fa5c43ac4c0> 140349938517184 129
<__main__.StateClass object at 0x7fa5c449dcf0> 140349939506416 0
初始类实例 stato
的 ID 140349939506416
,并且如我所料在其整个生命周期中保持不变。在 starmap_async
方法中,我确实得到了同一类的两个不同实例(每个工作人员/进程一个),我可以修改它们并保留它们的 state
属性直到结束的脚本。无论如何,这些实例的 id 最初是相同的 (140350069832528
),并且在脚本的末尾,它们都有另一个 id,这也与原始实例的 id 不同。
具有相同的 id 并不意味着它们在内存中具有相同的地址?那么他们怎么可能保持不同的状态
呢?
此行为与 fork 上下文有关吗?
最佳答案
首先,当我运行这个(Debian Linux,Python 3.9.7)时,我没有发现 sss
实例的 id 对于两个子进程是相同的:
main 140614771273680 <__main__.StateClass object at 0x7fe36d7defd0>
Worker 0 in process 19 (parent process 13): 0, I am 140614770671776: 0
Worker 0 in process 19 (parent process 13): 1, I am 140614770671776: 1
Worker 1 in process 20 (parent process 13): 0, I am 140614761373648: 123
Worker 0 in process 19 (parent process 13): 2, I am 140614770671776: 3
Worker 0 in process 19 (parent process 13): 3, I am 140614770671776: 6
Worker 1 in process 20 (parent process 13): 1, I am 140614761373648: 124
Worker 1 in process 20 (parent process 13): 2, I am 140614761373648: 126
Worker 1 in process 20 (parent process 13): 3, I am 140614761373648: 129
Finish
<__main__.StateClass object at 0x7fe36ce7b7f0> 140614761428976 6
<__main__.StateClass object at 0x7fe36ce7b520> 140614761428256 129
<__main__.StateClass object at 0x7fe36d7defd0> 140614771273680 0
即使您正在 fork 新进程,work_args_list
列表中的 stato
实例也会作为 sss
传递给您的辅助函数。传递给在不同进程/地址空间中运行的池工作函数的参数由 pickle
完成,它序列化然后反序列化实例,从而制作一个副本,将 general 在反序列化时有不同的 id。在这种特殊情况下,每个进程在使用 fork 方法时都继承全局变量 stato
并且它在所有进程/地址空间中应该具有相同的 id
。如果我们修改 wrapped_work_function
打印出 stato
的 id,我们可以验证这一点:
def wrapped_work_function(a1, a2, sss, qqq):
print('The id of the inherited stato is', id(stato))
time.sleep(a1 + 1)
if a1 == 0:
sss.state = 0
else:
sss.state = 123
for eee in a2:
time.sleep(a1 + 1)
sss.state += eee
print(
f"Worker {a1} in process {os.getpid()} (parent process {os.getppid()}): {eee}, {sss()}"
)
return sss
那么打印出来的是:
main 140456701534160 <__main__.StateClass object at 0x7fbe9fcd1fd0>
The id of the inherited stato is 140456701534160
The id of the inherited stato is 140456701534160
Worker 0 in process 43 (parent process 37): 0, I am 140456700920112: 0
Worker 0 in process 43 (parent process 37): 1, I am 140456700920112: 1
Worker 1 in process 44 (parent process 37): 0, I am 140456700920112: 123
Worker 0 in process 43 (parent process 37): 2, I am 140456700920112: 3
Worker 0 in process 43 (parent process 37): 3, I am 140456700920112: 6
Worker 1 in process 44 (parent process 37): 1, I am 140456700920112: 124
Worker 1 in process 44 (parent process 37): 2, I am 140456700920112: 126
Worker 1 in process 44 (parent process 37): 3, I am 140456700920112: 129
Finish
<__main__.StateClass object at 0x7fbe9f36e880> 140456691689600 6
<__main__.StateClass object at 0x7fbe9f36eb20> 140456691690272 129
<__main__.StateClass object at 0x7fbe9fcd1fd0> 140456701534160 0
所有地址空间都看到相同的 stato
id,即 140456701534160。如果每个地址空间看到相同的继承 stato
id,则 sss 的 id
,它应该是 stato 的单独副本,
不能与 stato
具有相同的 ID。当我运行代码时,它们的 ID 与我预期的不同。但是在不同地址空间中运行的每个 sss
可能具有彼此相同的 id,但这并不能保证(在第二次运行时它们是相同的)。
但即使 sss
实例具有相同的 id 和相同的地址,这两个实例存在于两个不同的进程中,因此是两个不同的地址空间。这就是它们可以保持不同状态的原因。顺便说一句,当您的辅助函数返回 sss
时,它会使用 pickle 传递回主进程,它序列化和反序列化实例,因此实际上制作了原始副本。这就是您看到返回的 ID 不同的原因。
另外:您有 bullo = result.get(timeout=100)
测试可能的超时。但此语句之前调用了 pool.close()
和 pool.join()
。这两个调用将等待所有提交的任务完成。因此,当您调用 result.get
时,任务保证已完成并且永远不会导致超时异常。
https://stackoverflow.com/questions/72445734/
相关文章:
kubernetes - 由于 'No such file or directory',Kubect
python - Gitlab CI 在 Python 脚本输出中换行
reactjs - RTK 查询维持 `isLoading` 缓存失效后自动重新获取
rust - "the size for values of type cannot be know
ruby-on-rails - Ruby on rails query 不知道怎么写
flutter - 如何在 ListView.builder flutter 中正确使用 findC
java - 这个官方 Xml 模式是否无效或者是 Jaxb 的错误限制