代码之家  ›  专栏  ›  技术社区  ›  user2707590

创建动态任务映射时,如何在BigQueryInsertJobOperator“params”中使用XComArg?

  •  0
  • user2707590  · 技术社区  · 1 年前

    所以我已经处理这个问题一段时间了,没有任何线索。。。 我有一个DAG,它从BigQuery查询数据,并根据结果创建一些动态任务映射,以使用在另一个BigQuery表中插入条目 BigQueryInsertJobOperator ...

    例如

    from airflow import DAG
    from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDataOperator, BigQueryInsertJobOperator
    from airflow.utils.dates import days_ago
    from airflow.decorators import task
    from airflow import XComArg
    
    default_args = {
        'owner': 'airflow',
        'start_date': days_ago(1),
        'retries': 1,
    }
    
    dag = DAG(
        dag_id='bigquery_data_transfer_mapped_correct',
        default_args=default_args,
        schedule_interval="@daily",
        catchup=False,
        tags=['example'],
    )
    
      @task
      def get_data(sql):
          bq_hook = BigQueryHook(...)
      
          self.log.info('Fetching Data from:')
          self.log.info('Query: %s', sql)
      
          bq_client = bq_hook.get_client()
          query_job = bq_client.query(sql)
          client_results = query_job.result()  # Waits for the query to finish
      
          results = list(dict(result) for  result in client_results)
      
          self.log.info(f"Retrieved {len(results)} rows from BigQuery")
          self.log.info('Response: %s', results)
      
          return results
    
      query_data = get_data("SELECT * FROM some_table WHERE some_conditions;")
      
      @task_group
      def tasks(params):
          insert_job = BigQueryInsertJobOperator(
              task_id=f"insert_data",
              configuration={
                  'query': {
                      'query': "INSERT INTO `project.dataset.table` (field1, field2) VALUES ('{{ params.field1 }}', '{{ params.field2 }}')",
                      'useLegacySql': False,
                  }
              },
              params=params
          )
    
          insert_job
    
      bq_tasks = tasks.expand(params=XComArg(query_data))
      
      query_data >> bq_tasks
    
    

    请注意,这段代码只是我刚刚编写的一个基本示例,在我的用例中,我实际上有一个task_group,它展开并接收一个参数,该参数将发送到BigQueryInsertJobOperator任务中的一个params。

    当我在没有任务组的情况下使用它时(即直接用expand调用BigQueryInsertJobOperator),它就可以工作了。

    运行我的DAG后,我收到一个错误,说:

    Broken DAG: [/opt/airflow/dags/src/dag.py] Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 407, in apply_defaults
        default_args, merged_params = get_merged_defaults(
                                      ^^^^^^^^^^^^^^^^^^^^
      File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 167, in get_merged_defaults
        raise TypeError("params must be a mapping")
    TypeError: params must be a mapping
    

    气流版本为:

    Version: [v2.8.1](https://pypi.python.org/pypi/apache-airflow/2.8.1)
    Git Version: .release:c0ffa9c5d96625c68ded9562632674ed366b5eb3
    
    0 回复  |  直到 1 年前