代码之家  ›  专栏  ›  技术社区  ›  G.Saleh

使用Dataproc(Spark)在BigQuery中加载CSV文件

  •  1
  • G.Saleh  · 技术社区  · 8 年前

    我正在尝试从GCS中的CSV文件中读取数据,并将其保存在BigQuery表中。

    此my csv文件:

    1,Marc,B12,2017-03-24
    2,Marc,B12,2018-01-31
    3,Marc,B21,2017-03-17
    4,Jeam,B12,2017-12-30
    5,Jeam,B12,2017-09-02
    6,Jeam,B11,2018-06-30
    7,Jeam,B21,2018-03-02
    8,Olivier,B20,2017-12-30
    

    这是我的代码:

    val spark = SparkSession
        .builder()
        .appName("Hyp-session-bq")
        .config("spark.master","local")
        .getOrCreate()
      val sc : SparkContext = spark.sparkContext
    
    
      val conf=sc.hadoopConfiguration
    
      //Input Parameters
      val projectId = conf.get("fs.gs.project.id")
      val bucket = conf.get("fs.gs.system.bucket")
      val inputTable = s"$projectId:rpc.testBig"
    
      //Input Configuration
      conf.set(BigQueryConfiguration.PROJECT_ID_KEY,projectId)
      conf.set(BigQueryConfiguration.GCS_BUCKET_KEY,bucket)
      BigQueryConfiguration.configureBigQueryInput(conf,inputTable)
    
      //Output Parameters
      val outPutTable = s"$projectId:rpc.outTestBig"
    
      // Temp output bucket that is deleted upon completion of job
      val outPutGcsPath = ("gs://"+bucket+"/hadoop/tmp/outTestBig")
    
      BigQueryOutputConfiguration.configure(conf,
        outPutTable,
        null,
        outPutGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        classOf[TextOutputFormat[_,_]])
    
      conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[_,_]].getName)
    
      // Truncate the table before writing output to allow multiple runs.
      conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,"WRITE_TRUNCATE")
    
      val text_file = sc.textFile("gs://test_files/csvfiles/test.csv")
      val lignes = text_file.flatMap(x=>x.split(" "))
      case class schemaFile(id: Int, name: String, symbole: String, date: String)
    
      def parseStringWithCaseClass(str: String): schemaFile = schemaFile(
          val id = str.split(",")(0).toInt,
          val name = str.split(",")(1),
          val symbole = str.split(",")(2),
          val date = str.split(",")(3)
        )
    
        val result1 = lignes.map(x=>parseStringWithCaseClass(x))
        val x =result1.map(elem =>(null,new Gson().toJsonTree(elem)))
        val y = x.saveAsNewAPIHadoopDataset(conf)  
    

    运行代码时,出现以下错误:

    ERROR org.apache.spark.internal.io.SparkHadoopMapReduceWriter: Aborting job job_20180226083501_0008.
    com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
    {
      "code" : 400,
      "errors" : [ {
        "domain" : "global",
        "message" : "Load configuration must specify at least one source URI",
        "reason" : "invalid"
      } ],
      "message" : "Load configuration must specify at least one source URI"
    }
            at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
            at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
            at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
            at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
            at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
            at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
            at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
            at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
            at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.insertJobOrFetchDuplicate(BigQueryHelper.java:306)
            at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:160)
            at com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57)
            at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
            at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
            at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
            at jeam.BigQueryIO$.main(BigQueryIO.scala:115)
            at jeam.BigQueryIO.main(BigQueryIO.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  
    

    我认为问题在于 案例类别 parseStringWithCaseClass 但我不知道如何解决这个问题。 我在配置中没有问题,因为我在尝试使用wordcount示例时得到了完美的结果: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

    2 回复  |  直到 8 年前
        1
  •  1
  •   jean-marc    8 年前

    尝试使用Tuple4:

      def parseStringWithTuple(str: String): Tuple4[Int, String, String, String] = {
          val id = str.split(",")(0).toInt
          val name = str.split(",")(1)
          val symbole = str.split(",")(2)
          val date = str.split(",")(3)
          (id, name, symbole, date)
        }
    val result1 = lignes.map(x=>parseStringWithTuple(x))
    

    但我测试了你的代码,效果很好。

        2
  •  1
  •   dsesto    8 年前

    我一直在用我自己的BigQuery表和CSV文件运行您的代码,并且它对我来说很有效,不需要任何额外的修改。

    我知道当你改变的时候 CaseClass Tuple4 ,正如@jean marc所建议的,您的代码开始工作,因此这是一种奇怪的行为,更重要的是,对于他和我来说,您的代码实际上正在工作,无需进一步修改。错误 Load configuration must specify at least one source URI 通常在BigQuery中的加载作业未正确配置且未接收到正确的云存储对象URL时出现。但是,如果相同的精确代码仅在更改为 元组4 而且您使用的CSV文件是相同的,并且没有更改(即URL有效),这可能是暂时的问题,可能与云存储或BigQuery有关,而与Dataproc作业本身无关。

    最后,如果这个问题是针对您的(至少有两个以上的用户使用了相同的代码),一旦您检查到没有与云存储对象相关的问题(权限、错误位置等),您可能会有兴趣在 Public Issue Tracker .