代码之家  ›  专栏  ›  技术社区  ›  muazfaiz

scala aggregate first函数提供意外结果

  •  1
  • muazfaiz  · 技术社区  · 7 年前

    我在scalaspark中使用了一个简单的groupby查询,目标是在已排序的数据帧中获取组中的第一个值。这是我的spark数据帧

    +---------------+------------------------------------------+
    |ID             |some_flag |some_type  |  Timestamp        |
    +---------------+------------------------------------------+
    |      656565654|      true|     Type 1|2018-08-10 00:00:00|
    |      656565654|     false|     Type 1|2017-08-02 00:00:00|
    |      656565654|     false|     Type 2|2016-07-30 00:00:00|
    |      656565654|     false|     Type 2|2016-05-04 00:00:00|
    |      656565654|     false|     Type 2|2016-04-29 00:00:00|
    |      656565654|     false|     Type 2|2015-10-29 00:00:00|
    |      656565654|     false|     Type 2|2015-04-29 00:00:00|
    +---------------+----------+-----------+-------------------+
    

    这是我的汇总查询

    val sampleDF = df.sort($"Timestamp".desc).groupBy("ID").agg(first("Timestamp"), first("some_flag"), first("some_type"))
    

    +---------------+-------------+---------+-------------------+
    |ID             |some_falg    |some_type|  Timestamp        |
    +---------------+-------------+---------+-------------------+
    |      656565654|         true|   Type 1|2018-08-10 00:00:00|
    +---------------+-------------+---------+-------------------+
    

    但是得到下面的wierd输出,它像一个随机行一样不断变化

    +---------------+-------------+---------+-------------------+
    |ID             |some_falg    |some_type|  Timestamp        |
    +---------------+-------------+---------+-------------------+
    |      656565654|        false|   Type 2|2015-10-29 00:00:00|
    +---------------+-------------+---------+-------------------+
    

    另外请注意,数据帧中没有空值。我在做错事的地方抓狂。需要帮助!

    2 回复  |  直到 7 年前
        1
  •  3
  •   Vamsi Prabhala    7 年前

    尝试获取所有第一个值的方法返回的结果不正确。每个列值可能来自不同的行。

    相反,你应该 order by 按每个组降序排列的时间戳并获取第一行。一种简单的方法是使用 row_number .

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    val sampleDF = df.withColumn("rnum",row_number().over(Window.partitionBy(col("ID")).orderBy(col("Timestamp").desc)))
    
    sampleDF.filter(col("rnum") == 1).show
    
        2
  •  3
  •   DNA    7 年前

    只是为了补充Vamsi的答案;问题是 groupBy first 它为该列找到的第一个非空值 i、 组中该列的几乎所有非空值。

    groupBy公司 不会以任何可复制的方式影响组内的顺序。

    也可以看这个 blog post 这就解释了,由于上述行为,从多个 第一 调用甚至不能来自组内的同一行。

    用3列k、t、v输入数据

    z, 1, null
    z, 2, 1.5
    z, 3, 2.4
    

    df.groupBy("k").agg(
      $"k",
      first($"t"),
      first($"v")
    )
    

    z, 1, 1.5
    

    这个结果是两个记录的混合!

    推荐文章