将 asyncio 与多处理结合起来会出现什么样的问题(如果有的话)?

时间:2023-03-14
本文介绍了将 asyncio 与多处理结合起来会出现什么样的问题(如果有的话)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

当他们第一次看到 Python 中的线程时,几乎每个人都知道,对于那些真正想要并行处理的人来说,GIL 让他们的生活变得悲惨——或者至少给它一个机会.

As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel - or at least give it a chance.

我目前正在考虑实现类似反应堆模式的东西.实际上,我想在一个类线程上监听传入的套接字连接,当有人尝试连接时,接受该连接并将其传递给另一个类线程进行处理.

I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.

我(还)不确定我可能面临什么样的负载.我知道目前对传入消息设置了 2MB 的上限.从理论上讲,我们每秒可以得到数千(尽管我不知道实际上我们是否见过类似的东西).处理消息所花费的时间并不非常重要,但显然越快越好.

I'm not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don't know if practically we've seen anything like that). The amount of time spent processing a message isn't terribly important, though obviously quicker would be better.

我正在研究 Reactor 模式,并使用 multiprocessing 库开发了一个小示例,该库(至少在测试中)似乎工作得很好.但是,现在/很快我们将拥有 asyncio 库,它将为我处理事件循环.

I was looking into the Reactor pattern, and developed a small example using the multiprocessing library that (at least in testing) seems to work just fine. However, now/soon we'll have the asyncio library available, which would handle the event loop for me.

asynciomultiprocessing 结合起来有什么可以咬我的吗?

Is there anything that could bite me by combining asyncio and multiprocessing?

推荐答案

你应该能够安全地结合 asynciomultiprocessing 没有太多麻烦,虽然你不应该t 直接使用 multiprocessing.asyncio(以及任何其他基于事件循环的异步框架)的主要罪过是阻塞事件循环.如果您尝试直接使用 multiprocessing,则任何时候您阻塞等待子进程,您都会阻塞事件循环.显然,这很糟糕.

You should be able to safely combine asyncio and multiprocessing without too much trouble, though you shouldn't be using multiprocessing directly. The cardinal sin of asyncio (and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.

避免这种情况的最简单方法是使用 BaseEventLoop.run_in_executorconcurrent.futures.ProcessPoolExecutor.ProcessPoolExecutor 是使用 multiprocessing.Process 实现的进程池,但 asyncio 内置支持在其中执行函数而不阻塞事件循环.这是一个简单的例子:

The simplest way to avoid this is to use BaseEventLoop.run_in_executor to execute a function in a concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor is a process pool implemented using multiprocessing.Process, but asyncio has built-in support for executing a function in it without blocking the event loop. Here's a simple example:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

在大多数情况下,仅此功能就足够了.如果您发现自己需要来自 multiprocessing 的其他构造,例如 QueueEventManager 等,则有一个名为 aioprocessing 的第三方库(完全公开:我写的),它提供了所有 multiprocessing 数据结构的 asyncio 兼容版本.这是一个演示示例:

For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing, like Queue, Event, Manager, etc., there is a third-party library called aioprocessing (full disclosure: I wrote it), that provides asyncio-compatible versions of all the multiprocessing data structures. Here's an example demoing that:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

这篇关于将 asyncio 与多处理结合起来会出现什么样的问题(如果有的话)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:多处理 AsyncResult.get() 在 Python 3.7.2 中挂起,但在 3.6 中没有 下一篇:多处理模块中的 ThreadPool 与 Pool 有什么区别?

相关文章