代码之家  ›  专栏  ›  技术社区  ›  Rahul P

Pyspark Dataframe-不带Numpy或其他库的中值

  •  0
  • Rahul P  · 技术社区  · 7 年前

    我在pyspark工作了一段时间,我被卡住了。我在试着找出这一列的中位数 数字 各自的窗口。我需要在不使用其他库(如numpy等)的情况下执行此操作。

    到目前为止(如下所示),我已经按照列将数据集分组到windows中 身份证件 . 这是由列描述的 行数 显示每个窗口的样子。此数据帧示例中有三个窗口。

    这就是我想要的:

    我希望每一行也包含列窗口的中间值 身份证件 没有考虑到自己的立场。我需要的中间带的位置在下面的函数中 中位数

    示例:对于 行数 =5,我需要找到上面1到4行的中值(即不包括 行数 5条)。因此,中位数(根据我的要求)是列的平均值 身份证件 在同一个窗口中 行数 =1和 行数 =2即

    Date        id      numbers row_number  med_loc
    2017-03-02  group 1   98        1       [1]
    2017-04-01  group 1   50        2       [1]
    2018-03-02  group 1   5         3       [1, 2]
    2016-03-01  group 2   49        1       [1]
    2016-12-22  group 2   81        2       [1]
    2017-12-31  group 2   91        3       [1, 2]
    2018-08-08  group 2   19        4       [2]
    2018-09-25  group 2   52        5       [1, 2]
    2017-01-01  group 3   75        1       [1]
    2018-12-12  group 3   17        2       [1]
    

    我用来获取最后一列med_loc的代码如下

    def median_loc(sz):
        if sz == 1 or sz == 0:
            kth = [1]
            return kth
        elif sz % 2 == 0 and sz > 1:
            szh = sz // 2
            kth = [szh - 1, szh] if szh != 1 else [1, 2]
            return kth
        elif sz % 2 != 0 and sz > 1:
            kth = [(sz + 1) // 2]
            return kth
    
    
    sqlContext.udf.register("median_location", median_loc)
    
    median_loc = F.udf(median_loc)
    
    df = df.withColumn("med_loc", median_loc(df.row_number)-1)
    

    注:为了便于理解,我把它们做成了一个列表。它只是显示中值在相应窗口中的位置。只是为了让读者更容易理解堆栈溢出

    我想要的输出如下:

    Date        id      numbers row_number  med_loc     median
    2017-03-02  group 1   98        1       [1]           98
    2017-04-01  group 1   50        2       [1]           98
    2018-03-02  group 1   5         3       [1, 2]        74
    2016-03-01  group 2   49        1       [1]           49
    2016-12-22  group 2   81        2       [1]           49
    2017-12-31  group 2   91        3       [1, 2]        65
    2018-08-08  group 2   19        4       [2]           81
    2018-09-25  group 2   52        5       [1, 2]        65
    2017-01-01  group 3   75        1       [1]           75
    2018-12-12  group 3   17        2       [1]           75
    

    基本上,到目前为止得到中值的方法是这样的:

    1. 如果med_loc是一个数字(即如果列表只有一个数字,如[1]或[3]等),则中值=df.numbers,其中 df.row U number=df.med U位置

    2. 如果med_loc是两位数(即如果列表有两位数,如[1,2]或[2,3]等),则中值=平均值(df.numbers),其中 df.med_loc中的df.row_编号

    我再强调也不为过 使用其他库(如numpy等)获取输出。我看到了其他一些使用np.median的解决方案,但是它们是有效的,这不是我现在的要求。

    我很抱歉,如果这个解释是如此吹毛求疵,如果我把它复杂化了。我看了好几天了,好像想不出来。我也试过使用percent_rank函数,但是我无法计算出来,因为不是所有的窗口都包含0.5%的值。

    任何帮助都将不胜感激。

    1 回复  |  直到 7 年前
        1
  •  1
  •   pault Tanjin    7 年前

    假设您从以下数据帧开始, df :

    +----------+-------+-------+
    |      Date|     id|numbers|
    +----------+-------+-------+
    |2017-03-02|group 1|     98|
    |2017-04-01|group 1|     50|
    |2018-03-02|group 1|      5|
    |2016-03-01|group 2|     49|
    |2016-12-22|group 2|     81|
    |2017-12-31|group 2|     91|
    |2018-08-08|group 2|     19|
    |2018-09-25|group 2|     52|
    |2017-01-01|group 3|     75|
    |2018-12-12|group 3|     17|
    +----------+-------+-------+
    

    订购数据帧

    首先添加 row_number 正如您在示例中所做的,并将输出分配给一个新的数据帧 df2 :

    import pyspark.sql.functions as f
    from pyspark.sql import Window
    
    df2 = df.select(
        "*", f.row_number().over(Window.partitionBy("id").orderBy("Date")).alias("row_number")
    )
    df2.show()
    #+----------+-------+-------+----------+
    #|      Date|     id|numbers|row_number|
    #+----------+-------+-------+----------+
    #|2017-03-02|group 1|     98|         1|
    #|2017-04-01|group 1|     50|         2|
    #|2018-03-02|group 1|      5|         3|
    #|2016-03-01|group 2|     49|         1|
    #|2016-12-22|group 2|     81|         2|
    #|2017-12-31|group 2|     91|         3|
    #|2018-08-08|group 2|     19|         4|
    #|2018-09-25|group 2|     52|         5|
    #|2017-01-01|group 3|     75|         1|
    #|2018-12-12|group 3|     17|         2|
    #+----------+-------+-------+----------+
    

    收集中值值

    现在你可以加入了 df2型 对自己说 id 列的条件是 row number 1 或者它大于右边的 行数 . 然后按左边数据框的 ("id", "Date", "row_number") 然后收集 numbers 从右边的数据框进入列表。

    在这种情况下 行数 等于1,我们只想保留此收集列表的第一个元素。否则保留所有的数字,但要对它们进行排序,因为我们需要对它们进行排序以计算中值。

    调用此中间数据帧 df3 :

    df3 = df2.alias("l").join(df2.alias("r"), on="id", how="left")\
        .where("l.row_number = 1 OR (r.row_number < l.row_number)")\
        .groupBy("l.id", "l.Date", "l.row_number")\
        .agg(f.collect_list("r.numbers").alias("numbers"))\
        .select(
            "id",
            "Date",
            "row_number",
            f.when(
                f.col("row_number") == 1,
                f.array([f.col("numbers").getItem(0)])
            ).otherwise(f.sort_array("numbers")).alias("numbers")
        )
    df3.show()
    #+-------+----------+----------+----------------+
    #|     id|      Date|row_number|         numbers|
    #+-------+----------+----------+----------------+
    #|group 1|2017-03-02|         1|            [98]|
    #|group 1|2017-04-01|         2|            [98]|
    #|group 1|2018-03-02|         3|        [50, 98]|
    #|group 2|2016-03-01|         1|            [49]|
    #|group 2|2016-12-22|         2|            [49]|
    #|group 2|2017-12-31|         3|        [49, 81]|
    #|group 2|2018-08-08|         4|    [49, 81, 91]|
    #|group 2|2018-09-25|         5|[19, 49, 81, 91]|
    #|group 3|2017-01-01|         1|            [75]|
    #|group 3|2018-12-12|         2|            [75]|
    #+-------+----------+----------+----------------+
    

    请注意 数字 df3型 有一个适当值的列表,我们希望找到其中值。

    计算中值

    因为您的Spark版本大于2.1,所以可以使用 pyspark.sql.functions.posexplode() 从这个值列表中计算中值。对于较低版本的spark,您需要使用 udf .

    首先在 df3型 :

    • isEven :一个布尔值,指示 数字 数组有偶数个元素
    • middle :数组中间的索引,它是长度为/2的地板

    创建这些列后,使用 posexplode() ,它将返回两个新列: pos col . 然后我们过滤出结果数据帧,只保留计算中值所需的位置。

    保持位置的逻辑如下:

    • 如果 伊塞文 False ,我们只保留中间位置
    • 如果 伊塞文 True ,我们保持中间位置和中间位置-1。

    最后按 身份证件 Date 平均剩余的 数字 .

    df3.select(
        "*",
        f.when(
            (f.size("numbers") % 2) == 0,
            f.lit(True)
        ).otherwise(f.lit(False)).alias("isEven"),
        f.floor(f.size("numbers")/2).alias("middle")
    ).select(
            "id", 
            "Date",
            "middle",
            f.posexplode("numbers")
    ).where(
        "(isEven=False AND middle=pos) OR (isEven=True AND pos BETWEEN middle-1 AND middle)"
    ).groupby("id", "Date").agg(f.avg("col").alias("median")).show()
    #+-------+----------+------+
    #|     id|      Date|median|
    #+-------+----------+------+
    #|group 1|2017-03-02|  98.0|
    #|group 1|2017-04-01|  98.0|
    #|group 1|2018-03-02|  74.0|
    #|group 2|2016-03-01|  49.0|
    #|group 2|2016-12-22|  49.0|
    #|group 2|2017-12-31|  65.0|
    #|group 2|2018-08-08|  81.0|
    #|group 2|2018-09-25|  65.0|
    #|group 3|2017-01-01|  75.0|
    #|group 3|2018-12-12|  75.0|
    #+-------+----------+------+
    
    推荐文章