代码之家  ›  专栏  ›  技术社区  ›  ankit12

消费者:与代理的连接丢失。尝试重新建立Rabbit MQ和Celery的连接

  •  0
  • ankit12  · 技术社区  · 3 年前

    我创建了一个小的Flask应用程序,我想在RabbitMQ的帮助下在Celery中执行一项任务。

    我已经在Windows 10系统中安装并创建了该用户,但每当我试图通过运行以下命令从Flask应用程序连接到RabbitMQ时,我都会收到下面发布的错误。

    celery -A  __init__.celery  worker -P solo  --loglevel INFO
    

    我的__init.py:

    from Util import make_celery
    from flask import Flask
    app.config['CELERY_RESULT_BACKEND'] ='rpc://' #{"broker_url":"", "result_backend":""}
    app.config['CELERY_BROKER_URL']  ='amqp://myuser:mypassword@localhost:5672/myvhost'
    celery = make_celery(app)
    app_ctx = app.app_context()
    app_ctx.push()
    

    网址:

    from celery import Celery
    
    def make_celery(app):
        celery = Celery(
            app.import_name,
            backend=app.config['CELERY_RESULT_BACKEND'],
            broker=app.config['CELERY_BROKER_URL']
        )
        celery.conf.update(app.config)
    
        class ContextTask(celery.Task):
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return self.run(*args, **kwargs)
    
        celery.Task = ContextTask
        return celery
    
    

    错误日志:

    [2023-05-20 17:41:00,805: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
    Traceback (most recent call last):
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 514, in channel
        return self.channels[channel_id]
    KeyError: None
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
        blueprint.start(self)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\bootsteps.py", line 116, in start
        step.start(parent)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 37, in start
        self.sync(c)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 41, in sync
        replies = self.send_hello(c)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 54, in send_hello
        replies = inspect.hello(c.hostname, our_revoked._data) or {}
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 389, in hello
        return self._request('hello', from_node=from_node, revoked=revoked)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 106, in _request
        return self._prepare(self.app.control.broadcast(
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 741, in broadcast
        return self.mailbox(conn)._broadcast(
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\pidbox.py", line 328, in _broadcast
        chan = channel or self.connection.default_channel
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 898, in default_channel
        self._default_channel = self.channel()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 281, in channel
        chan = self.transport.create_channel(self.connection)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\transport\pyamqp.py", line 166, in create_channel
        return connection.channel()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 517, in channel
        channel.open()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\channel.py", line 448, in open
        return self.send_method(
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 79, in send_method
        return self.wait(wait, returns_tuple=returns_tuple)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 99, in wait
        self.connection.drain_events(timeout=timeout)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 525, in drain_events
        while not self.blocking_read(timeout):
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 530, in blocking_read
        frame = self.transport.read_frame()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 294, in read_frame
        frame_header = read(7, True)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 627, in _read
        s = recv(n - len(rbuf))
    ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
    [2023-05-20 17:41:00,808: WARNING/MainProcess] C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\consumer.py:367: CPendingDeprecationWarning:
    
    In Celery 5.1 we introduced an optional breaking change which
    on connection loss cancels all currently executed tasks with late acknowledgement enabled.
    These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered back to the queue.
    You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss setting.
    In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.
    
      warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
    
    [2023-05-20 17:41:00,869: INFO/MainProcess] Connected to amqp://myuser:**@127.0.0.1:5672/myvhost
    [2023-05-20 17:41:00,931: INFO/MainProcess] mingle: searching for neighbors
    [2023-05-20 17:41:00,932: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
    Traceback (most recent call last):
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 514, in channel
        return self.channels[channel_id]
    KeyError: None
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
        blueprint.start(self)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\bootsteps.py", line 116, in start
        step.start(parent)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 37, in start
        self.sync(c)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 41, in sync
        replies = self.send_hello(c)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 54, in send_hello
        replies = inspect.hello(c.hostname, our_revoked._data) or {}
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 389, in hello
        return self._request('hello', from_node=from_node, revoked=revoked)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 106, in _request
        return self._prepare(self.app.control.broadcast(
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 741, in broadcast
        return self.mailbox(conn)._broadcast(
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\pidbox.py", line 328, in _broadcast
        chan = channel or self.connection.default_channel
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 898, in default_channel
        self._default_channel = self.channel()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 281, in channel
        chan = self.transport.create_channel(self.connection)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\transport\pyamqp.py", line 166, in create_channel
        return connection.channel()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 517, in channel
        channel.open()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\channel.py", line 448, in open
        return self.send_method(
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 79, in send_method
        return self.wait(wait, returns_tuple=returns_tuple)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 99, in wait
        self.connection.drain_events(timeout=timeout)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 525, in drain_events
        while not self.blocking_read(timeout):
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 530, in blocking_read
        frame = self.transport.read_frame()
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 294, in read_frame
        frame_header = read(7, True)
      File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 627, in _read
        s = recv(n - len(rbuf))
    ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
    
    0 回复  |  直到 3 年前