代码之家  ›  专栏  ›  技术社区  ›  Martin Senne

SparkSQL:如何处理用户定义函数中的空值?

  •  33
  • Martin Senne  · 技术社区  · 10 年前

    给定表1中有一列“x”,类型为String。 我想创建表2,其中列“y”是“x”中给定的日期字符串的整数表示。

    本质的 就是要保持 null “y”列中的值。

    表1(数据帧df1):

    +----------+
    |         x|
    +----------+
    |2015-09-12|
    |2015-09-13|
    |      null|
    |      null|
    +----------+
    root
     |-- x: string (nullable = true)
    

    表2(数据帧df2):

    +----------+--------+                                                                  
    |         x|       y|
    +----------+--------+
    |      null|    null|
    |      null|    null|
    |2015-09-12|20150912|
    |2015-09-13|20150913|
    +----------+--------+
    root
     |-- x: string (nullable = true)
     |-- y: integer (nullable = true)
    

    而用于将列“x”的值转换为列“y”的值的用户定义函数(udf)是:

    val extractDateAsInt = udf[Int, String] (
      (d:String) => d.substring(0, 10)
          .filterNot( "-".toSet)
          .toInt )
    

    并且有效,处理空值是不可能的。

    尽管如此,我还是可以做一些

    val extractDateAsIntWithNull = udf[Int, String] (
      (d:String) => 
        if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
        else 1 )
    

    我没有办法“生产” 无效的 通过udfs的值(当然 Int s不能是 无效的 ).

    我当前创建df2(表2)的解决方案如下:

    // holds data of table 1  
    val df1 = ... 
    
    // filter entries from df1, that are not null
    val dfNotNulls = df1.filter(df1("x")
      .isNotNull)
      .withColumn("y", extractDateAsInt(df1("x")))
      .withColumnRenamed("x", "right_x")
    
    // create df2 via a left join on df1 and dfNotNull having 
    val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")
    

    问题 :

    • 当前的解决方案似乎很麻烦(而且可能效率不高)。有更好的方法吗?
    • @Spark开发人员:是否有一种类型 NullableInt 计划的/可用的,这样以下udf是可能的(参见代码摘录)?

    代码摘录

    val extractDateAsNullableInt = udf[NullableInt, String] (
      (d:String) => 
        if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
        else null )
    
    3 回复  |  直到 8 年前
        1
  •  58
  •   Community CDub    8 年前

    这里是 Option 使用方便:

    val extractDateAsOptionInt = udf((d: String) => d match {
      case null => None
      case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
    })
    

    或使其在一般情况下稍微更安全:

    import scala.util.Try
    
    val extractDateAsOptionInt = udf((d: String) => Try(
      d.substring(0, 10).filterNot("-".toSet).toInt
    ).toOption)
    

    所有的功劳都归于 Dmitriy Selivanov 谁指出这个解决方案是(缺失的?)编辑 here .

    替代方案是处理 null UDF之外:

    import org.apache.spark.sql.functions.{lit, when}
    import org.apache.spark.sql.types.IntegerType
    
    val extractDateAsInt = udf(
       (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
    )
    
    df.withColumn("y",
      when($"x".isNull, lit(null))
        .otherwise(extractDateAsInt($"x"))
        .cast(IntegerType)
    )
    
        2
  •  15
  •   tristanbuckner    9 年前

    Scala实际上有一个很好的工厂函数Option(),它可以使这个函数更加简洁:

    val extractDateAsOptionInt = udf((d: String) => 
      Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))
    

    在内部,Option对象的apply方法只是为您执行空检查:

    def apply[A](x: A): Option[A] = if (x == null) None else Some(x)
    
        3
  •  11
  •   Martin Senne    10 年前

    补充代码

    使用 美好的 @zero323的答案,我创建了以下代码,以使用用户定义的函数来处理所述的空值。希望,这对别人有帮助!

    /**
     * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
     * handle `null` values.
     */
    object NullableFunctions {
    
      import org.apache.spark.sql.functions._
      import scala.reflect.runtime.universe.{TypeTag}
      import org.apache.spark.sql.UserDefinedFunction
    
      /**
       * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
       *   * if fnc input is null, None is returned. This will create a null value in the output Spark column.
       *   * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
       * @param f function from A1 => RT
       * @tparam RT return type
       * @tparam A1 input parameter type
       * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
       */
      def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
        udf[Option[RT],A1]( (i: A1) => i match {
          case null => None
          case s => Some(f(i))
        })
      }
    
      /**
       * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
       *   * if on of the function input parameters is null, None is returned.
       *     This will create a null value in the output Spark column.
       *   * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
       *     as value in the output column.
       * @param f function from A1 => RT
       * @tparam RT return type
       * @tparam A1 input parameter type
       * @tparam A2 input parameter type
       * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
       */
      def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
        udf[Option[RT], A1, A2]( (i1: A1, i2: A2) =>  (i1, i2) match {
          case (null, _) => None
          case (_, null) => None
          case (s1, s2) => Some((f(s1,s2)))
        } )
      }
    }