代码之家  ›  专栏  ›  技术社区  ›  user2896120

基于条件获取第一行

  •  0
  • user2896120  · 技术社区  · 5 年前

    indicator 列为0。例如,我的数据帧将如下所示:

    network   volume  indicator  Hour
    YYY       20      1          10
    YYY       30      0          9
    YYY       40      0          8
    YYY       80      1          7
    
    TTT       50      0          10
    TTT       40      1          8
    TTT       10      0          4
    TTT       10      1          2
    

    结果应该是这样的:

    network   volume  indicator  Hour
    YYY       20      1          10
    YYY       30      0          9
    YYY       80      1          7
    
    TTT       50      0          10
    TTT       40      1          8
    TTT       10      1          2
    

    0 回复  |  直到 5 年前
        1
  •  1
  •   partha_devArch    5 年前

    这是您所需的代码,带有内联注释以帮助您理解:(使用最新的数据集更新了输出,在指示符列中有多个1)

    sourceData.show()
    
    +-------+------+---------+----+
    |network|volume|indicator|Hour|
    +-------+------+---------+----+
    |    YYY|    20|        1|  10|
    |    YYY|    30|        0|   9|
    |    YYY|    40|        0|   8|
    |    YYY|    80|        1|   7|
    |    TTT|    50|        0|  10|
    |    TTT|    40|        1|   8|
    |    TTT|    10|        0|   4|
    |    TTT|    10|        1|   2|
    +-------+------+---------+----+
    
    
    sourceData.printSchema()
    
    root
      |-- network: string (nullable = true)
      |-- volume: integer (nullable = true)
      |-- indicator: integer (nullable = true)
      |-- Hour: integer (nullable = true)
    

    所需转换代码:

    //splitting your data set into two parts with indicator 1 and 0
    val indicator1Df = sourceData.filter("indicator == 1")
    val indicator0Df = sourceData.filter("indicator == 0")
    
    //getting the first row for all indicator=0
    indicator0Df.createOrReplaceTempView("indicator0")
    val firstIndicator0df = spark.sql("select network, volume, indicator, hour from (select i0.network,i0.volume,i0.indicator,i0.hour,ROW_NUMBER() over (partition by i0.network order by i0.Hour desc) as rnk from indicator0 i0) i where rnk = 1")
    
    //merging both the dataframes back to for your required output result
    val finalDf = indicator1Df.union(firstIndicator0df).orderBy($"network".desc,$"Hour".desc)
    
    finalDf.show()
    

    最终输出:

    +-------+------+---------+----+
    |network|volume|indicator|Hour|
    +-------+------+---------+----+
    |    YYY|    20|        1|  10|
    |    YYY|    30|        0|   9|
    |    YYY|    80|        1|   7|
    |    TTT|    50|        0|  10|
    |    TTT|    40|        1|   8|
    |    TTT|    10|        1|   2|
    +-------+------+---------+----+