代码之家  ›  专栏  ›  技术社区  ›  stack0114106

spark udf返回相同元素为结构数组抛出错误

  •  0
  • stack0114106  · 技术社区  · 7 年前

    val df = spark.read.format("csv").load("trans.txt").toDF("id", "dt", "amt")
    val df2 = df.groupBy("id").agg(collect_list(struct('dt,'amt)).as("trans_vec"))
    df2.show(false)
    df2.printSchema()
    
    def gen_rows(x:Seq[(String,String)]):Seq[(String,String)]={
      x
    }
    val udf_gen_rows = udf( gen_rows(_:Seq[(String,String)]):Seq[(String,String)] )
    
    df2.withColumn("row_number",udf_gen_rows('trans_vec)).show(false)
    

    它抛出下面的错误

    Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(trans_vec)' due to data type mismatch: argument 1 requires array<struct<_1:string,_2:string>> type, however, '`trans_vec`' is of array<struct<dt:string,amt:string>> type.;;
    

    如何解决这个问题?。

    1 回复  |  直到 7 年前
        1
  •  -1
  •   stack0114106    7 年前

    如果以下代码由替换而来,则它可以工作

    val df2 = df.groupBy("id").agg(collect_list(struct('dt,'amt)).as("trans_vec"))
    

    val df2 = df.groupBy("id").agg(collect_list(struct('dt.as("_1"),'amt.as("_2")).as("trans_vec"))
    

    看起来UDF只分配了_1,_2。。对于传递给它的结构数组,依此类推。 稍后,您可以使用select()命令重命名它们

    推荐文章