我有一个流式数据帧,它可以查看以下内容:
+--------------------+--------------------+
| owner| fruits|
+--------------------+--------------------+
|Brian | apple|
Brian | pear |
Brian | date|
Brian | avocado|
Bob | avocado|
Bob | apple|
........
+--------------------+--------------------+
我做了一个Groupby,Agg Collect_List来清理东西。
val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")
输出是每个所有者的一行和每个水果的一个数组。
现在,我想将这个清理过的数组加入到原始的流式数据帧中,除去水果列,只保留水果列
val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")
这似乎在我的头脑中起作用,但火花似乎不同意。
我得到一个
Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
+- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]
当我把所有东西变成静态数据帧时,它工作得很好。在流上下文中,这是不可能的吗?