我将举一个类似的例子,但与你的例子不同。。
如果你想在2个滞后列上应用concat,你可以按照以下两个步骤进行。。。
1) 应用滞后函数
2) 然后是海螺。
您不能同时对两个滞后柱应用concat。。。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
var customers = spark.sparkContext.parallelize(List(("Alice", "click","item_8", 50),
("Alice", "view","item_2", 55),
("Alice", "share","item_11", 100),
("Bob", "view","item_11", 25),
("Bob", "share","ietm_2", 50),
("Bob", "view", "item_8",65))).toDF("name", "event", "item", "time")
customers.show
val wSpec3 = Window.partitionBy("name").orderBy("time")
customers.withColumn(
"prev_event", lag(col("event"),1).over(wSpec3)
).withColumn(
"prev_item", lag(col("item"),1).over(wSpec3)
).withColumn(
"prev_time", lag(col("time"),1).over(wSpec3)
).withColumn("newcolumn", concat( 'prev_event, 'prev_item)).show
结果:
+-----+-----+-------+----+
| name|event| item|time|
+-----+-----+-------+----+
|Alice|click| item_8| 50|
|Alice| view| item_2| 55|
|Alice|share|item_11| 100|
| Bob| view|item_11| 25|
| Bob|share| ietm_2| 50|
| Bob| view| item_8| 65|
+-----+-----+-------+----+
+-----+-----+-------+----+----------+---------+---------+-----------+
| name|event| item|time|prev_event|prev_item|prev_time| newcolumn|
+-----+-----+-------+----+----------+---------+---------+-----------+
| Bob| view|item_11| 25| null| null| null| null|
| Bob|share| ietm_2| 50| view| item_11| 25|viewitem_11|
| Bob| view| item_8| 65| share| ietm_2| 50|shareietm_2|
|Alice|click| item_8| 50| null| null| null| null|
|Alice| view| item_2| 55| click| item_8| 50|clickitem_8|
|Alice|share|item_11| 100| view| item_2| 55| viewitem_2|
+-----+-----+-------+----+----------+---------+---------+-----------+