代码之家  ›  专栏  ›  技术社区  ›  osk

加载a。Scala中HDFS的csv文件

  •  0
  • osk  · 技术社区  · 7 年前

    所以我基本上有以下代码来阅读。csv文件并将其存储在 Array[Array[String]] :

    def load(filepath: String): Array[Array[String]] = {
          var data = Array[Array[String]]()
          val bufferedSource = io.Source.fromFile(filepath)
          for (line <- bufferedSource.getLines) {
            data :+ line.split(",").map(_.trim)
          }
          bufferedSource.close
          return data.slice(1,data.length-1) //skip header
      }
    

    它适用于未存储在HDFS上的文件。然而,当我在HDFS上尝试同样的方法时,我得到

    找不到此类文件或目录

    在HDFS上写入文件时,我还必须更改原始代码并添加一些 FileSystem Path 参数到 PrintWriter ,但这次我完全不知道怎么做。

    我到目前为止:

      def load(filepath: String, sc: SparkContext): Array[Array[String]] = {
          var data = Array[Array[String]]()
          val fs = FileSystem.get(sc.hadoopConfiguration)
          val stream = fs.open(new Path(filepath))
          var line = ""
          while ((line = stream.readLine()) != null) {
            data :+ line.split(",").map(_.trim)
          }
    
          return data.slice(1,data.length-1) //skip header
      }
    

    这应该行得通,但我有一个 NullPointerException 将行与null进行比较时,或者如果其长度超过0。

    2 回复  |  直到 7 年前
        1
  •  1
  •   osk    7 年前

    此代码将读取。HDFS中的csv文件:

      def read(filepath: String, sc: SparkContext): ArrayBuffer[Array[String]] = {
          var data = ArrayBuffer[Array[String]]()
          val fs = FileSystem.get(sc.hadoopConfiguration)
          val stream = fs.open(new Path(filepath))
          var line = stream.readLine()
          while (line != null) {
            val row = line.split(",").map(_.trim)
            data += row
            line = stream.readLine()
          }
          stream.close()
    
          return data // or return data.slice(1,data.length-1) to skip header
      }
    
        2
  •  -1
  •   Nicolas Cailloux    7 年前

    请阅读 this post about reading CSV 《Scala烹饪书》作者阿尔文·亚历山大:

    object CSVDemo extends App {
      println("Month, Income, Expenses, Profit")
      val bufferedSource = io.Source.fromFile("/tmp/finance.csv")
      for (line <- bufferedSource.getLines) {
        val cols = line.split(",").map(_.trim)
        // do whatever you want with the columns here
        println(s"${cols(0)}|${cols(1)}|${cols(2)}|${cols(3)}")
      }
      bufferedSource.close
    }
    

    您只需要从HDF中获取一个InputStream,并在这个代码段中进行替换