代码之家  ›  专栏  ›  技术社区  ›  Alex Q

从S3通配符加载文件时出现火花错误

  •  3
  • Alex Q  · 技术社区  · 10 年前

    我正在使用pyspark shell,并尝试使用spark的文件通配符功能从S3读取数据,但我得到了以下错误:

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
          /_/
    
    Using Python version 2.7.6 (default, Jul 24 2015 16:07:07)
    SparkContext available as sc.
    >>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'AWS_ACCESS_KEY_ID')
    >>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'AWS_SECRET_ACCESS_KEY')
    >>> sc.textFile("s3n://myBucket/path/files-*", use_unicode=False).count()
    16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(37645) called with curMem=83944, maxMem=278019440
    16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 36.8 KB, free 265.0 MB)
    16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(5524) called with curMem=121589, maxMem=278019440
    16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.4 KB, free 265.0 MB)
    16/01/07 18:03:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on salve1:48235 (size: 5.4 KB, free: 265.1 MB)
    16/01/07 18:03:02 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
    16/01/07 18:03:02 INFO SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2
    16/01/07 18:03:03 WARN RestS3Service: Response '/path' - Unexpected response code 404, expected 200
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 819, in count
        return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
      File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 810, in sum
        return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
      File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 715, in reduce
        vals = self.mapPartitions(func).collect()
      File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 676, in collect
        bytesInJava = self._jrdd.collect().iterator()
      File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
      File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o65.collect.
    : org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:197)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
            at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
            at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
            at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
            at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
            at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
            at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
            at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
            at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
            at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
            at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
            at scala.Option.getOrElse(Option.scala:120)
            at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
            at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
            at scala.Option.getOrElse(Option.scala:120)
            at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
            at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
            at scala.Option.getOrElse(Option.scala:120)
            at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1352)
            at org.apache.spark.rdd.RDD.collect(RDD.scala:780)
            at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309)
            at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
            at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
            at py4j.Gateway.invoke(Gateway.java:259)
            at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
            at py4j.commands.CallCommand.execute(CallCommand.java:79)
            at py4j.GatewayConnection.run(GatewayConnection.java:207)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler
            at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:179)
            at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
            at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
            at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
            at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
            ... 44 more
    Caused by: java.lang.OutOfMemoryError: Java heap space
            at java.util.Arrays.copyOf(Arrays.java:3332)
            at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
            at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
            at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:569)
            at java.lang.StringBuffer.append(StringBuffer.java:369)
            at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:160)
            at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198)
            at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090)
            at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056)
            at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166)
            at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
            at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source)
            at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375)
            at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842)
            at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902)
            at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032)
            at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
            at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
            at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
            at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
            at scala.Option.getOrElse(Option.scala:120)
            at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
            at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
            at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    

    当我尝试加载单个文件(不使用通配符)时,代码会起作用。由于我需要读取大约100k个文件,我想知道将所有文件加载到RDD中的最佳方法是什么。


    使现代化

    在我看来,问题是我使用的密钥前缀在包含我所有文件的s3“目录”中有超过300k个文件。我的文件以日期为后缀。

    s3://myBucket/path/files-2016-01-01-02-00
    s3://myBucket/path/files-2016-01-01-02-01
    s3://myBucket/path/files-2016-01-01-03-00
    s3://myBucket/path/files-2016-01-01-03-01
    

    我试图使用通配符只选择一些日期为 s3n://myBucket/path/files-2016-01-01-03-* 当我打开调试日志时,我看到spark正在s3“目录”中列出所有文件( s3://myBucket/path/ )而不仅仅是具有我指定的密钥前缀的文件( s3://myBucket/path/files-2016-01-01-03- ). 因此,即使我只尝试读取2个文件,所有300k文件都被列出,这可能是导致内存不足的原因。

    3 回复  |  直到 10 年前
        1
  •  2
  •   Alex Q    10 年前

    我已经直接从S3中列出了我的文件,然后制作了一个包含确切文件名的RDD,到目前为止它对我有效。

    raw_file_list = subprocess.Popen("env AWS_ACCESS_KEY_ID="myId" AWS_SECRET_ACCESS_KEY="myKey" aws s3 ls s3://myBucket/path/files-2016-01-01-02", shell=True, stdout=subprocess.PIPE).stdout.read().strip().split('\n')
    s3_file_list = sc.parallelize(raw_file_list).map(lambda line: "s3n://myBucket/path/%s" % line.split()[3]).collect()
    rdd = sc.textFile(','.join(s3_file_list), use_unicode=False)
    
        2
  •  1
  •   Durga Viswanath Gadiraju    10 年前

    它正在抛出内存不足的问题。因此,首先尝试将模式限制在较少的文件中,看看它是否解决了问题。

        3
  •  0
  •   jbrown    10 年前

    Spark在一起加载大量小文件时遇到了一个愚蠢的问题,因为它会为每个文件广播一些数据。这 可以 在几天前发布的1.6.0中进行了修复。目前,我希望您的代码正在加载每个文件,并在引擎盖下将RDD联合在一起。

    我使用的解决方案是将所有文件移动到S3上的一个目录中,然后将其作为一个glob传递,例如: s3n://myBucket/path/input-files/* 这样,就spark而言,您只加载了一个路径,它不会为该路径中的每个文件创建广播变量。