代码之家  ›  专栏  ›  技术社区  ›  y2k-shubham

将顶层DAG连接在一起

  •  4
  • y2k-shubham  · 技术社区  · 7 年前

    我需要几个相同的(只是在论点上不同) 顶层 DAG S公司 也可以与以下约束/假设一起触发:

    • 单个顶级DAG将具有 schedule_interval=None 因为他们只需要偶尔 手动触发
    • 然而,这一系列的dag需要每天运行
    • 排序 序列中的DAG数是固定的(在编写代码之前就知道了),很少更改(几个月一次)
    • 不管DAG是失败还是成功,触发链都不能断开
    • 目前它们必须串联运行;将来可能需要并行触发

    所以我为每个dag创建了一个文件 dags 现在我必须 将它们连接起来以便按顺序执行 是的。我已经确定了两种方法:

    1. SubDagOperator

    2. TriggerDagRunOperator

      • 工作于 my demo 但是跑进去了 平行的 (不是 按顺序 )事实并非如此 等待触发的DAG 在进入下一个之前完成
      • ExternalTaskSensor 可能有助于克服上述限制,但会使事情变得非常混乱

    我的问题是

    • 如何克服 parent_id 前缀输入 达格ID 属于 SubDag S?
    • 如何强迫 触发器 S至 等待DAG完成 是吗?
    • 任何替代/更好的方法 接通 独立(顶级)DAG在一起?
    • 有没有办法让我 创建单独的文件 (对于仅在输入上不同的DAG)对于每个顶级DAG?

    我在用 puckel/docker-airflow 具有

    • Airflow 1.9.0-4
    • Python 3.6-slim
    • CeleryExecutor 具有 redis:3.2.7

    编辑-1

    澄清 @维拉杰·帕雷克 queries

    你能详细说明一下你所说的等待完成是什么意思吗 在被触发之前?

    当我触发 import_parent_v1 DAG,它应该用的3个外部DAG 触发器 开始并行运行,即使我按顺序链接它们。实际上 原木 指示当它们一个接一个被触发时,执行将移动到下一个dag( 触发器 )在上一个完成之前。 enter image description here enter image description here 注意:在本例中,顶级DAG命名为 importer_child_v1_db_X 以及相应的 task_id S(代表 触发器 )命名为 importer_v1_db_X

    能不能让TriggerDagRunOperator 最后一个任务?

    我必须把几个相似的(只在论点上不同的)dag连在一起 逐个触发它们的工作流 是的。所以不仅仅是 触发器 我最后可以说,有很多(这里有3个,但在生产中会达到15个)

    2 回复  |  直到 7 年前
        1
  •  5
  •   y2k-shubham    6 年前

    从中获取提示 @维拉杰·帕雷克 answer ,我能够 TriggerDagRunOperator 以预定的方式工作。我在此张贴我的(部分)回答;将在事情变得清楚时更新。


    如何克服 parent_id 前缀输入 dag_id 属于 SubDag S?

    正如@viraj所说,没有直接的方法可以做到这一点。延伸 SubDagOperator 删除 this check 可能有用,但我决定避开它


    如何强迫 触发器 S至 等待DAG完成 是吗?

    • 看着 implementation ,很明显 触发器 只是为了 触发 外部DAG;就这样。默认情况下,它不是 应该是 等待DAG完成。因此我观察到的行为是可以理解的。

    • ExternalTaskSensor 是显而易见的出路。然而,在学习 Airflow 我在依赖 手动触发 达格的( schedule_interval=None )中。在这种情况下, 外部任务传感器 很难准确说明 execution_date 对于外部任务(正在等待谁完成),如果失败, sensor gets stuck 是的。

    • 所以接受暗示 implementation ,我做的 minor adjustment 行为 外部任务传感器 等待完成 全部的 task_instance S公司 有关任务的

      execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta

      这就实现了 desired result :外部DAG按顺序依次运行。


    有没有办法让我 创建单独的文件 (对于仅在输入上不同的DAG)对于每个顶级DAG?

    再次通过@viraj这可以通过 assigning DAGs to global scope 使用 globals()[dag_id] = DAG(..)


    编辑-1

    也许我指的是不正确的资源 link 上面已经死了),但是 ExternalTaskSensor 已经包含参数 execution_delta &安培 execution_date_fn 容易限制 执行日期 (s)任务是 感觉到 是的。

        2
  •  1
  •   Viraj Parekh    7 年前
    • 你能给出更多细节,你的意思是等待完成的DAG之前,被触发?能不能让triggerdagrunoperator成为dag中的最后一个任务?

    • 要创建类似的dag,可以从一个python文件动态生成dag。你可以这样做:

    从气流导入DAG

    from airflow.operators.python_operator import PythonOperator
    
    
    def create_dag(dag_id,
                   schedule,
                   dag_number,
                   default_args):
    
    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))
    
    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)
    
    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)
    
    return dag
    
    
    # build a dag for each number in range(10)
    for n in range(1, 10):
    dag_id = 'hello_world_{}'.format(str(n))
    
    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }
    
    schedule = '@daily'
    
    dag_number = n
    
    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)
    

    你可以在这里阅读更多关于这种方法的内容。如果大多数正在生成的DAG非常相似,您可能需要考虑将配置存储在一个airlow变量中 enter link description here

    您可能无法克服subdag操作符的前缀限制-我建议将子dag完全从工作流中删除,并将其作为单独的dag运行-如果您发现自己必须这样做,则可以更轻松地返回并重新运行旧的dag运行。