代码之家  ›  专栏  ›  技术社区  ›  arcee123

如何将表导出到以字段值作为文件名而不是部分的文件中

  •  1
  • arcee123  · 技术社区  · 6 年前

    我有一个非常大的表出来的大查询通过谷歌云存储。

    是否仍要通过zipcode查询表,并将结果导出到文件中,其中邮政编码是文件名。每个文件都有该邮政编码的记录。

    这能做到吗?

    2 回复  |  直到 6 年前
        1
  •  1
  •   Lefteris S Mikhail Berlyant    6 年前

    这里有一个python解决方案,我没有使用BigQuery导出。尽管如此,最终结果还是保存在存储器中,作为新行分隔的json文件(这样就可以加载回BigQuery)。它涉及一个查询,但对于非常大的表来说可能会很昂贵。我以一个表为例,它有一个ZipCode列和两个以上的列(col1,col2),但这并不重要。此外,我还硬编码了身份验证部分。

    #!/usr/bin/python
    
    from argparse import ArgumentParser
    from google.cloud import bigquery
    from google.cloud import storage
    
    def main(project_id, dataset_id, table_id, bucket_name):
    
        client = bigquery.Client.from_service_account_json('service_account.json',project=project_id)
        dataset = client.dataset(dataset_id)
        # Create a table for intermediate results
        table_ref = client.dataset(dataset_id).table('tmp')
    
        # Query job with 'tmp' as destination
        # Group by non grouped/aggregated field ZipCode using ARRAY_AGG
        job_config = bigquery.QueryJobConfig()
        job_config.destination = table_ref
        sql = 'SELECT ZipCode, ARRAY_AGG(STRUCT(col1, col2)) FROM `{}.{}.{}` GROUP BY ZipCode'.format(project_id, dataset_id, table_id)
        query_job = client.query(
            sql,
            location='US',
            job_config=job_config)
        query_job.result()
    
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(bucket_name)
    
        rows = client.list_rows(client.get_table(table_ref))
        for row in rows:
            record=''
            # Rest of row is a list of dictionaries with unicode items
            for r in row[1:][0]:
                r = {str(k):str(v) for k,v in r.items()}
                record+=(str(r))+'\n'
            # row[0] will have ZipCode which we want to use to name the exported files
            filename=row[0]+'.json'
            blob = bucket.blob(filename)
            print 'Exporting to gs://{}/{}'.format(bucket_name,filename)
            blob.upload_from_string(record)
    
        # Delete the tmp table
        client.delete_table(table_ref)
    
    if __name__ == '__main__':
        parser = ArgumentParser()
        parser.add_argument('-p','--project', help="project where the ZipCode table resides", dest='project_id')
        parser.add_argument('-d','--dataset', help="dataset with the ZipCode table", dest='dataset_id')
        parser.add_argument('-t','--table', help="ZipCode table", dest='table_id')
        parser.add_argument('-b','--bucket', help="destination bucket", dest='bucket')
    
        args = parser.parse_args()
        main(args.project_id,args.dataset_id,args.table_id,args.bucket)
    
    推荐文章