我在Cassandra(3.11.2)中有数据,这也是我的df:
卡桑德拉数据:
id | some_data
-- | ---------
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]
df详细信息:
df.printSchema()
//| |-- id: integer (nullable = true)
//| |-- some_data: array (nullable = true)
//| | |-- element: struct (containsNull = true)
//| | | |-- s1: string (nullable = true)
//| | | |-- s2: string (nullable = true)
这里,Cassandra模式定义为:
id:字符串
某些\u数据:列出冻结的测试\u udt创建为-->
创建类型测试。测试\u udt(
s1文本,
s2文本
);
我正在使用spark cassandra连接器2.0从cassandra中提取数据,以便在spark 2.2.1上进行处理。
所需输出
输出为df的分解形式
id | some_data | s1 | s2
-- | ---------------------------------------------------| ----- | ----
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34
我过去的做法
我使用了spark cassandra连接器1.6和spark 1.6,我对上述问题有一个简洁的解决方案:
import org.apache.spark.sql.functions._
case class my_data(s1 : String, s2 : String)
val flatData = df.explode(df("some_data")){
case Row(x : Seq[Row]) =>
x.map(x =>
my_data(
x.apply(0).asInstanceOf[String],
x.apply(1).asInstanceOf[String]
))
}
flatData.show()
在我们升级到2之后。x、 我在使用时出错
explode
作用spark文件说
爆炸
已弃用。
flatMap
建议作为
爆炸
。
问题:
-
如何在Scala中分解数据帧以获得与以前相同的结果?
-
如何使用
flatmap
?