假设“数据集中”的意思是数据包不代表“消息”:这是正确的。TCP是一种流协议。你得到了一个逻辑流。流在电线上传输的方式无关紧要。
您需要使用一个应用程序协议来了解如何解析流。通常会有信息的框架。常用的选择是分隔符(例如NUL字节)或以长度为前缀的消息。
在您的情况下,消息看起来可能是4个八位字节,这意味着您可以
transfer_exactly(4)
。如果不是,我建议使用NUL分隔符(当然,选择一个不会干扰实际消息内容的分隔符)。
看起来asio_wrapper无法实现这一点。只有当框架不重要,或者必须进行额外的缓冲和解析,这会使处理程序变得复杂时,它才会对您有所帮助。
我建议不要使用包装:
Live On Coliru
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using Message = std::string;
using Handler = std::function<void(Message const& msg)>; // or perhaps an interface
struct Connection : std::enable_shared_from_this<Connection> {
Connection(tcp::socket s, Handler cb) : s_(std::move(s)), cb_(std::move(cb)) {}
void Start() { read_loop(); }
void Write(Message msg) {
post(s_.get_executor(),
[this, self = shared_from_this(), msg = std::move(msg)]() mutable { do_write(std::move(msg)); });
}
void Stop() {
post(s_.get_executor(), [this, self = shared_from_this()]() { s_.cancel(); });
}
private:
tcp::socket s_;
Handler cb_;
Message incoming_;
std::deque<Message> outbox_;
void do_write(Message msg) {
if (!msg.ends_with('\n'))
msg += '\n';
outbox_.push_back(std::move(msg));
if (outbox_.size() == 1)
write_loop();
}
void read_loop() {
asio::async_read_until( //
s_, asio::dynamic_buffer(incoming_), "\n",
[this, self = shared_from_this()](error_code ec, size_t n) {
if (!ec) {
if (cb_ && n)
cb_(incoming_.substr(0, n - 1)); // exclude '\n'
incoming_.erase(0, n);
read_loop();
}
});
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
s_, asio::buffer(outbox_.front()), [this, self = shared_from_this()](error_code ec, size_t) {
if (!ec) {
outbox_.pop_front();
write_loop();
}
});
}
};
struct Server {
Server(uint16_t port, Handler cb) : acc_(io_, {{}, port}), callback_(std::move(cb)) {}
void Start() {
acc_.listen();
accept_loop();
}
void Stop() {
io_.stop();
// acc_.cancel();
// TODO perhaps keep a list of sessions and stop them?
// io_.join();
}
private:
asio::thread_pool io_{1};
tcp::acceptor acc_;
Handler callback_;
void accept_loop() {
acc_.async_accept(make_strand(acc_.get_executor()), [this](error_code ec, tcp::socket s) {
if (!ec) {
std::make_shared<Connection>(std::move(s), callback_)->Start();
accept_loop();
}
});
}
};
struct Client {
Client(std::string host, uint16_t port, Handler cb)
: spec_(host, std::to_string(port))
, callback_(std::move(cb)) {}
void Start() {
assert(!conn_);
tcp::socket socket_(io_);
asio::connect(socket_, tcp::resolver(io_).resolve(spec_));
conn_ = std::make_shared<Connection>(std::move(socket_), callback_);
conn_->Start();
}
void Write(Message msg) const {
assert(conn_);
conn_->Write(std::move(msg));
}
void Stop() {
if (conn_)
conn_->Stop();
}
private:
asio::thread_pool io_{1};
tcp::resolver::query spec_;
Handler callback_;
std::shared_ptr<Connection> conn_;
};
int main() {
try {
Server server(3000, [](std::string const& msg) { std::cout << "Server Rx: " << msg << std::endl; });
server.Start();
Client client("127.0.0.1", 3000,
[](std::string const& msg) { std::cout << "Client Rx: " << msg << std::endl; });
client.Start();
client.Write("abcd");
client.Write("efgh");
client.Write("ijkl");
client.Write("mnop");
client.Write("qrst");
client.Write("uvwx");
client.Write("yz");
client.Stop();
std::this_thread::sleep_for(5ms);
server.Stop(); // don't stop the server before last message arrived
} catch (std::exception const& e) {
std::cout << "exception :" << e.what() << "\n";
}
}
印刷
Server Rx: abcd
Server Rx: efgh
Server Rx: ijkl
Server Rx: mnop
Server Rx: qrst
Server Rx: uvwx
Server Rx: yz