假设您从以下数据帧开始,
df
:
+----------+-------+-------+
| Date| id|numbers|
+----------+-------+-------+
|2017-03-02|group 1| 98|
|2017-04-01|group 1| 50|
|2018-03-02|group 1| 5|
|2016-03-01|group 2| 49|
|2016-12-22|group 2| 81|
|2017-12-31|group 2| 91|
|2018-08-08|group 2| 19|
|2018-09-25|group 2| 52|
|2017-01-01|group 3| 75|
|2018-12-12|group 3| 17|
+----------+-------+-------+
订购数据帧
首先添加
row_number
正如您在示例中所做的,并将输出分配给一个新的数据帧
df2
:
import pyspark.sql.functions as f
from pyspark.sql import Window
df2 = df.select(
"*", f.row_number().over(Window.partitionBy("id").orderBy("Date")).alias("row_number")
)
df2.show()
#+----------+-------+-------+----------+
#| Date| id|numbers|row_number|
#+----------+-------+-------+----------+
#|2017-03-02|group 1| 98| 1|
#|2017-04-01|group 1| 50| 2|
#|2018-03-02|group 1| 5| 3|
#|2016-03-01|group 2| 49| 1|
#|2016-12-22|group 2| 81| 2|
#|2017-12-31|group 2| 91| 3|
#|2018-08-08|group 2| 19| 4|
#|2018-09-25|group 2| 52| 5|
#|2017-01-01|group 3| 75| 1|
#|2018-12-12|group 3| 17| 2|
#+----------+-------+-------+----------+
收集中值值
现在你可以加入了
df2型
对自己说
id
列的条件是
row number
是
1
或者它大于右边的
行数
. 然后按左边数据框的
("id", "Date", "row_number")
然后收集
numbers
从右边的数据框进入列表。
在这种情况下
行数
等于1,我们只想保留此收集列表的第一个元素。否则保留所有的数字,但要对它们进行排序,因为我们需要对它们进行排序以计算中值。
调用此中间数据帧
df3
:
df3 = df2.alias("l").join(df2.alias("r"), on="id", how="left")\
.where("l.row_number = 1 OR (r.row_number < l.row_number)")\
.groupBy("l.id", "l.Date", "l.row_number")\
.agg(f.collect_list("r.numbers").alias("numbers"))\
.select(
"id",
"Date",
"row_number",
f.when(
f.col("row_number") == 1,
f.array([f.col("numbers").getItem(0)])
).otherwise(f.sort_array("numbers")).alias("numbers")
)
df3.show()
#+-------+----------+----------+----------------+
#| id| Date|row_number| numbers|
#+-------+----------+----------+----------------+
#|group 1|2017-03-02| 1| [98]|
#|group 1|2017-04-01| 2| [98]|
#|group 1|2018-03-02| 3| [50, 98]|
#|group 2|2016-03-01| 1| [49]|
#|group 2|2016-12-22| 2| [49]|
#|group 2|2017-12-31| 3| [49, 81]|
#|group 2|2018-08-08| 4| [49, 81, 91]|
#|group 2|2018-09-25| 5|[19, 49, 81, 91]|
#|group 3|2017-01-01| 1| [75]|
#|group 3|2018-12-12| 2| [75]|
#+-------+----------+----------+----------------+
请注意
数字
列
df3型
有一个适当值的列表,我们希望找到其中值。
计算中值
因为您的Spark版本大于2.1,所以可以使用
pyspark.sql.functions.posexplode()
从这个值列表中计算中值。对于较低版本的spark,您需要使用
udf
.
首先在
df3型
:
-
isEven
:一个布尔值,指示
数字
数组有偶数个元素
-
middle
:数组中间的索引,它是长度为/2的地板
创建这些列后,使用
posexplode()
,它将返回两个新列:
pos
和
col
. 然后我们过滤出结果数据帧,只保留计算中值所需的位置。
保持位置的逻辑如下:
-
如果
伊塞文
是
False
,我们只保留中间位置
-
如果
伊塞文
是
True
,我们保持中间位置和中间位置-1。
最后按
身份证件
和
Date
平均剩余的
数字
.
df3.select(
"*",
f.when(
(f.size("numbers") % 2) == 0,
f.lit(True)
).otherwise(f.lit(False)).alias("isEven"),
f.floor(f.size("numbers")/2).alias("middle")
).select(
"id",
"Date",
"middle",
f.posexplode("numbers")
).where(
"(isEven=False AND middle=pos) OR (isEven=True AND pos BETWEEN middle-1 AND middle)"
).groupby("id", "Date").agg(f.avg("col").alias("median")).show()
#+-------+----------+------+
#| id| Date|median|
#+-------+----------+------+
#|group 1|2017-03-02| 98.0|
#|group 1|2017-04-01| 98.0|
#|group 1|2018-03-02| 74.0|
#|group 2|2016-03-01| 49.0|
#|group 2|2016-12-22| 49.0|
#|group 2|2017-12-31| 65.0|
#|group 2|2018-08-08| 81.0|
#|group 2|2018-09-25| 65.0|
#|group 3|2017-01-01| 75.0|
#|group 3|2018-12-12| 75.0|
#+-------+----------+------+