我有一系列的模式列
Map[(Int, Int), Row]
和
Map[(Int, Int), String]
,具有某些数据(字符串或结构)的int元组键控映射。我试图获取这些列,提取数据,并通过UDF根据键将其插入到单个列中,如下所示。
case class PersonConcept(field1: Int, field2: Int, field3: String,
field4: String, field5: String)
private def mergeMaps: UserDefinedFunction = {
val f = (keyedData: Map[Row, Row], keyedNames: Map[Row, String],
keyedOccupations: Map[Row, String]) => {
val intKeyedData = keyedData.map {
case (row, data) =>
(row.getInt(0), row.getInt(1)) -> data
}
val intKeyedNames = keyedNames.map {
case (row, data) =>
(row.getInt(0), row.getInt(1)) -> data
}
val intKeyedOccupations = keyedOccupations.map {
case (row, data) =>
(row.getInt(0), row.getInt(1)) -> data
}
intKeyedData
.map {
case (k, v) => {
val value =
PersonConcept(v.getInt(0),
v.getInt(1),
v.getString(2),
intKeyedOccupations(k), // <--- Grabbing occupation
intKeyedNames(k)) // <--- Grabbing name
(k, value)
}
}
}
udf(f)
}
我知道每个地图都包含完全相同的密钥集。然而,我遇到了
NoSuchElementException
.
Caused by: java.util.NoSuchElementException: key not found: (24,75)
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
当数据集仅由一行或一个分区组成时,UDF工作良好,这让我相信这与scross分区变量作用域或闭包有关(
http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
). 我不明白是什么导致了这一点,以及是否有一种可行的方法利用这种数据模型来实现这一点。