我在scalaspark中使用了一个简单的groupby查询,目标是在已排序的数据帧中获取组中的第一个值。这是我的spark数据帧
+---------------+------------------------------------------+ |ID |some_flag |some_type | Timestamp | +---------------+------------------------------------------+ | 656565654| true| Type 1|2018-08-10 00:00:00| | 656565654| false| Type 1|2017-08-02 00:00:00| | 656565654| false| Type 2|2016-07-30 00:00:00| | 656565654| false| Type 2|2016-05-04 00:00:00| | 656565654| false| Type 2|2016-04-29 00:00:00| | 656565654| false| Type 2|2015-10-29 00:00:00| | 656565654| false| Type 2|2015-04-29 00:00:00| +---------------+----------+-----------+-------------------+
这是我的汇总查询
val sampleDF = df.sort($"Timestamp".desc).groupBy("ID").agg(first("Timestamp"), first("some_flag"), first("some_type"))
+---------------+-------------+---------+-------------------+ |ID |some_falg |some_type| Timestamp | +---------------+-------------+---------+-------------------+ | 656565654| true| Type 1|2018-08-10 00:00:00| +---------------+-------------+---------+-------------------+
但是得到下面的wierd输出,它像一个随机行一样不断变化
+---------------+-------------+---------+-------------------+ |ID |some_falg |some_type| Timestamp | +---------------+-------------+---------+-------------------+ | 656565654| false| Type 2|2015-10-29 00:00:00| +---------------+-------------+---------+-------------------+
另外请注意,数据帧中没有空值。我在做错事的地方抓狂。需要帮助!
尝试获取所有第一个值的方法返回的结果不正确。每个列值可能来自不同的行。
相反,你应该 order by 按每个组降序排列的时间戳并获取第一行。一种简单的方法是使用 row_number .
order by
row_number
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val sampleDF = df.withColumn("rnum",row_number().over(Window.partitionBy(col("ID")).orderBy(col("Timestamp").desc))) sampleDF.filter(col("rnum") == 1).show
只是为了补充Vamsi的答案;问题是 groupBy first 它为该列找到的第一个非空值 i、 组中该列的几乎所有非空值。
groupBy
first
在 groupBy公司 不会以任何可复制的方式影响组内的顺序。
groupBy公司
也可以看这个 blog post 这就解释了,由于上述行为,从多个 第一 调用甚至不能来自组内的同一行。
第一
用3列k、t、v输入数据
z, 1, null z, 2, 1.5 z, 3, 2.4
df.groupBy("k").agg( $"k", first($"t"), first($"v") )
z, 1, 1.5
这个结果是两个记录的混合!