代码之家  ›  专栏  ›  技术社区  ›  Ziri

使用boost::asio::async_write进行多次调用

  •  1
  • Ziri  · 技术社区  · 2 年前

    我正在尝试提升asio,测试代码基于 asio-wrapper

    是否有任何方法可以在不中继的情况下防止数据集中 std::this_thread::sleep_for 在这种情况下?

    这是测试代码:

    #include<iostream>
    #include <vector>
    #include<thread>
    #include<chrono>
    #include "asio_wrapper.hpp"
        
        
    class WritableHandlerImplServer : public cppeng::tcp::WritableHandler {
        public:
            void HandleCallback(std::vector<uint8_t> data,
                    std::shared_ptr<cppeng::tcp::Writable> writable) override {
                    // Convert void pointer to string
                    std::string received(data.begin(), data.end());
                    std::cout << "Server Rx: " << received << std::endl;
    
                }
                void NotifyClosed(std::shared_ptr<cppeng::tcp::Writable> ptr) override {
                    std::cout << "Server: Connection closed!" << std::endl;
                }
            };
    
    
    class WritableHandlerImplClient : public cppeng::tcp::WritableHandler {
        public:
            void HandleCallback(std::vector<uint8_t> data,
                std::shared_ptr<cppeng::tcp::Writable> writable) override {
                // Convert void pointer to string
                std::string received(data.begin(), data.end());
                std::cout << "Client Rx: " << received << std::endl;
                
                rx_flag_ = true;
            }
            void NotifyClosed(std::shared_ptr<cppeng::tcp::Writable> ptr) override {
                std::cout << "Client: Connection closed!" << std::endl;
            }
            bool rx_flag_{ false };
    };
    
    
    int main()
    {
        /*this is a test code*/
        try {
    
            WritableHandlerImplServer writable_handler_server;
            WritableHandlerImplClient writable_handler_client;
    
            cppeng::tcp::Server server(3000, writable_handler_server);
            server.Start();
            cppeng::tcp::Client client("127.0.0.1", 3000, writable_handler_client);
            client.Start(1000);
    
            
            client.Write("abcd");
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
            client.Write("efgh");
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
            client.Write("ijkl");
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
            client.Write("mnop");
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
            client.Write("qrst");
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
            client.Write("uvwx");
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
            client.Write("yz");
                
            
            server.Stop();
            client.Stop();
    
        }
        catch (std::exception& e) {
            std::cout << "exception :" << e.what() << "\n";
        }
    
        return EXIT_SUCCESS;
    }
    

    enter image description here

    写入/读取功能:

    void Connection::DoRead() {
    
        auto self(shared_from_this());
        boost::asio::async_read(socket_,
            boost::asio::buffer(read_buffer_.data(), read_buffer_.size()),
            boost::asio::transfer_at_least(1),
                        [this, self](std::error_code ec, std::size_t length) {
                            if (!ec) {
                            std::vector<uint8_t> rx_data(read_buffer_.begin(), read_buffer_.begin() + length);
                            writable_handler_.HandleCallback(std::move(rx_data), shared_from_this());
                            DoRead();
                            } else {
                            Close();
                            }
                        });
    }
    
    void Connection::DoWrite() {
        auto self(shared_from_this());
        boost::asio::async_write(socket_,
                        boost::asio::buffer(write_queue_.front().data(),
                                        write_queue_.front().size()),
                        [this, self](std::error_code ec, std::size_t /*length*/) {
                            if (!ec) {
                            write_queue_.pop();
                            if (!write_queue_.empty()) {
                                DoWrite();
                            } else {
                                write_busy_ = false;
                            }
                            } else {
                            Close();
                            }
                        });
    }
    

    更新:

    源代码作者实现包装器写入函数的方式是,如果boost::asio写入函数正忙于写入,则第二条消息将丢失!

        void Connection::Write(const std::string &data) {
          // Add the data to the queue
          write_queue_.emplace(data.begin(), data.end());
          // Start the write process only if we are not already busy writing
          if (!write_busy_) {
            write_busy_ = true;
            DoWrite();
          }
          else {
             
          }
        }
    
    1 回复  |  直到 2 年前
        1
  •  1
  •   sehe    2 年前

    假设“数据集中”的意思是数据包不代表“消息”:这是正确的。TCP是一种流协议。你得到了一个逻辑流。流在电线上传输的方式无关紧要。

    您需要使用一个应用程序协议来了解如何解析流。通常会有信息的框架。常用的选择是分隔符(例如NUL字节)或以长度为前缀的消息。

    在您的情况下,消息看起来可能是4个八位字节,这意味着您可以 transfer_exactly(4) 。如果不是,我建议使用NUL分隔符(当然,选择一个不会干扰实际消息内容的分隔符)。

    看起来asio_wrapper无法实现这一点。只有当框架不重要,或者必须进行额外的缓冲和解析,这会使处理程序变得复杂时,它才会对您有所帮助。

    我建议不要使用包装:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::chrono_literals;
    
    using Message = std::string;
    using Handler = std::function<void(Message const& msg)>; // or perhaps an interface
    
    struct Connection : std::enable_shared_from_this<Connection> {
        Connection(tcp::socket s, Handler cb) : s_(std::move(s)), cb_(std::move(cb)) {}
    
        void Start() { read_loop(); }
    
        void Write(Message msg) {
            post(s_.get_executor(),
                 [this, self = shared_from_this(), msg = std::move(msg)]() mutable { do_write(std::move(msg)); });
        }
    
        void Stop() {
            post(s_.get_executor(), [this, self = shared_from_this()]() { s_.cancel(); });
        }
    
      private:
        tcp::socket s_;
        Handler     cb_;
    
        Message             incoming_;
        std::deque<Message> outbox_;
    
        void do_write(Message msg) {
            if (!msg.ends_with('\n'))
                msg += '\n';
            outbox_.push_back(std::move(msg));
            if (outbox_.size() == 1)
                write_loop();
        }
    
        void read_loop() {
            asio::async_read_until( //
                s_, asio::dynamic_buffer(incoming_), "\n",
                [this, self = shared_from_this()](error_code ec, size_t n) {
                    if (!ec) {
                        if (cb_ && n)
                            cb_(incoming_.substr(0, n - 1)); // exclude '\n'
                        incoming_.erase(0, n);
                        read_loop();
                    }
                });
        }
    
        void write_loop() {
            if (outbox_.empty())
                return;
            asio::async_write( //
                s_, asio::buffer(outbox_.front()), [this, self = shared_from_this()](error_code ec, size_t) {
                    if (!ec) {
                        outbox_.pop_front();
                        write_loop();
                    }
                });
        }
    };
    
    struct Server {
        Server(uint16_t port, Handler cb) : acc_(io_, {{}, port}), callback_(std::move(cb)) {}
    
        void Start() {
            acc_.listen();
            accept_loop();
        }
    
        void Stop() {
            io_.stop();
            // acc_.cancel();
            //  TODO perhaps keep a list of sessions and stop them?
            // io_.join();
        }
    
      private:
        asio::thread_pool io_{1};
        tcp::acceptor     acc_;
        Handler           callback_;
    
        void accept_loop() {
            acc_.async_accept(make_strand(acc_.get_executor()), [this](error_code ec, tcp::socket s) {
                if (!ec) {
                    std::make_shared<Connection>(std::move(s), callback_)->Start();
                    accept_loop();
                }
            });
        }
    };
    
    struct Client {
        Client(std::string host, uint16_t port, Handler cb)
            : spec_(host, std::to_string(port))
            , callback_(std::move(cb)) {}
    
        void Start() {
            assert(!conn_);
            tcp::socket socket_(io_);
            asio::connect(socket_, tcp::resolver(io_).resolve(spec_));
            conn_ = std::make_shared<Connection>(std::move(socket_), callback_);
            conn_->Start();
        }
    
        void Write(Message msg) const {
            assert(conn_);
            conn_->Write(std::move(msg));
        }
    
        void Stop() {
            if (conn_)
                conn_->Stop();
        }
    
      private:
        asio::thread_pool           io_{1};
        tcp::resolver::query        spec_;
        Handler                     callback_;
        std::shared_ptr<Connection> conn_;
    };
    
    int main() {
        try {
            Server server(3000, [](std::string const& msg) { std::cout << "Server Rx: " << msg << std::endl; });
            server.Start();
    
            Client client("127.0.0.1", 3000,
                          [](std::string const& msg) { std::cout << "Client Rx: " << msg << std::endl; });
            client.Start();
    
            client.Write("abcd");
            client.Write("efgh");
            client.Write("ijkl");
            client.Write("mnop");
            client.Write("qrst");
            client.Write("uvwx");
            client.Write("yz");
    
            client.Stop();
    
            std::this_thread::sleep_for(5ms);
            server.Stop(); // don't stop the server before last message arrived
        } catch (std::exception const& e) {
            std::cout << "exception :" << e.what() << "\n";
        }
    }
    

    印刷

    Server Rx: abcd
    Server Rx: efgh
    Server Rx: ijkl
    Server Rx: mnop
    Server Rx: qrst
    Server Rx: uvwx
    Server Rx: yz