代码之家  ›  专栏  ›  技术社区  ›  zlatan

在控制器中使用rabbitmq消息

  •  0
  • zlatan  · 技术社区  · 6 年前

    我目前正在学习服务、api和amqp消息传递系统,准确地说是rabbitmq,我正在关注这个 tutorial 关于rabbitmq消息传递。我把一切都做好了,但我想在我的项目中改变一些东西。我想从路由和控制器调用发布者和使用者脚本,而不是在终端中键入它们( php src/publisher.php php src/consumer.php )

    首先,我创建了两个路由:

     Route::get('/send-message', 'ServiceAController@index');
     Route::get('/receive-message', 'ServiceBController@index');
    

    第一个路由(send message)用于将http请求作为消息发送到rabbitmq,这是通过postman post请求完成的,在这里我插入了所需的参数。此路线的控制器工作正常,如下所示:

    public function index(Request $request){
        //Returning status 200 and sending message if amount is in range
        if( (-100000000  <= $request->amount )  &&  ($request->amount <= 100000000 )){
            //Sending message to RabbitMQ
            $amount = $request->amount;
            $currency = $request->currency;
    
            //Saving request data to variable to publish it
            $messageContent = json_encode([
                'amount' => $amount * 100,
                'currency' => $currency,
            ]);
    
            //Sending broker message
            $host = 'secret';
            $port = 5672;
            $user = 'secret';
            $pass = 'secret';
            $vhost = 'secret';
            $exchange = 'balance';
            $queue = 'local_balance';
    
            $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
            $channel = $connection->channel();
            /*
                The following code is the same both in the consumer and the producer.
                In this way we are sure we always have a queue to consume from and an
                    exchange where to publish messages.
            */
            /*
                name: $queue
                passive: false
                durable: true // the queue will survive server restarts
                exclusive: false // the queue can be accessed in other channels
                auto_delete: false //the queue won't be deleted once the channel is closed.
            */
            $channel->queue_declare($queue, false, true, false, false);
            /*
                name: $exchange
                type: direct
                passive: false
                durable: true // the exchange will survive server restarts
                auto_delete: false //the exchange won't be deleted once the channel is closed.
            */
            $channel->exchange_declare($exchange, 'direct', false, true, false);
            $channel->queue_bind($queue, $exchange);
            $messageBody = $messageContent;
            $message = new AMQPMessage($messageBody, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            $channel->basic_publish($message, $exchange);
            $channel->close();
            $connection->close();
    
            //Returning json response of HTTP payload
            $response = json_encode([
                'amount' => +number_format($amount, 2, '.', ''),
                'currency' => $currency,
            ]);
            return $response;
        }else{
            //Returning status 400 if amount is not in acceptable range
            abort(400, 'Amount is not in acceptable range'); //Returning code 400 if condition isn't met
        }
    }
    

    但是我的 问题 从我将消费者代码放入ServiceBController开始,与前一个相同。我的ServiceBController如下所示:

    public function index(){
    
        $host = 'secret';
        $port = 5672;
        $user = 'secret';
        $pass = 'secret';
        $vhost = 'secret';
        $exchange = 'balance';
        $queue = 'local_balance';
    
        $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
        $channel = $connection->channel();
        /*
            The following code is the same both in the consumer and the producer.
            In this way we are sure we always have a queue to consume from and an
                exchange where to publish messages.
        */
        /*
            name: $queue
            passive: false
            durable: true // the queue will survive server restarts
            exclusive: false // the queue can be accessed in other channels
            auto_delete: false //the queue won't be deleted once the channel is closed.
        */
        $channel->queue_declare($queue, false, true, false, false);
        /*
            name: $exchange
            type: direct
            passive: false
            durable: true // the exchange will survive server restarts
            auto_delete: false //the exchange won't be deleted once the channel is closed.
        */
        $channel->exchange_declare($exchange, 'direct', false, true, false);
        $channel->queue_bind($queue, $exchange);
        /**
         * @param AMQPMessage $message
         */
        function process_message(AMQPMessage $message){
            $messageBody = json_decode($message->body);
            $amount = $messageBody->amount;
            $currency = $messageBody->currency;
    
            file_put_contents('C:/xampp/htdocs/nsoft/data' . '.json', $message->body);
    
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        }
        /*
            queue: Queue from where to get the messages
            consumer_tag: Consumer identifier
            no_local: Don't receive messages published by this consumer.
            no_ack: Tells the server if the consumer will acknowledge the messages.
            exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
            nowait:
            callback: A PHP Callback
        */
        $consumerTag = 'local.consumer';
        $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
        /**
         * @param \PhpAmqpLib\Channel\AMQPChannel $channel
         * @param \PhpAmqpLib\Connection\AbstractConnection $connection
         */
        function shutdown($channel, $connection){
            $channel->close();
            $connection->close();
        }
    
        register_shutdown_function('shutdown', $channel, $connection);
    
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }
    

    在postman中使用get请求调用它之后,我得到以下错误:

    symfony\component\debug\exception\fatalerrorexception:在第227行的文件C:\xampp\htdocs\nsoft\vendor\php amqplib\php amqplib\phpamqplib\wire\io\streamio.php中,超过了30秒的最长执行时间。

    我已经被这个错误困扰了好几天了,似乎找不到解决办法,所以我需要一些人的帮助。我做错什么了?作为参考,当我将同一个使用者脚本作为单独的文件放置时,它也可以工作( src/consumer.php )当我通过我的终端呼叫时。如有任何帮助,我们将不胜感激。

    2 回复  |  直到 6 年前
        1
  •  0
  •   Muhammad Saqlain    6 年前

    更改参数的值 max_execution_time 在里面 php.ini

    max_execution_time = 360   ;Execution time in seconds
    
        2
  •  0
  •   Filip Koblański    6 年前

    我可以建议您使用rabbitmq队列驱动程序:

    https://github.com/vyuldashev/laravel-queue-rabbitmq

    使用rabbitmq服务器,您可以通过laravel的本机队列系统管理所有消息。在配置之后,您只需要运行: php artisan queue:work 在控制台里。

    有几种方法可以使用处理队列即时工作。例如,您可以使用supervisor:

    http://supervisord.org/