代码之家  ›  专栏  ›  技术社区  ›  sujit

Spark数据框架:派生列的连接方法

  •  0
  • sujit  · 技术社区  · 7 年前

    如下面的代码所示,提供一个数据集( df ,我的要求是能够添加派生列( DerivedCol )此列的值对于 idcol 一组行,由对另一列值应用谓词派生( filter 这里),然后b)聚合函数( max 在这里使用)在匹配组上。

    val df = Seq(("id1","k1","7"),("id2","k1","5"),("id1","k3","2"),("id3","k1","4"),("id2","k5","1"),("id4","k5","1"))
      .toDF("idcol","keycol","valcol")
    
    val aggDf = df.filter($"keycol" === "k1")
      .select($"idcol",$"valcol")
      .groupBy($"idcol")
      .agg(max($"valcol".cast(IntegerType)).cast(StringType).as("DerivedCol"))
      .withColumnRenamed("idcol", "newidcol")
    
    df.join(aggDf, df("idcol") === aggDf("newidcol"), "left_outer")
      .drop(aggDf("newidcol"))
    

    我正在使用 left outer join 为此。我的数据集非常庞大(数百万行)。我有以下问题:

    1. 是否有其他方法来实现这一点?
    2. 我应该使用什么分区逻辑来减少混乱?

    的基数 伊多克 柱子很高。Spark版本为2.1.1。

    1 回复  |  直到 7 年前
        1
  •  1
  •   user10346849    7 年前

    是否有其他方法来实现这一点?

    有-窗口功能。

    import org.apache.spark.sql.functions.max
    import org.apache.spark.sql.expressions.Window
    
    df.withColumn(
       "derivedcol",  
       max($"valcol".cast(IntegerType)).over(Window.partitionBy($"idcol")
    )
    

    取决于:

    • 基数-高基数是好的。
    • 群的大小分布-没有大的正偏斜的小群是好的。

    这可能表现得比聚合后再进行联接更好或更差。

    我应该使用什么分区逻辑来减少混乱?

    大概没有。至少有两个原因:

    • 如果您有大量的小组,窗口函数就可以了,不需要额外的分区。
    • 如果您有少量较大的组,则应广播数据,并且所需的唯一无序处理是聚合。
    • 如果有大量的大组-您可以考虑按ID进行预分区,但是根据因素的数量,您可以同时进行松散和增益分区,并且平均来说没有额外的无序(分区)更好。
    推荐文章