代码之家  ›  专栏  ›  技术社区  ›  Shankar Panda

PySpark:如何从spark数据帧创建嵌套JSON?

  •  0
  • Shankar Panda  · 技术社区  · 7 年前

    我正试图从spark数据框中创建一个嵌套的json,其中包含以下结构中的数据。下面的代码创建了一个带有键和值的简单json。你能帮忙吗

    df.coalesce(1).write.format('json').save(data_output_file+"createjson.json", overwrite=True)
    

    更新1: 根据@MaxU-answer,我将spark数据帧转换为pandas并由group by使用。它将最后两个字段放入嵌套数组中。如何首先将类别和计数放入嵌套数组,然后在该数组中放入子类别和计数。

    示例文本数据:

    Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
    Vendor1,10,Category 1,4,Sub Category 1,1
    Vendor1,10,Category 1,4,Sub Category 2,2
    Vendor1,10,Category 1,4,Sub Category 3,3
    Vendor1,10,Category 1,4,Sub Category 4,4
    
    j = (data_pd.groupby(['vendor_name','vendor_Cnt','Category','Category_cnt'], as_index=False)
                 .apply(lambda x: x[['Subcategory','subcategory_cnt']].to_dict('r'))
                 .reset_index()
                 .rename(columns={0:'subcategories'})
                 .to_json(orient='records'))
    

    enter image description here

    [{
            "vendor_name": "Vendor 1",
            "count": 10,
            "categories": [{
                "name": "Category 1",
                "count": 4,
                "subCategories": [{
                        "name": "Sub Category 1",
                        "count": 1
                    },
                    {
                        "name": "Sub Category 2",
                        "count": 1
                    },
                    {
                        "name": "Sub Category 3",
                        "count": 1
                    },
                    {
                        "name": "Sub Category 4",
                        "count": 1
                    }
                ]
            }]
    
    2 回复  |  直到 7 年前
        1
  •  2
  •   Maarten Fabré    7 年前

    在python/pandas中实现这一点最简单的方法是使用一系列嵌套的生成器,使用 groupby 我想:

    def split_df(df):
        for (vendor, count), df_vendor in df.groupby(["Vendor_Name", "count"]):
            yield {
                "vendor_name": vendor,
                "count": count,
                "categories": list(split_category(df_vendor))
            }
    
    def split_category(df_vendor):
        for (category, count), df_category in df_vendor.groupby(
            ["Categories", "Category_Count"]
        ):
            yield {
                "name": category,
                "count": count,
                "subCategories": list(split_subcategory(df_category)),
            }
    
    def split_subcategory(df_category):
        for row in df.itertuples():
            yield {"name": row.Subcategory, "count": row.Subcategory_Count}
    
    list(split_df(df))
    
    [
        {
            "vendor_name": "Vendor1",
            "count": 10,
            "categories": [
                {
                    "name": "Category 1",
                    "count": 4,
                    "subCategories": [
                        {"name": "Sub Category 1", "count": 1},
                        {"name": "Sub Category 2", "count": 2},
                        {"name": "Sub Category 3", "count": 3},
                        {"name": "Sub Category 4", "count": 4},
                    ],
                }
            ],
        }
    ]
    

    json ,您将需要一种导出 np.int64

        2
  •  6
  •   Steven    7 年前

    “子类别”是一种结构样式。

    from pyspark.sql import functions as F
    df.withColumn(
      "subCategories",
      F.struct(
        F.col("subCategories").alias("name"),
        F.col("subcategory_count").alias("count")
      )
    )
    

    然后,groupBy和使用F.collect_list创建数组。