代码之家  ›  专栏  ›  技术社区  ›  AlexW

django-celery-asyncio-daemonic进程不允许有子进程

  •  8
  • AlexW  · 技术社区  · 7 年前

    我可以看到类似的问题已经被问过了,但这些问题运行的是多处理器,而不是执行器。因此,我不确定如何解决这个问题。

    Github的问题也说它在4.1中解决了 https://github.com/celery/celery/issues/1709

    我正在使用

    celery==4.1.1
    django-celery==3.2.1
    django-celery-beat==1.0.1
    django-celery-results==1.0.1
    

    我的脚本如下所示,我试图将其缩减为只显示相关代码。

    @asyncio.coroutine
    def snmp_get(ip, oid, snmp_user, snmp_auth, snmp_priv):
        results=[]
        snmpEngine = SnmpEngine()
        errorIndication, errorStatus, errorIndex, varBinds = yield from getCmd(
                                ...
                            )
        ...
            for varBind in varBinds:
                results.append(' = '.join([x.prettyPrint() for x in varBind]))
        snmpEngine.transportDispatcher.closeDispatcher()
        return results
    
    def create_link_data_record(link_data):
        obj = LinkData.objects.create(
            ...
        )
        return 'data polled for {} record {} created'.format(link_data.hostname, obj.id)
    
    
    async def retrieve_data(link, loop):
        from  concurrent.futures import ProcessPoolExecutor
        executor = ProcessPoolExecutor(2)
    
        poll_interval = 60
        results = []
        # credentials:
        ...
        print('polling data for {} on {}'.format(hostname,link_mgmt_ip))
    
        # create link data obj
        link_data = LinkDataObj()
        ...
    
        # first poll for speeds
        download_speed_data_poll1 = await snmp_get(link_mgmt_ip, down_speed_oid % link_index ,snmp_user, snmp_auth, snmp_priv)
        download_speed_data_poll1 = await snmp_get(link_mgmt_ip, down_speed_oid % link_index ,snmp_user, snmp_auth, snmp_priv)
    
        # check we were able to poll
        if 'timeout' in str(get_snmp_value(download_speed_data_poll1)).lower():
            return 'timeout trying to poll {} - {}'.format(hostname ,link_mgmt_ip)
        upload_speed_data_poll1 = await snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv) 
    
        # wait for poll interval
        await asyncio.sleep(poll_interval)
    
        # second poll for speeds
        download_speed_data_poll2 = await snmp_get(link_mgmt_ip, down_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
        upload_speed_data_poll2 = await snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)    
    
        # create deltas for speed
        down_delta = int(get_snmp_value(download_speed_data_poll2)) - int(get_snmp_value(download_speed_data_poll1))
        up_delta = int(get_snmp_value(upload_speed_data_poll2)) - int(get_snmp_value(upload_speed_data_poll1)) 
        ...
        results.append(await loop.run_in_executor(executor, create_link_data_record, link_data))
        return results
    
    
    def get_link_data():  
        link_data = LinkTargets.objects.all() 
        # create loop
        loop = asyncio.get_event_loop()
        if asyncio.get_event_loop().is_closed():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(asyncio.new_event_loop())
        # create tasks 
        tasks = [asyncio.ensure_future(retrieve_data(link, loop)) for link in link_data]
        if tasks:
            start = time.time()  
            done, pending = loop.run_until_complete(asyncio.wait(tasks))
            loop.close()  
    

    在下面引用执行程序代码中运行的错误

    [2018-05-24 14:13:00,840: ERROR/ForkPoolWorker-3] Task exception was never retrieved
        future: <Task finished coro=<retrieve_data() done, defined at /itapp/itapp/monitoring/jobs/link_monitoring.py:130> exception=AssertionError('daemonic processes are not allowed to have children',)>
        Traceback (most recent call last):
          File "/itapp/itapp/monitoring/jobs/link_monitoring.py", line 209, in retrieve_data
            link_data.last_change = await loop.run_in_executor(executor, timestamp, (link_data.link_target_id, link_data.service_status))
          File "/usr/local/lib/python3.6/asyncio/base_events.py", line 639, in run_in_executor
            return futures.wrap_future(executor.submit(func, *args), loop=self)
          File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 466, in submit
            self._start_queue_management_thread()
          File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 427, in _start_queue_management_thread
            self._adjust_process_count()
          File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 446, in _adjust_process_count
            p.start()
          File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
            'daemonic processes are not allowed to have children'
        AssertionError: daemonic processes are not allowed to have children
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   Tarun Lalwani    7 年前

    尝试 Celery 5-devel

    pip install git+https://github.com/celery/celery@5.0-devel
    

    根据以下问题

    https://github.com/celery/celery/issues/3884

    芹菜5.0将支持异步。我们目前不支持它。

    下面也有这样的线

    How to combine Celery with asyncio?