代码之家  ›  专栏  ›  技术社区  ›  Stefan Falk

转换列并更新数据帧

  •  2
  • Stefan Falk  · 技术社区  · 7 年前

    所以,我在下面做的是我写一个专栏 A 来自 DataFrame 因为我想应用一个转换(这里我只是 json.loads 一个JSON字符串),并用转换后的列替换旧列。在转换之后,我将两个结果数据帧连接起来。

    df = df_data.drop('A').join(
        df_data[['ID', 'A']].rdd\
            .map(lambda x: (x.ID, json.loads(x.A)) 
                 if x.A is not None else (x.ID, None))\
            .toDF()\
            .withColumnRenamed('_1', 'ID')\
            .withColumnRenamed('_2', 'A'),
        ['ID']
    )
    

    我不喜欢这件事,当然是因为我不得不 withColumnRenamed 操作。

    对于熊猫,我会这样做:

    pdf = pd.DataFrame([json.dumps([0]*np.random.randint(5,10)) for i in range(10)], columns=['A'])
    pdf.A = pdf.A.map(lambda x: json.loads(x))
    pdf
    

    但以下内容在pyspark中不起作用:

    df.A = df[['A']].rdd.map(lambda x: json.loads(x.A))
    

    那么,有没有比我在第一次截取代码时所做的更简单的方法呢?

    2 回复  |  直到 7 年前
        1
  •  3
  •   pault Tanjin    7 年前

    我认为您不需要删除列并进行连接。以下代码应 * 与您发布的内容等效:

    cols = df_data.columns
    df = df_data.rdd\
        .map(
            lambda row: tuple(
                [row[c] if c != 'A' else (json.loads(row[c]) if row[c] is not None else None) 
                 for c in cols]
            )
        )\
        .toDF(cols)
    

    * 我还没有实际测试过这段代码,但我认为这应该可以。

    但要回答您的一般问题,可以使用 withColumn()

    df = df_data.withColumn("A", my_transformation_function("A").alias("A"))
    

    哪里 my_transformation_function() 可以是 udf 或a pyspark sql function

        2
  •  2
  •   Stefan Falk    7 年前

    据我所知,你是想达到这样的目的吗?

    import pyspark.sql.functions as F
    import json
    
    json_convert = F.udf(lambda x: json.loads(x) if x is not None else None)
    
    cols = df_data.columns
    df = df_data.select([json_convert(F.col('A')).alias('A')] + \
                        [col for col in cols if col != 'A'])