消息未消费:当您运行my_publish发布消息,然后运行my_consume消费消息时,消息不会按预期消费。 为解决问题所采取的步骤&故障排除步骤:
操作系统PID:56992 操作系统:Linux 正常运行时间(秒):15076 RabbitMQ版本:3.8.2 节点名称:rabbit@thimonnier-desktop Erlang配置:Erlang/OTP 22[erts-10.6.4][source][64 bit][smp:12:12][ds:12:12:10][async线程:192] Erlang进程:已使用444个,限制为1048576个 计划程序运行队列:1 群集检测信号超时(net_ticktime):60' AMQP-CPP 4 https://github.com/CopernicaMarketingSoftware/AMQP-CPP
-->为了学习和创建这些代码,我遵循了本回购的说明 https://github.com/yuanhui360/CPP-Programming-on-Linux/tree/main/YH-109 ,代码相似,但这是AMQPCPP 4库的新版本。 希望我已经很好地解释了我所做的事情,你们中的一些人可以帮助我。 以下分别是my_publish和my_consume的代码:
/**
* Method that is called by the AMQP library when the login attempt
* succeeded. After this method has been called, the connection is ready
* to use.
* @param connection The connection that can now be used
*/
virtual void onReady(AMQP::Connection *connection)
{
// @todo
// add your own implementation, for example by creating a channel
// instance, and start publishing or consuming
}
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized.
* @param connection The connection on which the error occurred
* @param message A human readable error message
*/
virtual void onError(AMQP::Connection *connection, const char *message)
{
// @todo
// add your own implementation, for example by reporting the error
// to the user of your program, log the error, and destruct the
// connection object because it is no longer in a usable state
}
/**
* Method that is called when the connection was closed. This is the
* counter part of a call to Connection::close() and it confirms that the
* AMQP connection was correctly closed.
*
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(AMQP::Connection *connection)
{
// @todo
// add your own implementation, for example by closing down the
// underlying TCP connection too
}
};
int main(int argc,char**argv){
#include <amqpcpp.h>
// amqp://[$USERNAME[:$PASSWORD]\@]$HOST[:$PORT]/[$VHOST]
// AMQP amqp("test:[email protected]:5672/my_vhost"); // all connect string
using namespace std;
// You'll need to extend the ConnectionHandler class and make your own, like this
class MyConnectionHandler : public AMQP::ConnectionHandler
{
/**
* Method that is called by the AMQP library every time it has data
* available that should be sent to RabbitMQ.
* @param connection pointer to the main connection object
* @param data memory buffer with the data that should be sent to RabbitMQ
* @param size size of the buffer
*/
virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
{
// @todo
// Add your own implementation, for example by doing a call to the
// send() system call. But be aware that the send() call may not
// send all data at once, so you also need to take care of buffering
// the bytes that could not immediately be sent, and try to send
// them again when the socket becomes writable again
// AMQP::Channel channel(connection);
// // Define the queue you want to consume from
// const std::string queueName = "tcpp_image_message";
// // Declare the queue
// channel.declareQueue(queueName);
// // Set up the message callback
// channel.consume(queueName).onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
// // Handle the received message
// std::string data = message.body();
// std::cout << "Received message: " << data << std::endl;
// // Acknowledge the message to remove it from the queue
// channel.ack(deliveryTag);
// });
}
/**
* Method that is called by the AMQP library when the login attempt
* succeeded. After this method has been called, the connection is ready
* to use.
* @param connection The connection that can now be used
*/
virtual void onReady(AMQP::Connection *connection)
{
// @todo
// add your own implementation, for example by creating a channel
// instance, and start publishing or consuming
}
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized.
* @param connection The connection on which the error occurred
* @param message A human readable error message
*/
virtual void onError(AMQP::Connection *connection, const char *message)
{
// @todo
// add your own implementation, for example by reporting the error
// to the user of your program, log the error, and destruct the
// connection object because it is no longer in a usable state
}
/**
* Method that is called when the connection was closed. This is the
* counter part of a call to Connection::close() and it confirms that the
* AMQP connection was correctly closed.
*
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(AMQP::Connection *connection)
{
// @todo
// add your own implementation, for example by closing down the
// underlying TCP connection too
}
};
int main (int argc, char** argv) {
std::string msg_body;
if (argc != 3)
{
std::cout << "Usage: my_publish <host:port> <exchange> <queue>" << std::endl;
return 1;
}
char const* ex_name = argv[1];
char const* queue_name = argv[2];
const AMQP::Login login;
try {
MyConnectionHandler myHandler;
AMQP::Connection connection(&myHandler, AMQP::Login("test", "test"), "/my_vhost");
AMQP::Channel channel(&connection);
channel.declareExchange(ex_name, AMQP::fanout);
// create a custom callback
auto callback = [](const std::string &name, int msgcount, int consumercount) {
// @todo add your own implementation
};
channel.declareQueue(queue_name).onSuccess(std::move(callback));
channel.bindQueue(ex_name, queue_name, "");
// start a transaction
channel.startTransaction();
// Create a lambda function to publish a message
auto publishMessage = [&channel, ex_name](const std::string& message) { // Capture ex_name
AMQP::Envelope env(message.c_str(), message.length());
channel.publish(ex_name,"key", env);
};
// commit the transactions, and set up callbacks that are called when
// the transaction was successful or not
channel.commitTransaction()
.onSuccess([]() {
// all messages were successfully published
})
.onError([](const char *message) {
// none of the messages were published
// now we have to do it all over again
});
// Enter a loop to continuously wait for user input and publish messages
while (true) {
std::string userMessage;
std::cout << "Enter a message to publish (or type 'exit' to quit): ";
std::getline(std::cin, userMessage);
if (userMessage == "exit") {
break; // Exit the loop if the user enters 'exit'
}
// Publish the user's message
publishMessage(userMessage);
std::cout << "Execution Troubleshooting:";
}
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
// Handle the exception appropriately
}
return 0;
}
#include <amqpcpp.h>
// amqp://[$USERNAME[:$PASSWORD]\@]$HOST[:$PORT]/[$VHOST]
// AMQP amqp("test:[email protected]:5672/my_vhost"); // all connect string
using namespace std;
// You'll need to extend the ConnectionHandler class and make your own, like this
class MyConnectionHandler : public AMQP::ConnectionHandler
{
/**
* Method that is called by the AMQP library every time it has data
* available that should be sent to RabbitMQ.
* @param connection pointer to the main connection object
* @param data memory buffer with the data that should be sent to RabbitMQ
* @param size size of the buffer
*/
virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
{
// @todo
// Add your own implementation, for example by doing a call to the
// send() system call. But be aware that the send() call may not
// send all data at once, so you also need to take care of buffering
// the bytes that could not immediately be sent, and try to send
// them again when the socket becomes writable again
// AMQP::Channel channel(connection);
// // Define the queue you want to consume from
// const std::string queueName = "tcpp_image_message";
// // Declare the queue
// channel.declareQueue(queueName);
// // Set up the message callback
// channel.consume(queueName).onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
// // Handle the received message
// std::string data = message.body();
// std::cout << "Received message: " << data << std::endl;
// // Acknowledge the message to remove it from the queue
// channel.ack(deliveryTag);
// });
}
/**
* Method that is called by the AMQP library when the login attempt
* succeeded. After this method has been called, the connection is ready
* to use.
* @param connection The connection that can now be used
*/
virtual void onReady(AMQP::Connection *connection)
{
// @todo
// add your own implementation, for example by creating a channel
// instance, and start publishing or consuming
}
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized.
* @param connection The connection on which the error occurred
* @param message A human readable error message
*/
virtual void onError(AMQP::Connection *connection, const char *message)
{
// @todo
// add your own implementation, for example by reporting the error
// to the user of your program, log the error, and destruct the
// connection object because it is no longer in a usable state
}
/**
* Method that is called when the connection was closed. This is the
* counter part of a call to Connection::close() and it confirms that the
* AMQP connection was correctly closed.
*
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(AMQP::Connection *connection)
{
// @todo
// add your own implementation, for example by closing down the
// underlying TCP connection too
}
int main (int argc, char** argv) {
std::string msg_body;
if (argc != 2)
{
std::cout << "Usage: my_consume <host:port> <queue>" << std::endl;
return 1;
}
char const* queue_name = argv[1];
try {
MyConnectionHandler myHandler;
AMQP::Connection connection(&myHandler, AMQP::Login("test", "test"), "/my_vhost");
AMQP::Channel channel(&connection);
channel.declareQueue(queue_name);
// callback function that is called when the consume operation starts
auto startCb = [](const std::string &consumertag) {
std::cout << "consume operation started" << std::endl;
};
// callback function that is called when the consume operation failed
auto errorCb = [](const char *message) {
std::cout << "consume operation failed" << std::endl;
};
// callback operation when a message was received
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "message received" << std::endl;
// acknowledge the message
channel.ack(deliveryTag);
};
// callback that is called when the consumer is cancelled by RabbitMQ (this only happens in
// rare situations, for example when someone removes the queue that you are consuming from)
auto cancelledCb = [](const std::string &consumertag) {
std::cout << "consume operation cancelled by the RabbitMQ server" << std::endl;
};
// start consuming from the queue, and install the callbacks
channel.consume(queue_name)
.onReceived(messageCb)
.onSuccess(startCb)
.onCancelled(cancelledCb)
.onError(errorCb);
// Other application logic, error handling, and cleanup
} catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
// Handle the exception appropriately
}
return 0;
}
终端:
$make clean rm-f*.o my_consume my_get my_publish
$make my_publish my_consume
g++-c-墙-g-std=c++17-I/usr/include my_publish.cpp-o my_publish。o
g++-lssh-pthread-Wl,--无需-ldl-L/usr/lib-L/usr/local/lib/aarch64 linux gnu-lrabbitmq-lamqpcpp my_publish.o my_publish g
++-c-Wall-g-std=c++17-I/usr/include my_consume.cpp-o my_consume。o
g++-lssh-pthread-Wl,--无需-ldl-L/usr/lib-L/usr/local/lib/aarch64 linux gnu-lrabbitmq-lamqpcpp my_consume.o-o my_consume
$/my_publish my_exchange my_queue 输入要发布的消息(或键入“退出”退出):嗨,RabbitMQ消息,我的名字是moumout 执行疑难解答:输入要发布的消息(或键入“exit”退出):退出
$/my_consume my_queue###应为“收到消息”,未打印任何内容
$
