代码之家  ›  专栏  ›  技术社区  ›  Bin Lin

google cloud dataproc上的pyspark作业失败

  •  0
  • Bin Lin  · 技术社区  · 7 年前

    我创建了一个包含1个主节点和10个节点的dataproc集群。所有处理器和内存配置相同:32个vCPU,120 GB内存。当我提交一份处理大量数据和计算的工作时。作业失败。

    从日志记录来看,我不确定是什么导致了失败。但我看到了来自tjob:job-c46fc848-6的与内存相关的错误消息: 因超过记忆极限而被纱线杀死的容器。使用24.1 GB的24 GB物理内存。考虑增强spark.yarn.executor.memoryoverhead。

    所以我尝试了一些我从其他帖子中找到的解决方案。例如,当从“作业”控制台提交作业时,我试图增加“属性”部分中的spark.executor.memoryoverhead和spark.driver.maxResultSize。作业find-duplicate-job-c46fc848-7仍然失败。

    我也看到了一些警告信息,但并不确定这是什么意思: 2004年6月18日17:13:25警告org.apache.spark.storage.blockmanagermasterendpoint:RDD_43_155没有可用的副本!

    我将尝试创建一个更高级别的集群,看看它是否有效。但我怀疑它是否能解决这个问题,因为拥有1个主节点和10个节点、32个vcpu、120 GB内存的集群已经非常强大了。

    希望能得到高级用户和专家的帮助。提前谢谢!

    1 回复  |  直到 7 年前
        1
  •  1
  •   Bin Lin    7 年前

    失败的根本原因与自交叉连接导致的内存有关。即使我不断地增加CPU的功率和内存,它仍然失败。所以这个问题的解决方案是下面的组合。

    1. 使用repartition()函数在联接之后、下一个转换之前重新分区。这将解决数据倾斜问题。例如:df_joined=df_joined.repartition(分区)
    2. 广播右边的桌子。
    3. 把它分成10个迭代。在每次迭代中,我只处理左表的1/10和右表的完整数据。

    参见示例代码:

    groups = 10 <br/>
    for x in range(0, groups): 
      df_joined = df1.join(broadcast(df2), (df1.authors == df2.authors)).where((col("df1.content_id") % groups == x)) 
    

    结合以上3种方法,我可以在1.5小时内完成任务,只使用1个主节点和4个工作节点(每个vm 8个cpu和30gb)。