代码之家  ›  专栏  ›  技术社区  ›  Andrew Cassidy

获取最后一次dag运行执行时间的ApacheAirflow宏

  •  1
  • Andrew Cassidy  · 技术社区  · 6 年前

    prev_execution_date 上市的 here 会给我最后一次DAG运行的执行日期,但看看源代码,它似乎只得到基于DAG计划的最后日期。

    prev_execution_date = task.dag.previous_schedule(self.execution_date)
    

    如果DAG不按计划运行,是否可以通过宏获取它的执行日期?

    2 回复  |  直到 6 年前
        1
  •  6
  •   Charlie Gelman    6 年前

    是的,您可以为此定义自己的自定义宏,如下所示:

    # custom macro function
    def get_last_dag_run(dag):
        last_dag_run = dag.get_last_dagrun()
        if last_dag_run is None:
            return "no prev run"
        else:
            return last_dag_run.execution_date.strftime("%Y-%m-%d")
    
    # add macro in user_defined_macros in dag definition
    dag = DAG(dag_id="my_test_dag",
          schedule_interval='@daily',
          user_defined_macros={
              'last_dag_run_execution_date': get_last_dag_run
          }
    )
    
    # example of using it in practice
    print_vals = BashOperator(
        task_id='print_vals',
        bash_command='echo {{ last_dag_run_execution_date(dag) }}',
        dag=dag
    )
    

    注意,dag.get_last_run()只是dag对象上可用的许多函数之一。我在这里找到的: https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/models.py#L3396

    您还可以调整日期格式字符串的格式,以及在没有上一次运行时要输出的内容。

        2
  •  0
  •   Fan    6 年前

    您可以制作自己的用户自定义宏函数,使用气流模型搜索元数据库。

    def get_last_dag_run(dag_id):
      //TODO search DB
      return xxx
    
    dag = DAG(
        'example',
        schedule_interval='0 1 * * *',
        user_defined_macros={
            'last_dag_run_execution_date': get_last_dag_run,
        }
    )
    

    然后使用模板中的密钥。

    推荐文章