代码之家  ›  专栏  ›  技术社区  ›  Sai Kiriti Badam

Java语言io。NotSerializableException:组织。阿帕奇。火花sql。列使用UDF映射有条件地创建新列时

  •  2
  • Sai Kiriti Badam  · 技术社区  · 7 年前

    我有一个带有startTime的设备ID数据和一些特征向量,需要根据 hour weekday_hour 。样本数据如下:

    +-----+-------------------+--------------------+
    |hh_id|          startTime|                hash|
    +-----+-------------------+--------------------+
    |dev01|2016-10-10 00:01:04|(1048576,[121964,...|
    |dev02|2016-10-10 00:17:45|(1048576,[121964,...|
    |dev01|2016-10-10 00:18:01|(1048576,[121964,...|
    |dev10|2016-10-10 00:19:48|(1048576,[121964,...|
    |dev05|2016-10-10 00:20:00|(1048576,[121964,...|
    |dev08|2016-10-10 00:45:13|(1048576,[121964,...|
    |dev05|2016-10-10 00:56:25|(1048576,[121964,...|
    

    这些特性基本上是由自定义函数合并而成的SparseVectors。当我尝试创建 钥匙 按以下方式列出:

    val columnMap = Map("hour" -> hour($"startTime"), "weekday_hour" -> getWeekdayHourUDF($"startTime"))
    val grouping = "hour"
    val newDF = oldDF.withColumn("dt_key", columnMap(grouping))
    

    我得到一个 java.io.NotSerializableException 。完整的堆栈跟踪如下所示:

    Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
    Serialization stack:
        - object not serializable (class: org.apache.spark.sql.Column, value: hour(startTime))
        - field (class: scala.collection.immutable.Map$Map3, name: value1, type: class java.lang.Object)
        - object (class scala.collection.immutable.Map$Map3, Map(hour -> hour(startTime), weekday_hour -> UDF(startTime), none -> 0))
        - field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: groupingColumnMap, type: interface scala.collection.immutable.Map)
        - object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@4f1f9a63)
        - field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $iw, type: class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
        - object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@207d6d1e)
    

    但是,当我尝试在不显式创建列的情况下执行相同的逻辑时,使用if-else,我不会遇到任何此类错误。

    val newDF = if(groupingKey == "hour") {
      oldDF.withColumn("dt_key", hour($"startTime")
    } else {
      oldDF.withColumn("dt_key", getWeekdayHourUDF($"startTime")
    }
    

    使用Map方式将非常方便,因为可能有更多类型的密钥提取方法。请帮我弄清楚为什么会出现这个问题。

    2 回复  |  直到 6 年前
        1
  •  1
  •   nefo_x    5 年前

    内置函数时

    您可以通过使用 when 内置函数为

    val groupingKey = //"hour" or "weekday_hour"
    import org.apache.spark.sql.functions._
    df.withColumn("dt_key", 
         when(lit(groupingKey) === "hour", hour($"startTime"))
         .when(lit(groupingKey) === "weekday_hour", getWeekdayHourUDF($"startTime"))
         .otherwise(lit(0)))).show(false)
    

    udf函数

    或者,您可以创建 udf 作用 创建地图列的步骤

    import org.apache.spark.sql.functions._
    def mapUdf = udf((hour: Int, weekdayhour: Int, groupingKey: String) => 
          if(groupByKey.equalsIgnoreCase("hour")) hour 
          else if(groupByKey.equalsIgnoreCase("weekday_hour")) weekdayhour 
          else 0)
    

    并将其用作

    val newDF = oldDF.withColumn("dt_key",
                      mapUdf(hour($"startTime"), 
                             getWeekdayHourUDF($"startTime"),
                             lit(groupingKey)))
    

    我希望答案有帮助

        2
  •  1
  •   Abel Borges    3 年前

    可能有点晚了,但我在Spark 2.4.6上,无法重现这个问题。我猜是代码调用 columnMap 对于多个键。如果您提供一个易于复制的示例,包括数据(一行数据集就足够了),这会有所帮助。然而,正如堆栈跟踪所述 Column 阶级确实不是 Serializable ,我将根据我目前的理解进行阐述。

    太长,读不下去了绕过这一点的一个简单方法是 val s进入 def s


    我相信已经很清楚为什么用 when 案例或UDF有效。

    第一次尝试 :这样做可能不起作用的原因是: 类是不可序列化的(考虑到它在Spark API中的预期角色,我认为这是一种有意识的设计选择),并且(b)表达式中没有任何内容

    oldDF.withColumn("dt_key", columnMap(grouping))
    

    这告诉Spark什么是实际的混凝土 对于的第二个参数 withColumn ,这意味着混凝土 Map[String, Column] 当引发类似这样的异常时,需要通过网络将对象发送给执行者。

    第二次尝试 :第二次尝试之所以有效,是因为与此相关的决策相同 groupingKey 定义 DataFrame 可能完全发生在驾驶员身上。


    使用 数据帧 API作为查询生成器,或保存执行计划的东西,而不是数据本身。一旦你对它采取行动( write ,则, show ,则, count Spark生成将任务发送给执行者的代码。此时,实现 数据帧 / Dataset 必须已经在查询计划中正确编码,或者需要序列化,以便可以通过网络发送。

    def公司 通常解决此类问题,因为

    def columnMap: Map[String, Column] = Map("a" -> hour($"startTime"), "weekday_hour" -> UDF($"startTime"))
    

    不是混凝土 Map 对象本身,但 创建 新的 映射[字符串,列] 每次它被调用时,都要对每个执行者说,碰巧执行了一项与此相关的任务 地图

    This this 似乎是这方面的好资源。我承认我明白为什么要用 Function 喜欢

    val columnMap = () => Map("a" -> hour($"startTime"), "b" -> UDF($"startTime"))
    

    然后 columnMap()("a") 可以,因为反编译的字节码显示 scala.Function s被定义为 可序列化 ,但我不明白为什么 def公司 因为他们看起来不是这样的。无论如何,我希望这能有所帮助。