代码之家  ›  专栏  ›  技术社区  ›  LEJ

Pyspark-从每列中选择不同的值

  •  0
  • LEJ  · 技术社区  · 6 年前

    我试图在数据帧的每一列中找到所有不同的值,并在一个表中显示。

    示例数据:

    |-----------|-----------|-----------|
    |   COL_1   |   COL_2   |   COL_3   | 
    |-----------|-----------|-----------|
    |     A     |     C     |     D     |
    |     A     |     C     |     D     |
    |     A     |     C     |     E     |
    |     B     |     C     |     E     |
    |     B     |     C     |     F     |
    |     B     |     C     |     F     |
    |-----------|-----------|-----------|
    

    输出示例:

    |-----------|-----------|-----------|
    |   COL_1   |   COL_2   |   COL_3   | 
    |-----------|-----------|-----------|
    |     A     |     C     |     D     |
    |     B     |           |     E     |
    |           |           |     F     |
    |-----------|-----------|-----------|
    

    这有可能吗?我可以在不同的桌子上做,但是在一张桌子上做会更好。

    1 回复  |  直到 6 年前
        1
  •  7
  •   pault Tanjin    6 年前

    这里最简单的就是使用 pyspark.sql.functions.collect_set

    import pyspark.sql.functions as f
    df.select(*[f.collect_set(c).alias(c) for c in df.columns]).show()
    #+------+-----+---------+
    #| COL_1|COL_2|    COL_3|
    #+------+-----+---------+
    #|[B, A]|  [C]|[F, E, D]|
    #+------+-----+---------+
    

    显然,这会将数据作为一行返回。

    相反,如果你想得到你在问题中所写的结果(每一列每一个唯一值对应一行),这是可行的,但需要相当多的pyspark技巧(而且任何解决方案都可能效率要低得多)。

    选项1:分解并合并

    你可以用 pyspark.sql.functions.posexplode functools.reduce :

    from functools import reduce 
    
    unique_row = df.select(*[f.collect_set(c).alias(c) for c in df.columns])
    
    final_df = reduce(
        lambda a, b: a.join(b, how="outer", on="pos"),
        (unique_row.select(f.posexplode(c).alias("pos", c)) for c in unique_row.columns)
    ).drop("pos")
    
    final_df.show()
    #+-----+-----+-----+
    #|COL_1|COL_2|COL_3|
    #+-----+-----+-----+
    #|    A| null|    E|
    #| null| null|    D|
    #|    B|    C|    F|
    #+-----+-----+-----+
    

    选项2:按位置选择

    首先计算最大数组的大小并将其存储在新列中 max_length . 如果索引中存在值,则从每个数组中选择元素。

    我们再次使用 pyspark.sql.functions函数.爆炸 但这次只需要创建一个列来表示要提取的每个数组中的索引。

    最后我们使用 this trick

    final_df= df.select(*[f.collect_set(c).alias(c) for c in df.columns])\
        .withColumn("max_length", f.greatest(*[f.size(c) for c in df.columns]))\
        .select("*", f.expr("posexplode(split(repeat(',', max_length-1), ','))"))\
        .select(
            *[
                f.expr(
                    "case when size({c}) > pos then {c}[pos] else null end AS {c}".format(c=c))
                for c in df.columns
            ]
        )
    
    final_df.show()
    #+-----+-----+-----+
    #|COL_1|COL_2|COL_3|
    #+-----+-----+-----+
    #|    B|    C|    F|
    #|    A| null|    E|
    #| null| null|    D|
    #+-----+-----+-----+