代码之家  ›  专栏  ›  技术社区  ›  Aaron Brazier

连接2个pyspark数据帧并继续运行窗口sum和max

  •  0
  • Aaron Brazier  · 技术社区  · 1 年前

    我有两个spark数据帧

    数据帧_1:

    +----------------------------+------+---------+-----+
    |timestamp                   |target|counter_1|max_1|
    +----------------------------+------+---------+-----+
    |2023-08-18T00:00:00.000+0000|0     |0        |0    |
    |2023-08-18T00:10:00.000+0000|1     |1        |1    |
    |2023-08-18T00:20:00.000+0000|1     |2        |1    |
    |2023-08-18T00:30:00.000+0000|0     |2        |1    |
    |2023-08-18T00:40:00.000+0000|1     |3        |1    |
    |2023-08-18T00:50:00.000+0000|0     |3        |1    |
    |2023-08-18T01:00:00.000+0000|1     |3        |1    |
    +----------------------------+------+---------+-----+
    

    数据帧_2:

    +----------------------------+------+
    |timestamp                   |target|
    +----------------------------+------+
    |2023-08-18T01:10:00.000+0000|1     |
    |2023-08-18T01:20:00.000+0000|1     |
    |2023-08-18T01:30:00.000+0000|1     |
    |2023-08-18T01:40:00.000+0000|0     |
    |2023-08-18T01:50:00.000+0000|1     |
    |2023-08-18T02:00:00.000+0000|0     |
    +----------------------------+------+
    

    您可以从时间戳中看到,两个数据帧都是10分钟的聚合数据,dataframe_2是下一个按顺序排列的数据量(假设每隔几个小时实时运行一次)。

    我想连接这两个数据帧,但保留窗口总和和窗口最大值的计算。

    我正在计算两列,称为counter_1和max_1。这些只是窗口总和和窗口最大值。

    我使用的窗口是(但实际上它可以是任意数量的行):

    window = (Window.partitionBy().orderBy("timestamp").rowsBetween(-4, 0))

    预期产出如下:

    +----------------------------+------+---------+-----+
    |timestamp                   |target|counter_1|max_1|
    +----------------------------+------+---------+-----+
    |2023-08-18T00:00:00.000+0000|0     |0        |0    |
    |2023-08-18T00:10:00.000+0000|1     |1        |1    |
    |2023-08-18T00:20:00.000+0000|1     |2        |1    |
    |2023-08-18T00:30:00.000+0000|0     |2        |1    |
    |2023-08-18T00:40:00.000+0000|1     |3        |1    |
    |2023-08-18T00:50:00.000+0000|0     |3        |1    |
    |2023-08-18T01:00:00.000+0000|1     |3        |1    |
    |2023-08-18T01:10:00.000+0000|1     |3        |1    |
    |2023-08-18T01:20:00.000+0000|1     |4        |1    |
    |2023-08-18T01:30:00.000+0000|1     |4        |1    |
    |2023-08-18T01:40:00.000+0000|0     |4        |1    |
    |2023-08-18T01:50:00.000+0000|1     |4        |1    |
    |2023-08-18T02:00:00.000+0000|0     |3        |1    |
    +----------------------------+------+---------+-----+
    

    我尝试了多种方法来对数据进行分组,并聚合原始数据帧的总和和最大值,但我并没有成功。

    编辑3/29/24

    忘了提一下从设计的角度来看这应该是如何实际工作的。

    对于counter_1和max_1,第一个数据帧并不总是必须从0开始。可能存在将那些值设置为counter_1的窗口max或max_1的窗口1的先前数据。

    这方面的一个例子是作为数据帧1:

    +----------------------------+------+---------+-----+
    |timestamp                   |target|counter_1|max_1|
    +----------------------------+------+---------+-----+
    |2023-08-18T00:00:00.000+0000|1     |3        |1    |
    |2023-08-18T00:10:00.000+0000|1     |4        |1    |
    |2023-08-18T00:20:00.000+0000|1     |3        |1    |
    |2023-08-18T00:30:00.000+0000|0     |3        |1    |
    |2023-08-18T00:40:00.000+0000|1     |3        |1    |
    |2023-08-18T00:50:00.000+0000|0     |3        |1    |
    |2023-08-18T01:00:00.000+0000|1     |3        |1    |
    +----------------------------+------+---------+-----+
    

    这是数据帧2:

    +----------------------------+------+
    |时间戳|目标|
    +----------------------------+------+
    |2023-08-1801:10:00.000+0000|1|
    |2023-08-1801:20:00.000+0000|1|
    |2023-08-1801:30:00.000+0000|1|
    |2023-08-1801:40:00.000+0000|0|
    |2023-08-1801:50:0.000+0000|1|
    |2023-08-18t2:00:00.000+0000|0|
    +----------------------------+------+
    

    如果使用以下解决方案,则会得到以下结果:

    +----------------------------+------+---------+-----+
    |timestamp                   |target|counter_1|max_1|
    +----------------------------+------+---------+-----+
    |2023-08-18T00:00:00.000+0000|1     |1        |1    |
    |2023-08-18T00:10:00.000+0000|1     |2        |1    |
    |2023-08-18T00:20:00.000+0000|1     |3        |1    |
    |2023-08-18T00:30:00.000+0000|0     |3        |1    |
    |2023-08-18T00:40:00.000+0000|1     |4        |1    |
    |2023-08-18T00:50:00.000+0000|0     |3        |1    |
    |2023-08-18T01:00:00.000+0000|1     |3        |1    |
    |2023-08-18T01:10:00.000+0000|1     |3        |1    |
    |2023-08-18T01:20:00.000+0000|1     |4        |1    |
    |2023-08-18T01:30:00.000+0000|1     |4        |1    |
    |2023-08-18T01:40:00.000+0000|0     |4        |1    |
    |2023-08-18T01:50:00.000+0000|1     |4        |1    |
    |2023-08-18T02:00:00.000+0000|0     |3        |1    |
    +----------------------------+------+---------+-----+
    

    这是不正确的,因为理想情况下,第一个数据帧不应该重新计算值,因为计算出的值只是想添加到继续窗口的新数据上。

    这两者的预期结果是:

    +----------------------------+------+---------+-----+
    |timestamp                   |target|counter_1|max_1|
    +----------------------------+------+---------+-----+
    |2023-08-18T00:00:00.000+0000|1     |3        |1    |
    |2023-08-18T00:10:00.000+0000|1     |4        |1    |
    |2023-08-18T00:20:00.000+0000|1     |3        |1    |
    |2023-08-18T00:30:00.000+0000|0     |3        |1    |
    |2023-08-18T00:40:00.000+0000|1     |3        |1    |
    |2023-08-18T00:50:00.000+0000|0     |3        |1    |
    |2023-08-18T01:00:00.000+0000|1     |3        |1    |
    |2023-08-18T01:10:00.000+0000|1     |3        |1    |
    |2023-08-18T01:20:00.000+0000|1     |4        |1    |
    |2023-08-18T01:30:00.000+0000|1     |4        |1    |
    |2023-08-18T01:40:00.000+0000|0     |4        |1    |
    |2023-08-18T01:50:00.000+0000|1     |4        |1    |
    |2023-08-18T02:00:00.000+0000|0     |3        |1    |
    +----------------------------+------+---------+-----+
    
    0 回复  |  直到 1 年前
        1
  •  0
  •   Prathik Kini    1 年前
    from pyspark.sql.functions import lit,sum,max
    from pyspark.sql.window import Window
    
    window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)
    
    dataframe_2 = dataframe_2.withColumn("counter_1", lit(0)).withColumn("max_1", lit(0))
    union_df = dataframe_1.union(dataframe_2)
    
    # Calculate the running sum and max over the window
    result_df = union_df \
        .withColumn("counter_1", sum("target").over(window_spec)) \
        .withColumn("max_1", max("target").over(window_spec))
    
    results.show(truncate=False)
    
    +----------------------------+------+---------+-----+
    |timestamp                   |target|counter_1|max_1|
    +----------------------------+------+---------+-----+
    |2023-08-18T00:00:00.000+0000|0     |0        |0    |
    |2023-08-18T00:10:00.000+0000|1     |1        |1    |
    |2023-08-18T00:20:00.000+0000|1     |2        |1    |
    |2023-08-18T00:30:00.000+0000|0     |2        |1    |
    |2023-08-18T00:40:00.000+0000|1     |3        |1    |
    |2023-08-18T00:50:00.000+0000|0     |3        |1    |
    |2023-08-18T01:00:00.000+0000|1     |3        |1    |
    |2023-08-18T01:10:00.000+0000|1     |3        |1    |
    |2023-08-18T01:20:00.000+0000|1     |4        |1    |
    |2023-08-18T01:30:00.000+0000|1     |4        |1    |
    |2023-08-18T01:40:00.000+0000|0     |4        |1    |
    |2023-08-18T01:50:00.000+0000|1     |4        |1    |
    |2023-08-18T02:00:00.000+0000|0     |3        |1    |
    +----------------------------+------+---------+-----+
    
        2
  •  0
  •   Aaron Brazier    1 年前

    对于其他发现这一点并有类似问题的人来说,这似乎是我在与CoPilot反复讨论并修改以适应我的用例后要采用的解决方案。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum, max, lit
    from pyspark.sql.window import Window
    
    # Initialize SparkSession
    spark = SparkSession.builder \
        .appName("Continuous DataFrame Join and Window Calculations") \
        .getOrCreate()
    
    # Define the dataframes
    df1 = spark.createDataFrame([
        ("2023-08-18T00:00:00.000+0000", 1, 3, 1),
        ("2023-08-18T00:10:00.000+0000", 1, 4, 1),
        ("2023-08-18T00:20:00.000+0000", 1, 3, 1),
        ("2023-08-18T00:30:00.000+0000", 0, 3, 1),
        ("2023-08-18T00:40:00.000+0000", 1, 3, 1),
        ("2023-08-18T00:50:00.000+0000", 0, 3, 1),
        ("2023-08-18T01:00:00.000+0000", 1, 3, 1)
    ], ["timestamp", "target", "counter_1", "max_1"])
    
    df2 = spark.createDataFrame([
        ("2023-08-18T01:10:00.000+0000", 1),
        ("2023-08-18T01:20:00.000+0000", 1),
        ("2023-08-18T01:30:00.000+0000", 1),
        ("2023-08-18T01:40:00.000+0000", 0),
        ("2023-08-18T01:50:00.000+0000", 1),
        ("2023-08-18T02:00:00.000+0000", 0)
    ], ["timestamp", "target"])
    
    # Define the window specification
    window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)
    
    # Add 'counter_1' and 'max_1' columns to df2
    df2 = df2.withColumn("counter_1", lit(None))
    df2 = df2.withColumn("max_1", lit(None))
    
    # Combine the last 4 rows of df1 and all rows of df2
    combined_df = df1.orderBy("timestamp", ascending=False).limit(4).union(df2).sort('timestamp')
    
    # Calculate 'counter_1' and 'max_1' for the combined dataframe
    combined_df = combined_df.withColumn("counter_1", sum("target").over(window_spec))
    combined_df = combined_df.withColumn("max_1",max("target").over(window_spec))
    
    # Replace the 'counter_1' and 'max_1' values in df2 with the calculated values
    df2 = df2.drop("counter_1", "max_1")
    df2 = df2.join(combined_df, ["timestamp", "target"], how="left")
    # Concatenate df1 and df2
    result = df1.union(df2)
    result.show(truncate=False)
    
    +----------------------------+------+---------+-----+
    |timestamp                   |target|counter_1|max_1|
    +----------------------------+------+---------+-----+
    |2023-08-18T00:00:00.000+0000|1     |3        |1    |
    |2023-08-18T00:10:00.000+0000|1     |4        |1    |
    |2023-08-18T00:20:00.000+0000|1     |3        |1    |
    |2023-08-18T00:30:00.000+0000|0     |3        |1    |
    |2023-08-18T00:40:00.000+0000|1     |3        |1    |
    |2023-08-18T00:50:00.000+0000|0     |3        |1    |
    |2023-08-18T01:00:00.000+0000|1     |3        |1    |
    |2023-08-18T01:10:00.000+0000|1     |3        |1    |
    |2023-08-18T01:30:00.000+0000|1     |4        |1    |
    |2023-08-18T01:20:00.000+0000|1     |4        |1    |
    |2023-08-18T01:40:00.000+0000|0     |4        |1    |
    |2023-08-18T02:00:00.000+0000|0     |3        |1    |
    |2023-08-18T01:50:00.000+0000|1     |4        |1    |
    +----------------------------+------+---------+-----+