为什么这些进程对消息使用不同的类型,同时默默地假设它们的大小相同(以及琐碎和标准布局等)。你是否混淆了各种类型和队列?看起来是这样。
如果你能很好地命名事物,这会有所帮助。此外,还要删除重复。
我会按消息类型分隔队列。按角色命名:
// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;
然后,通过定义通道类型:
using Prio = uint32_t;
template <typename T> struct Channel {
Channel(std::string name);
~Channel();
std::tuple<T, Prio> receive(std::stop_token token);
bool send(T const& msg, Prio prio, std::stop_token token);
private:
using Queue = boost::interprocess::message_queue;
std::string name_;
Queue queue_ = open();
Queue open() {
if constexpr (IsClient) {
return {boost::interprocess::open_only, name_.c_str()};
} else {
return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
}
}
};
现在我们可以简单地说:
Channel<Request> reqQ;
Channel<Response> resQ;
与建筑一样
Ipc(std::string const& queueName) try
: reqQ(queueName + "_req")
, resQ(queueName + "_res")
, mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
std::cerr << "Ipc: " << ex.what() << std::endl;
throw;
}
侦听器将收到的消息排队。类型取决于客户端/服务器模式:
using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
注意我是如何选择安全的
jthread
使用停止标记来协调线程退出:
std::jthread mThread;
std::stop_token mToken = mThread.get_stop_token();
void listen(std::stop_token token) {
while (!token.stop_requested()) {
try {
if constexpr (IsClient)
mInbox.push(get<0>(resQ.receive(token)));
else
mInbox.push(get<0>(reqQ.receive(token)));
if (mCallback)
mCallback();
} catch (std::exception const& ex) {
std::cerr << "Listen: " << ex.what() << std::endl;
}
}
}
外部操作看起来要简单得多,比如:
void wait() { mThread.join(); }
void close() {
mThread.request_stop();
if (std::this_thread::get_id() != mThread.get_id())
wait();
}
bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
std::optional<Incoming> consume() {
if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
return val;
return {};
}
void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }
示例客户端/服务器
让我们定义上面要响应的服务器
Requests
通过发回a和b/2的平方根:
void server() {
ServerIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Request> req = ipc.consume()) {
auto [a, b] = *req;
std::cout << "server received request a:" << a << " b:" << b << std::endl;
if (a == -42 && b == -42) {
std::cout << " -> server handling close request" << std::endl;
ipc.close();
} else {
// send response
ipc.send(Response{sqrt(a), b / 2});
}
}
};
ipc.register_callback(handler);
ipc.wait();
}
这就是全部。请注意,我们添加了一种机制来告诉服务器客户端希望它退出(因为服务器拥有资源)。客户可能看起来像这样:
void client() {
ClientIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Response> res = ipc.consume()) {
auto [a, b] = *res;
std::cout << "client received response a:" << a << " b:" << b << std::endl;
}
};
ipc.register_callback(handler);
for (int i = 0; i < 100; ++i) {
if (rand() % 30 == 0) // Flag set by other thread
ipc.send(Request{i, 2 * i}); // Send request
std::this_thread::sleep_for(10ms);
}
std::cout << "Client sending close command" << std::endl;
ipc.send(Request{-42, -42});
std::cout << "Closing" << std::endl;
ipc.close();
}
它所做的只是发送一些约10秒的请求并记录响应。然后它告诉服务器退出并关闭。只有服务器会删除队列。
切换客户端/服务器的简单方法:
int main(int argc, char** argv) {
if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
server();
else
client();
std::cout << "Bye" << std::endl;
}
现场演示
Live
¹ On Coliru
-
文件
test.h
#pragma once
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <iostream>
#include <optional>
#include <thread>
using namespace std::chrono_literals;
template <typename Request, typename Response, bool IsClient> class Ipc {
private:
static constexpr uint8_t MAX_DEPTH = 5;
using callback_t = std::function<void()>;
callback_t mCallback;
using Prio = uint32_t;
template <typename T> struct Channel {
Channel(std::string name) : name_(std::move(name)) { //
assert(queue_.get_max_msg_size() == sizeof(T));
}
~Channel() {
if (!IsClient) {
std::cerr << "Server cleaning up " << name_ << std::endl;
Queue::remove(name_.c_str());
}
}
std::tuple<T, Prio> receive(std::stop_token token) {
size_t len = 0;
Prio prio = 0;
T msg{};
while (!token.stop_requested()) {
auto deadline = std::chrono::steady_clock::now() + 50ms;
if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
assert(len == sizeof(T));
return {std::move(msg), prio};
}
}
throw std::runtime_error("stop requested");
}
bool send(T const& msg, Prio prio, std::stop_token token) {
while (!token.stop_requested()) {
auto deadline = std::chrono::steady_clock::now() + 50ms;
if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
return true;
}
return false;
}
private:
using Queue = boost::interprocess::message_queue;
std::string name_;
Queue queue_ = open();
Queue open() {
if constexpr (IsClient) {
return {boost::interprocess::open_only, name_.c_str()};
} else {
return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
}
}
};
Channel<Request> reqQ;
Channel<Response> resQ;
std::jthread mThread;
std::stop_token mToken = mThread.get_stop_token();
using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
void listen(std::stop_token token) {
while (!token.stop_requested()) {
try {
if constexpr (IsClient)
mInbox.push(get<0>(resQ.receive(token)));
else
mInbox.push(get<0>(reqQ.receive(token)));
if (mCallback)
mCallback();
} catch (std::exception const& ex) {
std::cerr << "Listen: " << ex.what() << std::endl;
}
}
}
public:
Ipc(std::string const& queueName) try
: reqQ(queueName + "_req")
, resQ(queueName + "_res")
, mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
std::cerr << "Ipc: " << ex.what() << std::endl;
throw;
}
void wait() { mThread.join(); }
void close() {
mThread.request_stop();
if (std::this_thread::get_id() != mThread.get_id())
wait();
}
bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
std::optional<Incoming> consume() {
if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
return val;
return {};
}
void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }
};
-
文件
test.cpp
#include "test.h"
#include <cmath>
#include <set>
// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;
static std::string IPC_NAME = "so_demo_ipc";
void server() {
ServerIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Request> req = ipc.consume()) {
auto [a, b] = *req;
std::cout << "server received request a:" << a << " b:" << b << std::endl;
if (a == -42 && b == -42) {
std::cout << " -> server handling close request" << std::endl;
ipc.close();
} else {
// send response
ipc.send(Response{sqrt(a), b / 2});
}
}
};
ipc.register_callback(handler);
ipc.wait();
}
void client() {
ClientIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Response> res = ipc.consume()) {
auto [a, b] = *res;
std::cout << "client received response a:" << a << " b:" << b << std::endl;
}
};
ipc.register_callback(handler);
for (int i = 0; i < 100; ++i) {
if (rand() % 30 == 0) // Flag set by other thread
ipc.send(Request{i, 2 * i}); // Send request
std::this_thread::sleep_for(10ms);
}
std::cout << "Client sending close command" << std::endl;
ipc.send(Request{-42, -42});
std::cout << "Closing" << std::endl;
ipc.close();
}
int main(int argc, char** argv) {
if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
server();
else
client();
std::cout << "Bye" << std::endl;
}
在本地进行现场演示:
遗憾的是,在线编译器不允许共享内存访问