代码之家  ›  专栏  ›  技术社区  ›  Masterbuilder

scala spark将结构类型列转换为JSON数据

  •  0
  • Masterbuilder  · 技术社区  · 7 年前

    我试图聚合数据集中的几个字段,并将它们转换为JSON数组格式,我使用concat-ws和lit函数手动添加“:”分隔符,我确信应该有更好的方法来实现这一点,这是我迄今为止尝试的代码,我使用的是Spark 2.0.1版本,因此没有运气使用JSON。功能。

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.col
    import org.apache.spark.sql.functions.collect_list
    import org.apache.spark.sql.functions.concat_ws
    import org.apache.spark.sql.functions.lit
    import org.apache.spark.sql.functions.struct
    import org.apache.spark.sql.functions.udf
    
    object Zipper {
      val warehouseLocation = "file:///${system:user.dir}//spark-warehouse"
      val spark = SparkSession
        .builder()
        .appName("jsonconvert")
        .config("spark.master", "local")
        .config("spark.sql.warehouse.dir", warehouseLocation)
        .getOrCreate()
      import spark.implicits._
    def main(args: Array[String]) = {
    
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")
    df.show(false)
    
    
    
    df.groupBy($"name")
      .agg(collect_list(struct(concat_ws(":",lit("food"),$"food"),concat_ws(":",lit("price"),$"price"))).as("foods"))
      .show(false)
    }
    } 
    
    
    
        +----+------------------------------------------------------------------------------+
    |name|foods                                                                         |
    +----+------------------------------------------------------------------------------+
    |john|[[food:tomato,price:1.99], [food:carrot,price:0.45], [food:banana,price:1.29]]|
    |bill|[[food:apple,price:0.99], [food:taco,price:2.59]]                             |
    +----+------------------------------------------------------------------------------+
    

    预期产量

        +----+------------------------------------------------------------------------------+
    |name|foods                                                                         |
    +----+------------------------------------------------------------------------------+
    |john|[{"food":"tomato","price":1.99}, {"food":"carrot","price":0.45}, {"food":"banana","price":1.29}]|
    |bill|[{"food":"apple","price":0.99}, {"food":"taco","price":2.59}]                             |
    +----+---------------------------------------------------------------------------
    
    2 回复  |  直到 7 年前
        1
  •  2
  •   Leo C    7 年前

    对于2.1之前的Spark版本,尝试Aggregate( food ,请 price )由 name ,应用 toJSON 到数据帧,然后提取json对象,如下所示:

    import org.apache.spark.sql.functions._
    
    df.groupBy($"name").agg(collect_list(struct($"food", $"price")).as("food_price")).
      toJSON.
      select(
        get_json_object($"value", "$.name").as("name"),
        get_json_object($"value", "$.food_price").as("foods")
      ).
      show(false)
    // +----+----------------------------------------------------------------------------------------------+
    // |name|foods                                                                                         |
    // +----+----------------------------------------------------------------------------------------------+
    // |john|[{"food":"tomato","price":1.99},{"food":"carrot","price":0.45},{"food":"banana","price":1.29}]|
    // |bill|[{"food":"apple","price":0.99},{"food":"taco","price":2.59}]                                  |
    // +----+----------------------------------------------------------------------------------------------+
    
        2
  •  0
  •   vaquar khan    7 年前
        import org.apache.spark.sql._
        import org.apache.spark.sql.functions._
    
    
    
    
        val df = Seq(
          ("john", "tomato", 1.99),
          ("john", "carrot", 0.45),
          ("bill", "apple", 0.99),
          ("john", "banana", 1.29),
          ("bill", "taco", 2.59)
        ).toDF("name", "food", "price")
    
    
    
        val vkDF2 = df.groupBy("name").agg(collect_list(struct(col("food"),col("price"))).alias("vaquarkhan_json"))
    
        vkDF2.show()
    
    **Results :**
    
    +----+--------------------+
    |name|     vaquarkhan_json|
    +----+--------------------+
    |john|[[tomato,1.99], [...|
    |bill|[[apple,0.99], [t...|
    +----+--------------------+
    
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    df: org.apache.spark.sql.DataFrame = [name: string, food: string ... 1 more field]
    vkDF2: org.apache.spark.sql.DataFrame = [name: string, vaquarkhan_json: array<struct<food:string,price:double>>]