代码之家  ›  专栏  ›  技术社区  ›  Jeremiah Valerio

运行Pika线程以创建到RabbitMQ的多个连接

  •  0
  • Jeremiah Valerio  · 技术社区  · 3 年前

    我正在创建一个组件,它将读取4个队列,每个队列将由最多5个线程组成。问题是,只有一个具有5个线程的队列正在初始化。

    以下是我正在做的事情的代码:

            # Set up RabbitMQ connection and channel TODO parameter config
            #self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel = self.connection.channel()
    
            # Declare the queues
            for queue_name in self.queue_names:
                self.channel.queue_declare(queue=queue_name)
    
        def consume(self):
            # Create a thread pool executor for consuming messages
            with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
                # Submit a task for each queue and thread
                for queue_name in self.queue_names:
                    for i in range(self.num_threads):
                        executor.submit(self._consume_thread, queue_name)
    
    
        def _consume_thread(self, queue_name):
            print(f'Conenction Established: {queue_name}')
            # Set up a new connection and channel for each thread
            #connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            channel = connection.channel()
    
            # Declare the queue
            channel.queue_declare(queue=queue_name)
            channel.basic_qos(prefetch_count= 100)
            # Start consuming messages
            channel.basic_consume(queue=queue_name, on_message_callback=self._on_message)
            channel.start_consuming()
    
        def _on_message(self, channel, method, properties, body):
            try:
                # Process the message here
                #print(f"Received message from {method.routing_key}: {body}")
                message = json.loads(body)
                self.correlator.correlate(message)
                # Acknowledge the message
                channel.basic_ack(delivery_tag=method.delivery_tag)
            except:
                pass
                
    
        def close(self):
            # Close the RabbitMQ connection
            self.connection.close()
    
    def main():
        consumer = RabbitMQConsumer(queue_names=['C1PERFQ', 'C1DESCQ', 'C1EVENTQ', 'C1RELQ'], num_threads=5)
        print("Starting")
        consumer.consume()
        print("Consuming")
    
    if __name__ == "__main__":
        # Initialize the logger once as the application starts up
        with open('./logging.yaml', 'rt') as f:
            config = yaml.safe_load(f.read())
        # Create logs folder if it does not exists
        if not os.path.exists('logs'):
            os.makedirs('logs')
        logging.config.dictConfig(config)
        logger = logging.getLogger(__name__)
        logger.info('Now Logging %s ', __name__)
        main()
    
    

    我预计它会初始化20个连接,因为它有4个队列和5个线程。但只有列表中的第一个正在初始化。

    0 回复  |  直到 3 年前