我创建了一个小的Flask应用程序,我想在RabbitMQ的帮助下在Celery中执行一项任务。
我已经在Windows 10系统中安装并创建了该用户,但每当我试图通过运行以下命令从Flask应用程序连接到RabbitMQ时,我都会收到下面发布的错误。
celery -A __init__.celery worker -P solo --loglevel INFO
我的__init.py:
from Util import make_celery
from flask import Flask
app.config['CELERY_RESULT_BACKEND'] ='rpc://' #{"broker_url":"", "result_backend":""}
app.config['CELERY_BROKER_URL'] ='amqp://myuser:mypassword@localhost:5672/myvhost'
celery = make_celery(app)
app_ctx = app.app_context()
app_ctx.push()
网址:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
错误日志:
[2023-05-20 17:41:00,805: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 514, in channel
return self.channels[channel_id]
KeyError: None
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
blueprint.start(self)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 37, in start
self.sync(c)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 41, in sync
replies = self.send_hello(c)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 54, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 389, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 106, in _request
return self._prepare(self.app.control.broadcast(
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 741, in broadcast
return self.mailbox(conn)._broadcast(
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\pidbox.py", line 328, in _broadcast
chan = channel or self.connection.default_channel
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 898, in default_channel
self._default_channel = self.channel()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 281, in channel
chan = self.transport.create_channel(self.connection)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\transport\pyamqp.py", line 166, in create_channel
return connection.channel()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 517, in channel
channel.open()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\channel.py", line 448, in open
return self.send_method(
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 79, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 99, in wait
self.connection.drain_events(timeout=timeout)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 525, in drain_events
while not self.blocking_read(timeout):
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 530, in blocking_read
frame = self.transport.read_frame()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 294, in read_frame
frame_header = read(7, True)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 627, in _read
s = recv(n - len(rbuf))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
[2023-05-20 17:41:00,808: WARNING/MainProcess] C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\consumer.py:367: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered back to the queue.
You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss setting.
In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.
warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
[2023-05-20 17:41:00,869: INFO/MainProcess] Connected to amqp://myuser:**@127.0.0.1:5672/myvhost
[2023-05-20 17:41:00,931: INFO/MainProcess] mingle: searching for neighbors
[2023-05-20 17:41:00,932: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 514, in channel
return self.channels[channel_id]
KeyError: None
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
blueprint.start(self)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 37, in start
self.sync(c)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 41, in sync
replies = self.send_hello(c)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\worker\consumer\mingle.py", line 54, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 389, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 106, in _request
return self._prepare(self.app.control.broadcast(
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\celery\app\control.py", line 741, in broadcast
return self.mailbox(conn)._broadcast(
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\pidbox.py", line 328, in _broadcast
chan = channel or self.connection.default_channel
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 898, in default_channel
self._default_channel = self.channel()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\connection.py", line 281, in channel
chan = self.transport.create_channel(self.connection)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\kombu\transport\pyamqp.py", line 166, in create_channel
return connection.channel()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 517, in channel
channel.open()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\channel.py", line 448, in open
return self.send_method(
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 79, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\abstract_channel.py", line 99, in wait
self.connection.drain_events(timeout=timeout)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 525, in drain_events
while not self.blocking_read(timeout):
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\connection.py", line 530, in blocking_read
frame = self.transport.read_frame()
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 294, in read_frame
frame_header = read(7, True)
File "C:\Hoonuit.Source\PersonalCode\MahaveerTrading\venv\lib\site-packages\amqp\transport.py", line 627, in _read
s = recv(n - len(rbuf))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host