代码之家  ›  专栏  ›  技术社区  ›  Ivan Bilan

Pandas to PySpark:将一列元组列表转换为每个元组项的单独列

  •  4
  • Ivan Bilan  · 技术社区  · 7 年前

    我需要转换一个数据帧,其中一列由一个元组列表组成,每个元组中的每个项都必须是一个单独的列。

    以下是熊猫的一个例子和解决方案:

    import pandas as pd
    
    df_dict = {
        'a': {
            "1": "stuff", "2": "stuff2"
        }, 
    
        "d": {
            "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
        }
    }
    
    df = pd.DataFrame.from_dict(df_dict)
    print(df)  # intial structure
    
               a    d
        1   stuff   [(1, 2), (3, 4)]
        2   stuff2  [(1, 2), (3, 4)]
    
    # first transformation, let's separate each list item into a new row
    row_breakdown = df.set_index(["a"])["d"].apply(pd.Series).stack()
    print(row_breakdown)
    
                a        
        stuff   0    (1, 2)
                1    (3, 4)
        stuff2  0    (1, 2)
                1    (3, 4)
        dtype: object
    
    row_breakdown = row_breakdown.reset_index().drop(columns=["level_1"])
    print(row_breakdown)
    
        a   0
        0   stuff   (1, 2)
        1   stuff   (3, 4)
        2   stuff2  (1, 2)
        3   stuff2  (3, 4)
    
    # second transformation, let's get each tuple item into a separate column
    row_breakdown.columns = ["a", "d"]
    row_breakdown = row_breakdown["d"].apply(pd.Series)
    row_breakdown.columns = ["value_1", "value_2"]
    print(row_breakdown)
    
            value_1 value_2
        0   1   2
        1   3   4
        2   1   2
        3   3   4
    

    这就是熊猫的解决方案。我需要能够做同样的,但使用PySpark(2.3)。我已经开始着手了,但马上就卡住了:

    from pyspark.context import SparkContext, SparkConf
    from pyspark.sql.session import SparkSession
    
    conf = SparkConf().setAppName("appName").setMaster("local")
    sc = SparkContext(conf=conf)
    
    spark = SparkSession(sc)
    
    df_dict = {
        'a': {
            "1": "stuff", "2": "stuff2"
        }, 
    
        "d": {
            "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
        }
    }
    
    df = pd.DataFrame(df_dict)
    ddf = spark.createDataFrame(df)
    
    row_breakdown = ddf.set_index(["a"])["d"].apply(pd.Series).stack()
    
        AttributeError: 'DataFrame' object has no attribute 'set_index'
    

    显然,Spark不支持索引。谢谢你的指点。

    2 回复  |  直到 7 年前
        1
  •  2
  •   martinarroyo    7 年前

    from pyspark.context import SparkContext, SparkConf
    from pyspark.sql.session import SparkSession
    from pyspark.sql import functions as F
    import pandas as pd
    
    conf = SparkConf().setAppName("appName").setMaster("local")
    sc = SparkContext(conf=conf)
    
    spark = SparkSession(sc)
    
    df_dict = {
        'a': {
            "1": "stuff", "2": "stuff2"
        }, 
    
        "d": {
            "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
        }
    }
    
    df = pd.DataFrame(df_dict)
    ddf = spark.createDataFrame(df)
    
    
    exploded = ddf.withColumn('d', F.explode("d"))
    exploded.show()
    

    结果:

    +------+------+
    |     a|     d|
    +------+------+
    | stuff|[1, 2]|
    | stuff|[3, 4]|
    |stuff2|[1, 2]|
    |stuff2|[3, 4]|
    +------+------+
    

    我觉得使用SQL更合适:

    exploded.createOrReplaceTempView("exploded")
    spark.sql("SELECT a, d._1 as value_1, d._2 as value_2 FROM exploded").show()
    

    重要提示:这是使用 _1 _2 访问器是因为spark将元组解析为一个结构,并为其提供默认键。如果在实际实现中,数据帧包含 array<int> [0]

    最终结果是:

    +------+-------+-------+
    |     a|value_1|value_2|
    +------+-------+-------+
    | stuff|      1|      2|
    | stuff|      3|      4|
    |stuff2|      1|      2|
    |stuff2|      3|      4|
    +------+-------+-------+
    
        2
  •  1
  •   pault Tanjin    7 年前

    更新

    ddf.printSchema()
    #root
    # |-- a: string (nullable = true)
    # |-- d: array (nullable = true)
    # |    |-- element: struct (containsNull = true)
    # |    |    |-- _1: long (nullable = true)
    # |    |    |-- _2: long (nullable = true)
    

    你必须使用 pyspark.sql.functions.explode 将数组分解为列,但之后可以使用 * 用于将结构转换为列的选择器:

    from pyspark.sql.functions import explode
    
    row_breakdown = ddf.select("a", explode("d").alias("d")).select("a", "d.*")
    row_breakdown.show()
    #+------+---+---+
    #|     a| _1| _2|
    #+------+---+---+
    #| stuff|  1|  2|
    #| stuff|  3|  4|
    #|stuff2|  1|  2|
    #|stuff2|  3|  4|
    #+------+---+---+
    

    要重命名这些列,可以使用 str.replace :

    from pyspark.sql.functions import col
    
    row_breakdown = row_breakdown.select(
        *[col(c).alias(c.replace("_", "value")) for c in row_breakdown.columns]
    )
    row_breakdown.show()
    #+------+------+------+
    #|     a|value1|value2|
    #+------+------+------+
    #| stuff|     1|     2|
    #| stuff|     3|     4|
    #|stuff2|     1|     2|
    #|stuff2|     3|     4|
    #+------+------+------+
    

    原始答案

    如果你是从字典开始,你不需要使用 pandas 为了这个。

    transform your dictionary into the appropriate format

    a 一点都不重要。

    mentioned in my comment ,您可以使用以下代码实现所描述的输出:

    df_dict = {
        'a': {
            "1": "stuff", "2": "stuff2"
        }, 
    
        "d": {
            "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
        }
    }
    
    from itertools import chain
    row_breakdown = spark.createDataFrame(
        chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]
    )
    row_breakdown.show()
    #+------+------+
    #|value1|value2|
    #+------+------+
    #|     1|     2|
    #|     3|     4|
    #|     1|     2|
    #|     3|     4|
    #+------+------+
    

    如果您想要一个类似于索引的列,只需使用 enumerate

    data = (
        (i,) + v for i, v in enumerate(
            chain.from_iterable(
                v for k, v in sorted(df_dict["d"].items(), key=lambda (key, val): key)
            )
        )
    )
    columns = ["index", "value1", "value2"]
    row_breakdown = spark.createDataFrame(data, columns)
    row_breakdown.show()
    #+-----+------+------+
    #|index|value1|value2|
    #+-----+------+------+
    #|    0|     1|     2|
    #|    1|     3|     4|
    #|    2|     1|     2|
    #|    3|     3|     4|
    #+-----+------+------+
    

    如您所见,我们可以将生成器表达式传递给 spark.createDataFrame ,这个解决方案不需要我们提前知道元组的长度。