我正在创建一个组件,它将读取4个队列,每个队列将由最多5个线程组成。问题是,只有一个具有5个线程的队列正在初始化。
以下是我正在做的事情的代码:
# Set up RabbitMQ connection and channel TODO parameter config
#self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
# Declare the queues
for queue_name in self.queue_names:
self.channel.queue_declare(queue=queue_name)
def consume(self):
# Create a thread pool executor for consuming messages
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
# Submit a task for each queue and thread
for queue_name in self.queue_names:
for i in range(self.num_threads):
executor.submit(self._consume_thread, queue_name)
def _consume_thread(self, queue_name):
print(f'Conenction Established: {queue_name}')
# Set up a new connection and channel for each thread
#connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue=queue_name)
channel.basic_qos(prefetch_count= 100)
# Start consuming messages
channel.basic_consume(queue=queue_name, on_message_callback=self._on_message)
channel.start_consuming()
def _on_message(self, channel, method, properties, body):
try:
# Process the message here
#print(f"Received message from {method.routing_key}: {body}")
message = json.loads(body)
self.correlator.correlate(message)
# Acknowledge the message
channel.basic_ack(delivery_tag=method.delivery_tag)
except:
pass
def close(self):
# Close the RabbitMQ connection
self.connection.close()
def main():
consumer = RabbitMQConsumer(queue_names=['C1PERFQ', 'C1DESCQ', 'C1EVENTQ', 'C1RELQ'], num_threads=5)
print("Starting")
consumer.consume()
print("Consuming")
if __name__ == "__main__":
# Initialize the logger once as the application starts up
with open('./logging.yaml', 'rt') as f:
config = yaml.safe_load(f.read())
# Create logs folder if it does not exists
if not os.path.exists('logs'):
os.makedirs('logs')
logging.config.dictConfig(config)
logger = logging.getLogger(__name__)
logger.info('Now Logging %s ', __name__)
main()
我预计它会初始化20个连接,因为它有4个队列和5个线程。但只有列表中的第一个正在初始化。