代码之家  ›  专栏  ›  技术社区  ›  VIjay

Apache Beam-GroupbyKeyOnly方法导致OutofMemory异常

  •  0
  • VIjay  · 技术社区  · 7 年前

    我们使用的是在Spark runner上执行的Apache Beam。我们的情况如下。这两个用例都会导致OutofMemory错误。

    1) 使用Apache Beam连接2个大表-一个表大小为120GB,另一个表大小为60GB。在GroupCombineFunctions中内部调用groupByKeyOnly()时,这会导致OutofMemory错误。Java语言

    2) GroupByKey—我们根据如下键对数据集进行分组。 P收集(>)&燃气轮机;costBasisRecords=主数据结果。应用(GroupByKey.create());

    此GroupbyKey操作还会导致OutOfmemory错误。

    你能给我们一些建议,以便我们能取得成果吗。

    从网上我们看到 还原键 方法-请您指导我们如何为Spark runners实现该功能。

    错误消息:

    java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.reflect.Array.newInstance(Array.java:75)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1897)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    
    2 回复  |  直到 7 年前
        1
  •  0
  •   Ben Sidhom    7 年前

    如果可能的话,我绝对建议使用 Combine.perKey 正如卢卡斯所言。

    如果无法做到这一点,或者仍然遇到OOM,请尝试通过增加分区数来减少分区大小。您可以通过手动设置 spark.default.parallelism 配置这是 explicitly used 确定的分区方案 groupByKeyOnly 洗牌。

    看起来通过手动构造的 SparkContextOptions . 有一个 test case 这说明了如何做到这一点。请注意,这需要您的管道程序直接链接到Spark。例如:

    SparkConf conf = new SparkConf().set("spark.default.parallelism", parallelism);
    JavaSparkContext jsc = new JavaSparkContext(conf);
    SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
    options.setUsesProvidedSparkContext(true);
    options.setProvidedSparkContext(jsc);
    Pipeline p = Pipeline.create(options);
    // ...
    

    注意:Spark有其自身的限制,即给定键的所有分组值必须适合处理该键的机器上的内存。如果这不适用于您的数据集(即,您有非常强的密钥倾斜),那么您将 需要 按键组合而不是按键分组。

        2
  •  0
  •   Lukasz Cwik    7 年前

    reduceByKey in Spark类似于 Combine.perKey 在Apache Beam中,请参见 Programming Guide 例如。

    请注意 还原键 结合珀基 只有在每个键减少的情况下才有效,否则您将遇到相同的内存不足问题。例如,将每个键的所有整数组合到一个列表中不会减少内存使用量,但将每个键的整数求和会减少内存使用量。

    推荐文章