代码之家  ›  专栏  ›  技术社区  ›  pault Tanjin

使用列值作为Spark数据帧函数的参数

  •  4
  • pault Tanjin  · 技术社区  · 7 年前

    考虑以下数据帧:

    #+------+---+
    #|letter|rpt|
    #+------+---+
    #|     X|  3|
    #|     Y|  1|
    #|     Z|  2|
    #+------+---+
    

    可以使用以下代码创建:

    df = spark.createDataFrame([("X", 3),("Y", 1),("Z", 2)], ["letter", "rpt"])
    

    假设我想每行重复列中指定的次数 rpt 就像这样 question .

    一种方法是复制我的 solution 用以下方法回答这个问题 pyspark-sql 查询:

    query = """
    SELECT *
    FROM
      (SELECT DISTINCT *,
                       posexplode(split(repeat(",", rpt), ",")) AS (index, col)
       FROM df) AS a
    WHERE index > 0
    """
    query = query.replace("\n", " ")  # replace newlines with spaces, avoid EOF error
    spark.sql(query).drop("col").sort('letter', 'index').show()
    #+------+---+-----+
    #|letter|rpt|index|
    #+------+---+-----+
    #|     X|  3|    1|
    #|     X|  3|    2|
    #|     X|  3|    3|
    #|     Y|  1|    1|
    #|     Z|  2|    1|
    #|     Z|  2|    2|
    #+------+---+-----+
    

    这会产生正确的答案。但是,我无法使用数据框架API函数复制此行为。

    我尝试过:

    import pyspark.sql.functions as f
    df.select(
        f.posexplode(f.split(f.repeat(",", f.col("rpt")), ",")).alias("index", "col")
    ).show()
    

    但这会导致:

    TypeError: 'Column' object is not callable

    为什么我能够将列作为输入传递给 repeat 在查询中,但不是从API?有没有一种方法可以使用spark dataframe函数复制这种行为?

    1 回复  |  直到 6 年前
        1
  •  5
  •   pault Tanjin    6 年前

    一种选择是使用 pyspark.sql.functions.expr ,它允许您使用列值作为激发SQL函数的输入。

    基于@user8371915 comment 我发现以下方法有效:

    from pyspark.sql.functions import expr
    
    df.select(
        '*',
        expr('posexplode(split(repeat(",", rpt), ","))').alias("index", "col")
    ).where('index > 0').drop("col").sort('letter', 'index').show()
    #+------+---+-----+
    #|letter|rpt|index|
    #+------+---+-----+
    #|     X|  3|    1|
    #|     X|  3|    2|
    #|     X|  3|    3|
    #|     Y|  1|    1|
    #|     Z|  2|    1|
    #|     Z|  2|    2|
    #+------+---+-----+