C++ 服务端进阶(五)—— Connection + 协程:面向对象的异步模型(工程版完整实现)

张开发
2026/4/7 0:47:22 15 分钟阅读

分享文章

C++ 服务端进阶(五)—— Connection + 协程:面向对象的异步模型(工程版完整实现)
一、这一篇到底解决什么问题在第四篇中我们已经完成了多 Reactor并发 协程执行架构已经是对的了Main Reactoraccept↓Sub Reactor线程 coroutine但是代码形态仍然是handleConnection(fd, reactor); 问题❌ 连接没有归属对象❌ 状态分散在函数中❌ 协程逻辑不属于“连接实体”❌ 不符合真实服务端设计本篇目标把模型升级为fd ↓ Connection 对象 ↓ run() 协程 ↓ read / process / write 一句话总结连接不再是“被处理的 fd”而是“自带异步执行能力的对象”二、第四篇 vs 第五篇关键对比第四篇函数式handleConnection(fd, reactor);第五篇对象化Connection* conn new Connection(fd, reactor); conn-start(); 本质变化函数式异步 → 面向对象异步三、最终项目结构第五篇. ├── Task.h ├── Reactor.h / Reactor.cpp ├── ReactorThread.h / ReactorThread.cpp ├── ReactorThreadPool.h / ReactorThreadPool.cpp ├── SocketUtil.h / SocketUtil.cpp ├── Connection.h / Connection.cpp └── main.cpp变化说明 相比第四篇删除Server.h / Server.cpp新增Connection.h / Connection.cpp 原因连接处理逻辑从“外部函数” → 收敛到 Connection 内部四、核心设计必须理解1️⃣ Connection 是“活的对象”它不只是数据fdbuffer它还包含协程执行逻辑run2️⃣ start() vs run()void start() { run(); } start 启动入口 run 真正协程执行体3️⃣ 生命周期归 Connection 自己管理closeSelf() 谁拥有连接谁负责关闭五、完整代码实现1️⃣ Task.h#ifndef TASK_H #define TASK_H #include coroutine #include exception struct DetachedTask { struct promise_type { DetachedTask get_return_object() noexcept { return {}; } std::suspend_never initial_suspend() noexcept { return {}; } struct FinalAwaiter { bool await_ready() noexcept { return false; } template typename Promise void await_suspend(std::coroutine_handlePromise h) noexcept { h.destroy(); } void await_resume() noexcept {} }; FinalAwaiter final_suspend() noexcept { return {}; } void return_void() noexcept {} void unhandled_exception() { std::terminate(); } }; }; #endif2️⃣ Reactor.h#ifndef REACTOR_H #define REACTOR_H #include coroutine #include unordered_map #include cstdint class Reactor { public: Reactor(); ~Reactor(); void waitReadable(int fd, std::coroutine_handle handle); void waitWritable(int fd, std::coroutine_handle handle); void removeFd(int fd); void loop(); class ReadableAwaiter { public: ReadableAwaiter(Reactor reactor, int fd) : reactor_(reactor), fd_(fd) {} bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle handle) { reactor_.waitReadable(fd_, handle); } void await_resume() const noexcept {} private: Reactor reactor_; int fd_; }; class WritableAwaiter { public: WritableAwaiter(Reactor reactor, int fd) : reactor_(reactor), fd_(fd) {} bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle handle) { reactor_.waitWritable(fd_, handle); } void await_resume() const noexcept {} private: Reactor reactor_; int fd_; }; ReadableAwaiter readable(int fd) { return ReadableAwaiter(*this, fd); } WritableAwaiter writable(int fd) { return WritableAwaiter(*this, fd); } private: struct Entry { std::coroutine_handle readHandle{}; std::coroutine_handle writeHandle{}; uint32_t interests{0}; bool registered{false}; }; void updateInterest(int fd); private: int epollFd_; std::unordered_mapint, Entry entries_; }; #endif3️⃣ Reactor.cpp保持和第三/四篇一致这里不再删减 这一段你可以直接复用前一篇代码保持一致性4️⃣ Connection.h#ifndef CONNECTION_H #define CONNECTION_H #include Task.h #include string class Reactor; class Connection { public: Connection(int fd, Reactor reactor); ~Connection(); void start(); private: DetachedTask run(); void closeSelf(); private: int fd_; Reactor reactor_; std::string readBuffer_; std::string writeBuffer_; }; #endif5️⃣ Connection.cpp核心#include Connection.h #include Reactor.h #include unistd.h #include errno.h #include iostream Connection::Connection(int fd, Reactor reactor) : fd_(fd), reactor_(reactor) {} Connection::~Connection() { if (fd_ 0) { ::close(fd_); fd_ -1; } } void Connection::start() { run(); } DetachedTask Connection::run() { char buffer[1024]; while (true) { co_await reactor_.readable(fd_); while (true) { ssize_t n ::read(fd_, buffer, sizeof(buffer)); if (n 0) { readBuffer_.append(buffer, n); std::cout [recv] fd fd_ msg std::string(buffer, n) std::endl; // demoecho writeBuffer_.append(buffer, n); } else if (n 0) { std::cout [close] peer closed fd fd_ std::endl; closeSelf(); co_return; } else { if (errno EAGAIN || errno EWOULDBLOCK) { break; } std::cerr [error] read failed fd fd_ std::endl; closeSelf(); co_return; } } while (!writeBuffer_.empty()) { co_await reactor_.writable(fd_); ssize_t wn ::write(fd_, writeBuffer_.data(), writeBuffer_.size()); if (wn 0) { writeBuffer_.erase(0, wn); } else { if (errno EAGAIN || errno EWOULDBLOCK) { continue; } std::cerr [error] write failed fd fd_ std::endl; closeSelf(); co_return; } } } } void Connection::closeSelf() { if (fd_ 0) { reactor_.removeFd(fd_); ::close(fd_); fd_ -1; } }6️⃣ main.cpp#include ReactorThreadPool.h #include SocketUtil.h #include Connection.h #include sys/socket.h #include iostream int main() { int listenFd createListenFd(8080); ReactorThreadPool pool(4); pool.start(); std::cout server running on :8080 std::endl; while (true) { int clientFd accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK); if (clientFd 0) { Reactor* subReactor pool.getNextReactor(); Connection* conn new Connection(clientFd, *subReactor); conn-start(); } } return 0; }六、执行流程必须理解accept 新连接↓new Connection(fd)↓conn.start()↓run() 协程开始↓co_await readable(fd)↓挂起↓epoll_wait↓fd ready↓Reactor 恢复协程↓继续执行 read/write七、本篇核心提升最重要第四篇解决多 Reactor 协程架构第五篇解决Connection 协程模型 一句话第四篇让你会“搭架构”第五篇让你会“写框架代码”八、总结如果说第四篇完成的是“多 Reactor 协程”的架构融合那么第五篇完成的就是“Connection 协程”的模型融合。从这一篇开始连接不再只是一个 fd也不只是一个状态容器而是一个能够承载完整异步执行流程的服务端实体。九、下一篇最终篇 《C 服务端进阶六—— 工程化落地协议、缓冲区与超时机制》 你将补齐半包 / 粘包buffer 管理超时 / 心跳生命周期函数式异步 → 对象化异步 → 工程化系统

更多文章