代码之家  ›  专栏  ›  技术社区  ›  underwood

在Spark 2中使用flatmap分解Cassandra UDT。x(Scala)

  •  0
  • underwood  · 技术社区  · 7 年前

    我在Cassandra(3.11.2)中有数据,这也是我的df:

    卡桑德拉数据:

    id | some_data  
    -- | ---------  
    1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
    2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
    3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]
    

    df详细信息:

     df.printSchema() 
        //|  |-- id: integer (nullable = true)
        //|  |-- some_data: array (nullable = true)
        //|  |    |-- element: struct (containsNull = true)
        //|  |    |    |-- s1: string (nullable = true)
        //|  |    |    |-- s2: string (nullable = true)
    

    这里,Cassandra模式定义为:

    id:字符串
    某些\u数据:列出冻结的测试\u udt创建为--> 创建类型测试。测试\u udt( s1文本, s2文本 );

    我正在使用spark cassandra连接器2.0从cassandra中提取数据,以便在spark 2.2.1上进行处理。

    所需输出

    输出为df的分解形式

    id | some_data                                          | s1    | s2  
    -- | ---------------------------------------------------| ----- | ---- 
    1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
    1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14 
    2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
    2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
    3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
    3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34
    

    我过去的做法

    我使用了spark cassandra连接器1.6和spark 1.6,我对上述问题有一个简洁的解决方案:

    import org.apache.spark.sql.functions._    
    case class my_data(s1 : String, s2 : String)
    
    val flatData = df.explode(df("some_data")){
                case Row(x : Seq[Row]) =>
                    x.map(x =>
                        my_data(
                            x.apply(0).asInstanceOf[String], 
                            x.apply(1).asInstanceOf[String]
                        ))
                      }
    flatData.show()
    

    在我们升级到2之后。x、 我在使用时出错 explode 作用spark文件说 爆炸 已弃用。 flatMap 建议作为 爆炸

    问题:

    1. 如何在Scala中分解数据帧以获得与以前相同的结果?
    2. 如何使用 flatmap ?
    1 回复  |  直到 7 年前
        1
  •  1
  •   philantrovert    7 年前

    您可以使用 explode function 也建议将其作为 爆炸 方法 getItem 用于从 struct 我叫它名字。

    df.withColumn("exploded" , explode($"some_data"))
      .withColumn("s1" , $"exploded".getItem("s1"))
      .withColumn("s2" , $"exploded".getItem("s2"))
      .drop("exploded")
      .show(false)
    
    //+---+------------------------------+-----+-----+
    //|id |some_data                     |s1   |s2   |
    //+---+------------------------------+-----+-----+
    //|1  |[[str11,str12], [str13,str14]]|str11|str12|
    //|1  |[[str11,str12], [str13,str14]]|str13|str14|
    //|2  |[[str21,str22], [str23,str24]]|str21|str22|
    //|2  |[[str21,str22], [str23,str24]]|str23|str24|
    //|3  |[[str31,str32], [str33,str44]]|str31|str32|
    //|3  |[[str31,str32], [str33,str44]]|str33|str44|
    //+---+------------------------------+-----+-----+