代码之家  ›  专栏  ›  技术社区  ›  mad_

如何查找气流中失败的上游任务数?

  •  1
  • mad_  · 技术社区  · 7 年前

    我很难弄清楚如何在相同的dag运行中找到失败的任务 同一天( 同一执行日 )

    举个例子 dag_id=1 第一次运行时失败(可能是由于任何原因,比如连接超时),任务失败。当我们试图查询失败的任务时,taskinstance表将包含该任务的条目。伟大的!!

    但是,如果我重新运行相同的dag(注意dag_id仍然是1),那么在最后一个任务中(它有 ALL_DONE 因此,无论上游任务是失败还是成功,它都将被执行)我想计算当前dag_运行中失败的任务数,忽略以前的dag_运行。我遇到了dag_run id,如果我们能将它与taskinstance关联起来,它可能会很有用,但我不能。如有任何建议/帮助,我们将不胜感激。

    1 回复  |  直到 7 年前
        1
  •  3
  •   joebeeson    7 年前

    你可以创建一个 PythonOperator 查询气流数据库以查找所需信息的任务。这样做的另一个好处是可以传递查询所需数据的信息:

    from contextlib import closing
    from airflow import models, settings
    from airflow.utils.state import State
    
    def your_python_operator_callable(**context):    
      with closing(settings.Session()) as session:
        print("There are {} failed tasks in this execution".format(
          session.query(
            models.TaskInstance
          ).filter(
            models.TaskInstance.dag_id == context["dag"].dag_id, 
            models.TaskInstance.execution_date == context["execution_date"],
            models.TaskInstance.state == State.FAILED).count()
          )
    

    然后将任务添加到 DAG 用一个 蟒蛇 .

    ( 我没有测试过上面的内容,但希望能让你走上正确的道路 )