代码之家  ›  专栏  ›  技术社区  ›  John

如何在带有循环的Spark中创建When表达式

  •  1
  • John  · 技术社区  · 7 年前

    嗨,我需要一个基于多个 什么时候 声明。

    df.withColumn("new colume",when(col("col1").isin("list of item1"),"item1")
                               .when(col("col1").isin("list of item2"),"item2")
                               ....)
    

    我能创造一个 什么时候 使用循环的条件?我的计划是投入和产出 什么时候 在地图上

    Map("item1" -> "list of item1","item2" -> "list of item2")
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Alper t. Turker    7 年前

    只是使用 when / otherwise 具有 foldLeft :

    import org.apache.spark.sql.functions._
    
    
    val conditions = Map(
      "item1" -> Seq("foo", "bar"), "item2" -> Seq("foobar")
    )
    
    conditions.foldLeft(lit(null)){
      case (acc, (v, ls)) => when(col("col1").isin(ls: _*), v).otherwise(acc)  
    }
    

    它将创建一个嵌套 CASE WHEN 形式表达:

    CASE WHEN (col1 IN (foobar)) THEN item2 
         ELSE CASE WHEN (col1 IN (foo, bar)) THEN item1 
              ELSE NULL 
         END 
    END
    

    你可以替换 lit(null) 如果没有匹配的值,则使用另一个值作为基本结果。

    您还可以生成

    CASE 
      WHEN (col1 IN (foo, bar)) THEN item1 
      WHEN (col1 IN (foobar)) THEN item2 
    END
    

    使用如下递归函数:

    import org.apache.spark.sql.Column
    
    def mergeConditions(conditions: Map[String, Seq[String]], c: Column) = {
      def mergeConditionsR(conditions: Seq[(String, Seq[String])], acc: Column): Column = conditions match {
        case (v, ls) :: t => mergeConditionsR(t, acc.when(c.isin(ls: _*), v))
        case Nil          => acc
      }
    
      conditions.toList match {
        case (v, ls) :: t => mergeConditionsR(t, when(c.isin(ls: _*), v))
        case Nil          => lit(null)
      }
    }
    
    mergeConditions(conditions, col("col1"))
    

    但这不会有什么不同。

    有了简单,当然可以跳过 什么时候 完全:

    import org.apache.spark.sql.functions.typedLit
    
    val conditionsCol = typedLit(conditions.flatMap {
      case (k, vs) => vs.map { v => (v, k) } 
    }.toMap)
    
    df.withColumn("value", conditionsCol($"col1"))
    

    或者通过转换 conditions 到A DataFrame 和加入。

    conditions.toSeq.toDF("value", "col1")
      .withColumn("col1", explode($"col1"))
      .join(df, Seq("col1"), "rightouter")