介绍
在提供最终解决方案之前,有几个问题需要回答(例如,图中元素的顺序)
colleagues
数组),但我不想拖太长时间。让我们看看解决此类问题的常用方法。
解决方案
自从
同事
column是一个数组列(在查询行时非常有效)
explode
(或
posexplode
)是的。对于每个数组元素的行,您可以进行必要的更改,最终
collect_list
返回数组列。
分解(e:列):列
为给定数组或映射列中的每个元素创建新行。
posexplode(e:列):列
为每个元素在给定数组或映射列中的位置创建新行。
让我们使用下面的
names
数据集:
val names = Seq((Array("guy1", "guy2", "guy3"), "Thisguy")).toDF("colleagues", "name")
scala> names.show
+------------------+-------+
| colleagues| name|
+------------------+-------+
|[guy1, guy2, guy3]|Thisguy|
+------------------+-------+
scala> names.printSchema
root
|-- colleagues: array (nullable = true)
| |-- element: string (containsNull = true)
|-- name: string (nullable = true)
让我们
爆炸
,做些改变,最后
收集清单
.
val elements = names.withColumn("elements", explode($"colleagues"))
scala> elements.show
+------------------+-------+--------+
| colleagues| name|elements|
+------------------+-------+--------+
|[guy1, guy2, guy3]|Thisguy| guy1|
|[guy1, guy2, guy3]|Thisguy| guy2|
|[guy1, guy2, guy3]|Thisguy| guy3|
+------------------+-------+--------+
这就是Spark SQL可以轻松处理的问题。让我们使用
regexp_replace
(什么?regexp?!现在你有两个问题:))。
val replaced = elements.withColumn("replaced", regexp_replace($"elements", "guy2", "guy10"))
scala> replaced.show
+------------------+-------+--------+--------+
| colleagues| name|elements|replaced|
+------------------+-------+--------+--------+
|[guy1, guy2, guy3]|Thisguy| guy1| guy1|
|[guy1, guy2, guy3]|Thisguy| guy2| guy10|
|[guy1, guy2, guy3]|Thisguy| guy3| guy3|
+------------------+-------+--------+--------+
最后,让我们按初始数组列分组并使用
收集清单
分组功能。
val solution = replaced
.groupBy($"colleagues" as "before")
.agg(
collect_list("replaced") as "after",
first("name") as "name")
scala> solution.show
+------------------+-------------------+-------+
| before| after| name|
+------------------+-------------------+-------+
|[guy1, guy2, guy3]|[guy1, guy10, guy3]|Thisguy|
+------------------+-------------------+-------+
替代解决方案
用户定义函数(UDF)
或者,您也可以编写一个自定义的用户定义函数,但它不会像上面的解决方案那样从众多优化中受益,因此我不推荐它(并且只会在请求时显示)。
自定义逻辑运算符
最好的方法是编写一个自定义逻辑运算符(a)
LogicalPlan
)这将完成所有这些并参与优化,但避免交换(由
groupBy
).然而,这将是一个相当先进的Spark开发,我还没有完成。