代码之家  ›  专栏  ›  技术社区  ›  smaug

如何连接数据帧中前一行的列?

  •  0
  • smaug  · 技术社区  · 4 年前

    我有一个这样的数据帧:

    case class CC(id: String, p2: Double, p3: Double, time: Int)
    val df = List(
      CC("a", 1.1d, 2.2d, 1),
      CC("b", 3.3d, 4.4d, 2),
      CC("c", 5.5d, 6.6d, 3)).toDF
    
    +---+---+---+----+
    | id| p2| p3|time|
    +---+---+---+----+
    |  a|1.1|2.2|   1|
    |  b|3.3|4.4|   2|
    |  c|5.5|6.6|   3|
    +---+---+---+----+
    

    我想连接 p2 p3 前一行和列中的位置 p5 并连接 p2 p3 当前行和列中的位置 p6 。要获取:

        +---+---+---+----+---------+---------+
        | id| p2| p3|time| p5      | p6      |
        +---+---+---+----+---------+---------+
        |  a|1.1|2.2|   1|         |1.1: 2.2 |
        |  b|3.3|4.4|   2|1.1: 2.2 |3.3: 4.4 |
        |  c|5.5|6.6|   3|3.3: 4.4 |5.5: 6.6 |
        +---+---+---+----+---------+---------+
    

    对于当前行,即。 p6 我可以很容易地使用

    .withColumn("p6", concat(col("p2"), col("p3")))
    

    对于前一行,我考虑过使用窗口函数 lag 如下图所示,但它不起作用。

    val wf = Window.partitionBy("id").orderBy("time")
    df.withColumn("p5", concat(lag(col("p2"), 1) + lag("p3", 1)).over(w))
    

    但我得到了那个表达式的错误 concat... 窗口函数中不支持。一些 StackOverflow answers 谈到使用用户定义的聚合函数,但我找不到一个可以遵循的简单示例。

    对这个问题的任何解释都非常感谢。如果你知道,请建议其他方法来解决这个问题。谢谢!

    0 回复  |  直到 4 年前
        1
  •  3
  •   Ram Ghadiyaram    4 年前

    我将举一个类似的例子,但与你的例子不同。。 如果你想在2个滞后列上应用concat,你可以按照以下两个步骤进行。。。 1) 应用滞后函数 2) 然后是海螺。

    您不能同时对两个滞后柱应用concat。。。

       import org.apache.spark.sql.expressions.Window
      import org.apache.spark.sql.functions._
    
      var customers = spark.sparkContext.parallelize(List(("Alice", "click","item_8", 50),
        ("Alice", "view","item_2", 55),
        ("Alice", "share","item_11", 100),
        ("Bob", "view","item_11", 25),
        ("Bob", "share","ietm_2", 50),
        ("Bob", "view", "item_8",65))).toDF("name", "event", "item", "time")
      customers.show
    
      val wSpec3 = Window.partitionBy("name").orderBy("time")
      customers.withColumn(
        "prev_event", lag(col("event"),1).over(wSpec3)
      ).withColumn(
        "prev_item", lag(col("item"),1).over(wSpec3)
      ).withColumn(
        "prev_time", lag(col("time"),1).over(wSpec3)
      ).withColumn("newcolumn", concat( 'prev_event, 'prev_item)).show
    
    

    结果:

    +-----+-----+-------+----+
    | name|event|   item|time|
    +-----+-----+-------+----+
    |Alice|click| item_8|  50|
    |Alice| view| item_2|  55|
    |Alice|share|item_11| 100|
    |  Bob| view|item_11|  25|
    |  Bob|share| ietm_2|  50|
    |  Bob| view| item_8|  65|
    +-----+-----+-------+----+
    
    +-----+-----+-------+----+----------+---------+---------+-----------+
    | name|event|   item|time|prev_event|prev_item|prev_time|  newcolumn|
    +-----+-----+-------+----+----------+---------+---------+-----------+
    |  Bob| view|item_11|  25|      null|     null|     null|       null|
    |  Bob|share| ietm_2|  50|      view|  item_11|       25|viewitem_11|
    |  Bob| view| item_8|  65|     share|   ietm_2|       50|shareietm_2|
    |Alice|click| item_8|  50|      null|     null|     null|       null|
    |Alice| view| item_2|  55|     click|   item_8|       50|clickitem_8|
    |Alice|share|item_11| 100|      view|   item_2|       55| viewitem_2|
    +-----+-----+-------+----+----------+---------+---------+-----------+