代码之家  ›  专栏  ›  技术社区  ›  vy32

从S3读取zip文件而不下载整个文件

  •  6
  • vy32  · 技术社区  · 7 年前

    我们有5-10GB大小的zip文件。典型的zip文件有5-10个内部文件,每个文件大小为1-5GB,未压缩。

    我有一套很好的python工具来读取这些文件。基本上,我可以打开一个文件名,如果有一个zip文件,工具会在zip文件中搜索,然后打开压缩文件。一切都相当透明。

    我想把这些文件作为压缩文件存储在AmazonS3中。我可以获取S3文件的范围,所以应该可以获取zip中心目录(它是文件的结尾,所以我可以读取最后的64kib),找到我想要的组件,下载它,然后直接流到调用进程。

    所以我的问题是,如何通过标准的python zipfile api实现这一点?它没有记录如何用支持POSIX语义的任意对象替换文件系统传输。在不重写模块的情况下,这是可能的吗?

    2 回复  |  直到 6 年前
        1
  •  2
  •   Janaka Bandara    7 年前

    以下是一种不需要获取整个文件(完整版本可用)的方法 here )

    它确实需要 boto (或) boto3 但是(除非你能模仿 GET 通过aws cli;我想这也是很可能的)。

    import sys
    import zlib
    import zipfile
    import io
    
    import boto
    from boto.s3.connection import OrdinaryCallingFormat
    
    
    # range-fetches a S3 key
    def fetch(key, start, len):
        end = start + len - 1
        return key.get_contents_as_string(headers={"Range": "bytes=%d-%d" % (start, end)})
    
    
    # parses 2 or 4 little-endian bits into their corresponding integer value
    def parse_int(bytes):
        val = ord(bytes[0]) + (ord(bytes[1]) << 8)
        if len(bytes) > 3:
            val += (ord(bytes[2]) << 16) + (ord(bytes[3]) << 24)
        return val
    
    
    """
    bucket: name of the bucket
    key:    path to zipfile inside bucket
    entry:  pathname of zip entry to be retrieved (path/to/subdir/file.name)    
    """
    
    # OrdinaryCallingFormat prevents certificate errors on bucket names with dots
    # https://stackoverflow.com/questions/51604689/read-zip-files-from-amazon-s3-using-boto3-and-python#51605244
    _bucket = boto.connect_s3(calling_format=OrdinaryCallingFormat()).get_bucket(bucket)
    _key = _bucket.get_key(key)
    
    # fetch the last 22 bytes (end-of-central-directory record; assuming the comment field is empty)
    size = _key.size
    eocd = fetch(_key, size - 22, 22)
    
    # start offset and size of the central directory
    cd_start = parse_int(eocd[16:20])
    cd_size = parse_int(eocd[12:16])
    
    # fetch central directory, append EOCD, and open as zipfile!
    cd = fetch(_key, cd_start, cd_size)
    zip = zipfile.ZipFile(io.BytesIO(cd + eocd))
    
    
    for zi in zip.filelist:
        if zi.filename == entry:
            # local file header starting at file name length + file content
            # (so we can reliably skip file name and extra fields)
    
            # in our "mock" zipfile, `header_offset`s are negative (probably because the leading content is missing)
            # so we have to add to it the CD start offset (`cd_start`) to get the actual offset
    
            file_head = fetch(_key, cd_start + zi.header_offset + 26, 4)
            name_len = parse_int(file_head[0:2])
            extra_len = parse_int(file_head[2:4])
    
            content = fetch(_key, cd_start + zi.header_offset + 30 + name_len + extra_len, zi.compress_size)
    
            # now `content` has the file entry you were looking for!
            # you should probably decompress it in context before passing it to some other program
    
            if zi.compress_type == zipfile.ZIP_DEFLATED:
                print zlib.decompressobj(-15).decompress(content)
            else:
                print content
            break
    

    在您的情况下,您可能需要将提取的内容写入本地文件(由于文件太大),除非不考虑内存使用。

        2
  •  2
  •   parsley72    6 年前

    所以这里的代码允许你在AmazonS3上打开一个文件,就像它是一个普通文件一样。注意我用了 aws 命令,而不是 boto3 python模块。(我没有权限访问boto3。)您可以打开文件并搜索它。文件在本地缓存。如果用python-zipfile api打开文件,它是一个zipfile,那么您可以读取各个部分。但是,您不能写入,因为S3不支持部分写入。

    单独地,我执行 s3open() ,它可以打开一个文件进行读写,但不实现seek接口,这是 ZipFile.

    from urllib.parse import urlparse
    from subprocess import run,Popen,PIPE
    import copy
    import json
    import os
    import tempfile
    
    # Tools for reading and write files from Amazon S3 without boto or boto3
    # http://boto.cloudhackers.com/en/latest/s3_tut.html
    # but it is easier to use the aws cli, since it's configured to work.
    
    def s3open(path, mode="r", encoding=None):
        """
        Open an s3 file for reading or writing. Can handle any size, but cannot seek.
        We could use boto.
        http://boto.cloudhackers.com/en/latest/s3_tut.html
        but it is easier to use the aws cli, since it is present and more likely to work.
        """
        from subprocess import run,PIPE,Popen
        if "b" in mode:
            assert encoding == None
        else:
            if encoding==None:
                encoding="utf-8"
        assert 'a' not in mode
        assert '+' not in mode
    
        if "r" in mode:
            p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
            return p.stdout
    
        elif "w" in mode:
            p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
            return p.stdin
        else:
            raise RuntimeError("invalid mode:{}".format(mode))
    
    
    
    
    CACHE_SIZE=4096                 # big enough for front and back caches
    MAX_READ=65536*16
    debug=False
    class S3File:
        """Open an S3 file that can be seeked. This is done by caching to the local file system."""
        def __init__(self,name,mode='rb'):
            self.name   = name
            self.url    = urlparse(name)
            if self.url.scheme != 's3':
                raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
            self.bucket = self.url.netloc
            self.key    = self.url.path[1:]
            self.fpos   = 0
            self.tf     = tempfile.NamedTemporaryFile()
            cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
            data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
            file_info = data['Contents'][0]
            self.length = file_info['Size']
            self.ETag   = file_info['ETag']
    
            # Load the caches
    
            self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file
            if self.length > CACHE_SIZE:
                self.backcache_start = self.length-CACHE_SIZE
                if debug: print("backcache starts at {}".format(self.backcache_start))
                self.backcache  = self._readrange(self.backcache_start,CACHE_SIZE)
            else:
                self.backcache  = None
    
        def _readrange(self,start,length):
            # This is gross; we copy everything to the named temporary file, rather than a pipe
            # because the pipes weren't showing up in /dev/fd/?
            # We probably want to cache also... That's coming
            cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
                   '--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
            if debug:print(cmd)
            data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
            if debug:print(data)
            self.tf.seek(0)         # go to the beginning of the data just read
            return self.tf.read(length) # and read that much
    
        def __repr__(self):
            return "FakeFile<name:{} url:{}>".format(self.name,self.url)
    
        def read(self,length=-1):
            # If length==-1, figure out the max we can read to the end of the file
            if length==-1:
                length = min(MAX_READ, self.length - self.fpos + 1)
    
            if debug:
                print("read: fpos={}  length={}".format(self.fpos,length))
            # Can we satisfy from the front cache?
            if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
                if debug:print("front cache")
                buf = self.frontcache[self.fpos:self.fpos+length]
                self.fpos += len(buf)
                if debug:print("return 1: buf=",buf)
                return buf
    
            # Can we satisfy from the back cache?
            if self.backcache and (self.length - CACHE_SIZE < self.fpos):
                if debug:print("back cache")
                buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
                self.fpos += len(buf)
                if debug:print("return 2: buf=",buf)
                return buf
    
            buf = self._readrange(self.fpos, length)
            self.fpos += len(buf)
            if debug:print("return 3: buf=",buf)
            return buf
    
        def seek(self,offset,whence=0):
            if debug:print("seek({},{})".format(offset,whence))
            if whence==0:
                self.fpos = offset
            elif whence==1:
                self.fpos += offset
            elif whence==2:
                self.fpos = self.length + offset
            else:
                raise RuntimeError("whence={}".format(whence))
            if debug:print("   ={}  (self.length={})".format(self.fpos,self.length))
    
        def tell(self):
            return self.fpos
    
        def write(self):
            raise RuntimeError("Write not supported")
    
        def flush(self):
            raise RuntimeError("Flush not supported")
    
        def close(self):
            return