代码之家  ›  专栏  ›  技术社区  ›  moumout

AMQPCPP消费者和发布者不创建交换和队列

  •  0
  • moumout  · 技术社区  · 2 年前

    消息未消费:当您运行my_publish发布消息,然后运行my_consume消费消息时,消息不会按预期消费。 为解决问题所采取的步骤&故障排除步骤:

    • 消息路由:像RabbitMQ这样的消息代理的主要目的是将消息从发布者路由到消费者。如果消息没有到达消费者,可能是由于配置错误的交换或绑定。我已经验证了它们,但我会提供以下代码,以防我遗漏了什么。
    • 权限:用户具有管理员权限。
    • 网络和防火墙:我已经确认您的RabbitMQ服务器可以访问。
    • 我已经检查了RabbitMQ服务器的状态,安装并配置了ufw以允许端口5672上的传入连接,并验证了您可以成功ping RabbitMQ服务器。
    • 主机名解析:验证连接字符串中的主机名或IP地址是否正确,以及主机名解析是否有效。
    • RabbitMQ配置:仔细检查RabbitMQ服务器的配置,包括交换机、队列和权限。
    • RabbitMQ管理接口:访问RabbitMQ管理接口以检查服务器的状态、连接、交换、队列和绑定。不管我有多个aatemp,都没有创建/添加任何交换或队列。
    • 附加日志记录:我在发布消息后添加了一个print语句,这意味着my_publish的执行非常顺利。
    • RabbitMQ服务器版本兼容性:确保您的RabbitMQ服务器版本与C++程序中使用的amqpcpp库版本兼容。 '节点状态rabbit@thimonnier-desktop。。。 运行时

    操作系统PID:56992 操作系统:Linux 正常运行时间(秒):15076 RabbitMQ版本:3.8.2 节点名称:rabbit@thimonnier-desktop Erlang配置:Erlang/OTP 22[erts-10.6.4][source][64 bit][smp:12:12][ds:12:12:10][async线程:192] Erlang进程:已使用444个,限制为1048576个 计划程序运行队列:1 群集检测信号超时(net_ticktime):60' AMQP-CPP 4 https://github.com/CopernicaMarketingSoftware/AMQP-CPP

    -->为了学习和创建这些代码,我遵循了本回购的说明 https://github.com/yuanhui360/CPP-Programming-on-Linux/tree/main/YH-109 ,代码相似,但这是AMQPCPP 4库的新版本。 希望我已经很好地解释了我所做的事情,你们中的一些人可以帮助我。 以下分别是my_publish和my_consume的代码:

    /**
     *  Method that is called by the AMQP library when the login attempt
     *  succeeded. After this method has been called, the connection is ready
     *  to use.
     *  @param  connection      The connection that can now be used
     */
    virtual void onReady(AMQP::Connection *connection)
    {
        // @todo
        //  add your own implementation, for example by creating a channel
        //  instance, and start publishing or consuming
    }
    
    /**
     *  Method that is called by the AMQP library when a fatal error occurs
     *  on the connection, for example because data received from RabbitMQ
     *  could not be recognized.
     *  @param  connection      The connection on which the error occurred
     *  @param  message         A human readable error message
     */
    virtual void onError(AMQP::Connection *connection, const char *message)
    {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program, log the error, and destruct the
        //  connection object because it is no longer in a usable state
    }
    
    /**
     *  Method that is called when the connection was closed. This is the
     *  counter part of a call to Connection::close() and it confirms that the
     *  AMQP connection was correctly closed.
     *
     *  @param  connection      The connection that was closed and that is now unusable
     */
    virtual void onClosed(AMQP::Connection *connection) 
    {
        // @todo
        //  add your own implementation, for example by closing down the
        //  underlying TCP connection too
    }
    

    };

    int main(int argc,char**argv){

    #include <amqpcpp.h>
    
    //  amqp://[$USERNAME[:$PASSWORD]\@]$HOST[:$PORT]/[$VHOST]
    //  AMQP amqp("test:[email protected]:5672/my_vhost");     // all connect string
    
    using namespace std;
    
    // You'll need to extend the ConnectionHandler class and make your own, like this
    class MyConnectionHandler : public AMQP::ConnectionHandler
    {
        /**
         *  Method that is called by the AMQP library every time it has data
         *  available that should be sent to RabbitMQ.
         *  @param  connection  pointer to the main connection object
         *  @param  data        memory buffer with the data that should be sent to RabbitMQ
         *  @param  size        size of the buffer
         */
        virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
        {
            // @todo
            //  Add your own implementation, for example by doing a call to the
            //  send() system call. But be aware that the send() call may not
            //  send all data at once, so you also need to take care of buffering
            //  the bytes that could not immediately be sent, and try to send
            //  them again when the socket becomes writable again
            // AMQP::Channel channel(connection);
            // // Define the queue you want to consume from
            // const std::string queueName = "tcpp_image_message";
            // // Declare the queue
            // channel.declareQueue(queueName);
            // // Set up the message callback
            // channel.consume(queueName).onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
            //     // Handle the received message
            //     std::string data = message.body();
            //     std::cout << "Received message: " << data << std::endl;
            //     // Acknowledge the message to remove it from the queue
            //     channel.ack(deliveryTag);
            // });
        }
    
        /**
         *  Method that is called by the AMQP library when the login attempt
         *  succeeded. After this method has been called, the connection is ready
         *  to use.
         *  @param  connection      The connection that can now be used
         */
        virtual void onReady(AMQP::Connection *connection)
        {
            // @todo
            //  add your own implementation, for example by creating a channel
            //  instance, and start publishing or consuming
        }
    
        /**
         *  Method that is called by the AMQP library when a fatal error occurs
         *  on the connection, for example because data received from RabbitMQ
         *  could not be recognized.
         *  @param  connection      The connection on which the error occurred
         *  @param  message         A human readable error message
         */
        virtual void onError(AMQP::Connection *connection, const char *message)
        {
            // @todo
            //  add your own implementation, for example by reporting the error
            //  to the user of your program, log the error, and destruct the
            //  connection object because it is no longer in a usable state
        }
    
        /**
         *  Method that is called when the connection was closed. This is the
         *  counter part of a call to Connection::close() and it confirms that the
         *  AMQP connection was correctly closed.
         *
         *  @param  connection      The connection that was closed and that is now unusable
         */
        virtual void onClosed(AMQP::Connection *connection) 
        {
            // @todo
            //  add your own implementation, for example by closing down the
            //  underlying TCP connection too
        }
    
    
    };
    
    int main (int argc, char** argv) {
        std::string msg_body;
    
        if (argc != 3)
        {
            std::cout << "Usage: my_publish <host:port> <exchange> <queue>" << std::endl;
            return 1;
        }
        
        
        char const* ex_name = argv[1];
        char const* queue_name = argv[2];
        const AMQP::Login login;
        
        try {
        
            MyConnectionHandler myHandler;
            AMQP::Connection connection(&myHandler, AMQP::Login("test", "test"), "/my_vhost");
            AMQP::Channel channel(&connection);
            channel.declareExchange(ex_name, AMQP::fanout);
            // create a custom callback
            auto callback = [](const std::string &name, int msgcount, int consumercount) {
        
                // @todo add your own implementation
        
            };
            
            channel.declareQueue(queue_name).onSuccess(std::move(callback));
            channel.bindQueue(ex_name, queue_name, "");
            // start a transaction
            channel.startTransaction();
            // Create a lambda function to publish a message
            auto publishMessage = [&channel, ex_name](const std::string& message) {  // Capture ex_name
            AMQP::Envelope env(message.c_str(), message.length());
            channel.publish(ex_name,"key", env);
            };
            // commit the transactions, and set up callbacks that are called when
            // the transaction was successful or not
            channel.commitTransaction()
                .onSuccess([]() {
                    // all messages were successfully published
                })
                .onError([](const char *message) {
                    // none of the messages were published
                    // now we have to do it all over again
                });
        
            // Enter a loop to continuously wait for user input and publish messages
            while (true) {
                std::string userMessage;
                std::cout << "Enter a message to publish (or type 'exit' to quit): ";
                std::getline(std::cin, userMessage);
        
                if (userMessage == "exit") {
                    break; // Exit the loop if the user enters 'exit'
                }
        
                // Publish the user's message
                publishMessage(userMessage);
                std::cout << "Execution Troubleshooting:";
            }
        
        } catch (const std::exception& e) {
            std::cerr << "Error: " << e.what() << std::endl;
            // Handle the exception appropriately
        }
        return 0;
    }
    
    #include <amqpcpp.h>
    
    //  amqp://[$USERNAME[:$PASSWORD]\@]$HOST[:$PORT]/[$VHOST]
    //  AMQP amqp("test:[email protected]:5672/my_vhost");     // all connect string
    
    using namespace std;
    
    // You'll need to extend the ConnectionHandler class and make your own, like this
    class MyConnectionHandler : public AMQP::ConnectionHandler
    {
        /**
         *  Method that is called by the AMQP library every time it has data
         *  available that should be sent to RabbitMQ.
         *  @param  connection  pointer to the main connection object
         *  @param  data        memory buffer with the data that should be sent to RabbitMQ
         *  @param  size        size of the buffer
         */
        virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
        {
            // @todo
            //  Add your own implementation, for example by doing a call to the
            //  send() system call. But be aware that the send() call may not
            //  send all data at once, so you also need to take care of buffering
            //  the bytes that could not immediately be sent, and try to send
            //  them again when the socket becomes writable again
            // AMQP::Channel channel(connection);
            // // Define the queue you want to consume from
            // const std::string queueName = "tcpp_image_message";
            // // Declare the queue
            // channel.declareQueue(queueName);
            // // Set up the message callback
            // channel.consume(queueName).onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
            //     // Handle the received message
            //     std::string data = message.body();
            //     std::cout << "Received message: " << data << std::endl;
            //     // Acknowledge the message to remove it from the queue
            //     channel.ack(deliveryTag);
            // });
        }
    /**
     *  Method that is called by the AMQP library when the login attempt
     *  succeeded. After this method has been called, the connection is ready
     *  to use.
     *  @param  connection      The connection that can now be used
     */
    virtual void onReady(AMQP::Connection *connection)
    {
        // @todo
        //  add your own implementation, for example by creating a channel
        //  instance, and start publishing or consuming
    }
    
    /**
     *  Method that is called by the AMQP library when a fatal error occurs
     *  on the connection, for example because data received from RabbitMQ
     *  could not be recognized.
     *  @param  connection      The connection on which the error occurred
     *  @param  message         A human readable error message
     */
    virtual void onError(AMQP::Connection *connection, const char *message)
    {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program, log the error, and destruct the
        //  connection object because it is no longer in a usable state
    }
    
    /**
     *  Method that is called when the connection was closed. This is the
     *  counter part of a call to Connection::close() and it confirms that the
     *  AMQP connection was correctly closed.
     *
     *  @param  connection      The connection that was closed and that is now unusable
     */
    virtual void onClosed(AMQP::Connection *connection) 
    {
        // @todo
        //  add your own implementation, for example by closing down the
        //  underlying TCP connection too
    }
    
    int main (int argc, char** argv) {
        
        std::string msg_body;
        
        if (argc != 2)
        {
            std::cout << "Usage: my_consume <host:port> <queue>" << std::endl;
            return 1;
        }
        
        
        char const* queue_name = argv[1];
        
        try {
        
            MyConnectionHandler myHandler;
            AMQP::Connection connection(&myHandler, AMQP::Login("test", "test"), "/my_vhost");
            AMQP::Channel channel(&connection);
            channel.declareQueue(queue_name);
            // callback function that is called when the consume operation starts
            auto startCb = [](const std::string &consumertag) {
        
                std::cout << "consume operation started" << std::endl;
            };
        
            // callback function that is called when the consume operation failed
            auto errorCb = [](const char *message) {
        
                std::cout << "consume operation failed" << std::endl;
            };
        
            // callback operation when a message was received
            auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
        
                std::cout << "message received" << std::endl;
        
                // acknowledge the message
                channel.ack(deliveryTag);
            };
        
            // callback that is called when the consumer is cancelled by RabbitMQ (this only happens in
            // rare situations, for example when someone removes the queue that you are consuming from)
            auto cancelledCb = [](const std::string &consumertag) {
        
                std::cout << "consume operation cancelled by the RabbitMQ server" << std::endl;
            };
        
            // start consuming from the queue, and install the callbacks
            channel.consume(queue_name)
                .onReceived(messageCb)
                .onSuccess(startCb)
                .onCancelled(cancelledCb)
                .onError(errorCb);
        
        
        
            // Other application logic, error handling, and cleanup
        
        } catch (const std::exception &e) {
            std::cerr << "Error: " << e.what() << std::endl;
            // Handle the exception appropriately
        }
        
        return 0;
    }
    

    终端:

    $make clean rm-f*.o my_consume my_get my_publish

    $make my_publish my_consume

    g++-c-墙-g-std=c++17-I/usr/include my_publish.cpp-o my_publish。o

    g++-lssh-pthread-Wl,--无需-ldl-L/usr/lib-L/usr/local/lib/aarch64 linux gnu-lrabbitmq-lamqpcpp my_publish.o my_publish g

    ++-c-Wall-g-std=c++17-I/usr/include my_consume.cpp-o my_consume。o

    g++-lssh-pthread-Wl,--无需-ldl-L/usr/lib-L/usr/local/lib/aarch64 linux gnu-lrabbitmq-lamqpcpp my_consume.o-o my_consume

    $/my_publish my_exchange my_queue 输入要发布的消息(或键入“退出”退出):嗨,RabbitMQ消息,我的名字是moumout 执行疑难解答:输入要发布的消息(或键入“exit”退出):退出

    $/my_consume my_queue###应为“收到消息”,未打印任何内容

    $

    0 回复  |  直到 2 年前
    推荐文章