对于其他发现这一点并有类似问题的人来说,这似乎是我在与CoPilot反复讨论并修改以适应我的用例后要采用的解决方案。
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, lit
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("Continuous DataFrame Join and Window Calculations") \
.getOrCreate()
df1 = spark.createDataFrame([
("2023-08-18T00:00:00.000+0000", 1, 3, 1),
("2023-08-18T00:10:00.000+0000", 1, 4, 1),
("2023-08-18T00:20:00.000+0000", 1, 3, 1),
("2023-08-18T00:30:00.000+0000", 0, 3, 1),
("2023-08-18T00:40:00.000+0000", 1, 3, 1),
("2023-08-18T00:50:00.000+0000", 0, 3, 1),
("2023-08-18T01:00:00.000+0000", 1, 3, 1)
], ["timestamp", "target", "counter_1", "max_1"])
df2 = spark.createDataFrame([
("2023-08-18T01:10:00.000+0000", 1),
("2023-08-18T01:20:00.000+0000", 1),
("2023-08-18T01:30:00.000+0000", 1),
("2023-08-18T01:40:00.000+0000", 0),
("2023-08-18T01:50:00.000+0000", 1),
("2023-08-18T02:00:00.000+0000", 0)
], ["timestamp", "target"])
window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)
df2 = df2.withColumn("counter_1", lit(None))
df2 = df2.withColumn("max_1", lit(None))
combined_df = df1.orderBy("timestamp", ascending=False).limit(4).union(df2).sort('timestamp')
combined_df = combined_df.withColumn("counter_1", sum("target").over(window_spec))
combined_df = combined_df.withColumn("max_1",max("target").over(window_spec))
df2 = df2.drop("counter_1", "max_1")
df2 = df2.join(combined_df, ["timestamp", "target"], how="left")
result = df1.union(df2)
result.show(truncate=False)
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1 |3 |1 |
|2023-08-18T00:10:00.000+0000|1 |4 |1 |
|2023-08-18T00:20:00.000+0000|1 |3 |1 |
|2023-08-18T00:30:00.000+0000|0 |3 |1 |
|2023-08-18T00:40:00.000+0000|1 |3 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
|2023-08-18T01:10:00.000+0000|1 |3 |1 |
|2023-08-18T01:30:00.000+0000|1 |4 |1 |
|2023-08-18T01:20:00.000+0000|1 |4 |1 |
|2023-08-18T01:40:00.000+0000|0 |4 |1 |
|2023-08-18T02:00:00.000+0000|0 |3 |1 |
|2023-08-18T01:50:00.000+0000|1 |4 |1 |
+----------------------------+------+---------+-----+