学生管理系统网站,开福区城乡建设局网站,湖南长信建设集团网站,题库网站建设文章目录 1.IO1.1基本介绍1.2基础io的低效性1.3如何提高IO效率1.4五种IO模型1.5非阻塞模式的设置 2.IO多路转接之Select2.1函数的基本了解2.2fd_set理解2.3完整例子代码#xff08;会在代码中进行讲解#xff09;2.4优缺点 3.多路转接之poll3.1poll函数的介绍3.2poll服务器3.… 文章目录 1.IO1.1基本介绍1.2基础io的低效性1.3如何提高IO效率1.4五种IO模型1.5非阻塞模式的设置 2.IO多路转接之Select2.1函数的基本了解2.2fd_set理解2.3完整例子代码会在代码中进行讲解2.4优缺点 3.多路转接之poll3.1poll函数的介绍3.2poll服务器3.3优缺点 4.epoll4.1初始epoll4.2epoll相关的系统调用4.3epoll的工作原理4.4epoll的工作方式4.5LT模式重要代码4.6ET模式重要代码 1.IO
1.1基本介绍
IO实际上就是 input output 在冯诺依曼体系中就是与外设交互的意思而我们的网络通信本质上也是一种IO。
1.2基础io的低效性
为什么基础io会低效呢我们以读取为例来介绍。
当我们底层调用read函数的时候如果缓冲区没有数据 我们就会将pcb放入等待队列进行阻塞。
当我们底层调用read函数的时候如果缓冲区有数据我们就会读取。
这么来看我们io的本质其实就是数据拷贝等待。
实际上回顾我们之前的文章不光是在网络中在本地主机进行IO的时候也是进行这两个阶段。
当我们的程序需要读取磁盘中的内容时磁盘需要先将内容加载到内存里面。
而在加载还未完成时我们的程序在做什么呢 阻塞 或者说 等。
这也就是为什么我们使用scanf和cin等输入函数的时候 命令行会阻塞住。
在进行IO的时间里大部分时间都在等待。
1.3如何提高IO效率
顺着我们上面说的实际就是降低这个等待时间的比重。
1.4五种IO模型
经历了这么长时间的发展之后计算机的前辈们已经总结出来了五种IO模型让我们一起来学习一下吧。
如果光将一些概念大家应该很难理解这里我们借用钓鱼的例子来为大家大致分析一下(把钓起鱼的一瞬间抽象成拷贝等鱼儿上钩的时间想象成阻塞时间)。 例子一张三去钓鱼的时候不喜欢被打扰甩钩之后就一直盯着鱼漂等什么时候余漂有反应了就立刻拉钩。 例子二李四去钓鱼的时候专心不了甩钩之后就喜欢刷刷手机每刷一会儿手机就看一眼鱼漂如果有反应了就拉钩如果没反应就继续刷手机。 例子三王五去钓鱼的时候喜欢在鱼漂上挂个铃铛 之后就去刷手机玩了。 如果铃铛响了那么王五就去拉钩如果没响就一直玩手。 例子四赵六去钓鱼的时候喜欢多备几根鱼竿所有鱼竿下水之后赵六就在旁边巡视哪一根鱼竿的鱼漂动了就去拉哪根鱼竿。 例子五田七去钓鱼的时候带着一个小跟班每次只需要布置任务让小跟班钓多少鱼就好自己处理自己的事情去了。 上面的五个例子分别代表了五个IO模型分别是 故事一 阻塞 故事二 非阻塞轮询 故事三 信号驱动 故事四 多路复用多路转接 故事五 异步IO 理论上来讲这些例子当中例子四就是我们的多路复用多路转接最为高效当然这里提到异步IO我们就多提一嘴其实有关异步IO和同步IO的概念一直都有争论有兴趣的同学可以去了解一下这里就不做过多讲解。
1.5非阻塞模式的设置
如果想让IO进行非阻塞的话 打开文件的时候就可以进行非阻塞设置比如说 open socket。
但是如果我们使用每个函数的时候都记住它们的非阻塞标志未免也有点太麻烦了
所以说我们这里使用 fcntl 函数来统一设置
#include fcntl.hint fcntl(int fd, int cmd, ... /* arg */);fd是要进行操作的文件描述符
cmd是控制命令
arg是与命令相关的参数如果设置失败会返回-1 并且错误码会被设置 成功返回大于等于0参数二的不同功能
复制一个现有的描述符cmdF_DUPFD.
获得/设置文件描述符标记(cmdF_GETFD或F_SETFD).
获得/设置文件状态标记(cmdF_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmdF_GETOWN或F_SETOWN)
获得/设置记录锁(cmdF_GETLK,F_SETLK或F_SETLKW)使用案例
#include stdio.h
#include fcntl.h
#include unistd.h
#include errno.hint main() {// 获取当前标准输出的状态标志int flags fcntl(STDOUT_FILENO, F_GETFL);if (flags -1) {perror(fcntl);return 1;}// 设置标准输出为非阻塞模式if (fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK) -1) {perror(fcntl);return 1;}// 尝试从标准输出中读取数据但不会阻塞char buffer[1024];ssize_t bytesRead read(STDOUT_FILENO, buffer, sizeof(buffer));if (bytesRead -1) {if (errno EAGAIN || errno EWOULDBLOCK) {printf(No data available in non-blocking mode.\n);} else {perror(read);}} else {// 读取到数据buffer[bytesRead] \0;printf(Read %zd bytes: %s, bytesRead, buffer);}return 0;
}我们首先使用fcntl函数获取标准输出的当前状态标志然后使用fcntl再次设置标准输出为非阻塞模式通过将O_NONBLOCK标志添加到原来的标志中。接下来我们尝试从标准输出中读取数据但由于标准输出已设置为非阻塞模式如果没有数据可用read将立即返回并且errno会被设置为EAGAIN或EWOULDBLOCK表示没有数据可读。当我们使用了非阻塞IO的时候,每次读取如果遇到了 EWOULDBLOCK或EAGAIN我们就可以让我们的进程去做一会儿其他事情。2.IO多路转接之Select
接下来我们会带大家了解select这个函数其实质上就是在io等这一步上做了如下操作
帮用户一次等待多个sock如果有sock就绪了 select就要通知用户 这些sock就绪了 让用户调用read/recv函数来进行读取
2.1函数的基本了解
#include sys/select.h int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);1.nfds:需要等待的文件当中文件描述符最大的1因为文件描述符是递归增长的所以给定一个最大值就能确定范围。这样子的话我们岂不是要等待从 0 ~ nfds-1所有文件描述符了关于这个问题看了下面的参数介绍就能明白了.
2.readfds 是一个指向读取文件描述符集合的指针。writefds 是一个指向写文件描述符集合的指针。exceptfds 是一个指向异常文件描述符集合的指针。
这几个其实都是输出输出型参数
我们要传入的是一个fd_set类型的指针这些参数在输入的时候分别表示我们是否关心读就绪
我们是否关心写就绪
我们是否关心有异常在输出的时候分别表示哪些读就绪了
哪些写就绪了
出现哪些异常了3.timeval *timeouttimeval实际上是一个结构体 在Linux系统中 它的定义如下:
struct timeval {time_t tv_sec; // 当前时间的秒suseconds_t tv_usec; // 当前时间的微秒
};
我们让select进行等待的时候 有三种模式可以供我们选择阻塞式
非阻塞式
阻塞一段时间 之后返回对于这个参数来说如果我输入nullptr 那么它就是阻塞式的
如果我们输入结构体 {0 0} 那么它就是非阻塞式的
如果我们输入结构体{5 0}那么它就会等待五秒钟之后返回但是如果说五秒内有文件描述符就绪了的话这个参数就会显示出输出性。比如说我们要求等五秒而实际上2秒就有文件描述符就绪了那么它就会返回{3 0}。返回值类型int表示的是就绪的文件描述符的个数只要让我们等待的文件描述符中 有一个就绪了 它就会返回2.2fd_set理解
fd_set 叫做文件描述符集它本质上是一个位图 。 系统提供了四个函数来让我们进行文件描述符集操作它们的作用如下:
清除某个文件描述符判断某个文件描述符是否被设置设置文件描述符清空文件描述符
我们举个具体的使用场景
fd_set *readfds
当它作为一个输入参数时
它是用户通知内核的一种方式在比特位中 比特位的下标表示文件描述符比特位下标对应的内容是否为1表示我对于该文件的读是否关心比如 0101 就是我对于2号和0号文件描述符的读关心
当它作为一个输出参数时
它是内核通知用户的一种方式在比特位中 比特位的下标表示文件描述符比特位下标对应的内容是否为1表示该文件描述符的读是否就绪比如说 0100 就是用户让系统关心的0号和2号文件描述符中 2号文件描述符就绪了
至于fd_set *writefds fd_set *exceptfds通知的内容分别变成了 是否关心写是否关心异常
2.3完整例子代码会在代码中进行讲解 main.cc #include selectServer.hpp
#include memoryint main()
{// 1. fd_set是一个固定大小位图直接决定了select能同时关心的fd的个数是有上限的// std::cout sizeof(fd_set) * 8 std::endl;std::unique_ptrSelectServer svr(new SelectServer());svr-Start();return 0;
} Log.hpp #pragma once#include iostream
#include cstdio
#include cstdarg
#include ctime
#include string// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char *gLevelMap[] {DEBUG,NORMAL,WARNING,ERROR,FATAL
};#define LOGFILE ./selectServer.log// 完整的日志功能至少: 日志等级 时间 支持用户自定义(日志内容, 文件行文件名)
void logMessage(int level, const char *format, ...)
{// va_list ap;// va_start(ap, format);// while()// int x va_arg(ap, int);// va_end(ap); //apnullptrchar stdBuffer[1024]; //标准部分time_t timestamp time(nullptr);// struct tm *localtime localtime(timestamp);snprintf(stdBuffer, sizeof stdBuffer, [%s] [%ld] , gLevelMap[level], timestamp);char logBuffer[1024]; //自定义部分va_list args;va_start(args, format);// vprintf(format, args);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);// FILE *fp fopen(LOGFILE, a);printf(%s%s\n, stdBuffer, logBuffer);// fprintf(fp, %s%s\n, stdBuffer, logBuffer);// fclose(fp);
}Sock.hpp #pragma once#include iostream
#include string
#include cstring
#include cerrno
#include cassert
#include unistd.h
#include memory
#include sys/types.h
#include sys/socket.h
#include arpa/inet.h
#include netinet/in.h
#include ctype.hclass Sock
{
private:// listen的第二个参数意义底层全连接队列的长度 listen的第二个参数1const static int gbacklog 10;
public:Sock() {}static int Socket(){int listensock socket(AF_INET, SOCK_STREAM, 0);if (listensock 0){exit(2);}int opt 1;setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, opt, sizeof(opt));return listensock;}static void Bind(int sock, uint16_t port, std::string ip 0.0.0.0){struct sockaddr_in local;memset(local, 0, sizeof local);local.sin_family AF_INET;local.sin_port htons(port);inet_pton(AF_INET, ip.c_str(), local.sin_addr);if (bind(sock, (struct sockaddr *)local, sizeof(local)) 0){exit(3);}}static void Listen(int sock){if (listen(sock, gbacklog) 0){exit(4);}}// 一般经验// const std::string : 输入型参数// std::string *: 输出型参数// std::string : 输入输出型参数static int Accept(int listensock, std::string *ip, uint16_t *port){struct sockaddr_in src;socklen_t len sizeof(src);int servicesock accept(listensock, (struct sockaddr *)src, len);if (servicesock 0){return -1;}if(port) *port ntohs(src.sin_port);if(ip) *ip inet_ntoa(src.sin_addr);return servicesock;}static bool Connect(int sock, const std::string server_ip, const uint16_t server_port){struct sockaddr_in server;memset(server, 0, sizeof(server));server.sin_family AF_INET;server.sin_port htons(server_port);server.sin_addr.s_addr inet_addr(server_ip.c_str());if(connect(sock, (struct sockaddr*)server, sizeof(server)) 0) return true;else return false;}~Sock() {}
};selectServer.hpp #ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__#include iostream
#include string
#include vector
#include sys/select.h
#include sys/time.h
#include Log.hpp
#include Sock.hpp#define BITS 8
#define NUM (sizeof(fd_set)*BITS)
#define FD_NONE -1using namespace std;
// select 我们只完成读取写入和异常不做处理 -- epoll(写完整)
class SelectServer
{
public:SelectServer(const uint16_t port 8080) : _port(port){_listensock Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);logMessage(DEBUG,%s,create base socket success);for(int i 0; i NUM; i) _fd_array[i] FD_NONE;// 规定 : _fd_array[0] _listensock;_fd_array[0] _listensock;}void Start(){while (true){// struct timeval timeout {0, 0};// 如何看待listensock? 获取新连接我们把它依旧看做成为IOinput事件如果没有连接到来呢阻塞// int sock Sock::Accept(listensock, ...); //不能直接调用accept了// 将listensock添加到读文件描述符集中// FD_SET(_listensock, rfds); // int n select(_listensock 1, rfds, nullptr, nullptr, timeout);// 1. nfds: 随着我们获取的sock越来越多随着我们添加到select的sock越来越多注定了nfds每一次都可能要变化,我们需要对它动态计算// 2. rfds/writefds/exceptfds都是输入输出型参数输入输出不一定以一样的所以注定了我们每一次都要对rfds进行重新添加// 3. timeout: 都是输入输出型参数每一次都要进行重置前提是你要的话// 1,2 注定了我们必须自己将合法的文件描述符需要单独全部保存起来 用来支持1. 更新最大fd 2.更新位图结构DebugPrint();fd_set rfds;FD_ZERO(rfds);int maxfd _listensock;for(int i 0; i NUM; i){if(_fd_array[i] FD_NONE) continue;FD_SET(_fd_array[i], rfds);if(maxfd _fd_array[i]) maxfd _fd_array[i];}// rfds未来一定会有两类socklistensock普通sock// 我们select中就绪的fd会越来越多int n select(maxfd 1, rfds, nullptr, nullptr, nullptr);switch (n){case 0:// printf(hello select ...\n);logMessage(DEBUG, %s, time out...);break;case -1:logMessage(WARNING, select error: %d : %s, errno, strerror(errno));break;default:// 成功的logMessage(DEBUG, get a new link event...); // 为什么会一直打印连接到来呢连接已经建立完成就绪了但是你没有取走select要一直通知你HandlerEvent(rfds);break;}}}~SelectServer(){if (_listensock 0)close(_listensock);}
private:void HandlerEvent(const fd_set rfds) // fd_set 是一个集合里面可能会存在多个sock{for(int i 0; i NUM; i){// 1. 去掉不合法的fdif(_fd_array[i] FD_NONE) continue;// 2. 合法的就一定就绪了?不一定if(FD_ISSET(_fd_array[i], rfds)){//指定的fd读事件就绪// 读事件就绪连接时间到来acceptif(_fd_array[i] _listensock) Accepter();else Recver(i);}}}void Accepter(){string clientip;uint16_t clientport 0;// listensock上面的读事件就绪了表示可以读取了// 获取新连接了int sock Sock::Accept(_listensock, clientip, clientport); // 这里在进行accept会不会阻塞不会if(sock 0){logMessage(WARNING, accept error);return;}logMessage(DEBUG, get a new line success : [%s:%d] : %d, clientip.c_str(), clientport, sock);// read / recv? 不能为什么不能我们不清楚该sock上面数据什么时候到来 recv、read就有可能先被阻塞IO 等数据拷贝// 谁可能最清楚呢select// 得到新连接的时候此时我们应该考虑的是将新的sock托管给select让select帮我们进行检测sock上是否有新的数据// 有了数据select读事件就绪select就会通知我我们在进行读取此时我们就不会被阻塞了// 要将sock添加 给 select 其实我们只要将fd放入到数组中即可int pos 1;for(; pos NUM; pos){if(_fd_array[pos] FD_NONE) break;}if(pos NUM){logMessage(WARNING, %s:%d, select server already fullclose: %d, sock);close(sock);}else{_fd_array[pos] sock;}}void Recver(int pos){// 读事件就绪INPUT事件到来、recvreadlogMessage(DEBUG, message in, get IO event: %d, _fd_array[pos]);// 暂时先不做封装, 此时select已经帮我们进行了事件检测fd上的数据一定是就绪的即 本次 不会被阻塞// 这样读取有bug吗有的你怎么保证以读到了一个完整包文呢char buffer[1024];int n recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);if(n 0){buffer[n] 0;logMessage(DEBUG, client[%d]# %s, _fd_array[pos], buffer);}else if(n 0){logMessage(DEBUG, client[%d] quit, me too..., _fd_array[pos]);// 1. 我们也要关闭不需要的fdclose(_fd_array[pos]);// 2. 不要让select帮我关心当前的fd了_fd_array[pos] FD_NONE;}else{logMessage(WARNING, %d sock recv error, %d : %s, _fd_array[pos], errno, strerror(errno));// 1. 我们也要关闭不需要的fdclose(_fd_array[pos]);// 2. 不要让select帮我关心当前的fd了_fd_array[pos] FD_NONE;}}void DebugPrint(){cout _fd_array[]: ;for(int i 0; i NUM; i){if(_fd_array[i] FD_NONE) continue;cout _fd_array[i] ;}cout endl;}
private:uint16_t _port;int _listensock;int _fd_array[NUM];// int _fd_write[NUM];// std::vectorint arr;
};#endif2.4优缺点
优点
效率高 IO等的时间少 尤其是在有大量连接 并且只有少量活跃的情况下单进程 占用资源少
缺点
为了维护第三方数组 select服务器充满大量的遍历操作每一次都要对select参数进行重新设定能够同时管理的fd的个数是有上限的由于参数是输入输出的 所以避免不了大量用户和内核之间的拷贝编码比较复杂
3.多路转接之poll
poll是系统提供的一个多路转接接口它的作用和select函数基本一致。
3.1poll函数的介绍
原形
int poll(struct pollfd *fds, nfds_t nfds, int timeout); struct pollfd *fds 里面包含着文件描述符表,我们需要监视的文件描述符合集和就绪的文件描述符合集 fd 特定的文件描述符值events 用户告诉内核 哪些事件需要关心revents 内核告诉用户 哪些事件就绪了
也是用了位图结构来存储数据
一个文件描述符实际上就是对应一个struct pollfd所以说理论上只要有多少个数组我们的poll就能检测多少的文件描述符。
以下是events和revents的取值 我们需要特别注意的有三个分别是
POLLIN 可读POLLOUT 可写POLLERR 错误 nfds_t nfds fds数组的长度 timeout 超时时间
单位是毫秒 比如说我们设置为1000 就是等待1秒如果设置为0 就表示非阻塞模式如果设置为-1 就表示阻塞模式
3.2poll服务器
我们将上面写的select的服务器修改一下
私有成员变化如下
private:
int _port;
int _listensock;
struct pollfd *_rfds;
func_t _func; 对比于我们select的第三方数组来说,我们这里多了一个数组指针和数组大小.
在初始化的时候 我们首先new出一个 struct pollfd 数组出来 ,并且遍历初始化一下. _rfds[i].fd defaultfd; _rfds[i].events 0;_rfds[i].revents 0;对于数据如何判断就绪,我们可以使用按位与来判断:
_rfds[i].revents POLLIN具体代码如下
#pragma once#include iostream
#include string
#include functional
#include sock.hppnamespace select_ns
{static const int defaultport 8081;static const int fdnum sizeof(fd_set) * 8;static const int defaultfd -1;using func_t std::functionstd::string (const std::string);class SelectServer{public:SelectServer(func_t f, int port defaultport) : func(f), _port(port), _listensock(-1), fdarray(nullptr){}void initServer(){_listensock Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);fdarray new int[fdnum];for (int i 0; i fdnum; i)fdarray[i] defaultfd;fdarray[0] _listensock; // 不变了}void Print(){std::cout fd list: ;for (int i 0; i fdnum; i){if (fdarray[i] ! defaultfd)std::cout fdarray[i] ;}std::cout std::endl;}void Accepter(int listensock){logMessage(DEBUG, Accepter in);// 走到这里accept 函数会不会阻塞1 0// select 告诉我 listensock读事件就绪了std::string clientip;uint16_t clientport 0;int sock Sock::Accept(listensock, clientip, clientport); // accept 等 获取if (sock 0)return;logMessage(NORMAL, accept success [%s:%d], clientip.c_str(), clientport);// sock我们能直接recv/read 吗不能整个代码只有select有资格检测事件是否就绪// 将新的sock 托管给select// 将新的sock托管给select的本质其实就是将sock添加到fdarray数组中即可int i 0;for (; i fdnum; i){if (fdarray[i] ! defaultfd)continue;elsebreak;}if (i fdnum){logMessage(WARNING, server if full, please wait);close(sock);}else{fdarray[i] sock;}Print();logMessage(DEBUG, Accepter out);}void Recver(int sock, int pos){logMessage(DEBUG, in Recver);// 1. 读取request// 这样读取是有问题的char buffer[1024];ssize_t s recv(sock, buffer, sizeof(buffer) - 1, 0); // 这里在进行读取的时候会不会被阻塞1 0if (s 0){buffer[s] 0;logMessage(NORMAL, client# %s, buffer);}else if (s 0){close(sock);fdarray[pos] defaultfd;logMessage(NORMAL, client quit);return;}else{close(sock);fdarray[pos] defaultfd;logMessage(ERROR, client quit: %s, strerror(errno));return;}// 2. 处理requeststd::string response func(buffer);// 3. 返回response// write bugwrite(sock, response.c_str(), response.size());logMessage(DEBUG, out Recver);}// 1. handler event rfds 中不仅仅是有一个fd是就绪的可能存在多个// 2. 我们的select目前只处理了read事件void HandlerReadEvent(fd_set rfds){for (int i 0; i fdnum; i){// 过滤掉非法的fdif (fdarray[i] defaultfd)continue;// 正常的fd// 正常的fd不一定就绪了// 目前一定是listensock只有这一个if (FD_ISSET(fdarray[i], rfds) fdarray[i] _listensock)Accepter(_listensock);else if(FD_ISSET(fdarray[i], rfds))Recver(fdarray[i], i);else{}}}void start(){for (;;){fd_set rfds;// fd_set wfds;FD_ZERO(rfds);int maxfd fdarray[0];for (int i 0; i fdnum; i){if (fdarray[i] defaultfd)continue;FD_SET(fdarray[i], rfds); // 合法 fd 全部添加到读文件描述符集中if (maxfd fdarray[i])maxfd fdarray[i]; // 更新所有fd中最大的fd}logMessage(NORMAL, max fd is: %d, maxfd);// struct timeval timeout {1, 0};// int n select(_listensock 1, rfds, nullptr, nullptr, timeout); // ??// 一般而言要是用select需要程序员自己维护一个保存所有合法fd的数组int n select(maxfd 1, rfds, nullptr, nullptr, nullptr); // ??switch (n){case 0:logMessage(NORMAL, timeout...);break;case -1:logMessage(WARNING, select error, code: %d, err string: %s, errno, strerror(errno));break;default:// 说明有事件就绪了,目前只有一个监听事件就绪了logMessage(NORMAL, have event ready!);HandlerReadEvent(rfds);// HandlerWriteEvent(wfds);break;}// std::string clientip;// uint16_t clientport 0;// int sock Sock::Accept(_listensock, clientip, clientport); // accept 等 获取// if(sock0) continue;// // 开始进行服务器的处理逻辑}}~SelectServer(){if (_listensock 0)close(_listensock);if (fdarray)delete[] fdarray;}private:int _port;int _listensock;int *fdarray;func_t func;};
}3.3优缺点
优点
效率高适合有大量连接 少量活跃输入输出分离接口使用方便poll参数级别 没有可管理的fd上限
缺点
poll依旧需要不少的遍历poll需要内核到用户的拷贝poll的代码虽然比select容易 但是也很复杂
4.epoll
4.1初始epoll
epoll是为了处理大量句柄而做出改进的poll句柄可以是一个整数、指针、引用或其他数据结构它用于唯一标识和访问特定资源或对象。
它在2.5.44内核中被引入到Linux
也是目前来说最常用的一种多路转接IO方式
4.2epoll相关的系统调用
epoll_createepoll_ctlepoll_wait
int epoll_create(int size); //创建一个epoll模型
参数说明目前来说 epoll_create的参数是被废弃的 我们设置为256或者512就行 这样设计的原因是为了向前兼容
返回值说明返回一个epoll模型 实际上就是一个文件描述符int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);//对创建出来的epoll模型进行操控
参数说明1.int epfd 标识一个我们的IO模型
2.int op (operator) 表示我们想要做出什么样的操作
3.int fd 表示我们需要添加的文件描述符
4.epoll_event *event 表示我们需要关心哪些事件
返回值说明函数成功调用返回0 失败返回-1 同时错误码将被设置int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
//监视我们关心的关键描述符
参数说明1.epfd是 epoll 实例的文件描述符它代表你要监听的一组文件描述符的集合。
2.events是一个指向 struct epoll_event 数组的指针用于存储就绪事件的信息。
3.maxevents指定 events 数组的最大大小即最多可以存储多少就绪事件的信息。
4.timeout指定等待事件的超时时间单位为毫秒。传递负值会使 epoll_wait 成为阻塞调用直到有事件发生传递零会使它成为非阻塞调用立即返回传递正值会在指定时间内等待事件。返回值epoll_wait 返回就绪事件的数量如果发生错误则返回 -1。如果超时时间到期而没有事件发生它将返回 0。poll_wait 返回的事件信息存储在 events 数组中。每个 struct epoll_event 结构包含以下信息events一个位掩码指示事件类型如可读、可写、错误等。
data一个联合可以存储用户定义的数据通常是文件描述符或其他标识符。4.3epoll的工作原理
我们之前的学习的多路转接函数 无论是select还有poll 它们都需要我们做下面的操作
让我们维护一个第三方的数组都需要遍历整个数组都需要经历用户到内核 内核到用户的事件通知
而我们的epoll则不同。
当然在我们讲解epoll的具体工作原理时我们需要先了解一些前置知识
操作系统是如何知道硬件里面有数据了呢
下图很好的解释了这一问题 正式讲解
当我们创建一个epoll模型之后操作系统底层会帮助我们维护一颗红黑树 红黑树的节点里面维护着很多元素 其中最重要的是两个
文件描述符事件
这颗红黑树解决了用户通知内核的问题。
用户通知内核自己要关心哪些文件描述符的哪些事件之后操作系统就会生成一个节点然后插入到这颗红黑树当中
而这颗红黑树就是对应我们select和poll当中的数组。(现在由操作系统维护了)
当内核通知用户的则是通过了消息队列通知
在内核维护的红黑树旁边有一个消息队列(也交就绪队列)每当有fd的事件就绪的时候就会在该队列上添加一个元素(也是由操作系统维护)。
操作系统在调用驱动的时候构建就绪队列节点
在生成红黑树节点的时候在驱动中每个节点都会生成一个自己的回调函数。
于是在经历了硬件中断到读取数据的过程后操作系统会调用驱动中的回调函数来获取该节点的数据 并且根据这些数据fd和events构建就绪节点最后将构建好的节点插入到队列中。
知道了这些后不妨再来看看我们上面提到的函数
epoll_create 创建epoll模型 包括红黑树 就绪队列 回调函数等(这个描述符所对应的文件里面有指针可以找到红黑树和就绪队列)epoll_ctl 对于红黑树的节点进行注册epoll_wait 获取就绪队列中的内容
4.4epoll的工作方式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET) 你正在吃鸡, 眼看进入了决赛圈, 你妈饭做好了, 喊你吃饭的时候有两种方式: 如果你妈喊你一次, 你没动, 那么你妈会继续喊你第二次, 第三次…(亲妈, 水平触发)如果你妈喊你一次, 你没动, 你妈就不管你了(后妈, 边缘触发) 举个例子来帮大家彻底了解这两种模式 我们已经把一个tcp socket添加到epoll描述符 这个时候socket的另一端被写入了2KB的数据 调用epoll_wait并且它会返回. 说明它已经准备好读取操作 然后调用read, 只读取了1KB的数据 继续调用epoll_wait…
水平触发Level Triggere 工作模式 epoll默认状态下就是LT工作模式. 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分. 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait 仍然会立刻返回并通知socket读事件就绪. 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回. 支持阻塞读写和非阻塞读写 边缘触发Edge Triggered工作模式 如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式. 当epoll检测到socket上事件就绪时, 必须立刻处理. 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候, epoll_wait 不会再返回了. 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会. ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll. 只支持非阻塞的读写 select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET.
4.5LT模式重要代码
tcp_epoll_server.hpp
#pragma once
#include vector
#include functional
#include sys/epoll.h
#include tcp_socket.hpp
typedef std::functionvoid(const std::string, std::string* resp) Handler;
class Epoll {
public:Epoll() {epoll_fd_ epoll_create(10);}~Epoll() {close(epoll_fd_);}bool Add(const TcpSocket sock) const {int fd sock.GetFd();printf([Epoll Add] fd %d\n, fd);epoll_event ev;ev.data.fd fd;ev.events EPOLLIN;int ret epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, ev);if (ret 0) {perror(epoll_ctl ADD);return false;}return true;}bool Del(const TcpSocket sock) const {int fd sock.GetFd();printf([Epoll Del] fd %d\n, fd);int ret epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);if (ret 0) {perror(epoll_ctl DEL);return false;}return true;}bool Wait(std::vectorTcpSocket* output) const {output-clear();epoll_event events[1000];int nfds epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);if (nfds 0) {perror(epoll_wait);return false;}// [注意!] 此处必须是循环到 nfds, 不能多循环for (int i 0; i nfds; i) {TcpSocket sock(events[i].data.fd);output-push_back(sock);}return true;}
private:int epoll_fd_;
};
class TcpEpollServer {
public:TcpEpollServer(const std::string ip, uint16_t port) : ip_(ip), port_(port) {}bool Start(Handler handler) {// 1. 创建 socketTcpSocket listen_sock;CHECK_RET(listen_sock.Socket());// 2. 绑定CHECK_RET(listen_sock.Bind(ip_, port_));// 3. 监听CHECK_RET(listen_sock.Listen(5));// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去Epoll epoll;epoll.Add(listen_sock);// 5. 进入事件循环for (;;) {// 6. 进行 epoll_waitstd::vectorTcpSocket output;if (!epoll.Wait(output)) {continue;}// 7. 根据就绪的文件描述符的种类决定如何处理for (size_t i 0; i output.size(); i) {if (output[i].GetFd() listen_sock.GetFd()) {// 如果是 listen_sock, 就调用 acceptTcpSocket new_sock;listen_sock.Accept(new_sock);epoll.Add(new_sock);}else {// 如果是 new_sock, 就进行一次读写std::string req, resp;bool ret output[i].Recv(req);if (!ret) {// [注意!!] 需要把不用的 socket 关闭// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了epoll.Del(output[i]);output[i].Close();continue;}handler(req, resp);output[i].Send(resp);} // end for} // end for (;;)}return true;}private:std::string ip_;uint16_t port_;
};4.6ET模式重要代码
基于 LT 版本稍加修改即可
修改 tcp_socket.hpp, 新增非阻塞读和非阻塞写接口对于 accept 返回的 new_sock 加上 EPOLLET 这样的选项
注意: 此代码暂时未考虑 listen_sock ET 的情况. 如果将 listen_sock 设为 ET, 则需要非阻塞轮询的方式 accept. 否则会导致同一时刻大量的客户端同时连接的时候, 只能 accept 一次的问题.
tcp_socket.hpp
// 以下代码添加在 TcpSocket 类中
// 非阻塞 IO 接口
bool SetNoBlock() {int fl fcntl(fd_, F_GETFL);if (fl 0) {perror(fcntl F_GETFL);return false;}int ret fcntl(fd_, F_SETFL, fl | O_NONBLOCK);if (ret 0) {perror(fcntl F_SETFL);return false;}return true;
}
bool RecvNoBlock(std::string* buf) const {// 对于非阻塞 IO 读数据, 如果 TCP 接受缓冲区为空, 就会返回错误// 错误码为 EAGAIN 或者 EWOULDBLOCK, 这种情况也是意料之中, 需要重试// 如果当前读到的数据长度小于尝试读的缓冲区的长度, 就退出循环// 这种写法其实不算特别严谨(没有考虑粘包问题)buf-clear();char tmp[1024 * 10] { 0 };for (;;) {ssize_t read_size recv(fd_, tmp, sizeof(tmp) - 1, 0);if (read_size 0) {if (errno EWOULDBLOCK || errno EAGAIN) {continue;}perror(recv);比特就业课tcp_epoll_server.hppreturn false;}if (read_size 0) {// 对端关闭, 返回 falsereturn false;}tmp[read_size] \0;*buf tmp;if (read_size (ssize_t)sizeof(tmp) - 1) {break;}}return true;
}
bool SendNoBlock(const std::string buf) const {// 对于非阻塞 IO 的写入, 如果 TCP 的发送缓冲区已经满了, 就会出现出错的情况// 此时的错误号是 EAGAIN 或者 EWOULDBLOCK. 这种情况下不应放弃治疗// 而要进行重试ssize_t cur_pos 0; // 记录当前写到的位置ssize_t left_size buf.size();for (;;) {ssize_t write_size send(fd_, buf.data() cur_pos, left_size, 0);if (write_size 0) {if (errno EAGAIN || errno EWOULDBLOCK) {// 重试写入continue;}return false;}cur_pos write_size;left_size - write_size;// 这个条件说明写完需要的数据了if (left_size 0) {break;}}return true;
}tcp_epoll_server.hpp
#pragma once
#include vector
#include functional
#include sys/epoll.h
#include tcp_socket.hpp
typedef std::functionvoid(const std::string, std::string* resp) Handler;
class Epoll {
public:Epoll() {epoll_fd_ epoll_create(10);}~Epoll() {close(epoll_fd_);}bool Add(const TcpSocket sock, bool epoll_et false) const {int fd sock.GetFd();printf([Epoll Add] fd %d\n, fd);epoll_event ev;ev.data.fd fd;if (epoll_et) {ev.events EPOLLIN | EPOLLET;}else {ev.events EPOLLIN;}int ret epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, ev);if (ret 0) {perror(epoll_ctl ADD);return false;}return true;}bool Del(const TcpSocket sock) const {int fd sock.GetFd();printf([Epoll Del] fd %d\n, fd);int ret epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);if (ret 0) {perror(epoll_ctl DEL);return false;}return true;}bool Wait(std::vectorTcpSocket* output) const {output-clear();epoll_event events[1000];int nfds epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);if (nfds 0) {perror(epoll_wait);return false;}// [注意!] 此处必须是循环到 nfds, 不能多循环for (int i 0; i nfds; i) {TcpSocket sock(events[i].data.fd);output-push_back(sock);}return true;}
private:int epoll_fd_;
};
class TcpEpollServer {
public:TcpEpollServer(const std::string ip, uint16_t port) : ip_(ip), port_(port) {}bool Start(Handler handler) {// 1. 创建 socketTcpSocket listen_sock;CHECK_RET(listen_sock.Socket());// 2. 绑定CHECK_RET(listen_sock.Bind(ip_, port_));// 3. 监听CHECK_RET(listen_sock.Listen(5));// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去Epoll epoll;epoll.Add(listen_sock);// 5. 进入事件循环for (;;) {// 6. 进行 epoll_waitstd::vectorTcpSocket output;if (!epoll.Wait(output)) {continue;}// 7. 根据就绪的文件描述符的种类决定如何处理for (size_t i 0; i output.size(); i) {if (output[i].GetFd() listen_sock.GetFd()) {// 如果是 listen_sock, 就调用 acceptTcpSocket new_sock;listen_sock.Accept(new_sock);epoll.Add(new_sock, true);}else {// 如果是 new_sock, 就进行一次读写std::string req, resp;bool ret output[i].RecvNoBlock(req);if (!ret) {// [注意!!] 需要把不用的 socket 关闭// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了epoll.Del(output[i]);output[i].Close();continue;}handler(req, resp);output[i].SendNoBlock(resp);printf([client %d] req: %s, resp: %s\n, output[i].GetFd(),req.c_str(), resp.c_str());} // end for} // end for (;;)}return true;}private:std::string ip_;uint16_t port_;
};onst std::string ip, uint16_t port) : ip_(ip), port_(port) { } bool Start(Handler handler) { // 1. 创建 socket TcpSocket listen_sock; CHECK_RET(listen_sock.Socket()); // 2. 绑定 CHECK_RET(listen_sock.Bind(ip_, port_)); // 3. 监听 CHECK_RET(listen_sock.Listen(5)); // 4. 创建 Epoll 对象, 并将 listen_sock 加入进去 Epoll epoll; epoll.Add(listen_sock); // 5. 进入事件循环 for (; { // 6. 进行 epoll_wait std::vector output; if (!epoll.Wait(output)) { continue; } // 7. 根据就绪的文件描述符的种类决定如何处理 for (size_t i 0; i output.size(); i) { if (output[i].GetFd() listen_sock.GetFd()) { // 如果是 listen_sock, 就调用 accept TcpSocket new_sock; listen_sock.Accept(new_sock); epoll.Add(new_sock, true); } else { // 如果是 new_sock, 就进行一次读写 std::string req, resp; bool ret output[i].RecvNoBlock(req); if (!ret) { // [注意!!] 需要把不用的 socket 关闭 // 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了 epoll.Del(output[i]); output[i].Close(); continue; } handler(req, resp); output[i].SendNoBlock(resp); printf(“[client %d] req: %s, resp: %s\n”, output[i].GetFd(), req.c_str(), resp.c_str()); } // end for } // end for (; } return true; }
private: std::string ip_; uint16_t port_; };