代码之家  ›  专栏  ›  技术社区  ›  Dimebag

在本地优化Spark脚本

  •  1
  • Dimebag  · 技术社区  · 7 年前

    我一直在努力优化一个Spark脚本,但它仍然慢得让人无法忍受(600MB的数据需要24分钟)。完整的代码是 here 但我会尽量在这个问题上总结一下,如果你有什么办法可以加快速度,请告诉我。

    硬件 local local[*] 但让我们集中精力

    数据 :2个NetCDF文件(列数据);单机=>无HDF

    分析数据 :将所有列读取为 Arrays ss.parallelize + zip DataFrame

    行动 : show() summary(min, max, mean, stddev) , write , groupBy() ,

    我是怎么跑的 : sbt assembly 去创造一个不包括火花本身的大罐子+

    spark-submit --master "local" --conf "spark.sql.shuffle.partitions=4" --driver-memory "10g" target/scala-2.11/spark-assembly-1.0.jar --partitions 4 --input ${input} --slice ${slice}

    我试过的优化

    • 不同的分区号=>1似乎会冻结,超过4似乎会减慢速度(遵守numPartitions=~4x个核心数和numPartitions=~data/128MB的规则)
    • 将所有数据作为Scala数组读取到驱动程序->转置->单个RDD(与压缩RDD相反)=>速度较慢
    • 在相同的列和numPartitions上重新分配刚刚读取的数据帧,这样连接就不会触发shuffle
    • 缓存重新使用的数据帧

    代码

    private def readDataRDD(path: String, ss: SparkSession, dims: List[String], createIndex: Boolean, numPartitions: Int): DataFrame = {
      val file: NetcdfFile = NetcdfFile.open(path)
      val vars: util.List[Variable] = file.getVariables
      // split variables into dimensions and regular data
      val dimVars: Map[String, Variable] = vars.filter(v => dims.contains(v.getShortName)).map(v => v.getShortName -> v).toMap
      val colVars: Map[String, Variable] = vars.filter(v => !dims.contains(v.getShortName)).map(v => v.getShortName -> v).toMap
    
      val lon: Array[Float] = readVariable(dimVars(dims(0)))
      val lat: Array[Float] = readVariable(dimVars(dims(1)))
      val tim: Array[Float] = readVariable(dimVars(dims(2)))
      val dimsCartesian: Array[ListBuffer[_]] = cartesian(lon, lat, tim)
    
      // create the rdd with the dimensions (by transposing the cartesian product)
      var tempRDD: RDD[ListBuffer[_]] = ss.sparkContext.parallelize(dimsCartesian, numPartitions)
      // gather the names of the columns (in order)
      val names: ListBuffer[String] = ListBuffer(dims: _*)
    
      for (col <- colVars) {
        tempRDD = tempRDD.zip(ss.sparkContext.parallelize(readVariable(col._2), numPartitions)).map(t => t._1 :+ t._2)
        names.add(col._1)
      }
    
      if (createIndex) {
        tempRDD = tempRDD.zipWithIndex().map(t => t._1 :+ t._2.asInstanceOf[Float])
        names.add("index")
      }
    
      val finalRDD: RDD[Row] = tempRDD.map(Row.fromSeq(_))
      val df: DataFrame = ss.createDataFrame(finalRDD, StructType(names.map(StructField(_, FloatType, nullable = false))))
    
      val floatTimeToString = udf((time: Float) => {
        val udunits = String.valueOf(time.asInstanceOf[Int]) + " " + UNITS
    
        CalendarDate.parseUdunits(CALENDAR, udunits).toString.substring(0, 10)
      })
    
      df.withColumn("time", floatTimeToString(df("time")))
    }
    
    def main(args: Array[String]): Unit = {
      val spark: SparkSession = SparkSession.builder
        .appName("Spark Pipeline")
        .getOrCreate()
    
      val dimensions: List[String] = List("longitude", "latitude", "time")
      val numberPartitions = options('partitions).asInstanceOf[Int]
      val df1: DataFrame = readDataRDD(options('input) + "data1.nc", spark, dimensions, createIndex = true, numberPartitions)
        .repartition(numberPartitions, col("longitude"), col("latitude"), col("time"))
      val df2: DataFrame = readDataRDD(options('input) + "data2.nc", spark, dimensions, createIndex = false, numberPartitions)
        .repartition(numberPartitions, col("longitude"), col("latitude"), col("time"))
    
      var df: DataFrame = df1.join(df2, dimensions, "inner").cache()
    
      println(df.show())
    
      val slice: Array[String] = options('slice).asInstanceOf[String].split(":")
      df = df.filter(df("index") >= slice(0).toFloat && df("index") < slice(1).toFloat)
        .filter(df("tg") =!= -99.99f && df("pp") =!= -999.9f && df("rr") =!= -999.9f)
        .drop("pp_stderr", "rr_stderr", "index")
        .withColumn("abs_diff", abs(df("tx") - df("tn"))).cache()
    
      val df_agg = df.drop("longitude", "latitude", "time")
        .summary("min", "max", "mean", "stddev")
        .coalesce(1)
        .write
        .option("header", "true")
        .csv(options('output) + "agg")
    
      val computeYearMonth = udf((time: String) => {
        time.substring(0, 7).replace("-", "")
      })
      df = df.withColumn("year_month", computeYearMonth(df("time")))
    
      val columnsToAgg: Array[String] = Array("tg", "tn", "tx", "pp", "rr")
      val groupOn: Seq[String] = Seq("longitude", "latitude", "year_month")
      val grouped_df: DataFrame = df.groupBy(groupOn.head, groupOn.drop(1): _*)
        .agg(columnsToAgg.map(column => column -> "mean").toMap)
        .drop("longitude", "latitude", "year_month")
    
      val columnsToSum: Array[String] = Array("tg_mean", "tn_mean", "tx_mean", "rr_mean", "pp_mean")
      grouped_df
        .agg(columnsToSum.map(column => column -> "sum").toMap)
        .coalesce(1)
        .write
        .option("header", "true")
        .csv(options('output) + "grouped")
    
      spark.stop()
    }
    

    有什么办法可以进一步加快速度吗?

    笔记

    • 地方的 24分钟; local[32]
    • 是的,Spark不是为一台机器构建的,但是java和pandas中相同的操作(单线程)分别需要10秒和40秒;差别很大
    • 当前无法查看web界面以可视化任务
    0 回复  |  直到 7 年前
    推荐文章