代码之家  ›  专栏  ›  技术社区  ›  hummingBird

如何使spark etl更加并行化,而不丢失信息(在文件名中)

  •  1
  • hummingBird  · 技术社区  · 6 年前

    部分 保持 文件名信息。我需要名字能够做一个MD5和“确认”没有发生信息丢失。

    这是我的代码:

    import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
    import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.functions.broadcast 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.{SparkConf, SparkContext} 
    
    sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
    sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
    sc.getConf.set("spark.hadoop.mapred.output.compression.codec", 
                   "org.apache.hadoop.io.compress.BZip2Codec")
    sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
    
    val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    val hdfs = FileSystem.get(hdfsConf)
    val sourcePath = new Path("/source/*20180801*") 
    
    hdfs.globStatus(sourcePath).foreach( fileStatus => {
      val fileName = fileStatus.getPath().getName()
      val filePathName = fileStatus.getPath().toString
      if (fileName.contains(".done")) {
        /* open, then save compressed */
        val myFile = sc.textFile(filePathName)
        val compressedBasePath = "/destination/compressed/"
        /* use tmp_ to store folder w/ parts in it */
        val compressedPath = compressedBasePath + "tmp_/" + fileName
        myFile.saveAsTextFile(compressedPath, 
                              classOf[org.apache.hadoop.io.compress.BZip2Codec])
        /* merge part* -> old_name.bzip */
        FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs, 
                           new Path(compressedBasePath + "/" + fileName + ".bzip2"), 
                           true, hdfsConf, null)
        myFile.unpersist()
      }
    })
    

    在我意识到 需要

    val myFile = sc.textFile("/source/*20180801*")
    myFile.saveAsTextFile(compressedPath, 
                          classOf[org.apache.hadoop.io.compress.BZip2Codec])
    

    但是我不能做重命名部分,我需要名字。 你知道我能做什么吗?

    感谢评论中的建议,以及 this particular question ,我可以使用并行集合解决问题。唯一真正的变化是 import scala.collection.parallel.immutable.ParVector 加上 par foreach .

    关于并行集合的完整文章: https://docs.scala-lang.org/overviews/parallel-collections/overview.html

    谢谢

    1 回复  |  直到 6 年前
        1
  •  0
  •   hummingBird    6 年前

    在最初问题的评论中有两种可能的解决方案:

    TBH,我只测试了第二种方法,因为这是一种更快的方法(警告更少)。最终的解决方案只需要最小的更改-导入适当的lib并并行化数组 hdfs.globStatus(sourcePath) 电话回来了。这是最后一个代码,删除了注释,添加了两个注释,以便更容易地发现更改。

    import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
    import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.functions.broadcast 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.{SparkConf, SparkContext} 
    import scala.collection.parallel.immutable.ParVector /* added */
    
    sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
    sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
    sc.getConf.set("spark.hadoop.mapred.output.compression.codec", 
                   "org.apache.hadoop.io.compress.BZip2Codec")
    sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
    
    val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    val hdfs = FileSystem.get(hdfsConf)
    val sourcePath = new Path("/source/*20180801*") 
    
    /* note the par method call below */
    hdfs.globStatus(sourcePath).par.foreach( fileStatus => {
      val fileName = fileStatus.getPath().getName()
      val filePathName = fileStatus.getPath().toString
      if (fileName.contains(".done")) {
        val myFile = sc.textFile(filePathName)
        val compressedBasePath = "/destination/compressed/"
        val compressedPath = compressedBasePath + "tmp_/" + fileName
        myFile.saveAsTextFile(compressedPath, 
                              classOf[org.apache.hadoop.io.compress.BZip2Codec])
        FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs, 
                           new Path(compressedBasePath + "/" + 
                                    fileName.replace(".done", ".done.bz2")), 
                           true, hdfsConf, null)
        myFile.unpersist()
      }
    })