ftp更换网站,最新军事战争新闻消息,wordpress 分页插件,wordpress如何添加广告代码没有一颗星#xff0c; 会因为追求梦想而受伤#xff0c; 当你真心渴望某样东西时#xff0c; 整个宇宙都会来帮忙。 --- 保罗・戈埃罗 《牧羊少年奇幻之旅》--- 实现Reactor反应堆模型 1 前言2 框架搭建3 准备工作4 Reactor类的设计5 Connection连接接口6 回调方法 1 … 没有一颗星 会因为追求梦想而受伤 当你真心渴望某样东西时 整个宇宙都会来帮忙。 --- 保罗・戈埃罗 《牧羊少年奇幻之旅》--- 实现Reactor反应堆模型 1 前言2 框架搭建3 准备工作4 Reactor类的设计5 Connection连接接口6 回调方法 1 前言
到目前为止我学习了计算机网络了解了网络传输的过程理解网络协议栈的层与层之间的关系。实践了使用TCP进行的网络编程也了解了协议的编写实际了http协议下的通信过程。
最近学习了五种IO模型可以通过多路转接EPOLL提高读取效率。
那么现在是否可以将多路转接与网络结合编写一个高效处理网络请求的反应堆模型Reactor。今天我们搭建基础的结构。
2 框架搭建 我们想要搭建的是这样的结构
最底层是Reactor负责事件派发管理connection套接字连接。可以添加监听套接字与普通套接字其中都有对应的回调方法。可以通过套接字类型赋予连接对应的回调方法。通过多路转接IO获取就绪事件找到对应connection执行事件。Connection连接管理文件描述符的连接对象内部有这个文件描述符的输入输出缓冲区回调函数客户端信息就绪事件集。等待Reactor调用回调方法。Listener监听这是专门管理监听套接字的对象里面有对于监听套接字的方法可以获取新连接。作为监听套接字connection的回调方法HandlerConnection普通套接字 这是针对普通套接字的对象里面有对于普通套接字事件就绪的处理方法类。
最底层的就是这三层结构。下面我们来实现这三层结构。
3 准备工作
在实现三层结构之前我们先对多路转接IO进行封装让代码尽可能解耦
对于多路转接我们设计一个基类作为上层调用的统一接口。然后继承出子类Epoll poll select在子类中分别实现对应的方法。
这里只提供了Epoll的封装
构造函数构造时创建EPOLL模型获得EPOLLfd。AddEvent添加事件调用epoll_ctl_add方法即可。Wait获取底层就绪事件直接使用epoll_wait即可
#pragma once
#include iostream
#include stdlib.h
#include sys/epoll.h
#include Log.hpp
#include Comm.hppusing namespace log_ns;// 多路复用基类
class Mutliplex
{
public:Mutliplex(/* args */){}virtual bool AddEvent(int fd, uint32_t events) 0;virtual int Wait(struct epoll_event revs[], int num, int timeout) 0;~Mutliplex(){}
};// epoll poll select基类
class Epoller : public Mutliplex
{
private:static const int size 128;public:Epoller(){_epollfd ::epoll_create(size);if (_epollfd 0){LOG(ERROR, epoll create failed!\n);exit(EPOLL_CREATE);}}std::string EventToString(uint32_t revents){std::string ret;if (revents EPOLLIN)ret EPOLLIN;if (revents EPOLLOUT)ret | EPOLLOUT;return ret;}bool AddEvent(int fd, uint32_t events){struct epoll_event ev;ev.data.fd fd;ev.events events;int n ::epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, ev);if (n 0){LOG(ERROR, epoll_ctl add failed , errno:%d, errno);return -1;}LOG(INFO, epoll_ctl add fd:%d , events:%s\n, fd, EventToString(events).c_str());return true;}int Wait(struct epoll_event revs[], int num, int timeout){return ::epoll_wait(_epollfd, revs, num, timeout);}~Epoller(){}private:int _epollfd;
};4 Reactor类的设计
之前的TcpServer等服务端都要在内部封装_listensock。如果封装了监听套接字那么代码结构就定型了就必须要有对监听套接字的处理。而这里我们想将Reactor设计一个管理connection连接的类不需要针对监听套接字进行特殊处理
成员变量
通过fd映射Connection*对象的哈希表_conn判断是否启动bool isrunning构建一个Multipex对象 构造时建立epoll指针负责处理多路转接IO就绪事件组struct epoll_event revs[gnum]针对监听套接字的方法集在添加连接时可以将方法设置进入connection中针对普通套接字的方法集。
回调方法的类型为using handler_t std::functionvoid(Connection *conn);
#pragma once
#include string
#include iostream
#include memory
#include unordered_map#include Connection.hpp
#include Epoller.hppusing namespace log_ns;class Reactor
{
private:static const int gnum 128;public:Reactor() : _epoller(std::make_uniqueEpoller()), _isrunning(false){}void SetOnNormal(handler_t OnRecver, handler_t OnSender, handler_t OnExcepeter){_OnRecver OnRecver;_OnSender OnSender;_OnExcepeter OnExcepeter;}void SetOnConnect(handler_t OnConnect){_OnConnect OnConnect;}// 加入连接void AddConnection(int fd, uint32_t events, const InetAddr addr, int type){}void Dispatcher(){}~Reactor(){}private:// fd 映射连接表std::unordered_mapint, Connection * _conn;// 是否启动bool _isrunning;std::unique_ptrMutliplex _epoller;// 事件数组struct epoll_event revs[gnum];//_listen新连接到来handler_t _OnConnect;// 处理普通fd IOhandler_t _OnRecver;handler_t _OnSender;handler_t _OnExcepeter;
};
Addconnection接口 首先通过 fd events 与客户端信息和连接类型建立connection 进行设置对应的事件集 然后根据type判断类型设置connection的上层处理回调方法。注意这里要对 conn与Reactor进行关联 !后续connection的模块进行讲解设置addr方便打印日志可以知道是哪一个客户端然后通过fd 与 events 托管给epoll 进行添加事件 。最后将连接放入哈希表中。IsConnExists判断当前连接是否存在Dispatch()事件派发接口进行while循环获取底层哪些事件就绪 储存在成员变量struct epoll_event revs[gnum]根据返回值 n 对n个事件进行处理这里只处理 ERR HUP IN OUT 使用if语句ERR HUP直接设置为IN OUT后续统一处理IN事件就绪 事件派发 通过_conn[fd]找到对应连接 执行对应事件的回调函数注意保证连接存在 且 回调方法存在。
完整代码如下
#pragma once
#include string
#include iostream
#include memory
#include unordered_map#include Connection.hpp
#include Epoller.hppusing namespace log_ns;class Reactor
{
private:static const int gnum 128;public:Reactor() : _epoller(std::make_uniqueEpoller()), _isrunning(false){}void SetOnNormal(handler_t OnRecver, handler_t OnSender, handler_t OnExcepeter){_OnRecver OnRecver;_OnSender OnSender;_OnExcepeter OnExcepeter;}void SetOnConnect(handler_t OnConnect){_OnConnect OnConnect;}// 加入连接void AddConnection(int fd, uint32_t events, const InetAddr addr, int type){// 1. 通过 fd 构建一个 connection指针 set对应的事件集Connection *conn new Connection(fd);conn-SetReactor(this);conn-SetEvents(events);conn-SetConnectionType(type);conn-SetAddr(addr);// 2. TODO 设置对connection的上层处理 设置回调方法if (conn-Type() ListenConnection){conn-RegisterHandler(_OnConnect, nullptr, nullptr); // 设置方法}else{conn-RegisterHandler(_OnRecver, _OnSender, _OnExcepeter); // 设置方法}// 3. fd 与 events 托管给epoll 添加事件 出错直接 return;int n _epoller-AddEvent(fd, events);// 4. 托管给_connection_conn.insert(std::make_pair(fd, conn));// 添加连接成功}// 判断连接是否存在bool IsConnExist(int fd){return _conn.find(fd) ! _conn.end();}void LoopOnce(int timeout){// 获取底层事件int n _epoller-Wait(revs, gnum, -1);for (int i 0; i n; i){// 文件描述符int fd revs[i].data.fd;// 就绪事件uint32_t revents revs[i].events;// 处理IN OUT ERR HUPif (revents EPOLLERR)revents | (EPOLLIN | EPOLLOUT);if (revents EPOLLHUP)revents | (EPOLLIN | EPOLLOUT);if (revents EPOLLIN){// 调用回调方法if (IsConnExist(fd) _conn[fd]-_handler_recver)_conn[fd]-_handler_recver(_conn[fd]);}if (revents EPOLLOUT){// 调用回调方法if (IsConnExist(fd) _conn[fd]-_handler_sender)_conn[fd]-_handler_sender(_conn[fd]);}}}void Dispatcher(){_isrunning true;int timeout -1;while (true){LoopOnce(timeout);PrintDebug();//打印托管的fd列表}_isrunning false;}void PrintDebug(){std::string s 已建立的连接:;for (auto conn : _conn){s std::to_string(conn.first) ;}LOG(DEBUG, epoll 管理的fd列表: %s\n, s.c_str());}~Reactor(){}private:// fd 映射连接表std::unordered_mapint, Connection * _conn;// 是否启动bool _isrunning;std::unique_ptrMutliplex _epoller;// 事件数组struct epoll_event revs[gnum];//_listen新连接到来handler_t _OnConnect;// 处理普通fd IOhandler_t _OnRecver;handler_t _OnSender;handler_t _OnExcepeter;
};
5 Connection连接接口
成员变量 文件描述符fd需要关心的事件集 events输入缓冲区 输出缓冲区三种事件的回调方法设置一个Reactor* _R SetEvents接口通过传入events 初始化 eventsEvents接口返回事件集Sockfd返回对应fdRegisterHandler接口快速设置回调方法SetReactor(Reactor* R)接口 connection与Reactor进行绑定执行自己属于的Reactor
对于这个Reactor* _R 指针是监听套装字获取到连接时发挥作用。当监听套接字的事件就绪在回调方法中可以通过参数Connection取出内部的_R指针找到对应的Reactor进行AddConnection操作。
#pragma once#include iostream
#include string
#include functional#include InetAddr.hppclass Connection;
class Reactor;using handler_t std::functionvoid(Connection *conn);#define ListenConnection 0
#define NormalConnection 1class Connection
{
public:Connection(int fd) : _sockfd(fd){}void RegisterHandler(handler_t recver, handler_t sender, handler_t excepter){_handler_recver recver; // 处理读取_handler_sender sender; // 处理写入_handler_excepter excepter; // 处理异常}void SetEvents(uint32_t events){_events events;}void SetAddr(const InetAddr addr){_addr addr;}int Sockfd(){return _sockfd;}uint32_t Events(){return _events;}int Type(){return _type;}void SetReactor(Reactor *R){_R R;}void SetConnectionType(int type){_type type;}Reactor *GetReactor(){return _R;}InetAddr GerInetAddr(){return _addr;}void AppendInbuffer(const std::string in){_inbuffer in;}std::string Inbuffer(){return _inbuffer;}~Connection(){}private:int _sockfd; // 套接字fduint32_t _events; // 事件集std::string _inbuffer; // 输入缓冲区std::string _outbuffer; // 输出缓冲区Reactor *_R;int _type;InetAddr _addr;public:handler_t _handler_recver; // 处理读取handler_t _handler_sender; // 处理写入handler_t _handler_excepter; // 处理异常
};6 回调方法
这里需要两种回调方法类一种针对监听套接字一种针对普通套接字。 Listener Listener统一管理Tcp连接模块管理_listensock成员变量 std::unique_ptr _listensock Tcp套接字对象int _port;端口号 通过端口号进行构造TcpSocketListenSock接口返回_listensock的fd。Accepter(conn* int* code)方法获取连接并得到文件描述符 这里采用ET模式首先将listensockfd 读取设置为非阻塞读取然后进行while(true)进行非阻塞读取 根据Accepter返回的错误码通过code返回 通过错误码进行判断当读取到一个新的fd时,通过conn的Reactor指针调用AddConnection 加入新连接
#pragma once
#include memory
#include iostream
#include Socket.hpp
#include Connection.hppusing namespace log_ns;
using namespace socket_ns;// 处理listen套接字的读写
class Listener
{
public:Listener(uint16_t port) : _port(port), _listensock(std::make_uniqueTcpSocket(port)){_listensock-BuildListenSocket(_port);}int ListenSockfd(){return _listensock-GetSockfd();}void Accepter(Connection *conn){LOG(DEBUG, %d socket ready\n, conn-Sockfd());// 非阻塞式读取while (true){errno 0;int code 0;InetAddr addr;int sockfd _listensock-Accepter(addr, code);if (sockfd 0){LOG(INFO, 成功获取连接, 客户端%s sockfd:%d\n, addr.AddrStr().c_str(), sockfd);conn-GetReactor()-AddConnection(sockfd, EPOLLIN | EPOLLET, addr, NormalConnection);}else{if (code EWOULDBLOCK){// 读取完毕LOG(INFO, 底层数据全部读取完毕!\n);break;}// 信号中断else if (code EINTR){continue;}else{LOG(ERROR, 获取连接失败!\n);break;}}}}~Listener(){}private:uint16_t _port;std::unique_ptrSocket _listensock;
};HandlerConnection 处理普通连接读写问题这个的设计就比较简单了注意其只复杂数据的读取协议解析需要交给上层进行处理HandlerRecver(conn*)我们先实现读取的逻辑HandlerSender(conn*)后续实现HandlerExcepter(conn*)后续实现
#include sys/types.h
#include sys/socket.h
// 不应该让HandlerConnection处理报文
class HandlerConnection
{
private:const static int buffersize 512;public:HandlerConnection(handler_t process) : _process(process){}void Recver(Connection *conn){// LOG(DEBUG , client发送信息: %d\n , conn-Sockfd());// 进行正常读写 --- 非阻塞读取while (true){char buffer[buffersize];int n ::recv(conn-Sockfd(), buffer, sizeof(buffer) - 1, 0);if (n 0){// buffer是一个数据块 添加到conn的输入缓冲区中buffer[n] 0;conn-AppendInbuffer(buffer);// 数据交给上层处理}else if (n 0){// 连接断开LOG(INFO, 客户端[%s]退出, 服务器准备关闭fd: %d\n, conn-GerInetAddr().AddrStr().c_str(), conn-Sockfd());conn-_handler_excepter(conn); // 统一执行异常处理}else{// 本轮数据读完了if (errno EWOULDBLOCK){// 这是唯一出口break;}// 信号中断else if (errno EINTR){continue;}// 出现异常else{conn-_handler_excepter(conn);return;}}}// 读取完毕我们应该处理数据了// 加入协议std::cout Inbuffer 内容: conn-Inbuffer() std::endl;_process(conn);}void Sender(Connection *conn){}void Excepter(Connection *conn){}~HandlerConnection(){}private:handler_t _process;
};
至此Reactor反应堆模型的框架已经搭建好了下一篇文章我们将在这个的基础之上进行协议解析与数据处理并设计如何将数据发回。这里只是简单的实现读取数据的逻辑