代码之家  ›  专栏  ›  技术社区  ›  ZK Zhao

Pyspark:按列加权平均

  •  0
  • ZK Zhao  · 技术社区  · 6 年前

    例如,我有一个这样的数据集

    test = spark.createDataFrame([
        (0, 1, 5, "2018-06-03", "Region A"),
        (1, 1, 2, "2018-06-04", "Region B"),
        (2, 2, 1, "2018-06-03", "Region B"),
        (3, 3, 1, "2018-06-01", "Region A"),
        (3, 1, 3, "2018-06-05", "Region A"),
    ])\
      .toDF("orderid", "customerid", "price", "transactiondate", "location")
    test.show()
    

    我可以通过

    overall_stat = test.groupBy("customerid").agg(count("orderid"))\
      .withColumnRenamed("count(orderid)", "overall_count")
    temp_result = test.groupBy("customerid").pivot("location").agg(count("orderid")).na.fill(0).join(overall_stat, ["customerid"])
    
    for field in temp_result.schema.fields:
        if str(field.name) not in ['customerid', "overall_count", "overall_amount"]:
            name = str(field.name)
            temp_result = temp_result.withColumn(name, col(name)/col("overall_count"))
    temp_result.show()
    

    数据是这样的

    enter image description here

    现在,我想计算加权平均值 overall_count ,我该怎么做?

    结果应该是 (0.66*3+1*1)/4 对于区域A,以及 (0.33*3+1*1)/4 对于区域B


    我的想法:

    当然可以通过将数据转换成python/panda然后进行一些计算来实现,但是在什么情况下应该使用Pyspark呢?

    我可以得到一些像

    temp_result.agg(sum(col("Region A") * col("overall_count")), sum(col("Region B")*col("overall_count"))).show()
    

    但感觉不对,特别是如果有很多 region 数一数。

    0 回复  |  直到 6 年前
        1
  •  0
  •   shadow_dev    5 年前

    通过将上述步骤分为多个阶段,可以获得加权平均值。

    请考虑以下几点:

    Dataframe Name: sales_table
    [ total_sales, count_of_orders, location]
    [     50     ,       9        ,    A    ]
    [     80     ,       4        ,    A    ]
    [     90     ,       7        ,    A    ]
    

    计算上述(70)的分组加权平均分为两步:

    1. 倍增 sales 通过 importance
    2. 聚合 sales_x_count 产品
    3. 划分 销售额 按原件的总和

    如果我们在PySpark代码中将上述内容分成几个阶段,您可以得到以下结果:

    new_sales = sales_table \
        .withColumn("sales_x_count", col("total_sales") * col("count_orders")) \
        .groupBy("Location") \
        .agg(sf.sum("total_sales").alias("sum_total_sales"), \
             sf.sum("sales_x_count").alias("sum_sales_x_count")) \
        .withColumn("count_weighted_average", col("sum_sales_x_count") / col("sum_total_sales"))
    

    所以。。。这里不需要特别的自定义项(很可能会让你慢下来)。