只是使用
when
/
otherwise
具有
foldLeft
:
import org.apache.spark.sql.functions._
val conditions = Map(
"item1" -> Seq("foo", "bar"), "item2" -> Seq("foobar")
)
conditions.foldLeft(lit(null)){
case (acc, (v, ls)) => when(col("col1").isin(ls: _*), v).otherwise(acc)
}
它将创建一个嵌套
CASE WHEN
形式表达:
CASE WHEN (col1 IN (foobar)) THEN item2
ELSE CASE WHEN (col1 IN (foo, bar)) THEN item1
ELSE NULL
END
END
你可以替换
lit(null)
如果没有匹配的值,则使用另一个值作为基本结果。
您还可以生成
CASE
WHEN (col1 IN (foo, bar)) THEN item1
WHEN (col1 IN (foobar)) THEN item2
END
使用如下递归函数:
import org.apache.spark.sql.Column
def mergeConditions(conditions: Map[String, Seq[String]], c: Column) = {
def mergeConditionsR(conditions: Seq[(String, Seq[String])], acc: Column): Column = conditions match {
case (v, ls) :: t => mergeConditionsR(t, acc.when(c.isin(ls: _*), v))
case Nil => acc
}
conditions.toList match {
case (v, ls) :: t => mergeConditionsR(t, when(c.isin(ls: _*), v))
case Nil => lit(null)
}
}
mergeConditions(conditions, col("col1"))
但这不会有什么不同。
有了简单,当然可以跳过
什么时候
完全:
import org.apache.spark.sql.functions.typedLit
val conditionsCol = typedLit(conditions.flatMap {
case (k, vs) => vs.map { v => (v, k) }
}.toMap)
df.withColumn("value", conditionsCol($"col1"))
或者通过转换
conditions
到A
DataFrame
和加入。
conditions.toSeq.toDF("value", "col1")
.withColumn("col1", explode($"col1"))
.join(df, Seq("col1"), "rightouter")