使用主题交换运行多个 Celery 任务

时间:2023-02-14
本文介绍了使用主题交换运行多个 Celery 任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在用 Celery 替换一些本地代码,但很难复制当前的行为.我想要的行为如下:

I'm replacing some homegrown code with Celery, but having a hard time replicating the current behaviour. My desired behaviour is as follows:

  • 创建新用户时,应使用 user.created 路由键将消息发布到 tasks 交换.
  • 此消息应触发两个 Celery 任务,即 send_user_activate_emailcheck_spam.
  • When creating a new user, a message should be published to the tasks exchange with the user.created routing key.
  • Two Celery tasks should be trigged by this message, namely send_user_activate_email and check_spam.

我尝试通过使用 ignore_result=True 参数定义 user_created 任务以及 send_user_activate_emailcheck_spam 的任务来实现这一点.

I tried implementing this by defining a user_created task with a ignore_result=True argument, plus a task for send_user_activate_email and check_spam.

在我的配置中,我添加了以下路由和队列定义.当消息传递到 user_created 队列时,它不会传递到其他两个队列.

In my configuration, I added the following routes and queues definitions. While the message is delivered to the user_created queue, it is not delivered to the other two queues.

理想情况下,邮件仅传递到 send_user_activate_emailcheck_spam 队列.使用 vanilla RabbitMQ 时,消息会发布到交换器,队列可以绑定到交换器,但 Celery 似乎直接将消息传递到队列.

Ideally, the message is only delivery to the send_user_activate_email and check_spam queues. When using vanilla RabbitMQ, messages are published to an exchange, to which queues can bind, but Celery seems to deliver a message to a queue directly.

如何在 Celery 中实现上述行为?

How would I implement the behaviour outlined above in Celery?

CELERY_QUEUES = {
    'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}

CELERY_ROUTES = {
    'user_created': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'send_user_activate_email': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'check_spam': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
}

推荐答案

听起来您希望单个消息触发/被两个队列使用,但这不是 Celery 的工作方式.一个 Exchange 会将一个任务发布到符合条件的队列中,但是一旦它被使用,其他队列就会忽略该消息.每个要触发的任务都需要一条消息.

It sounds like you are expecting a single message to trigger/be consumed by two queues but this is not how Celery works. An Exchange will post a task to eligible queues, but once it is consumed, the other Queues ignore the message. You need a message per Task you want to trigger.

Celery 新用户经常会感到困惑,因为在这个系统中队列"有两种用途;Queue() 和文档引用的 Kombu 队列,以及直接保存消息并由工作人员使用的 AMQP 队列.当我们发布到队列时,我们会想到 AMQP 队列,这是不正确的.(感谢下面链接的答案).

There is often confusion with new Celery users because there are two uses of "Queue" in this system; Kombu Queues which the Queue() and documentation refer to, and the AMQP Queues, which hold messages directly and are consumed by workers. When we publish to queues, we think of the AMQP ones, which is incorrect. (thanks to answer linked below).

回到你的问题,如果我理解正确的话,当 user_created 被消费时,你希望它产生另外两个任务;send_user_activate_email 和 check_spam.此外,它们不应相互依赖;它们可以在不同的机器上并行运行,并且不需要知道彼此的状态.

Back to your issue, if I am understanding correctly, when user_created is consumed, you want it to spawn two more tasks; send_user_activate_email and check_spam. Furthermore, these should not be dependent on each other; they can run in parallel on separate machines and do not need to know the status of one another.

在这种情况下,您希望 user_created 应用异步"这两个新任务并返回.这可以直接完成,或者您可以使用包含 check_spam 和 send_user_activate_email 的 Celery组"来实现此目的.该小组提供了一些很好的速记并为您的任务提供了一些结构,所以我个人会向您推荐这个方向.

In this case, you want user_created to "apply_async" these two new Tasks and return. This could be done directly, or you can use a Celery "Group" containing check_spam and send_user_activate_email to achieve this. The group gives some nice shorthand and lends some structure to your tasks, so personally I'd nudge you that direction.

#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()

此设置将创建四个消息;一个用于您要执行的每个任务加上一个用于 Group(),它本身就会有一个结果.

This setup would create four messages; one for each Task you want to execute plus one for the Group(), which itself will have a result.

在您的情况下,我不确定 Exchange 或 ignore_result 是否必要,但我需要查看任务代码并更多地了解系统才能做出判断.

In your case, I am not sure the Exchange or ignore_result is necessary, but I'd need to see the Task code and understand the system more to make that judgement.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#groupshttp://celery.阅读thedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys为什么 CELERY_ROUTES 都有一个 "排队"还有一个routing_key"?

(如果我离开了,我会删除/删除答案...)

(if I am way off I'll delete/remove the answer...)

这篇关于使用主题交换运行多个 Celery 任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:Celery 3.0.1 中的框架错误 下一篇:Django、RabbitMQ 和Celery - 为什么在我更新开发中的 Django 代码后,Celery 会运行旧

相关文章

最新文章