代码之家  ›  专栏  ›  技术社区  ›  Abhishek Choudhary

如何更新数组列?

  •  4
  • Abhishek Choudhary  · 技术社区  · 6 年前

    我知道我们可以替换dataframe列中的值,并使用以下方法返回更新值的新dataframe:

    dataframe.withColumn("col1",when(col("col1").equalTo("this"),"that").otherwise(col("make")))
    

    但这将在需要时更改整个列的值。

    现在我有一个稍微复杂的数据帧:

    |        colleagues|   name|
    
    |[guy1, guy2, guy3]|Thisguy|
    |[guy4, guy5, guy6]|Thatguy|
    |[guy7, guy8, guy9]|Someguy|
    

    这里有一个“同事”列,其中包含数组。我想替换任何数组中的一个特定元素,例如,我想在新的数据帧中替换第一行中的'guy2',而不是'guy10' 我怎样才能做到这一点?请帮忙。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Community CDub    4 年前

    介绍

    在提供最终解决方案之前,有几个问题需要回答(例如,图中元素的顺序) colleagues 数组),但我不想拖太长时间。让我们看看解决此类问题的常用方法。

    解决方案

    自从 同事 column是一个数组列(在查询行时非常有效) explode (或 posexplode )是的。对于每个数组元素的行,您可以进行必要的更改,最终 collect_list 返回数组列。

    分解(e:列):列 为给定数组或映射列中的每个元素创建新行。

    posexplode(e:列):列 为每个元素在给定数组或映射列中的位置创建新行。

    让我们使用下面的 names 数据集:

    val names = Seq((Array("guy1", "guy2", "guy3"), "Thisguy")).toDF("colleagues", "name")
    scala> names.show
    +------------------+-------+
    |        colleagues|   name|
    +------------------+-------+
    |[guy1, guy2, guy3]|Thisguy|
    +------------------+-------+
    scala> names.printSchema
    root
     |-- colleagues: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- name: string (nullable = true)
    

    让我们 爆炸 ,做些改变,最后 收集清单 .

    val elements = names.withColumn("elements", explode($"colleagues"))
    scala> elements.show
    +------------------+-------+--------+
    |        colleagues|   name|elements|
    +------------------+-------+--------+
    |[guy1, guy2, guy3]|Thisguy|    guy1|
    |[guy1, guy2, guy3]|Thisguy|    guy2|
    |[guy1, guy2, guy3]|Thisguy|    guy3|
    +------------------+-------+--------+
    

    这就是Spark SQL可以轻松处理的问题。让我们使用 regexp_replace (什么?regexp?!现在你有两个问题:))。

    val replaced = elements.withColumn("replaced", regexp_replace($"elements", "guy2", "guy10"))
    scala> replaced.show
    +------------------+-------+--------+--------+
    |        colleagues|   name|elements|replaced|
    +------------------+-------+--------+--------+
    |[guy1, guy2, guy3]|Thisguy|    guy1|    guy1|
    |[guy1, guy2, guy3]|Thisguy|    guy2|   guy10|
    |[guy1, guy2, guy3]|Thisguy|    guy3|    guy3|
    +------------------+-------+--------+--------+
    

    最后,让我们按初始数组列分组并使用 收集清单 分组功能。

    val solution = replaced
      .groupBy($"colleagues" as "before")
      .agg(
        collect_list("replaced") as "after",
        first("name") as "name")
    scala> solution.show
    +------------------+-------------------+-------+
    |            before|              after|   name|
    +------------------+-------------------+-------+
    |[guy1, guy2, guy3]|[guy1, guy10, guy3]|Thisguy|
    +------------------+-------------------+-------+
    

    替代解决方案

    用户定义函数(UDF)

    或者,您也可以编写一个自定义的用户定义函数,但它不会像上面的解决方案那样从众多优化中受益,因此我不推荐它(并且只会在请求时显示)。

    自定义逻辑运算符

    最好的方法是编写一个自定义逻辑运算符(a) LogicalPlan )这将完成所有这些并参与优化,但避免交换(由 groupBy ).然而,这将是一个相当先进的Spark开发,我还没有完成。