代码之家  ›  专栏  ›  技术社区  ›  Dark Sorrow

随机boost::进程间异常::library_boost:进程间错误::message_queue

  •  0
  • Dark Sorrow  · 技术社区  · 1 年前

    我使用MessageQueue为IPC创建了一个模板类。
    我在无限while循环(称为主循环)中运行我的程序。
    我通过以太网从各种子系统(传感器)收集数据,并使用消息队列将接收到的数据传递给适当的进程(它们是多个不同的进程,可以充当数据接收器,每个进程都有自己的消息队列)。
    我刚刚运行了程序,没有进行任何活动。这是唯一正在运行的程序,每次运行前我都会重新启动操作系统。
    程序只是在while循环中运行,其中所有标志都设置为false;因此,程序只是运行一个空白(空)循环。
    我随机得到 boost::interprocess_exception::library_error 由于他们没有活动,我希望他们不会出错。

    我删除了与以太网相关的代码,但仍然出现了同样的错误。

    我在语句中遇到了错误:

    if (primaryNode == true)
    {
        this->mSecondaryToPrimaryMessageQueue->receive(
            &receiveData,
            sizeof(receiveData),
            receiveLength,
            priority
        );
    }
    else
    {
        this->mPrimaryToSecondaryMessageQueue->receive(
            &receiveData,
            sizeof(receiveData),
            receiveLength,
            priority
        );
    }
    

    我尝试将primaryNode设置为true或false。我也犯了同样的错误。

    代码:

    ipc.hpp

    #pragma once
    
    #include <thread>
    #include <string>
    #include <atomic>
    #include <memory>
    #include <variant>
    #include <optional>
    #include <iostream>
    #include <functional>
    #include <boost/lockfree/spsc_queue.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    
    /// @brief 
    /// @tparam T1 Specifies the data-type that has to be sent
    /// @tparam T2 Specifies the data-type that has will be received
    /// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
    template<typename T1, typename T2, bool primaryNode>
    class Ipc
    {
    private:
        static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
        using callback_t = std::function<void(void)>;
        callback_t mCallback;
        std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
        std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;
    
        std::string         mPrimaryToSecondaryMessageQueueName;
        std::string         mSecondaryToPrimaryMessageQueueName;
    
        std::thread         mReceiveThread;
        std::atomic_bool    mExitReceiveThread{ false };
        boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;
    
        void listen(void);
    public:
        Ipc() {}
        bool open(const std::string& queueName);
        bool close(void);
        bool send(const T1& data, std::uint32_t priority = 10);
        std::optional<T2> receive(void);
        bool register_callback(callback_t callback_implementation);
        bool isDataAvailableInReceiveDataQueue(void) const;
    };
    
    
    template<typename T1, typename T2, bool primaryNode>
    inline void Ipc<T1, T2, primaryNode>::listen(void)
    {
        T2                          receiveData;//Buffer to store received data
        std::uint64_t               receiveLength;
        std::uint32_t               priority;
        while(this->mExitReceiveThread.load() == false)
        {
            try
            {
                std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
                receiveLength = 0; //Initialize read length to 0
                priority = 0; //Initialize priority to 0
                if (primaryNode == true)
                {
                    this->mSecondaryToPrimaryMessageQueue->receive(
                        &receiveData,
                        sizeof(receiveData),
                        receiveLength,
                        priority
                    );
                }
                else
                {
                    this->mPrimaryToSecondaryMessageQueue->receive(
                        &receiveData,
                        sizeof(receiveData),
                        receiveLength,
                        priority
                    );
                }
                this->mReceiveDataQueue.push(receiveData);
                this->mCallback();
            }
            catch (const std::exception& ex)
            {
                std::cout << "Inside Listen Exception\n";
                std::cout << ex.what() << std::endl;
            }
        }
    }
    
    template<typename T1, typename T2, bool primaryNode>
    inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
    {
        try
        {
            if(primaryNode == true)
            {
                this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
                this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
            }
            else
            {
                this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
                this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
            }
            //Open-Create message queue to send data from primaryNode node to secondary node
            this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
                boost::interprocess::open_or_create,
                this->mPrimaryToSecondaryMessageQueueName.c_str(),
                MAX_MESSAGE_DEPTH,
                sizeof(T1)
            );
    
            //Open-Create message queue to send data from secondary node to primaryNode node  
            this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
                boost::interprocess::open_or_create,
                this->mSecondaryToPrimaryMessageQueueName.c_str(),
                MAX_MESSAGE_DEPTH,
                sizeof(T2)
            );
    
            //Start Listner Thread
            this->mReceiveThread = std::thread(&Ipc::listen, this);
            return true;
        }
        catch (const std::exception& ex)
        {
            std::cout << ex.what() << std::endl;
            return false;
        }
    }
    
    template<typename T1, typename T2, bool primaryNode>
    inline bool Ipc<T1, T2, primaryNode>::close(void)
    {
        try
        {
            this->mExitReceiveThread.store(true); //Marked to close thread
            boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
            boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
        }
        catch (const std::exception& ex)
        {
            std::cout << ex.what() << std::endl;
            return false;
        }
    }
    
    template<typename T1, typename T2, bool primaryNode>
    inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
    {
        try
        {
            if (primaryNode == true) //Send message on Primary to Secondary Queue
            {
                this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
            }
            else //Send message on Secondary to Primary Queue
            {
                this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
            }
            return true;
        }
        catch (const std::exception& ex)
        {
            std::cout << ex.what() << std::endl;
            return false;
        }
    }
    
    template<typename T1, typename T2, bool primaryNode>
    inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
    {
        std::optional<T2> data{ std::nullopt };
        if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
        {
            data = this->mReceiveDataQueue.front();
            this->mReceiveDataQueue.pop();
        }
        else
        {
            //data = std::nullopt; //Not needed 
        }
        return data;
    }
    
    template<typename T1, typename T2, bool primaryNode>
    inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
    {
        try
        {
            this->mCallback = callbackImplementation;
            return true;
        }
        catch (const std::exception& ex)
        {
            std::cerr << ex.what() << '\n';
        }
        return false;
    }
    
    template<typename T1, typename T2, bool primaryNode>
    inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
    {
        if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
        {
            return true;
        }
        else
        {
            return false;
        }
    }
    

    main.cpp

    #include <ipc.hpp>
    #include <iostream>
    
    //P1 stands for Process 1
    //P2 stands for Process 2
    struct P1ToP2
    {
        float a;
        int b;
    };
    
    struct P2ToP1
    {
        int a;
        int b;
    };
    
    Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object
    
    void message_queue_data_received(void)
    {
        if (ipc1.isDataAvailableInReceiveDataQueue() == true)
        {
            auto s = ipc1.receive();
            if (s.has_value() == true)
            {
                std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
            }
    
        }
    }
    
    int main(int argc, char *argv[])
    {
        bool dataReceivedOnEthernet = false;
        
        ipc1.register_callback(message_queue_data_received);
        this->ipc1.open("ipc1");
        
        while(true)
        {
            if(dataReceivedOnEthernet == true) //Flag set by other thread
            {
                P1ToP2 p;
                p.a = 10.23; //Some Data received over ethernet
                p.b = 10; //Some Data received over ethernet
                ipc1.send(p); //Send data over IPC
            }
            //Other Code
        }
    }
    

    错误

    boost::interprocess_exception::library_error
    
    0 回复  |  直到 1 年前
        1
  •  1
  •   sehe    1 年前

    为什么这些进程对消息使用不同的类型,同时默默地假设它们的大小相同(以及琐碎和标准布局等)。你是否混淆了各种类型和队列?看起来是这样。

    如果你能很好地命名事物,这会有所帮助。此外,还要删除重复。

    我会按消息类型分隔队列。按角色命名:

    // Requests are sent by client to server.
    // Responses are sent by server to client.
    struct Request { int a, b; };
    struct Response { double a; int b; };
    using ClientIpc = Ipc<Request, Response, true>;
    using ServerIpc = Ipc<Request, Response, false>;
    

    然后,通过定义通道类型:

    using Prio = uint32_t;
    template <typename T> struct Channel {
        Channel(std::string name);
        ~Channel();
    
        std::tuple<T, Prio> receive(std::stop_token token);
        bool send(T const& msg, Prio prio, std::stop_token token);
    
      private:
        using Queue = boost::interprocess::message_queue;
        std::string name_;
        Queue       queue_ = open();
    
        Queue open() {
            if constexpr (IsClient) {
                return {boost::interprocess::open_only, name_.c_str()};
            } else {
                return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
            }
        }
    };
    

    现在我们可以简单地说:

    Channel<Request>  reqQ;
    Channel<Response> resQ;
    

    与建筑一样

     Ipc(std::string const& queueName) try
        : reqQ(queueName + "_req")
        , resQ(queueName + "_res")
        , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
    } catch (std::exception const& ex) {
        std::cerr << "Ipc: " << ex.what() << std::endl;
        throw;
    }
    

    侦听器将收到的消息排队。类型取决于客户端/服务器模式:

    using Incoming = std::conditional_t<IsClient, Response, Request>;
    boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
    

    注意我是如何选择安全的 jthread 使用停止标记来协调线程退出:

    std::jthread      mThread;
    std::stop_token   mToken = mThread.get_stop_token();
    
    void listen(std::stop_token token) {
        while (!token.stop_requested()) {
            try {
                if constexpr (IsClient)
                    mInbox.push(get<0>(resQ.receive(token)));
                else
                    mInbox.push(get<0>(reqQ.receive(token)));
    
                if (mCallback)
                    mCallback();
            } catch (std::exception const& ex) {
                std::cerr << "Listen: " << ex.what() << std::endl;
            }
        }
    }
    

    外部操作看起来要简单得多,比如:

    void wait() { mThread.join(); }
    
    void close() {
        mThread.request_stop();
        if (std::this_thread::get_id() != mThread.get_id())
            wait();
    }
    
    bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
    bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
    
    std::optional<Incoming> consume() {
        if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
            return val;
        return {};
    }
    
    void register_callback(callback_t cb) { mCallback = cb; }
    bool haveMessage() const { return mInbox.read_available(); }
    

    示例客户端/服务器

    让我们定义上面要响应的服务器 Requests 通过发回a和b/2的平方根:

    void server() {
        ServerIpc ipc(IPC_NAME);
    
        auto handler = [&ipc] {
            assert(ipc.haveMessage());
            if (std::optional<Request> req = ipc.consume()) {
                auto [a, b] = *req;
                std::cout << "server received request a:" << a << " b:" << b << std::endl;
    
                if (a == -42 && b == -42) {
                    std::cout << " -> server handling close request" << std::endl;
                    ipc.close();
                } else {
                    // send response
                    ipc.send(Response{sqrt(a), b / 2});
                }
            }
        };
    
        ipc.register_callback(handler);
        ipc.wait();
    }
    

    这就是全部。请注意,我们添加了一种机制来告诉服务器客户端希望它退出(因为服务器拥有资源)。客户可能看起来像这样:

    void client() {
        ClientIpc ipc(IPC_NAME);
    
        auto handler = [&ipc] {
            assert(ipc.haveMessage());
            if (std::optional<Response> res = ipc.consume()) {
                auto [a, b] = *res;
                std::cout << "client received response a:" << a << " b:" << b << std::endl;
            }
        };
    
        ipc.register_callback(handler);
    
        for (int i = 0; i < 100; ++i) {
            if (rand() % 30 == 0)            // Flag set by other thread
                ipc.send(Request{i, 2 * i}); // Send request
    
            std::this_thread::sleep_for(10ms);
        }
    
        std::cout << "Client sending close command" << std::endl;
        ipc.send(Request{-42, -42});
    
        std::cout << "Closing" << std::endl;
        ipc.close();
    }
    

    它所做的只是发送一些约10秒的请求并记录响应。然后它告诉服务器退出并关闭。只有服务器会删除队列。

    切换客户端/服务器的简单方法:

    int main(int argc, char** argv) {
        if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
            server();
        else
            client();
    
        std::cout << "Bye" << std::endl;
    }
    

    现场演示

    Live ¹ On Coliru

    • 文件 test.h

       #pragma once
      
       #include <boost/interprocess/ipc/message_queue.hpp>
       #include <boost/lockfree/spsc_queue.hpp>
       #include <iostream>
       #include <optional>
       #include <thread>
       using namespace std::chrono_literals;
      
       template <typename Request, typename Response, bool IsClient> class Ipc {
         private:
           static constexpr uint8_t MAX_DEPTH = 5;
      
           using callback_t = std::function<void()>;
           callback_t mCallback;
      
           using Prio = uint32_t;
           template <typename T> struct Channel {
               Channel(std::string name) : name_(std::move(name)) { //
                   assert(queue_.get_max_msg_size() == sizeof(T));
               }
      
               ~Channel() {
                   if (!IsClient) {
                       std::cerr << "Server cleaning up " << name_ << std::endl;
                       Queue::remove(name_.c_str());
                   }
               }
      
               std::tuple<T, Prio> receive(std::stop_token token) {
                   size_t len  = 0;
                   Prio   prio = 0;
                   T      msg{};
      
                   while (!token.stop_requested()) {
                       auto deadline = std::chrono::steady_clock::now() + 50ms;
                       if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
                           assert(len == sizeof(T));
                           return {std::move(msg), prio};
                       }
                   }
      
                   throw std::runtime_error("stop requested");
               }
      
               bool send(T const& msg, Prio prio, std::stop_token token) {
                   while (!token.stop_requested()) {
                       auto deadline = std::chrono::steady_clock::now() + 50ms;
                       if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
                           return true;
                   }
                   return false;
               }
      
             private:
               using Queue = boost::interprocess::message_queue;
               std::string name_;
               Queue       queue_ = open();
      
               Queue open() {
                   if constexpr (IsClient) {
                       return {boost::interprocess::open_only, name_.c_str()};
                   } else {
                       return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
                   }
               }
           };
      
           Channel<Request>  reqQ;
           Channel<Response> resQ;
           std::jthread      mThread;
           std::stop_token   mToken = mThread.get_stop_token();
      
           using Incoming = std::conditional_t<IsClient, Response, Request>;
           boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
      
           void listen(std::stop_token token) {
               while (!token.stop_requested()) {
                   try {
                       if constexpr (IsClient)
                           mInbox.push(get<0>(resQ.receive(token)));
                       else
                           mInbox.push(get<0>(reqQ.receive(token)));
      
                       if (mCallback)
                           mCallback();
                   } catch (std::exception const& ex) {
                       std::cerr << "Listen: " << ex.what() << std::endl;
                   }
               }
           }
      
         public:
           Ipc(std::string const& queueName) try
               : reqQ(queueName + "_req")
               , resQ(queueName + "_res")
               , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
           } catch (std::exception const& ex) {
               std::cerr << "Ipc: " << ex.what() << std::endl;
               throw;
           }
      
           void wait() { mThread.join(); }
      
           void close() {
               mThread.request_stop();
               if (std::this_thread::get_id() != mThread.get_id())
                   wait();
           }
      
           bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
           bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
      
           std::optional<Incoming> consume() {
               if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
                   return val;
               return {};
           }
      
           void register_callback(callback_t cb) { mCallback = cb; }
           bool haveMessage() const { return mInbox.read_available(); }
       };
      
    • 文件 test.cpp

       #include "test.h"
       #include <cmath>
       #include <set>
      
       // Requests are sent by client to server.
       // Responses are sent by server to client.
       struct Request { int a, b; };
       struct Response { double a; int b; };
       using ClientIpc = Ipc<Request, Response, true>;
       using ServerIpc = Ipc<Request, Response, false>;
      
       static std::string IPC_NAME = "so_demo_ipc";
       void server() {
           ServerIpc ipc(IPC_NAME);
      
           auto handler = [&ipc] {
               assert(ipc.haveMessage());
               if (std::optional<Request> req = ipc.consume()) {
                   auto [a, b] = *req;
                   std::cout << "server received request a:" << a << " b:" << b << std::endl;
      
                   if (a == -42 && b == -42) {
                       std::cout << " -> server handling close request" << std::endl;
                       ipc.close();
                   } else {
                       // send response
                       ipc.send(Response{sqrt(a), b / 2});
                   }
               }
           };
      
           ipc.register_callback(handler);
           ipc.wait();
       }
      
       void client() {
           ClientIpc ipc(IPC_NAME);
      
           auto handler = [&ipc] {
               assert(ipc.haveMessage());
               if (std::optional<Response> res = ipc.consume()) {
                   auto [a, b] = *res;
                   std::cout << "client received response a:" << a << " b:" << b << std::endl;
               }
           };
      
           ipc.register_callback(handler);
      
           for (int i = 0; i < 100; ++i) {
               if (rand() % 30 == 0)            // Flag set by other thread
                   ipc.send(Request{i, 2 * i}); // Send request
      
               std::this_thread::sleep_for(10ms);
           }
      
           std::cout << "Client sending close command" << std::endl;
           ipc.send(Request{-42, -42});
      
           std::cout << "Closing" << std::endl;
           ipc.close();
       }
      
       int main(int argc, char** argv) {
           if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
               server();
           else
               client();
      
           std::cout << "Bye" << std::endl;
       }
      

    在本地进行现场演示:

    enter image description here

    遗憾的是,在线编译器不允许共享内存访问