代码之家  ›  专栏  ›  技术社区  ›  C.S.Reddy Gadipally

基于第二个数据集的Spark聚合

  •  0
  • C.S.Reddy Gadipally  · 技术社区  · 6 年前

    我有两个数据集(数据帧)

    1. idPeersDS-它有一个id列和它的对等的id。
    2. infoDS-它有两个类型列(type1,type2)和一个metric列。

    --

    idPeersDS
    +---+---------+
    | id|    peers|
    +---+---------+
    |  1|[1, 2, 3]|
    |  2|[2, 1, 6]|
    |  3|[3, 1, 2]|
    |  4|[4, 5, 6]|
    |  5|[5, 4, 6]|
    |  6|[6, 1, 2]|
    +---+---------+
    
    
    infoDS
    +---+-----+-----+------+
    | id|type1|type2|metric|
    +---+-----+-----+------+
    |  1|    A|    X|  10.0|
    |  1|    A|    Y|  20.0|
    |  1|    B|    X|  30.0|
    |  1|    B|    Y|  40.0|
    |  2|    A|    Y|  10.0|
    |  2|    B|    X|  20.0|
    |  2|    B|    Y|  30.0|
    |  3|    A|    X|  40.0|
    |  4|    B|    Y|  10.0|
    |  5|    A|    X|  20.0|
    |  5|    B|    X|  30.0|
    |  6|    A|    Y|  40.0|
    |  6|    B|    Y|  10.0|
    +---+-----+-----+------+
    

    例子: 对于组(“A”,“X”)和id=1,对等方为(1,2,3),zscore的度量为(10,0,40);因为id=2在组(“A”,“X”)中不存在,所以它为0。id=5不是id=1的对等方,因此它不是zscore计算的一部分。

    +---+------+---------+-----------+
    | id|metric|    peers|type1|type2|
    +---+------+---------+-----------+
    |  1|  10.0|[1, 2, 3]|    A|    X|
    |  3|  40.0|[3, 1, 2]|    A|    X|
    |  5|  20.0|[5, 4, 6]|    A|    X|
    Z = (X - μ) / σ
    Z = (10 - 16.66666) / 16.99673
    
    Z = -0.39223
    
     Output should be the following table. I can compute zscore if `peersmetrics` column instead of `zScoreValue` column like my code did.
        +---+------+---------+-----------+-----+-----+
        | id|metric|    peers|zScoreValue|type1|type2|    peersmetrics
        +---+------+---------+-----------+-----+-----+
        |  1|  10.0|[1, 2, 3]|      -0.39|    A|    X|    [10, 0, 40]
        |  3|  40.0|[3, 1, 2]|       1.37|    A|    X|    [40, 10, 0]
        |  5|  20.0|[5, 4, 6]|       1.41|    A|    X|    [20, 0 , 0]
        |  1|  40.0|[1, 2, 3]|       0.98|    B|    Y|    [40, 30, 0]
        |  2|  30.0|[2, 1, 6]|       0.27|    B|    Y|    [30, 40, 10]
        |  4|  10.0|[4, 5, 6]|       0.71|    B|    Y|
        |  6|  10.0|[6, 1, 2]|      -1.34|    B|    Y|
        |  1|  30.0|[1, 2, 3]|       1.07|    B|    X|
        |  2|  20.0|[2, 1, 6]|       0.27|    B|    X|
        |  5|  30.0|[5, 4, 6]|       1.41|    B|    X|
        |  1|  20.0|[1, 2, 3]|       1.22|    A|    Y|
        |  2|  10.0|[2, 1, 6]|      -1.07|    A|    Y|
        |  6|  40.0|[6, 1, 2]|       1.34|    A|    Y|
        +---+------+---------+-----------+-----+-----+
    

    下面是我的解决方案,但计算时间比我希望的要长。 真实数据集的大小: idPeersDS有17000个,infoDS有17000*6*15个

    val idPeersDS = Seq(
      (1, Seq(1,2,3)),
      (2, Seq(2,1,6)),
      (3, Seq(3,1,2)),
      (4, Seq(4,5,6)),
      (5, Seq(5,4,6)),
      (6, Seq(6,1,2))
    ).toDS.select($"_1" as "id", $"_2" as "peers")
    
    val infoDS = Seq(
      (1, "A", "X", 10),
      (1, "A", "Y", 20),
      (1, "B", "X", 30),
      (1, "B", "Y", 40),
      (2, "A", "Y", 10),
      (2, "B", "X", 20),
      (2, "B", "Y", 30),
      (3, "A", "X", 40),
      (4, "B", "Y", 10),
      (5, "A", "X", 20),
      (5, "B", "X", 30),
      (6, "A", "Y", 40),
      (6, "B", "Y", 10)
    ).toDS.select($"_1" as "id", $"_2" as "type1", $"_3" as "type2", $"_4" cast "double" as "metric")
    
    
    
    
    def calculateZScoreGivenPeers(idMetricDS: DataFrame, irPeersDS: DataFrame, roundTo: Int = 2)
    (implicit spark: SparkSession): DataFrame = {
    
      import spark.implicits._
    
      // for every id in the idMetricDS, get the peers and their metric for zscore, calculate zscore
      val fir = idMetricDS.join(irPeersDS, "id")
      val fsMapBroadcast = spark.sparkContext.broadcast(
        idMetricDS.toDF.map((r: Row) => {r.getInt(0) -> r.getDouble(1)}).rdd.collectAsMap)
      val fsMap = fsMapBroadcast.value
      val funUdf = udf((currId: Int, xs: WrappedArray[Int]) => {
        val zScoreMetrics: Array[Double] = xs.toArray.map(x => fsMap.getOrElse(x, 0.0))
        val ds = new DescriptiveStatistics(zScoreMetrics)
        val mean = ds.getMean()
        val sd = Math.sqrt(ds.getPopulationVariance())
        val zScore = if (sd == 0.0) {0.0} else {(fsMap.getOrElse(currId, 0.0)- mean) / sd}
        zScore
      })
    
      val idStatsWithZscoreDS =
        fir.withColumn("zScoreValue", round(funUdf(fir("id"), fir("peers")), roundTo))
      fsMapBroadcast.unpersist
      fsMapBroadcast.destroy
      return idStatsWithZscoreDS
    
    }
    
    val typesComb = infoDS.select("type1", "type2").dropDuplicates.collect
    
    val zScoreDS = typesComb.map(
      ept => {
        val et = ept.getString(0)
        val pt = ept.getString(1)
        val idMetricDS = infoDS.where($"type1" === lit(et) && $"type2" === lit(pt)).select($"id", $"metric")
        val zScoreDS = calculateZScoreGivenPeers(idMetricDS, idPeersDS)(spark)
        zScoreDS.select($"id", $"metric", $"peers", $"zScoreValue").withColumn("type1", lit(et)).withColumn("type2", lit(pt))
      }
    ).reduce(_.union(_))
    
    
    scala> idPeersDS.show(100)
    +---+---------+
    | id|    peers|
    +---+---------+
    |  1|[1, 2, 3]|
    |  2|[2, 1, 6]|
    |  3|[3, 1, 2]|
    |  4|[4, 5, 6]|
    |  5|[5, 4, 6]|
    |  6|[6, 1, 2]|
    +---+---------+
    
    
    scala> infoDS.show(100)
    +---+-----+-----+------+
    | id|type1|type2|metric|
    +---+-----+-----+------+
    |  1|    A|    X|  10.0|
    |  1|    A|    Y|  20.0|
    |  1|    B|    X|  30.0|
    |  1|    B|    Y|  40.0|
    |  2|    A|    Y|  10.0|
    |  2|    B|    X|  20.0|
    |  2|    B|    Y|  30.0|
    |  3|    A|    X|  40.0|
    |  4|    B|    Y|  10.0|
    |  5|    A|    X|  20.0|
    |  5|    B|    X|  30.0|
    |  6|    A|    Y|  40.0|
    |  6|    B|    Y|  10.0|
    +---+-----+-----+------+
    
    
    scala> typesComb
    res3: Array[org.apache.spark.sql.Row] = Array([A,X], [B,Y], [B,X], [A,Y])
    
    scala> zScoreDS.show(100)
    +---+------+---------+-----------+-----+-----+
    | id|metric|    peers|zScoreValue|type1|type2|
    +---+------+---------+-----------+-----+-----+
    |  1|  10.0|[1, 2, 3]|      -0.39|    A|    X|
    |  3|  40.0|[3, 1, 2]|       1.37|    A|    X|
    |  5|  20.0|[5, 4, 6]|       1.41|    A|    X|
    |  1|  40.0|[1, 2, 3]|       0.98|    B|    Y|
    |  2|  30.0|[2, 1, 6]|       0.27|    B|    Y|
    |  4|  10.0|[4, 5, 6]|       0.71|    B|    Y|
    |  6|  10.0|[6, 1, 2]|      -1.34|    B|    Y|
    |  1|  30.0|[1, 2, 3]|       1.07|    B|    X|
    |  2|  20.0|[2, 1, 6]|       0.27|    B|    X|
    |  5|  30.0|[5, 4, 6]|       1.41|    B|    X|
    |  1|  20.0|[1, 2, 3]|       1.22|    A|    Y|
    |  2|  10.0|[2, 1, 6]|      -1.07|    A|    Y|
    |  6|  40.0|[6, 1, 2]|       1.34|    A|    Y|
    +---+------+---------+-----------+-----+-----+
    
    0 回复  |  直到 6 年前
        1
  •  1
  •   C.S.Reddy Gadipally    6 年前

    我解决了。这是我的答案。这个解决方案的运行速度确实比我以前的解决方案快得多(<1/10),我的真实数据集上有这个问题。 我在reduce中避免了对driver和map以及数据集的合并。

    val idPeersDS = Seq(
      (1, Seq(1,2,3)),
      (2, Seq(2,1,6)),
      (3, Seq(3,1,2)),
      (4, Seq(4,5,6)),
      (5, Seq(5,4,6)),
      (6, Seq(6,1,2))
    ).toDS.select($"_1" as "id", $"_2" as "peers")
    
    val infoDS = Seq(
      (1, "A", "X", 10),
      (1, "A", "Y", 20),
      (1, "B", "X", 30),
      (1, "B", "Y", 40),
      (2, "A", "Y", 10),
      (2, "B", "X", 20),
      (2, "B", "Y", 30),
      (3, "A", "X", 40),
      (4, "B", "Y", 10),
      (5, "A", "X", 20),
      (5, "B", "X", 30),
      (6, "A", "Y", 40),
      (6, "B", "Y", 10)
    ).toDS.select($"_1" as "id", $"_2" as "type1", $"_3" as "type2", $"_4" cast "double" as "metric")
    
    
    // Exiting paste mode, now interpreting.
    
    idPeersDS: org.apache.spark.sql.DataFrame = [id: int, peers: array<int>]
    infoDS: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 2 more fields]
    
    scala> idPeersDS.show
    +---+---------+
    | id|    peers|
    +---+---------+
    |  1|[1, 2, 3]|
    |  2|[2, 1, 6]|
    |  3|[3, 1, 2]|
    |  4|[4, 5, 6]|
    |  5|[5, 4, 6]|
    |  6|[6, 1, 2]|
    +---+---------+
    
    
    scala> infoDS.show
    +---+-----+-----+------+
    | id|type1|type2|metric|
    +---+-----+-----+------+
    |  1|    A|    X|  10.0|
    |  1|    A|    Y|  20.0|
    |  1|    B|    X|  30.0|
    |  1|    B|    Y|  40.0|
    |  2|    A|    Y|  10.0|
    |  2|    B|    X|  20.0|
    |  2|    B|    Y|  30.0|
    |  3|    A|    X|  40.0|
    |  4|    B|    Y|  10.0|
    |  5|    A|    X|  20.0|
    |  5|    B|    X|  30.0|
    |  6|    A|    Y|  40.0|
    |  6|    B|    Y|  10.0|
    +---+-----+-----+------+
    
    
    scala> val infowithpeers = infoDS.join(idPeersDS, "id")
    infowithpeers: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 3 more fields]
    
    scala> infowithpeers.show
    +---+-----+-----+------+---------+
    | id|type1|type2|metric|    peers|
    +---+-----+-----+------+---------+
    |  1|    A|    X|  10.0|[1, 2, 3]|
    |  1|    A|    Y|  20.0|[1, 2, 3]|
    |  1|    B|    X|  30.0|[1, 2, 3]|
    |  1|    B|    Y|  40.0|[1, 2, 3]|
    |  2|    A|    Y|  10.0|[2, 1, 6]|
    |  2|    B|    X|  20.0|[2, 1, 6]|
    |  2|    B|    Y|  30.0|[2, 1, 6]|
    |  3|    A|    X|  40.0|[3, 1, 2]|
    |  4|    B|    Y|  10.0|[4, 5, 6]|
    |  5|    A|    X|  20.0|[5, 4, 6]|
    |  5|    B|    X|  30.0|[5, 4, 6]|
    |  6|    A|    Y|  40.0|[6, 1, 2]|
    |  6|    B|    Y|  10.0|[6, 1, 2]|
    +---+-----+-----+------+---------+
    
    
    scala> val joinMap = udf { values: Seq[Map[Int,Double]] => values.flatten.toMap }
    joinMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(IntegerType,DoubleType,false),Some(List(ArrayType(MapType(IntegerType,DoubleType,false),true))))
    
    scala> val zScoreCal = udf { (metric: Double, zScoreMetrics: WrappedArray[Double]) =>
        |   val ds = new DescriptiveStatistics(zScoreMetrics.toArray)
        |   val mean = ds.getMean()
        |   val sd = Math.sqrt(ds.getPopulationVariance())
        |   val zScore = if (sd == 0.0) {0.0} else {(metric - mean) / sd}
        |   zScore
        | }
    zScoreCal: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,DoubleType,Some(List(DoubleType, ArrayType(DoubleType,false))))
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    
    val infowithpeersidmetric = infowithpeers.withColumn("idmetric", map($"id",$"metric"))
    val idsingrpdf = infowithpeersidmetric.groupBy("type1","type2").agg(joinMap(collect_list(map($"id", $"metric"))) as "idsingrp")
    
    val metricsMap = udf { (peers: Seq[Int], values: Map[Int,Double]) => {
        peers.map(p => values.getOrElse(p,0.0))
      }
    }
    
    // Exiting paste mode, now interpreting.
    
    infowithpeersidmetric: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 4 more fields]
    idsingrpdf: org.apache.spark.sql.DataFrame = [type1: string, type2: string ... 1 more field]
    metricsMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(DoubleType,false),Some(List(ArrayType(IntegerType,false), MapType(IntegerType,DoubleType,false))))
    
    scala> val infoWithMap = infowithpeers.join(idsingrpdf, Seq("type1","type2")).withColumn("zScoreMetrics", metricsMap($"peers", $"idsingrp")).withColumn("zscore", round(zScoreCal($"metric",$"zScoreMetrics"),2))
    infoWithMap: org.apache.spark.sql.DataFrame = [type1: string, type2: string ... 6 more fields]
    
    scala> infoWithMap.show
    +-----+-----+---+------+---------+--------------------+------------------+------+
    |type1|type2| id|metric|    peers|            idsingrp|     zScoreMetrics|zscore|
    +-----+-----+---+------+---------+--------------------+------------------+------+
    |    A|    X|  1|  10.0|[1, 2, 3]|[3 -> 40.0, 5 -> ...| [10.0, 0.0, 40.0]| -0.39|
    |    A|    Y|  1|  20.0|[1, 2, 3]|[2 -> 10.0, 6 -> ...| [20.0, 10.0, 0.0]|  1.22|
    |    B|    X|  1|  30.0|[1, 2, 3]|[1 -> 30.0, 2 -> ...| [30.0, 20.0, 0.0]|  1.07|
    |    B|    Y|  1|  40.0|[1, 2, 3]|[4 -> 10.0, 1 -> ...| [40.0, 30.0, 0.0]|  0.98|
    |    A|    Y|  2|  10.0|[2, 1, 6]|[2 -> 10.0, 6 -> ...|[10.0, 20.0, 40.0]| -1.07|
    |    B|    X|  2|  20.0|[2, 1, 6]|[1 -> 30.0, 2 -> ...| [20.0, 30.0, 0.0]|  0.27|
    |    B|    Y|  2|  30.0|[2, 1, 6]|[4 -> 10.0, 1 -> ...|[30.0, 40.0, 10.0]|  0.27|
    |    A|    X|  3|  40.0|[3, 1, 2]|[3 -> 40.0, 5 -> ...| [40.0, 10.0, 0.0]|  1.37|
    |    B|    Y|  4|  10.0|[4, 5, 6]|[4 -> 10.0, 1 -> ...| [10.0, 0.0, 10.0]|  0.71|
    |    A|    X|  5|  20.0|[5, 4, 6]|[3 -> 40.0, 5 -> ...|  [20.0, 0.0, 0.0]|  1.41|
    |    B|    X|  5|  30.0|[5, 4, 6]|[1 -> 30.0, 2 -> ...|  [30.0, 0.0, 0.0]|  1.41|
    |    A|    Y|  6|  40.0|[6, 1, 2]|[2 -> 10.0, 6 -> ...|[40.0, 20.0, 10.0]|  1.34|
    |    B|    Y|  6|  10.0|[6, 1, 2]|[4 -> 10.0, 1 -> ...|[10.0, 40.0, 30.0]| -1.34|
    +-----+-----+---+------+---------+--------------------+------------------+------+