python pool apply_async 和 map_async 不会阻塞完整队列

时间:2023-03-14
本文介绍了python pool apply_async 和 map_async 不会阻塞完整队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我对 python 还很陌生.我正在使用多处理模块读取标准输入上的文本行,以某种方式转换它们并将它们写入数据库.这是我的代码片段:

I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

现在一切正常,直到我开始处理巨大的输入文件(数亿行),然后通过管道传输到我的 python 程序中.在某些时候,当我的数据库变慢时,我会看到内存已满.

Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.

玩了一会儿,发现 pool.apply_async 和 pool.map_async 从来没有阻塞过,所以要处理的调用队列越来越大.

After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.

解决我的问题的正确方法是什么?我希望我可以设置一个参数,一旦达到某个队列长度,它将阻止 pool.apply_async 调用.Java 中的 AFAIR 可以为此目的为 ThreadPoolExecutor 提供一个具有固定长度的 BlockingQueue.

What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.

谢谢!

推荐答案

apply_asyncmap_async 函数旨在不阻塞主进程.为了做到这一点,Pool 维护了一个内部 Queue,遗憾的是它的大小无法更改.

The apply_async and map_async functions are designed not to block the main process. In order to do so, the Pool maintains an internal Queue which size is unfortunately impossible to change.

解决问题的方法是使用 Semaphore 以您希望队列的大小进行初始化.在为池提供数据之前以及在工作人员完成任务之后获取和释放信号量.

The way the problem can be solved is by using a Semaphore initialized with the size you want the queue to be. You acquire and release the semaphore before feeding the pool and after a worker has completed the task.

这是一个使用 Python 2.6 或更高版本的示例.

Here's an example working with Python 2.6 or greater.

from threading import Semaphore
from multiprocessing import Pool

def task_wrapper(f):
    """Python2 does not allow a callback for method raising exceptions,
    this wrapper ensures the code run into the worker will be exception free.

    """
    try:
        return f()
    except:
        return None

class TaskManager(object):
    def __init__(self, processes, queue_size):
        self.pool = Pool(processes=processes)
        self.workers = Semaphore(processes + queue_size)

    def new_task(self, f):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done))

    def task_done(self):
        """Called once task is done, releases the queue is blocked."""
        self.workers.release()

另一个使用 concurrent.futures 池实现的示例.

Another example using concurrent.futures pools implementation.

这篇关于python pool apply_async 和 map_async 不会阻塞完整队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:multiprocessing.value 语法清晰吗? 下一篇:如何在进程之间共享一个类?

相关文章