代码之家  ›  专栏  ›  技术社区  ›  wttttt

Spark scala数据框udf返回行

  •  9
  • wttttt  · 技术社区  · 7 年前

    假设我有一个数据框,其中包含一列(称为colA),该列是行的序列。我想在colA的每条记录中添加一个新字段。(新字段与以前的记录关联,因此我必须编写一个udf。) 我应该如何编写这个udf?

    我尝试编写一个udf,它将colA作为输入,并输出Seq[行],其中每条记录都包含新字段。但问题是udf无法返回Seq[行]/例外是“org类型的架构”。阿帕奇。火花sql。不支持行“”。 我该怎么办?

    我写的udf: val convert = udf[Seq[Row], Seq[Row]](blablabla...) java是个例外。lang.UnsupportedOperationException:类型org的架构。阿帕奇。火花sql。不支持行

    2 回复  |  直到 7 年前
        1
  •  21
  •   Raphael Roth    7 年前

    自spark 2.0以来,您可以创建返回 Row / Seq[Row] ,但必须为返回类型提供架构,例如,如果使用一个double数组:

    val schema = ArrayType(DoubleType)
    
    val myUDF = udf((s: Seq[Row]) => {
      s // just pass data without modification
    }, schema)
    

    但我真的无法想象这在哪里有用,我宁愿从UDF返回元组或case类(或其Seq)。

    编辑:如果您的行包含22个以上的字段(元组/case类的字段限制),则该选项可能很有用

        2
  •  0
  •   Iraj Hedayati    3 年前

    这是一个老问题,我只是想根据Spark的新版本进行更新。

    自从Spark 3.0.0以来,@Raphael Roth提到的方法就被弃用了。因此,您可能会得到 AnalysisException 。原因是使用此方法的输入闭包没有类型检查,并且当涉及到 null 价值观

    如果你真的知道自己在做什么,你需要 spark.sql.legacy.allowUntypedScalaUDF 配置到 true

    另一个解决方案是使用 case class 而不是架构。例如

    case class Foo(field1: String, field2: String)
    
    val convertFunction: Seq[Row] => Seq[Foo] = input => {
        input.map {
            x => // do something with x and convert to Foo
        }
    }
    
    val myUdf = udf(convertFunction)