代码之家  ›  专栏  ›  技术社区  ›  Shankar Panda

如何在我的pyspark代码中修复这个reducebykey转换问题?

  •  0
  • Shankar Panda  · 技术社区  · 6 年前

    我对如何正确获得这个值有点困惑。以下是我的样本数据:

    col_name,Category,SegmentID,total_cnt,PercentDistribution
    city,ANTIOCH,1,1,15
    city,ARROYO GRANDE,1,1,15
    state,CA,1,3,15
    state,NZ,1,4,15
    

    enter image description here

    我正在尝试获取输出数据帧,如下所示:

    enter image description here

    我可以一直到今天。这里需要你的帮助。

        from pyspark.sql.types import StructType,StructField,StringType,IntegerType
        import json
    
        join_df=spark.read.csv("/tmp/testreduce.csv",inferSchema=True, header=True)
        jsonSchema = StructType([StructField("Name", StringType())
                               , StructField("Value", IntegerType())
                               , StructField("CatColName", StringType())
                               , StructField("CatColVal", StringType())
                            ])
        def reduceKeys(row1, row2):
                row1[0].update(row2[0])
                return row1
    
        res_df=join_df.rdd.map(lambda row: ("Segment " + str(row[2]), ({row[1]: row[3]},row[0],row[4])))\
    .reduceByKey(lambda x, y: reduceKeys(x, y))\
    .map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema)
    

    我当前的代码输出:

    未根据段id和CatColName正确分组数据。

    enter image description here

    1 回复  |  直到 6 年前
        1
  •  1
  •   gaw    6 年前

    问题是reduceByKey接受生成的字符串 Segment 1 考虑到这一点,城市和州是平等的。如果您添加 col_name 开始时,它按预期工作,但结果中会收到不同的名称。这可以用正则表达式更改

    res_df=test_df.rdd.map(lambda row: ("Segment " + str(row[2]) +" " + str(row[0]), ({row[1]: row[3]},row[0],row[4])))\
    .reduceByKey(lambda x, y: reduceKeys(x, y))\
    .map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema).withColumn("name",regexp_extract(col("name"),"(\w+\s\d+)",1))
    
    res_df.show(truncate=False)
    

    输出:

    +---------+-----+----------+----------------------------------+
    |name     |Value|CatColName|CatColVal                         |
    +---------+-----+----------+----------------------------------+
    |Segment 1|15   |city      |{"ANTIOCH": 1, "ARROYO GRANDE": 1}|
    |Segment 1|15   |state     |{"CA": 3, "NZ": 4}                |
    +---------+-----+----------+----------------------------------+