python - 多处理:如何在类中定义的函数上使用 Pool.map?

当我运行类似的东西时:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

它工作正常。然而,把它作为一个类的函数:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

给我以下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我看过 Alex Martelli 的一篇帖子处理了同样的问题,但不够明确。

最佳答案

我无法使用到目前为止发布的代码,因为使用“multiprocessing.Pool”的代码不适用于 lambda 表达式,并且不使用“multiprocessing.Pool”的代码会产生与工作项一样多的进程。

我修改了代码 s.t.它产生预定义数量的工作人员,并且仅在存在空闲工作人员时才遍历输入列表。我还为 worker s.t. 启用了“守护程序”模式。 ctrl-c 按预期工作。

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))

https://stackoverflow.com/questions/3288595/

相关文章:

linux - 创建 .tar.gz 文件时排除目录

python - 无法安装 Python 包 [SSL : TLSV1_ALERT_PROTOCOL

linux - 在 bash 中检查当前分区的可用磁盘空间

python - 如何更改绘图背景颜色?

python - 无法使用 Ctrl-C 终止 Python 脚本

linux - ~/.ssh/config 文件中的 SSH 端口转发?

python - 如何让 Flask 在端口 80 上运行?

linux - 抑制makefile中命令调用的回声?

python - 在 Python 2.6 中不推荐使用 BaseException.message

linux - 从 linux shell 脚本发送邮件