代码之家  ›  专栏  ›  技术社区  ›  VNourdin

从UDF内的Spark SQL行提取嵌套数组

  •  0
  • VNourdin  · 技术社区  · 7 年前

    我正在处理数据帧,需要提取数据。 我有很多嵌套的级别,所以我用分解和选择来做第一个级别,但是我使用UDF来做嵌套的级别。

    我有个自助餐 $"Root.Obj" ,这是一个数组,我希望它返回一个数组[myobj]。
    我的输出类:

    case class MyObj(fieldA: Boolean, fieldB: String, fieldC: Array[MyNested])
    case class MyNested(field1: Long, field2: String)
    

    简而言之,这是输入模式:

     |-- Obj: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- FieldA: boolean (nullable = true)
     |    |    |-- FieldB: string (nullable = true)
     |    |    |-- FieldC: array (nullable = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- Field1: long (nullable = true)
     |    |    |    |    |-- Field2: string (nullable = true)
     |    |    |-- FieldD: boolean (nullable = true)
    

    我的自定义项:

    def extractObjs: UserDefinedFunction = udf {
      (objs: Seq[Row]) ⇒
        objs.map {
          obj ⇒
            MyObj(
              obj.getAs[Boolean]("FieldA"),
              obj.getAs[String]("FieldB"),
              extractNested(obj.get???("FieldC"))
            )
        }
    }
    
    def extractNested(nesteds: ???): Array[MyNested] = {
      ???
    }
    

    这是更复杂的IRL,因为我需要从其他地方检索值,并且有更多的嵌套数组。 此外,obj和fieldc的输入结构比这里复杂得多,我不能(或者不想)为它们创建case类。因为我需要在多个地方执行此操作,所以假设我不知道fieldc元素的“结构”。

    我的问题是提取“fieldc”数组。我想要一个seq[row]但是我不能实现,getstruct只给我一行,getseq[row]在后面抛出错误,因为 scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row .

    1 回复  |  直到 7 年前
        1
  •  0
  •   Raphael Roth    7 年前

    结构映射到 Row 在UDF中,因此可以通过 Seq[Row] 例如:

    def extractObjs: UserDefinedFunction = udf {
      (objs: Seq[Row]) ⇒
        objs.map {
          obj ⇒
            MyObj(
              obj.getAs[Boolean]("FieldA"),
              obj.getAs[String]("FieldB"),
              extractNested(obj.getAs[Seq[Row]]("FieldC"))
            )
        }
    }
    
    def extractNested(nesteds: Seq[Row]): Array[MyNested] = {
      nesteds.map(r => MyNested(r.getAs[Long]("Field1"),r.getAs[String]("Field2"))).toArray
    }