以系统提供的函数poll
为核心实现多路转接,即高级网络IO
认识 poll 函数
1 2 3
| #include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
|
特殊的结构体pollfd
1 2 3 4 5
| struct pollfd { int fd; short events; short revents; };
|
功能
poll
也类似于select
函数,能够同时监听多个文件描述符的就绪事件
参数解释
fds
是pollfd
结构体数组的指针/地址,提供待监听文件描述符的列表。其中结构体中的位图提供了每个文件描述符的各种状态的可能性
nfds
表示传入数组的长度
timeout
表示poll
函数的超时时间,单位是毫秒(ms)
其中events
和revents
都是位图,可以有如下取值
| 事件 | 描述 | 是否可作为输入 | 是否可作为输出 |
| POLLIN
| 数据(包括普通数据和优先级数据)可读 | 是 | 是 |
| POLLRDNORM
| 普通数据可读 | 是 | 是 |
| POLLRDBAND
| 优先级带数据可读 | 是 | 是 |
| POLLPRI
| 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
| POLLOUT
| 数据(包括普通数据和优先数据)可写 | 是 | 是 |
| POLLWRNORM
| 普通数据可写 | 是 | 是 |
| POLLWRBAND
| 优先级带数据可写 | 是 | 是 |
| POLLRDHUP
| TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入 | 是 | 是 |
| POLLERR
| 错误 | 否 | 是 |
| POLLHUP
| 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
| POLLNVAL
| 文件描述符没有打开 | 否 | 是 |
返回值
<0
,表示出错
=0
,表示等待超时
>0
,表示就绪的文件描述符数量
poll就绪条件
与select相同
读就绪
- socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0;
- socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
- 监听的socket上有新的连接请求;
- socket上有未处理的错误;
写就绪
- socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记
SO_SNDLOWAT
, 此时可以无阻塞的写, 并且返回值大于0;
- socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
- socket使用
非阻塞connect连接
成功或失败之后;
- socket上有未读取的错误;
poll 的优点
不同于select
使用三个位图来表示三个fdset
的方式,poll
所使用的pollfd
结构体将功能集成到了一个pollfd
对象中,在poll
服务器中每个fd
的封装性和独立性更高,更接近面向对象编程的思想,这导致了如下优点:
poll
所需的传参比select
更简单更方便
poll
使用的是数组而不是位图,没有最大数量限制(当然数量太大会导致性能下降)
poll 的缺点
难以高效处理大量的文件描述符
后续处理效率低
: poll
函数返回后,用户仍然需要遍历轮询pollfd
数组来获取就绪的描述符
大量的拷贝消耗
: 每次调用poll
都需要把大量的pollfd
结构从用户态拷贝到内核中
线性下降的效率
: 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效
率也会线性下降
代码实践
实际上poll服务器
和select服务器
差别不是很大,仅仅只是核心函数
和要维护的数组
发生了变化
文件结构
Socket.h
基于先前封装过的Socket
类,但修改了日志输出方式
main.cpp
,启动服务的源文件
selectServer.hpp
,服务器的实现文件
makefile
项目构建文件。
框架函数设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| class PollServer { public: PollServer(const uint16_t port) :_port(port) {}
~PollServer(){close(_listen_sock.Fd());};
void init(); void dispatcher(); void acceptor(); void recver(struct pollfd& fd,int pos); void start();
private: Sock _listen_sock; std::vector<struct pollfd> _fd_array; uint16_t _port; };
|
可以看到我们这次使用了vector容器
管理待监听的文件描述符,因为在poll服务器
中文件描述符数组的动态性更强,使用vector
更加适用于动态维护
makefile 实现
1 2 3 4 5 6
| pollserver:main.cpp g++ -o $@ $^ -std=c++11
.PHONY:clean clean: rm -f pollserver
|
完整代码实现
PollServer.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| #pragma once
#include <sys/fcntl.h> #include <unistd.h> #include <iostream> #include <string> #include <poll.h> #include <string.h> #include <vector> #include "Socket.hpp"
static const uint16_t defaultport = 25565; static const uint16_t fd_num_max = 8; static struct pollfd default_fd = {-1,0,0};
class PollServer { public: PollServer(const uint16_t port) :_port(port) {}
~PollServer(){close(_listen_sock.Fd());};
void init() { _listen_sock.Socket(); _listen_sock.Bind(_port); _listen_sock.Listen(); } void dispatcher() { std::vector<int> del_list; int size = _fd_array.size(); for(int i = 0;i<size;++i) { struct pollfd fd = _fd_array[i]; if(fd.fd == default_fd.fd) { continue; } if(fd.fd == _listen_sock.Fd() == fd.revents == POLLIN) { acceptor(); } else if (fd.revents == POLLIN) { printf("recver\n"); recver(fd,i,del_list); } } for(int i = del_list.size()-1;i>=0;--i) { printf("已删除连接fd : %d \n",_fd_array[i].fd); _fd_array.erase(_fd_array.begin() + del_list[i]); } } void acceptor() { std::string client_ip; uint16_t client_port; int sock = _listen_sock.Accept(&client_ip,&client_port); if(sock < 0) return; printf("[Info]accept success,[%s,%d]\n",client_ip.c_str(),client_port); _fd_array[0].revents = 0; if(_fd_array.size() < fd_num_max) { _fd_array.push_back({sock,POLLIN,0}); } else { printf("[Warning] 连接数超载,已丢弃本次连接 fd :%d",sock); close(sock); } } void recver(struct pollfd& fd,int pos,std::vector<int>& del_list) { char buffer[1024] = {0}; int n = read(fd.fd,buffer,1024-1); if(n>0) { fd.revents = 0; std::cout<<"poll get msgs#" << buffer << std::endl; } else if(n == 0) { printf("[INFO] client quit,close fd : %d",fd.fd); close(fd.fd); del_list.push_back(pos); } else { printf("[Warning] read error, fd: %d",fd.fd); close(fd.fd); del_list.push_back(pos); } } void start() { int listenfd = _listen_sock.Fd(); _fd_array.push_back({listenfd,POLLIN,0}); int cnt = 0; for(;;) { for(auto& fd : _fd_array) { printf("%d ",fd.fd); }printf("\n"); int ret = poll(_fd_array.data(),_fd_array.size(),1000); if(ret < 0) { perror("poll"); continue; } else if (ret == 0) { printf("[INFO] poll time out %d\n",cnt++); continue; } else { dispatcher(); continue; } } }
private: Sock _listen_sock; std::vector<struct pollfd> _fd_array; uint16_t _port; };
|
main.cpp
1 2 3 4 5 6 7 8 9
| #include "PollServer.hpp"
int main() { PollServer svr(8818); svr.init(); svr.start(); return 0; }
|