代码之家  ›  专栏  ›  技术社区  ›  Ninja Warrior 11

Python:如何异步化for循环

  •  1
  • Ninja Warrior 11  · 技术社区  · 7 年前

    有没有可能重复这个过程 generator 在Python中使用 asyncio ? 我做了一个简单的函数,名为 hash_generator() 返回一个唯一的散列。现在我决定对循环进行基准测试,我有大约8秒的时间来迭代打印100000个哈希。我可以在异步模式下运行它以最小化时间吗?我读了它的文档,但我感到困惑。我想探索异步,我想从这个问题开始。

    import hashlib
    import string
    import random
    import time
    
    
    def hash_generator():
        """Return a unique hash"""
        prefix = int(time.time())
        suffix = (random.choice(string.ascii_letters) for i in range(10))
        key = ".".join([str(prefix), str("".join(suffix))])
        value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
        return value.upper()
    
    
    """Iterating the hashes and printing the time it loaded"""
    hashes = (hash_generator() for i in range(100000))
    time_before = time.time()
    [print(i) for i in hashes]
    time_after = time.time()
    difference = time_after - time_before
    print('Loaded in {0:.2f}sec'.format(difference))
    # 40503CBA2DAE
    # ...
    # A511068F4945
    # Loaded in 8.81sec
    

    编辑1

    这个 random.choice() 函数是程序运行时间过长的主要原因。我用当前时间和来自的随机字符串重新创建了下面的函数 os.urandom (低碰撞)作为值。我尝试了多线程,但没有让任务以同样快的速度运行,而是太慢了。任何重构以下代码的建议都是受欢迎的。

    import hashlib
    import time
    import os
    import timeit
    
    
    def hash_generator():
        """Return a unique hash"""
        prefix = str(time.time())
        suffix = str(os.urandom(10))
        key = "".join([prefix, suffix])
        value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
        return value.upper()
    
    
    """Iterating the hashes and printing the time it loaded"""
    print(timeit.timeit(hash_generator, number=100000), "sec")
    # 0.497149389999322 sec
    

    编辑2

    在Jack Taylor和Stackoverflowers的帮助下,我可以通过使用 multiprocessing 超过100万次迭代。我对下面的代码进行了基准测试。

    import hashlib
    import time
    import os
    import timeit
    import multiprocessing
    
    
    def hash_generator(_=None):
        """Return a unique hash"""
        prefix = str(time.time())
        suffix = str(os.urandom(10))
        key = "".join([prefix, suffix])
        value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
        return value.upper()
    
    
    # Allows for the safe importing of the main module
    if __name__ == "__main__":
        start_time = time.time()
        number_processes = 4
        iteration = 10000000
        pool = multiprocessing.Pool(number_processes)
        results = pool.map(hash_generator, range(iteration))
        pool.close()
        pool.join()
        end_time = time.time()
        pool_runtime = end_time - start_time
        print('(Pool) Loaded in: {0:.5f} sec'.format(pool_runtime))
    
        ordinary_runtime = timeit.timeit(hash_generator, number=iteration)
        print('(Ordinary) Loaded in: {0:.5f} sec'.format(ordinary_runtime))
    

    iteration = 10
    (Pool) Loaded in: 1.20685 sec
    (Ordinary) Loaded in: 0.00023 sec
    
    iteration = 1000
    (Pool) Loaded in: 0.72233 sec
    (Ordinary) Loaded in: 0.01767 sec
    
    iteration = 1000
    (Pool) Loaded in: 0.99571 sec
    (Ordinary) Loaded in: 0.01208 sec
    
    iteration = 10,000
    (Pool) Loaded in: 1.07876 sec
    (Ordinary) Loaded in: 0.12652 sec
    
    iteration = 100,000
    (Pool) Loaded in: 1.57068 sec
    (Ordinary) Loaded in: 1.23418 sec
    
    iteration = 1,000,000
    (Pool) Loaded in: 4.28724 sec
    (Ordinary) Loaded in: 11.56332 sec
    
    iteration = 10,000,000
    (Pool) Loaded in: 27.26819 sec
    (Ordinary) Loaded in: 132.68170 sec
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Jack Taylor    7 年前

    看起来,使用顺序版本可能会更好。传统的看法是,在Python中,与I/O绑定的作业(文件读/写、联网)可以通过使用事件循环或多个线程来加速,而与CPU绑定的作业(如计算哈希)可以通过使用多个进程来加速。

    然而,我把你的版本用 concurrent.futures 还有一个进程池,它不但没有加快进程,反而使进程慢了10倍。

    以下是代码:

    from concurrent import futures
    import hashlib
    import string
    import random
    import time
    
    def hash_generator():
        """Return a unique hash"""
        prefix = int(time.time())
        suffix = (random.choice(string.ascii_letters) for i in range(10))
        key = ".".join([str(prefix), str("".join(suffix))])
        value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
        return value.upper()
    
    def main(workers = None):
        """Iterating the hashes and printing the time it loaded"""
        time_before = time.time()
        with futures.ProcessPoolExecutor(workers) as executor:
            worker_count = executor._max_workers
            jobs = (executor.submit(hash_generator) for i in range(100000))
            for future in futures.as_completed(jobs):
                print(future.result())
        time_after = time.time()
        difference = time_after - time_before
        print('Loaded in {0:.2f}sec with {1} workers'.format(difference, worker_count))
    
    if __name__ == '__main__':
        main()
    
    # 2BD6056CC0B4
    # ...
    # D0A6707225EB
    # Loaded in 50.74sec with 4 workers
    

    对于多个进程,启动和停止不同进程以及进程间通信都会带来一些开销,这可能就是多进程版本比顺序版本慢的原因,尽管它使用了所有的CPU内核。

    您还可以尝试使用集群将工作拆分到多台计算机上,和/或使用较低级别的语言编写算法(Go在我看来是个不错的选择)。但这是否值得你花时间,我不知道。