在最初问题的评论中有两种可能的解决方案:
TBH,我只测试了第二种方法,因为这是一种更快的方法(警告更少)。最终的解决方案只需要最小的更改-导入适当的lib并并行化数组
hdfs.globStatus(sourcePath)
电话回来了。这是最后一个代码,删除了注释,添加了两个注释,以便更容易地发现更改。
import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.parallel.immutable.ParVector
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.BZip2Codec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfsConf)
val sourcePath = new Path("/source/*20180801*")
hdfs.globStatus(sourcePath).par.foreach( fileStatus => {
val fileName = fileStatus.getPath().getName()
val filePathName = fileStatus.getPath().toString
if (fileName.contains(".done")) {
val myFile = sc.textFile(filePathName)
val compressedBasePath = "/destination/compressed/"
val compressedPath = compressedBasePath + "tmp_/" + fileName
myFile.saveAsTextFile(compressedPath,
classOf[org.apache.hadoop.io.compress.BZip2Codec])
FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs,
new Path(compressedBasePath + "/" +
fileName.replace(".done", ".done.bz2")),
true, hdfsConf, null)
myFile.unpersist()
}
})