写点什么

Linux 高并发服务器 select/poll 实现

用户头像
赖猫
关注
发布于: 2021 年 03 月 12 日

1.select 实现


select 函数可以让程序同时监视多个文件描述符,当一个或多个文件描述符对设定的某种事件(例如读、写事件)“准备就绪”就可以进入下一步的操作。在高并发服务器中,为了避免多进程或者多线程大的开销,采用 Linux 内核来管理多客户端的连接请求,极大的减轻了服务器的压力。


select() 函数原型,参数注释也都在其中


int select(int nfds, fd_set *readfds, fd_set *writefds,             fd_set *exceptfds, struct timeval *timeout);/*int nfds;  整型,监听fd集合数组的下标,传入最大值 maxi+1,指向最大的fd。fd_set *readfds;  指向fd_set结构体,读事件的文件描述符的一个集合。当读事件变化, select就会返回一个大于0的值,表示有文件可读,判断FD_ISSET(int fd, fd_set *set)。fd_set *writefds;fd_set *exceptfds;同上。表示写事件,异常事件。struct timeval *timeout;时间,一般三种方式传入;第一,若将NULL以形参传入,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止。第二,若将时间值设具体值,函数经过timeout时间阻塞状态,监测是否有事件发生,文件无变化返回0,有变化返回一个正值。第三,0;非阻塞等待,直接测试所有指定的描述符并立即返回。*/
/*fd_set 结构体操作函数*/ void FD_CLR(int fd, fd_set *set);//将一个给定的文件描述符从集合中删除 int FD_ISSET(int fd, fd_set *set);//检查集合中指定的文件描述符是否有读写事件 void FD_SET(int fd, fd_set *set);//将一个给定的文件描述符加入集合之中 void FD_ZERO(fd_set *set);//清空集合
复制代码


对于 select 函数实现,关键之处有:


1.文件描述符 fd 数组 client[1024] (select 最大监听数 1024) 集合的创建;


2.动态管理监听的 fd 集合 client[],包括添加和释放;


3.监听读写事件的发生,与进一步的操作。


