<bdo id='pkkRQ'></bdo><ul id='pkkRQ'></ul>

  • <tfoot id='pkkRQ'></tfoot>

      1. <i id='pkkRQ'><tr id='pkkRQ'><dt id='pkkRQ'><q id='pkkRQ'><span id='pkkRQ'><b id='pkkRQ'><form id='pkkRQ'><ins id='pkkRQ'></ins><ul id='pkkRQ'></ul><sub id='pkkRQ'></sub></form><legend id='pkkRQ'></legend><bdo id='pkkRQ'><pre id='pkkRQ'><center id='pkkRQ'></center></pre></bdo></b><th id='pkkRQ'></th></span></q></dt></tr></i><div id='pkkRQ'><tfoot id='pkkRQ'></tfoot><dl id='pkkRQ'><fieldset id='pkkRQ'></fieldset></dl></div>
        <legend id='pkkRQ'><style id='pkkRQ'><dir id='pkkRQ'><q id='pkkRQ'></q></dir></style></legend>

      2. <small id='pkkRQ'></small><noframes id='pkkRQ'>

      3. 如何在 Airflow 中创建条件任务

        时间:2023-08-29

        <small id='4Nh9e'></small><noframes id='4Nh9e'>

          <tbody id='4Nh9e'></tbody>

          1. <legend id='4Nh9e'><style id='4Nh9e'><dir id='4Nh9e'><q id='4Nh9e'></q></dir></style></legend>

              <tfoot id='4Nh9e'></tfoot>
                • <bdo id='4Nh9e'></bdo><ul id='4Nh9e'></ul>

                  <i id='4Nh9e'><tr id='4Nh9e'><dt id='4Nh9e'><q id='4Nh9e'><span id='4Nh9e'><b id='4Nh9e'><form id='4Nh9e'><ins id='4Nh9e'></ins><ul id='4Nh9e'></ul><sub id='4Nh9e'></sub></form><legend id='4Nh9e'></legend><bdo id='4Nh9e'><pre id='4Nh9e'><center id='4Nh9e'></center></pre></bdo></b><th id='4Nh9e'></th></span></q></dt></tr></i><div id='4Nh9e'><tfoot id='4Nh9e'></tfoot><dl id='4Nh9e'><fieldset id='4Nh9e'></fieldset></dl></div>
                  本文介绍了如何在 Airflow 中创建条件任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

                  问题描述

                  我想在 Airflow 中创建一个条件任务,如下面的架构中所述.预期的情况如下:

                  I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:

                  • 任务 1 执行
                  • 如果任务 1 成功,则执行任务 2a
                  • Else 如果任务 1 失败,则执行任务 2b
                  • 最终执行任务 3

                  以上所有任务都是 SSHExecuteOperator.我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理条件,但我不清楚如何实现它.你能描述一下解决方案吗?

                  All tasks above are SSHExecuteOperator. I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?

                  推荐答案

                  你必须使用 气流触发规则

                  所有操作符都有一个 trigger_rule 参数,它定义了触发生成任务的规则.

                  All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.

                  触发规则的可能性:

                  ALL_SUCCESS = 'all_success'
                  ALL_FAILED = 'all_failed'
                  ALL_DONE = 'all_done'
                  ONE_SUCCESS = 'one_success'
                  ONE_FAILED = 'one_failed'
                  DUMMY = 'dummy'
                  

                  这是解决您问题的想法:

                  Here is the idea to solve your problem:

                  from airflow.operators.ssh_execute_operator import SSHExecuteOperator
                  from airflow.utils.trigger_rule import TriggerRule
                  from airflow.contrib.hooks import SSHHook
                  
                  sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
                  
                  task_1 = SSHExecuteOperator(
                          task_id='task_1',
                          bash_command=<YOUR COMMAND>,
                          ssh_hook=sshHook,
                          dag=dag)
                  
                  task_2 = SSHExecuteOperator(
                          task_id='conditional_task',
                          bash_command=<YOUR COMMAND>,
                          ssh_hook=sshHook,
                          dag=dag)
                  
                  task_2a = SSHExecuteOperator(
                          task_id='task_2a',
                          bash_command=<YOUR COMMAND>,
                          trigger_rule=TriggerRule.ALL_SUCCESS,
                          ssh_hook=sshHook,
                          dag=dag)
                  
                  task_2b = SSHExecuteOperator(
                          task_id='task_2b',
                          bash_command=<YOUR COMMAND>,
                          trigger_rule=TriggerRule.ALL_FAILED,
                          ssh_hook=sshHook,
                          dag=dag)
                  
                  task_3 = SSHExecuteOperator(
                          task_id='task_3',
                          bash_command=<YOUR COMMAND>,
                          trigger_rule=TriggerRule.ONE_SUCCESS,
                          ssh_hook=sshHook,
                          dag=dag)
                  
                  
                  task_2.set_upstream(task_1)
                  task_2a.set_upstream(task_2)
                  task_2b.set_upstream(task_2)
                  task_3.set_upstream(task_2a)
                  task_3.set_upstream(task_2b)
                  

                  这篇关于如何在 Airflow 中创建条件任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

                  上一篇:“或"Python 中的条件问题 下一篇:使用条件在 pandas 数据框中生成新列

                  相关文章

                  1. <legend id='fSgO6'><style id='fSgO6'><dir id='fSgO6'><q id='fSgO6'></q></dir></style></legend>

                    <small id='fSgO6'></small><noframes id='fSgO6'>

                  2. <i id='fSgO6'><tr id='fSgO6'><dt id='fSgO6'><q id='fSgO6'><span id='fSgO6'><b id='fSgO6'><form id='fSgO6'><ins id='fSgO6'></ins><ul id='fSgO6'></ul><sub id='fSgO6'></sub></form><legend id='fSgO6'></legend><bdo id='fSgO6'><pre id='fSgO6'><center id='fSgO6'></center></pre></bdo></b><th id='fSgO6'></th></span></q></dt></tr></i><div id='fSgO6'><tfoot id='fSgO6'></tfoot><dl id='fSgO6'><fieldset id='fSgO6'></fieldset></dl></div>
                    <tfoot id='fSgO6'></tfoot>
                    • <bdo id='fSgO6'></bdo><ul id='fSgO6'></ul>