同步与异步模式简介
同步
- 同步模式下,程序会等待某个操作完成后再继续执行后续代码
- 典型场景:阻塞式 socket 通信,read/write 操作会阻塞线程直到数据收发完成
- 优点:编程简单,逻辑清晰,易于理解和调试
- 缺点:效率低,无法充分利用多核资源,容易造成线程阻塞,难以支撑高并发
void session(socket_ptr sock){
for (;;) {
char data[MAX_LENGTH];
size_t length = sock->read_some(boost::asio::buffer(data, MAX_LENGTH), error);
// ...处理数据...
boost::asio::write(*sock, boost::asio::buffer(data, length));
}
}
异步
- 异步模式下,操作会立即返回,实际的读写操作在后台进行,完成后通过回调函数通知主程序
- 典型场景:事件驱动的高性能服务器,利用回调机制处理 I/O
- 优点:高并发、资源利用率高、适合大规模网络服务
- 缺点:编程复杂,回调嵌套多,生命周期管理难度大
void Session::Start() {
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, ...));
}
Boost.Asio 网络库使用
核心组件
io_context:所有异步操作的核心,事件循环。socket:封装 TCP/UDP 套接字。acceptor:服务器端用于监听和接受连接。buffer:数据缓冲区。error_code:错误处理。
同步模式下的典型流程
- 创建
io_context - 创建
acceptor并绑定端口 - 循环接受连接,创建 socket
- 阻塞读写数据
异步模式下的典型流程
- 创建
io_context - 创建
acceptor,异步接受连接 - 每个连接创建 Session 对象,异步读写
- 通过回调函数处理数据和连接生命周期
开发注意点:
- 异步模式下常用
std::shared_ptr和enable_shared_from_this管理 Session 对象生命周期,防止回调时对象被提前销毁。 - TCP 是流协议,消息边界不固定,需自定义协议头解决粘包问题,一般用tlv协议,或者应用层http来拆包,下面的例子直接用2字节表示后面数据的大小
一个asio异步echo服务器示例
AsyncServer.h
#pragma once
#include <iostream>
#include <boost/asio.hpp>
#include <memory>
#include <map>
#include <queue>
#include <mutex>
#include <cstdint>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
using boost::asio::ip::tcp;
#define HEAD_SIZE 2
#define MAX_SENDQUEUE_SIZE 1000
/*
异步读写的echo服务器示例
用伪闭包延长生命周期
防止 callback 执行时对象被销毁
技术用的是智能指针
*/
class Server;
class Session;
class MsgNode
{
friend class Session;
public:
// 用于发送
MsgNode(char *msg, int max_len) : _total_len(max_len + HEAD_SIZE), _cur_len(0)
{
_data = new char[_total_len + 1];
// 转成网络字节序
std::uint16_t len_net = boost::asio::detail::socket_ops::host_to_network_short(static_cast<std::uint16_t>(max_len));
memcpy(_data, &len_net, HEAD_SIZE);
memcpy(_data + HEAD_SIZE, msg, max_len);
_data[_total_len] = '\0';
}
// 用于接收
MsgNode(int max_len) : _total_len(max_len), _cur_len(0)
{
_data = new char[_total_len + 1];
memset(_data, 0, _total_len + 1);
}
~MsgNode()
{
delete[] _data;
}
void Clear()
{
memset(_data, 0, _total_len);
_cur_len = 0;
}
private:
int _cur_len;
int _total_len;
char *_data;
};
class Session : public std::enable_shared_from_this<Session>
{
private:
tcp::socket _socket;
enum
{
max_length = 1024 * 2
};
char _data[max_length];
Server *_server;
std::string _uuid;
std::queue<std::shared_ptr<MsgNode>> _send_que;
std::mutex _send_lock;
// 收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
// 收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;
bool head_parse;
public:
Session(boost::asio::io_context &ioc, Server *server)
: _socket(ioc), _server(server), _recv_head_node(std::make_shared<MsgNode>(HEAD_SIZE)), head_parse(false)
{
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
}
tcp::socket &Socket()
{
return _socket;
}
std::string &GetUuid();
void Start();
void Close();
void handle_read(const boost::system::error_code &error, size_t bytes_transfered, std::shared_ptr<Session> _self_shared);
void handle_write(const boost::system::error_code &error, std::shared_ptr<Session> _self_shared);
void Send(char *msg, int max_length);
};
class Server
{
public:
Server(boost::asio::io_context &ioc, short port);
void ClearSession(std::string _uuid);
private:
void start_accept();
void handle_accept(std::shared_ptr<Session>, const boost::system::error_code &error);
boost::asio::io_context &_ioc;
tcp::acceptor _acceptor;
std::map<std::string, std::shared_ptr<Session>> _sessions;
};
AsyncServer.cpp
#include <iostream>
#include "AsyncServer.h"
// =================================== session func below ======================================
void Session::Send(char *msg, int max_length)
{
bool pending = false;
std::lock_guard<std::mutex> lock(_send_lock);
if (_send_que.size() > 0)
{
pending = true;
}
if (_send_que.size() > MAX_SENDQUEUE_SIZE){
std::cout << "session: " << _uuid << "send que fulled, size is: " << MAX_SENDQUEUE_SIZE << std::endl;
return;
}
_send_que.push(std::make_shared<MsgNode>(msg, max_length));
if (pending)
{
return;
}
auto &msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&Session::handle_write, this, std::placeholders::_1, shared_from_this()));
}
std::string &Session::GetUuid()
{
return _uuid;
}
void Session::Start()
{
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, shared_from_this()));
}
void Session::Close()
{
// 清理发送队列并安全关闭 socket,然后通知 Server 清除会话
{
std::lock_guard<std::mutex> lock(_send_lock);
while (!_send_que.empty())
_send_que.pop();
}
boost::system::error_code ec;
if (_socket.is_open())
{
// 尝试有序关闭连接,忽略可能的错误
_socket.shutdown(tcp::socket::shutdown_both, ec);
_socket.close(ec);
}
}
void Session::handle_read(const boost::system::error_code &error, size_t bytes_transfered, std::shared_ptr<Session> _self_shared)
{
if (!error)
{
// 已经移动的字符数
int copy_len = 0;
while (bytes_transfered > 0)
{
if (!head_parse)
{
// 收到的数据不足头部大小
if (bytes_transfered + _recv_head_node->_cur_len < HEAD_SIZE)
{
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transfered);
// 头部已经储存的长度增加
_recv_head_node->_cur_len += bytes_transfered;
// 清空Session的缓存
memset(_data, 0, max_length);
// 继续监听
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
// 收到的数据比头部长度大
// 头部剩余未复制的长度
int head_remain = HEAD_SIZE - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
// 更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transfered -= head_remain;
// 获取头部数据
// 数据大小
std::uint16_t data_len_net = 0;
memcpy(&data_len_net, _recv_head_node->_data, HEAD_SIZE);
// 字节序转换
int data_len = boost::asio::detail::socket_ops::network_to_host_short(data_len_net);
std::cout << "data_len is " << data_len << std::endl;
// 头部长度非法
if (data_len <= 0 || data_len > max_length)
{
std::cout << "invalid data length is " << data_len << std::endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = std::make_shared<MsgNode>(data_len);
// 1. 当前回调读取到的消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transfered < data_len)
{
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transfered);
_recv_msg_node->_cur_len += bytes_transfered;
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
// 头部处理完成
head_parse = true;
return;
}
// 2. 当前回调已读取到整条消息(头+体),且缓冲区中还有多余数据(发生粘包)
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transfered -= data_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
std::cout << "receive data is " << _recv_msg_node->_data << std::endl;
// 此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
// 继续轮询剩余未处理数据
head_parse = false;
_recv_head_node->Clear();
if (bytes_transfered <= 0)
{
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
continue;
}
// 已经处理完头部,处理上次未接受完的消息数据
// 1.1 接收的数据仍不足剩余未处理的
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transfered < remain_msg)
{
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transfered);
_recv_msg_node->_cur_len += bytes_transfered;
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
// 1.2 接收的数据满足处理长度条件,这里也发生粘包
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transfered -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
std::cout << "receive data is " << _recv_msg_node->_data << std::endl;
// 此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
// 继续轮询剩余未处理数据
head_parse = false;
_recv_head_node->Clear();
if (bytes_transfered <= 0)
{
::memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
continue;
}
}
else
{
if (error != boost::asio::error::eof)
{
std::cout << "handle read failed, error is " << error.what() << std::endl;
}
// Close();
_server->ClearSession(_uuid);
}
}
void Session::handle_write(const boost::system::error_code &error, std::shared_ptr<Session> _self_shared)
{
if (!error)
{
std::lock_guard<std::mutex> lock(_send_lock);
_send_que.pop();
if (!_send_que.empty())
{
auto &msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&Session::handle_write, this, std::placeholders::_1, _self_shared));
}
}
else
{
std::cout << "handle write failed, error is " << error.what() << std::endl;
_server->ClearSession(_uuid);
}
}
// =================================== session func above ======================================
// =================================== server func below ======================================
Server::Server(boost::asio::io_context &ioc, short port) : _ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port))
{
std::cout << "Server Start on port :" << port << std::endl;
start_accept();
}
void Server::ClearSession(std::string _uuid)
{
_sessions.erase(_uuid);
}
void Server::start_accept()
{
std::shared_ptr<Session> new_session = std::make_shared<Session>(_ioc, this);
_acceptor.async_accept(new_session->Socket(),
std::bind(&Server::handle_accept, this, new_session, std::placeholders::_1));
}
void Server::handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code &error)
{
if (!error)
{
new_session->Start();
_sessions.insert(std::make_pair(new_session->GetUuid(), new_session));
}
else
{
// delete new_session;
}
start_accept();
}
// =================================== server func above ======================================
int main()
{
try
{
boost::asio::io_context ioc;
Server s(ioc, 8080);
ioc.run();
}
catch (const std::exception &e)
{
std::cerr << e.what() << '\n';
}
}
这里代码写的有点耦合了,Session类应该另起一个源文件,清楚一点。
此外,这里的async_read_some函数调用时不限制长度,造成了切包时的复杂处理,可以用async_read函数来指定长度,较为方便直观,这个api封装了async_read_some,免去手动划分长度。
handle_read_head
void Session::handle_read_head(const boost::system::error_code &error, size_t bytes_transfered, std::shared_ptr<Session> _self_shared)
{
if (!error)
{
if (bytes_transfered != HEAD_SIZE)
{
std::cout << "invalid header size is " << bytes_transfered << std::endl;
_server->ClearSession(_uuid);
return;
}
std::uint16_t data_len_net = 0;
memcpy(&data_len_net, _recv_head_node->_data, HEAD_SIZE);
int data_len = boost::asio::detail::socket_ops::network_to_host_short(data_len_net);
std::cout << "data_len is " << data_len << std::endl;
if (data_len <= 0 || data_len > max_length)
{
std::cout << "invalid data length is " << data_len << std::endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = std::make_shared<MsgNode>(data_len);
boost::asio::async_read(_socket, boost::asio::buffer(_recv_msg_node->_data, data_len),
std::bind(&Session::handle_read_body, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
}
else
{
if (error != boost::asio::error::eof)
{
std::cout << "handle read failed, error is " << error.what() << std::endl;
}
// Close();
_server->ClearSession(_uuid);
}
}
handle_read_body
void Session::handle_read_body(const boost::system::error_code &error, size_t bytes_transfered, std::shared_ptr<Session> _self_shared)
{
if (!error)
{
if (bytes_transfered != static_cast<size_t>(_recv_msg_node->_total_len))
{
std::cout << "invalid body size is " << bytes_transfered << std::endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node->_cur_len = static_cast<int>(bytes_transfered);
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
std::cout << "receive data is " << _recv_msg_node->_data << std::endl;
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
_recv_head_node->Clear();
boost::asio::async_read(_socket, boost::asio::buffer(_recv_head_node->_data, HEAD_SIZE),
std::bind(&Session::handle_read_head, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
}
else
{
if (error != boost::asio::error::eof)
{
std::cout << "handle read body failed, error is " << error.what() << std::endl;
}
_server->ClearSession(_uuid);
}
}
改进:增加IOservice多线程处理消息请求,并且把逻辑层和网络接收层解耦
graph TD A[服务端启动入口] B[主 io_context] D[Server acceptor 监听端口] E[AsioIOServicePool 线程池] F[GetIOService 轮询分配] G[Session socket] H[LogicSystem 业务队列] I[JsonClient 客户端] A --> B A --> E E --> D I --> D D --> F F --> G G --> H H --> G