代码之家  ›  专栏  ›  技术社区  ›  Evgenii

GroupBy数据帧占用的时间太长

  •  0
  • Evgenii  · 技术社区  · 6 年前

    我有大量的数据。5000个ORC文件,每个大约300 MB。以及4本字典(每个都有几个KB)。

    在将其加载到数据帧(大约1-2小时)并加入之后,我尝试对其进行分组和聚合。

    tableDS
      .join(broadcast(enq1), $"url".contains($"q1"), "left_outer")
      .join(broadcast(enq2), $"url".contains($"q2"), "left_outer")
      .join(broadcast(enq3), $"url".contains($"q3"), "left_outer")
      .join(broadcast(model), $"url".contains($"model"), "left_outer")
      .groupBy($"url", $"ctn", $"timestamp")
      .agg(
    collect_set(when($"q2".isNotNull && $"q3".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model"),
    collect_set(when($"q1".isNotNull && $"q2".isNotNull && $"model".isNotNull, $"model").otherwise(lit(null))).alias("model2")
      )
    

    需要15-16个小时。

    我的问题是。

    1)GroupBy for Dataframes的工作方式是否与GroupByKey for RDD(执行所有数据的Shaffle)相同?如果是,是否将移动到数据集方法GroupByKey.ReduceGroupes,甚至RDD ReduceByKey,会提高性能?

    2)还是资源问题?执行分组的任务有200个,增加这些任务的数量会有帮助吗?我该怎么做?

    这就是我运行它的方式

    spark-submit \
    --class Main \
    --master yarn \
    --deploy-mode client \
    --num-executors 200 \
    --executor-cores 20 \
    --driver-memory 8G \
    --executor-memory 16G \
    --files hive-site.xml#hive-site.xml \
    --conf spark.task.maxFailures=10 \
    --conf spark.executor.memory=16G \
    --conf spark.app.name=spark-job \
    --conf spark.yarn.executor.memoryOverhead=4096 \
    --conf spark.yarn.driver.memoryOverhead=2048 \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.shuffle.consolidateFiles=true \
    --conf spark.broadcast.compress=true \
    --conf spark.shuffle.compress=true \
    --conf spark.shuffle.spill.compress=true \
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   user9961256    6 年前

    GroupBy for DataFrames的工作方式是否与GroupByKey for RDD相同?

    collect_set 它和 groupByKey .如果重复的数目很大,它的行为可能会更好,否则您可以期望类似的性能配置文件 RDD.groupByKey 不考虑无序执行的差异。

    将移动到数据集方法GroupByKey.ReduceGroupes

    不,不会的。 reduceByGroups 不允许使用可变缓冲区,因此即使在最好的情况下,您也没有太大的改进空间。

    或者甚至RDD通过键来降低性能?

    可能不会。你将无法提高 分组依据键 具有 reduceByKey (如果你相信 this )

    还是资源问题?执行分组的任务有200个,增加这些任务的数量会有帮助吗?我该怎么做?

    可能,但我们只能猜测。调谐 spark.sql.shuffle.partitions 自己测量。