所以我已经处理这个问题一段时间了,没有任何线索。。。
我有一个DAG,它从BigQuery查询数据,并根据结果创建一些动态任务映射,以使用在另一个BigQuery表中插入条目
BigQueryInsertJobOperator
...
例如
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDataOperator, BigQueryInsertJobOperator
from airflow.utils.dates import days_ago
from airflow.decorators import task
from airflow import XComArg
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
dag = DAG(
dag_id='bigquery_data_transfer_mapped_correct',
default_args=default_args,
schedule_interval="@daily",
catchup=False,
tags=['example'],
)
@task
def get_data(sql):
bq_hook = BigQueryHook(...)
self.log.info('Fetching Data from:')
self.log.info('Query: %s', sql)
bq_client = bq_hook.get_client()
query_job = bq_client.query(sql)
client_results = query_job.result() # Waits for the query to finish
results = list(dict(result) for result in client_results)
self.log.info(f"Retrieved {len(results)} rows from BigQuery")
self.log.info('Response: %s', results)
return results
query_data = get_data("SELECT * FROM some_table WHERE some_conditions;")
@task_group
def tasks(params):
insert_job = BigQueryInsertJobOperator(
task_id=f"insert_data",
configuration={
'query': {
'query': "INSERT INTO `project.dataset.table` (field1, field2) VALUES ('{{ params.field1 }}', '{{ params.field2 }}')",
'useLegacySql': False,
}
},
params=params
)
insert_job
bq_tasks = tasks.expand(params=XComArg(query_data))
query_data >> bq_tasks
请注意,这段代码只是我刚刚编写的一个基本示例,在我的用例中,我实际上有一个task_group,它展开并接收一个参数,该参数将发送到BigQueryInsertJobOperator任务中的一个params。
当我在没有任务组的情况下使用它时(即直接用expand调用BigQueryInsertJobOperator),它就可以工作了。
运行我的DAG后,我收到一个错误,说:
Broken DAG: [/opt/airflow/dags/src/dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 407, in apply_defaults
default_args, merged_params = get_merged_defaults(
^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 167, in get_merged_defaults
raise TypeError("params must be a mapping")
TypeError: params must be a mapping
气流版本为:
Version: [v2.8.1](https://pypi.python.org/pypi/apache-airflow/2.8.1)
Git Version: .release:c0ffa9c5d96625c68ded9562632674ed366b5eb3