代码之家  ›  专栏  ›  技术社区  ›  maximodesousadias

如何根据条件删除日期后的记录

  •  0
  • maximodesousadias  · 技术社区  · 10 月前

    我正在寻找一种优雅的方法来删除DataFrame中发生在最新事件之前的所有记录 '测试组件' 存在 “不满意” ,基于他们 'TEST_DT' 每个ID的值。

    例如,给定ID 5000的以下DataFrame:

    | ID   | TEST_ID | TEST_COMPONENT | TEST_DT                       |
    |------|---------|----------------|-------------------------------|
    | 5000 | ENGL    | SATISFACTORY   | 2023-01-04T00:00:00.000+11:00 |
    | 5000 | ENGL    | SATISFACTORY   | 2022-09-07T00:00:00.000+10:00 |
    | 5000 | OTHER   | NONE           | 2022-09-07T00:00:00.000+10:00 |
    | 5000 | ENGL    | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |
    | 5000 | OTHER   | NONE           | 2016-05-23T00:00:00.000+10:00 |
    | 5000 | OTHER   | NONE           | 2016-05-23T00:00:00.000+10:00 |
    | 5000 | OTHER   | NONE           | 2016-02-09T00:00:00.000+11:00 |
    | 5000 | OTHER   | NONE           | 2016-02-09T00:00:00.000+11:00 |
    | 5000 | OTHER   | NONE           | 2016-02-09T00:00:00.000+11:00 |
    | 5000 | ENGL    | UNSATISFACTORY | 2014-05-29T00:00:00.000+10:00 |
    | 5000 | OTHER   | NONE           | 2013-09-27T00:00:00.000+10:00 |
    

    我只想保留最新的行 “不满意” 记录向前。此示例的期望输出为:

    | ID   | TEST_ID | TEST_COMPONENT | TEST_DT                       |
    |------|---------|----------------|-------------------------------|
    | 5000 | ENGL    | SATISFACTORY   | 2023-01-04T00:00:00.000+11:00 |
    | 5000 | ENGL    | SATISFACTORY   | 2022-09-07T00:00:00.000+10:00 |
    | 5000 | OTHER   | NONE           | 2022-09-07T00:00:00.000+10:00 |
    | 5000 | ENGL    | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |
    

    我怎样才能有效地利用 PySpark ?

    2 回复  |  直到 10 月前
        1
  •  0
  •   ManishPrajapati    10 月前

    以下是我使用Window函数的尝试:

    window_spec = Window.partitionBy("ID")
    
    df = df.withColumn("MAX_DATE",F.max(F.when(df['TEST_COMPONENT']=='UNSATISFACTORY',df['TEST_DT']).otherwise(None)).over(window_spec))
    df_drop = df.filter((df['TEST_DT']>F.col('MAX_DATE')) | ((df['TEST_DT']==F.col('MAX_DATE')) & (df['TEST_COMPONENT']=='UNSATISFACTORY')))
    
    df_drop.show(truncate=False)
    
        2
  •  0
  •   maximodesousadias    10 月前

    我通过在PySpark中执行以下3个步骤实现了这一点:

    df_max = df.filter(col("TEST_COMPONENT")=="UNSATISFACTORY")\
    .groupBy("ID")\
    .agg(max("TEST_DT")\
    .alias("LATEST_UNSAT"))
    
    df = df.alias("a").join(\
      df_max.alias("b")\
      ,df.ID == df_max.ID\
      ,"left")
    
    df = df.filter(col("TEST_DT") >= col("LATEST_UNSAT"))
    

    这种方法是有效的,但我愿意听取更好的想法。