代码之家  ›  专栏  ›  技术社区  ›  SaiKiran

导出时出现大JSON,导致内存问题

  •  0
  • SaiKiran  · 技术社区  · 7 年前

    问题:

    我有一个API,它从弹性多个索引获取数据,并将其组合成一个JSON记录,然后在调用API时返回。此外,从API获取的结果通常是巨大的。

    results ,当当天的数据较少时,我不会遇到问题。但是,当一天中获取的数据很大时,整个阵列都位于RAM中,导致系统速度减慢。

    我创建这个数组的主要目的是在一个mongo中导出,这个mongo在另一个网络中,我可以直接从我的网络中复制。

    代码段:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from __future__ import division, print_function, absolute_import
    import argparse
    import sys
    import logging
    import MySQLdb
    import requests
    import json
    import time
    
    
    
    _logger = logging.getLogger(__name__)
    
    
    def get_samples(date,end):
        """
        Get Samples hashes form Database
    
        :param date: date of sample arrival
        :return list_of_hashes
        """
        try:
            results = []
            cur_time = time.time()
            with open('config.json','r') as c:
                config = json.load(c)
            _logger.info('Entering into database {}'.format(date))
            connection = MySQLdb.connect(config['malware_mysql'],"root","root","meta")
            cursor = connection.cursor()
            cursor.execute("SELECT MD5 from some where `Last_Seen` BETWEEN '{} 00:00:00' AND '{} 23:59:59'".format(date,end))
            hashes = cursor.fetchall()
            for hash in hashes:
                _logger.info('Hash {}'.format(hash[0]))
                try:
                    response = requests.get('http://{}:{}/some/{}'.format(config['a'],config['b'],hash[0]))
                    _logger.info('Result from API {}'.format(response))
                    if response.status_code == 200:
                        results.append(json.loads(response.text))
                    else:
                        _logger.error('Error in Querying API {} for hash {}'.format(response.status_code,hash))
                except Exception as e:
                    _logger.error('Error in querying database {} - {}'.format(hash,e))
            connection.close()
            with open('{}_{}.json'.format(date,end),'w') as f:
                f.write(json.dumps(results))
        except KeyboardInterrupt:
            print('Bye')
        except Exception as e:
            _logger.error('Error in querying database final {}'.format(e))
        return '{} completed'.format(date)
    
    
    def parse_args(args):
        """
        Parse command line parameters
    
        :param args: command line parameters as list of strings
        :return: command line parameters as :obj:`airgparse.Namespace`
        """
        parser = argparse.ArgumentParser(
            description="Enter date to Export")
        parser.add_argument(
            dest="date",
            help="Date of Sample Arrival in format 2018-08-16",
            )
        parser.add_argument(
            dest="end",
            help="Date of Sample Arrival end",
            )
        return parser.parse_args(args)
    
    
    def main(args):
        args = parse_args(args)
        print("{} Samples are quiered -- {}".format(args.date, get_samples(args.date,args.end)))
        _logger.info("Script ends here")
    
    
    def run():
        logging.basicConfig(level=logging.INFO, stream=sys.stdout)
        main(sys.argv[1:])
    
    
    if __name__ == "__main__":
        run()
    

    为什么我要这么做?

    另一种解决方案是防止整个阵列位于RAM中并导致系统速度减慢。使解决方案更有效的其他解决方案。

    1 回复  |  直到 7 年前
        1
  •  1
  •   Bruno A. ruddra    7 年前

    据我所知,你不能直接连接到你的Mongo DB,对吗?你能在本地开发一个MongoDB吗?这样,您可以使用Mongo Python库在获得结果时保存结果,使用 mongoexport

    现在回到你的问题,这里有几个建议:

    • connection.close() 一旦你得到你需要的信息 hashes = cursor.fetchall()
    • json.loads(response.text) response.json()
    • 而不是附加到 results

    把它们放在一起,不需要键盘中断处理,只需要改变 get_samples 功能:

    def get_samples(date, end):
        with open('{}_{}.json'.format(date, end), 'w') as out_file:
            out_file.write('[\n')
            with open('config.json','r') as c:
                config = json.load(c)
            _logger.info('Entering into database {}'.format(date))
            connection = MySQLdb.connect(config['malware_mysql'],"root","root","meta")
            cursor = connection.cursor()
            cursor.execute(
                "SELECT MD5 from some where `Last_Seen` BETWEEN '{} 00:00:00' AND '{} 23:59:59'".format(date, end)
            )
            hashes = cursor.fetchall()
            connection.close()
            for hash in hashes:
                _logger.info('Hash {}'.format(hash[0]))
                try:
                    response = requests.get('http://{}:{}/some/{}'.format(config['a'],config['b'],hash[0]))
                    _logger.info('Result from API {}'.format(response))
                    if response.status_code == 200:
                        out_file.write(response.json() + ',\n')
                    else:
                        _logger.error('Error in Querying API {} for hash {}'.format(response.status_code,hash))
                except Exception as e:
                    _logger.error('Error in querying database {} - {}'.format(hash,e))
            out_file.write(']\n')
    

    我没有试过这段代码,所以可能在某个地方有语法错误。希望这能让你离得足够近。

    a streaming mode ,这可能会有进一步的帮助。