#include <sys/socket.h>#include<arpa/inet.h>#include<ctype.h>#include<unistd.h>#include<stdio.h>#include<sys/types.h>#include<strings.h>#include<sys/wait.h>#include<pthread.h>#include<fcntl.h>#include"wrap.h"#define MAXSIAE 10240#define SERV_IP "127.0.0.1"#define SERV_PORT 6666//"192.168.32.30"
int main(void){ int i,j,n,maxi;//maxi 定义为指向client[] 最后一个元素的下标 int nready,client[FD_SETSIZE];//自定义的存放监听文件描述符的数组,默认1024 将描述符单独存放,避免以后遍历 char buf[BUFSIZ],str[INET_ADDRSTRLEN];//INET_ADDRSTRLEN 16,存放IP char clie_IP[BUFSIZ]; int listenfd,connectfd,socketfd,maxfd; socklen_t clie_addr_len,serv_addr_len; /*集合*/fd_set rset,allset;//rset 读事件文件描述符的集合 allset用来暂时存储监听事件listenfd、 struct sockaddr_in serv_addr , clie_addr; listenfd = Socket(AF_INET,SOCK_STREAM,0); bzero(&serv_addr,sizeof(serv_addr));//地址结构初始化 serv_addr.sin_family = AF_INET;//ipv4 serv_addr.sin_port = htons(SERV_PORT); serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); //inet_pton(AF_INET,"192.168.32.30",&serv_addr.sin_addr.s_addr); serv_addr_len = sizeof(serv_addr); Bind(listenfd, (struct sockaddr*)&serv_addr, serv_addr_len); Listen(listenfd,128); printf("wait for client connect::\n"); maxfd = listenfd;//初始的第一个lfd是最大的,赋给maxfd /*listenfd的值是文件描述符的序号,初始为3号描述符*/ maxi = -1; for(i = 0; i < FD_SETSIZE;i++) { client[i] = -1;//将存放监听文件描述符的数组全置为-1,方便以后放入监听值 } FD_ZERO(&allset);//清空暂存rset事件缓存区 FD_SET(listenfd,&allset);//构造文件描述符类型的集合,并将listefd添加进去 while(1) { rset = allset;//每次循环时都重新设置select监控合集 nready = select(maxfd+1,&rset,NULL,NULL,NULL);//neady 是 select 函数返回的监听的描述符的个数 //rset集合中暂时存储了需要监听的文件描述符 /*参数1:最大描述符+1 参数2:读事件的集合 参数3:写事件的集合 参数4:异常事件 参数5:时间,NULL表示永久等待 */ if(nready < 0) { perr_exit("select error"); } if(FD_ISSET(listenfd,&rset))//是否有请的客户端请求,有的话将新生成的文件描述符添加到client数组 { clie_addr_len = sizeof(clie_addr); connectfd = Accept(listenfd,(struct sockaddr*)&clie_addr,&clie_addr_len); printf("recevied from %s at PORT %d ", inet_ntop(AF_INET,&clie_addr.sin_addr.s_addr,str,sizeof(str)), ntohs(clie_addr.sin_port));//打印客户端的信息 printf("connectfd = %d\n",connectfd);
for(i = 0;i < FD_SETSIZE;i++) { if(client[i] < 0)//将客户端的文件描述符存放进client[]数组,从头开始存放 { client[i] = connectfd; break; } } if(i == FD_SETSIZE) { fputs("too many clients\n",stderr); exit(1); } FD_SET(connectfd,&allset);//添加connectfd描述符 if(connectfd > maxfd) { maxfd = connectfd ;//select 参数1需要:最大描述符+1 } if(i > maxi) { maxi = i; } if(--nready == 0)// continue; } /*在已经监听过的client数组中的fd 进行读写操作*/ for(i = 0;i <= maxi;i++)// { if((socketfd = client[i]) < 0) { continue; } if(FD_ISSET(socketfd,&rset))//判断socketfd是否在rset集合中 { if((n = Read(socketfd,buf,sizeof(buf))) == 0) { Close(socketfd); FD_CLR(socketfd,&allset);//解除allset对文件描述符的监控 client[i] = -1; }else if(n > 0)//n 实际读到的字节数 { for(j = 0;j < n;j++) { buf[j] = toupper(buf[j]); } sleep(10); Write(socketfd,buf,n); Write(STDOUT_FILENO,buf,n); if(-- nready == 0) break; } } } } Close(listenfd); return 0;}
复制代码


2.poll 实现


poll 函数和 select 函数一脉相承, 但是它克服了 select 监听上限 1024…

poll() 函数原型


  int poll(struct pollfd *fds, nfds_t nfds, int timeout);  /*nfds_t nfds; 文件描述符集合,传入参数nfds+1,与select此参数一样int timeout;时间。同select. 第一,若将NULL以形参传入,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止。第二,若将时间值设具体值,函数经过timeout时间阻塞状态,监测是否有事件发生,文件无变化返回0,有变化返回一个正值。第三,0;非阻塞等待,直接测试所有指定的描述符并立即返回。*/// 重点了解结构体类型如下 struct pollfd{  int fd; // 需要监听的文件描述符   short events; //需要监听的事件   short revents; // 已发生的事件 }events 事件的表达式用来描述事件的类型,主要有以下几种:POLLIN:文件中有数据可读,下面实例中使用到了这个标志POLLPRI::文件中有紧急数据可读POLLOUT:可以向文件写入数据POLLERR:文件中出现错误,只限于输出POLLHUP:与文件的连接被断开,只限于输出POLLNVAL:文件描述符是不合法的,即它并没有指向一个成功打开的文件
复制代码


poll 实现原理和 select 一样,只是将 select 中对各种事件的响应封装起来,形成一个结构体,并对相应的事件类型进行封装管理,从而看起来比 select 的参数要简单明了一些,实质实一样的。


#include <sys/socket.h>#include<arpa/inet.h>#include<ctype.h>#include<unistd.h>#include<stdio.h>#include<sys/types.h>#include<strings.h>#include<sys/wait.h>#include<pthread.h>#include<fcntl.h>#include<errno.h>#include<poll.h>#include<time.h>#include<sys/stropts.h>#include"wrap.h"

#define MAXSIAE 10240#define SERV_IP "127.0.0.1"#define SERV_PORT 6666//"192.168.32.30"#define OPEN_MAX 1024 int main(void){ int i,j,n,maxi; int nready; char buf[BUFSIZ],str[INET_ADDRSTRLEN];INET_ADDRSTRLEN 16,存放IP int listenfd,connectfd,socketfd; socklen_t clie_addr_len,serv_addr_len; struct pollfd client[OPEN_MAX];//定义 poll 数组 struct sockaddr_in serv_addr , clie_addr; listenfd = Socket(AF_INET,SOCK_STREAM,0); int opt = 1 ; setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//端口复用函数,,, bzero(&serv_addr,sizeof(serv_addr));//地址结构初始化 serv_addr.sin_family = AF_INET;//ipv4 serv_addr.sin_port = htons(SERV_PORT); serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); //inet_pton(AF_INET,"192.168.32.30",&serv_addr.sin_addr.s_addr); serv_addr_len = sizeof(serv_addr); Bind(listenfd, (struct sockaddr*)&serv_addr, serv_addr_len); Listen(listenfd,128); printf("wait for client connect::\n"); client[0].fd = listenfd;//需要监听的第一个文件描述符 存入client[0]; client[0].events = POLLIN;//listenfd 读事件监听 POLLIN 读事件 POLLOUT 写事件 POLLERR 异常事件 for(i = 1; i < OPEN_MAX;i++) { client[i].fd = -1;//将存放监听文件描述符的数组全置为-1,方便以后放入监听值 } maxi = 0; while(1) { nready = poll(client,maxi+1,0);//阻塞监听 是否有客户端连接请求,返回请求的fd个数 /*参数1:struct poll结构体数组 参数2:最大监听文件描述符+1 参数3:时间 */ if(client[0].revents & POLLIN)//return 读事件 { printf("guoqi4\n"); clie_addr_len = sizeof(clie_addr); connectfd = Accept(listenfd,(struct sockaddr*)&clie_addr,&clie_addr_len); printf("guoqi5\n"); printf("recevied from %s at PORT %d ", inet_ntop(AF_INET,&clie_addr.sin_addr.s_addr,str,sizeof(str)), ntohs(clie_addr.sin_port));//打印客户端的信息 printf("connectfd = %d\n",connectfd); for(i = 0;i < OPEN_MAX;i++) { if(client[i].fd < 0)//将客户端的文件描述符存放进client[]数组,从头开始存放 { client[i].fd = connectfd; break; } } if(i == OPEN_MAX) { perr_exit("too many clients\n"); } client[i].events = POLLIN; if(i > maxi) { maxi = i; } if(--nready <= 0) continue; } /*在已经监听过的client数组中的fd 进行读写操作*/ for(i = 0;i <= maxi;i++)// { if((socketfd = client[i].fd) < 0) { continue; } if(client[i].revents&POLLIN)//POLLIN 读事件 POLLOUT 写事件 POLLERR 异常事件 { if((n = Read(socketfd,buf,sizeof(buf))) < 0) { if(errno == ECONNRESET)//收到RST标志 { printf("client[%d] aborted connection\n",i); Close(socketfd); client[i].fd = -1;//将监听数组该目标置为-1 }else{ perr_exit("read error"); } } else if(n == 0)//n 实际读到的字节数 { printf("client[%d] closed connection\n",i); Close(socketfd); client[i].fd = -1;//将监听数组该目标置为-1 } else { for(j = 0;j < n;j++) { buf[j] = toupper(buf[j]); } sleep(2); Write(socketfd,buf,n); Write(STDOUT_FILENO,buf,n); } if(-- nready <= 0) break; } } } Close(listenfd); return 0; }
复制代码


总结:select,poll 函数实现高并发服务器,都免不了一个问题,那就是在装载监听文件描述符的集合 client[]中,每当需要对已经连接的 fds 进行再次监听读写请求的时候,内核都会通过遍历这个数组来实现对请求的定位,这就会造成开销过大,效率不高。如果能实现定点查找到需要进行读写操作的 fds,那么久会对整个服务器的效率得到质的提升,所以 epoll 横空出世,且听下回分解。


原文链接:Linux 高并发服务器 select/poll实现


另外 Linux、C/C++技术交流群:【960994558】整理了一些个人觉得比较好的学习书籍、大厂面试题、和热门技术教学视频资料共享在里面(包括 C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK 等等.),有需要的可以自行添加哦!~



以上有不足的地方欢迎指出讨论,觉得不错的朋友希望能得到您的转发支持,同时可以持续关注我,每天分享干货内容


用户头像

赖猫

关注

还未添加个人签名 2020.11.28 加入

纸上得来终觉浅,绝知此事要躬行

评论

发布
暂无评论
Linux 高并发服务器 select/poll实现