代码之家  ›  专栏  ›  技术社区  ›  Dan fatihk

任务失败后气流停止运行

  •  0
  • Dan fatihk  · 技术社区  · 7 年前

    我对气流在气流中的作用感到困惑。我想实现的行为是:

    1. DAG的常规触发器(每小时)
    2. 如果任务失败n次重试,请发送有关失败的电子邮件
    3. 当下一个每小时触发时,触发一个新的dagrun,就像什么都没有失败一样。

    任务默认值:

    'depends_on_past': True,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['email@address.co.uk'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'wait_for_downstream': False,
    

    dag参数:

    schedule_interval=timedelta(minutes=60),
    catchup=False,
    max_active_runs=1
    

    我认为我误解了其中的一些论点,因为在我看来,如果一个任务失败了n次(即dagrun失败),那么下一个dagrun就会被调度,但永远处于运行状态,并且没有其他的dagrun会成功(甚至不会被调度)。例如,这里是dagruns(我不知道在哪里可以找到基于文本的调度程序日志,比如 this question

    enter image description here

    执行每5分钟运行一次,直到失败,之后最后一次执行刚好处于运行状态,并且在过去30分钟内一直处于运行状态。

    我应该补充说,重新启动调度程序没有帮助,也没有手动设置运行任务失败。。。

    1 回复  |  直到 7 年前
        1
  •  5
  •   Farvardin kiran    6 年前

    你有 depends_on_past 设置为True,这将阻止启动下一个DagRun。

    From the docs : 依赖于过去(bool)当设置为true时,任务实例将按顺序运行,同时依赖以前的任务计划来成功。允许运行开始日期的任务实例。

    这意味着您的Dag正在尝试运行,但它正在等待上一个DagRun中的相应任务处于成功状态。

        2
  •  1
  •   alvaro nortes    6 年前

    在我的例子中,上一次执行失败时,下一个DAG的执行没有开始,即使我有depends\u on\u pass=False选项。

    等待下游(bool)-设置为true时,任务X的实例将等待上一个任务X实例下游的任务成功完成,然后再运行。如果一个任务的不同实例>X改变了相同的资源,并且该资源被任务X的下游任务使用,则这一点非常有用。>请注意,无论在哪里使用wait\u for\u down,dependence\u past都将强制为True。

    最后请注意,max\u active\u runs=1选项很重要 是因为在另一种情况下,同一任务可以在随后的几个DAG运行中同时开始运行。

    from datetime import datetime, timedelta
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    
    args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'wait_for_downstream': False,
        'start_date': datetime(2019, 7, 20),
    }
    
    dag = DAG(
        dag_id='test_v8',
        default_args=args,
        schedule_interval='* * * * *',
        catchup=False,
        max_active_runs=1
    
    )
    
    from time import sleep
    
    
    def sleep_1():
        sleep(1)
    
    
    def sleep_2():
        sleep(2)
    
    
    sleep_2 = PythonOperator(
        task_id='sleep_2',
        python_callable=sleep_2,
        dag=dag,
    )
    
    sleep_1 = PythonOperator(
        task_id='sleep_1',
        python_callable=sleep_1,
        dag=dag,
    )
    
    sleep_1 >> sleep_2
    

    终于成功了!

    enter image description here