使用 Python 多处理管理器 (BaseManager/SyncManager) 与远程机器共享队列时管道损坏

时间:2023-03-14
本文介绍了使用 Python 多处理管理器 (BaseManager/SyncManager) 与远程机器共享队列时管道损坏的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

上个月,当我们尝试使用 Python 2.6.x 多处理包在多台不同 (linux) 计算机之间共享队列时,我们遇到了一个持续存在的问题.我也直接向 Jesse Noller 提出了这个问题,因为我们还没有在 StackOverflow、Python 文档、源代码或其他在线网站上找到任何说明该问题的内容.

In the last month, we've had a persistent problem with the Python 2.6.x multiprocessing package when we've tried to use it to share a queue among several different (linux) computers. I've posed this question directly to Jesse Noller as well since we haven't yet found anything that elucidates the issue on StackOverflow, Python docs, source code or elsewhere online.

我们的工程师团队无法解决这个问题,我们向 python 用户组中的很多人提出了这个问题,但无济于事.我希望有人能提供一些见解,因为我觉得我们做错了什么,但离问题太近了,看不到它的本质.

Our team of engineers hasn't been able to solve this one, and we've posed the question to quite a few people in python user groups to no avail. I was hoping someone could shed some insight, since I feel like we're doing something incorrect but are too close to the problem to see it for what it is.

这是症状:

Traceback (most recent call last):
  File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
    return queue, queue.get(block=False)
  File "<string>", line 2, in get
  File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

(我展示了我们的代码在共享队列对象上调用 queue.get() 的位置,该对象由扩展 SyncManger 的管理器托管).

(I'm showing where our code calls queue.get() on a shared queue object, hosted by a manager that extends SyncManger).

这个问题的特殊之处在于,如果我们连接到单台机器上的这个共享队列(我们称之为 machine A),即使来自许多并发进程,我们似乎也永远不会遇到问题.只有当我们从其他机器(我们称这些机器 B 和 C)连接到队列(同样,使用扩展多处理 SyncManager 并且目前没有添加额外功能的类)并运行大量在我们遇到问题的同时进入和退出队列.

What's peculiar about the issue is that if we connect to this shared queue on a single machine (let's call this machine A), even from lots of concurrent processes, we never seem to run into an issue. It's only when we connect to the queue (again, using a class that extends multiprocessing SyncManager and currently adds no additional functionality) from other machines (let's call these machines B and C) and run a high volume of items into and out of the queue at the same time that we experience a problem.

就好像 python 的 multiprocessing 包处理本地连接(即使它们仍然使用相同的 manager.connect() 连接方法)以一种从 machine A 工作的方式但是当远程连接是至少从 machines B 或 C 之一同时生成,我们得到了 Broken pipe 错误.

It is as though python's multiprocessing package handles local connections (even though they are still using the same manager.connect() connection method) in a manner that works from machine A but when remote connections are made simultaneously from at least one of machines B or C we get a Broken pipe error.

在我的团队所做的所有阅读中,我们认为问题与锁定有关.我们想也许我们不应该使用 Queue.Queue,而是使用 multiprocessing.Queue,但是我们切换了,问题仍然存在(我们还注意到 SyncManager 自己的共享队列是Queue.Queue 的实例).

In all the reading my team has done, we thought the problem was related to locking. We thought maybe we shouldn't use Queue.Queue, but instead multiprocessing.Queue, but we switched and the problem persisted (we also noticed that SyncManager's own shared Queue is an instance of Queue.Queue).

我们正在努力解决如何调试问题,因为它很难重现,但确实经常发生(如果我们从队列中插入和 .get() 处理大量项目,则每天多次).

We are pulling our hair out about how to even debug the issue, since it's hard to reproduce but does happen fairly frequently (many times per day if we are inserting and .get()ing lots of items from the queue).

我们创建的方法 get_from_queue 尝试以随机睡眠间隔重试从队列中获取项目约 10 次,但似乎如果它失败一次,它将失败十次(这导致我相信 .register() 和 .connect() 连接到管理器可能不会给服务器提供另一个套接字连接,但我无法通过阅读文档或查看 Python 内部源代码来确认这一点).

The method we created get_from_queue attempts to retry acquiring the item from a queue ~10 times with randomized sleep intervals, but it seems like if it fails once, it will fail all ten times (which lead me to believe that .register() and .connect()ing to a manager perhaps doesn't give another socket connection to the server, but I couldn't confirm this either by reading the docs or looking at the Python internal source code).

任何人都可以提供有关我们可能查看的位置或我们如何跟踪实际发生的情况的任何见解吗?

Can anyone provide any insight into where we might look or how we might track what's actually happening?

我们如何使用 multiprocessing.BaseManagermultiprocessing.SyncManager 在管道损坏的情况下启动新连接?

How can we start a new connection in the event of a broken pipe using multiprocessing.BaseManager or multiprocessing.SyncManager?

我们首先如何防止管道破裂?

How can we prevent the broken pipe in the first place?

推荐答案

仅供参考 如果其他人运行同样的错误,在与 Python 核心开发团队的 Ask Solem 和 Jesse Noller 进行广泛咨询后,看起来这实际上是一个当前 python 2.6.x(可能是 2.7+,可能是 3.x)中的错误.他们正在寻找可能的解决方案,并且可能会在 Python 的未来版本中包含一个修复程序.

FYI In case anyone else runs by this same error, after extensive consulting with Ask Solem and Jesse Noller of Python's core dev team, it looks like this is actually a bug in current python 2.6.x (and possibly 2.7+ and possibly 3.x). They are looking at possible solutions and a fix will probably be included in a future version of Python.

这篇关于使用 Python 多处理管理器 (BaseManager/SyncManager) 与远程机器共享队列时管道损坏的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:Python Multiprocessing - 将类方法应用于对象列表 下一篇:在 Python 中,我如何知道一个进程何时完成?

相关文章