代码之家  ›  专栏  ›  技术社区  ›  SheCodes

拆分Spark数据帧并基于一列值计算平均值

  •  1
  • SheCodes  · 技术社区  · 8 年前

    我有两个数据帧,第一个数据帧 classRecord

    Class, Calculation
    first, Average
    Second, Sum
    Third, Average
    

    第二个数据帧 studentRecord

    Name, height, Camp, Class
    Shae, 152, yellow, first
    Joe, 140, yellow, first
    Mike, 149, white, first
    Anne, 142, red, first
    Tim, 154, red, Second
    Jake, 153, white, Second
    Sherley, 153, white, Second
    

    从第二个数据帧开始,基于类类型,我想分别基于camp(如果类是第一个,则分别计算黄色、白色等的平均值)计算高度(对于类一:平均值,对于类二:总和等)。

    //function to calculate average
    def averageOnName(splitFrame : org.apache.spark.sql.DataFrame ) : Array[(String, Double)] = {
      val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
      var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1 / y._2).collect
      return avg_by_key
    }
    
    //required schema for further modifications
    val schema = StructType(
    StructField("name", StringType, false) ::
    StructField("avg", DoubleType, false) :: Nil)
    
    // for each loop on each class type
    classRecord.rdd.foreach{
      //filter students based on camps
      var campYellow =studentRecord.filter($"Camp" === "yellow")
      var campWhite =studentRecord.filter($"Camp" === "white")
      var campRed =studentRecord.filter($"Camp" === "red")
    
      // since I know that calculation for first class is average, so representing calculation only for class first
      val avgcampYellow  =  averageOnName(campYellow)
      val avgcampWhite   =  averageOnName(campWhite)
      val avgcampRed   =  averageOnName(campRed)
    
      // union of all
      val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
      //conversion of rdd to frame
      var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
      //union with yellow camp data
      val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
      //conversion of rdd to frame
      var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
      var dfYellWhite = dfYellow.union(dfWhite)
      //union with yellow,white camp data
      val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
      //conversion of rdd to frame
      var dfRed = sqlContext.createDataFrame(rddRed, schema)
      var dfYellWhiteRed = dfYellWhite .union(dfRed)
      // other modifications and final result to hive
    }
    

    在这里,我正在努力:

    1. 硬编码黄色、红色和白色,也可能有其他营地类型。
    2. 数据帧目前正在被过滤多次,可以进行改进。

    感谢您的帮助。

    1 回复  |  直到 7 年前
        1
  •  0
  •   Shaido MadHadders    6 年前

    classRecord 单独设置数据帧并提取所需内容。您可以在spark中使用 groupBy() 方法并聚合值。

    使用示例数据帧:

    val spark = SparkSession.builder.getOrCreate()
    import spark.implicits._
    
    studentRecord.show()
    
    +-------+------+------+------+
    |   Name|height|  Camp| Class|
    +-------+------+------+------+
    |   Shae|   152|yellow| first|
    |    Joe|   140|yellow| first|
    |   Mike|   149| white| first|
    |   Anne|   142|   red| first|
    |    Tim|   154|   red|Second|
    |   Jake|   153| white|Second|
    |Sherley|   153| white|Second|
    +-------+------+------+------+
    
    val df = studentRecord.groupBy("Class", "Camp")
      .agg(
        sum($"height").as("Sum"), 
        avg($"height").as("Average"), 
        collect_list($"Name").as("Names")
      )
    df.show()
    
    +------+------+---+-------+---------------+
    | Class|  Camp|Sum|Average|          Names|
    +------+------+---+-------+---------------+
    | first| white|149|  149.0|         [Mike]|
    | first|   red|142|  142.0|         [Anne]|
    |Second|   red|154|  154.0|          [Tim]|
    |Second| white|306|  153.0|[Jake, Sherley]|
    | first|yellow|292|  146.0|    [Shae, Joe]|
    +------+------+---+-------+---------------+
    

    完成此操作后,您只需检查您的第一个 类记录

    // Collects the dataframe as an Array[(String, String)]
    val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)}
    
    for (classRec <- classRecs){
      val clas = classRec._1
      val calc = classRec._2
    
      // Matches which calculation you want to do
      val df2 = calc match {
        case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average")
        case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum")
      }
    
    // Do something with df2
    }
    

    希望有帮助!