代码之家  ›  专栏  ›  技术社区  ›  spaghettifunk Thomas Weller

气流EMR从传感器执行步骤

  •  3
  • spaghettifunk Thomas Weller  · 技术社区  · 8 年前

    我在气流中进行了以下DAG,我正在执行一组EMRSteps来运行我的管道。

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2017, 07, 20, 10, 00),
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=2),
    }
    
    dag = DAG('dag_import_match_hourly',
          default_args=default_args,
          description='Fancy Description',
          schedule_interval=timedelta(hours=1),
          dagrun_timeout=timedelta(hours=2))
    
    try:
        merge_s3_match_step = EmrAddStepsOperator(
            task_id='merge_s3_match_step',
            job_flow_id=cluster_id,
            aws_conn_id='aws_default',
            steps=create_step('Merge S3 Match'),
            dag=dag
        )
    
        mapreduce_step = EmrAddStepsOperator(
            task_id='mapreduce_match_step',
            job_flow_id=cluster_id,
            aws_conn_id='aws_default',
            steps=create_step('MapReduce Match Hourly'),
            dag=dag
        )
    
        merge_hdfs_step = EmrAddStepsOperator(
            task_id='merge_hdfs_step',
            job_flow_id=cluster_id,
            aws_conn_id='aws_default',
            steps=create_step('Merge HDFS Match Hourly'),
            dag=dag
        )
    
        ## Sensors
        check_merge_s3 = EmrStepSensor(
            task_id='watch_merge_s3',
            job_flow_id=cluster_id,
            step_id="{{ task_instance.xcom_pull('merge_s3_match_step', key='return_value')[0] }}",
            aws_conn_id='aws_default',
            dag=dag
        )
    
        check_mapreduce = EmrStepSensor(
            task_id='watch_mapreduce',
            job_flow_id=cluster_id,
            step_id="{{ task_instance.xcom_pull('mapreduce_match_step', key='return_value')[0] }}",
            aws_conn_id='aws_default',
            dag=dag
        )
    
        check_merge_hdfs = EmrStepSensor(
            task_id='watch_merge_hdfs',
            job_flow_id=cluster_id,
            step_id="{{ task_instance.xcom_pull('merge_hdfs_step', key='return_value')[0] }}",
            aws_conn_id='aws_default',
            dag=dag
        )
    
        mapreduce_step.set_upstream(merge_s3_match_step)
        merge_s3_match_step.set_downstream(check_merge_s3)
    
        mapreduce_step.set_downstream(check_mapreduce)
    
        merge_hdfs_step.set_upstream(mapreduce_step)
        merge_hdfs_step.set_downstream(check_merge_hdfs)
    
    except AirflowException as ae:
        print ae.message
    

    DAG工作正常,但 我想使用传感器来确保在且仅当EMR作业已正确完成时,我将执行下一步 .我尝试了几件事,但都不管用。上面的代码不能正确完成这项工作。有人知道如何使用EMRSensorStep来实现我的目标吗?

    1 回复  |  直到 8 年前
        1
  •  6
  •   Chengzhi    8 年前

    看起来您的EmrStepSensor任务需要设置正确的依赖项,例如check\u mapreduce,如果您想等待check\u mapreduce完成,下一步应该是 merge_hdfs_step.set_upstream(check_mapreduce) check_mapreduce.set_downstream(merge_hdfs_step) 因此,这将是TaskA>&燃气轮机;传感器A>&燃气轮机;任务B>&燃气轮机;传感器B>&燃气轮机;任务C>&燃气轮机;SensorC,尝试使用这种方式设置依赖项

    推荐文章