代码之家  ›  专栏  ›  技术社区  ›  hlagos

在气流中从BigQueryOperator获取结果

  •  0
  • hlagos  · 技术社区  · 6 年前

    我正试图从 BigQueryOperator 使用气流,但我找不到方法。我试着打电话给 next() 方法在 bq_cursor 成员(在1.10中可用),但它返回 None . 我就是这么做的

    import datetime
    import logging
    
    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    from airflow.operators import python_operator
    
    
    yesterday = datetime.datetime.combine(
        datetime.datetime.today() - datetime.timedelta(1),
        datetime.datetime.min.time()
    )
    
    def MyChequer(**kwargs):
        big_query_count = bigquery_operator.BigQueryOperator(
            task_id='my_bq_query',
            sql='select count(*) from mydataset.mytable'
        )
    
        big_query_count.execute(context=kwargs)
    
        logging.info(big_query_count)
        logging.info(big_query_count.__dict__)
        logging.info(big_query_count.bq_cursor.next())
    
    default_dag_args = {
        'start_date': yesterday,
        'email_on_failure': False,
        'email_on_retry': False,
        'project_id': 'myproject'
    }
    
    with models.DAG(
            'bigquery_results_execution',
            # Continue to run DAG once per day
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
    
        myoperator = python_operator.PythonOperator(
            task_id='threshold_operator',
            provide_context=True,
            python_callable=MyChequer
        )
    
        # Define DAG
        myoperator
    

    看看 bigquery_hook.py bigquery_operator.py 这似乎是获取结果的唯一可用方法。

    3 回复  |  直到 6 年前
        1
  •  1
  •   Mike    6 年前

    每当我需要从BigQuery查询中获取数据并将其用于某种用途时,我都会使用BigQuery钩子创建自己的运算符。我通常将其称为BigQueryToxOperator,我们有一堆用于将BigQuery数据发送到其他内部系统的运算符。

    例如,我有一个bigquery to pubsub操作符,作为一个示例,您可能会发现它对于如何查询bigquery,然后逐行处理结果,将它们发送到google pubsub非常有用。请考虑以下通用示例代码,以了解如何单独执行此操作:

    class BigQueryToXOperator(BaseOperator):
        template_fields = ['sql']
        ui_color = '#000000'
    
        @apply_defaults
        def __init__(
                self,
                sql,
                keys,
                bigquery_conn_id='bigquery_default',
                delegate_to=None,
                *args,
                **kwargs):
            super(BigQueryToXOperator, self).__init__(*args, **kwargs)
            self.sql = sql
            self.keys = keys # A list of keys for the columns in the result set of sql
            self.bigquery_conn_id = bigquery_conn_id
            self.delegate_to = delegate_to
    
    
        def execute(self, context):
            """
            Run query and handle results row by row.
            """
            cursor = self._query_bigquery()
            for row in cursor.fetchall():
                # Zip keys and row together because the cursor returns a list of list (not list of dicts)
                row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')
    
                # Do what you want with the row...
                handle_row(row_dict)
    
    
        def _query_bigquery(self):
            """
            Queries BigQuery and returns a cursor to the results.
            """
            bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                              use_legacy_sql=False)
            conn = bq.get_conn()
            cursor = conn.cursor()
            cursor.execute(self.sql)
            return cursor
    
        2
  •  1
  •   hlagos    6 年前

    感谢@kaxil和@mike的回答。我发现了问题。在我的头脑里有一种虫子 BigQueryCursor . 作为其中的一部分 run_with_configuration ,的 running_job_id 正在返回但从未分配给 job_id 用于将结果拉入 next 方法。一个变通方法(如果不想重新实现所有内容,则不是很优雅,但很好)是分配 乔布斯 基于 奔跑 在这个钩子里

    big_query_count.execute(context=kwargs)
    #workaround
    big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
    logging.info(big_query_count.bq_cursor.next())
    

    一是解决问题 用配置运行 在流程结束时分配正确的作业ID,可以删除解决方法后的行。

        3
  •  0
  •   kaxil    6 年前

    你可以使用 BigQueryOperator 将结果保存到临时目标表中,然后使用 BigQueryGetDataOperator 获取以下结果,然后使用 BigQueryTableDeleteOperator 删除表格:

    get_data = BigQueryGetDataOperator(
        task_id='get_data_from_bq',
        dataset_id='test_dataset',
        table_id='Transaction_partitions',
        max_results='100',
        selected_fields='DATE',
        bigquery_conn_id='airflow-service-account'
    )
    

    Docs: