IO多路复用与socket

前言

简单来讲I/O多路复用就是用一个进程来监听多个文件描述符(fd),我们将监听的fd通过系统调用注册到内核中,如果有一个或多个fd可读或可写,内核会通知应用程序来对这些fd做读写操作,select、poll、epoll都是用于处理此类问题的系统API,只不过注册和调用的方式略有不同。

例如telnet命令的操作,telnet命令从shell读入数据然后写到socket fd上,同时也需要从socket fd上读数据写到shell上。telnet server需要从socket读出命令并发送给shell,再将命令执行结果返回给telnet客户端。此时对于telnet命令来说,需要接收用户输入和sockfd的输入,也需要输出给用户和socket fd,这两种输入和输出是无序的,不能单纯的阻塞某一个读操作,如何处理这种场景?

  1. 将两个read fd设置为非阻塞,然后轮询两个read fd,如果第一个收到数据,则处理,之后再看第二个read fd是否有数据需要读取,如此往复。
  2. 使用多进程或者多线程,将用户输入和输出到sockfd作为一条通道。将sockfd输入和输出给用户作为一条通道。

这样父进程读入用户数据后会发送给socketfd到telenetd,子进程读入telnetd数据后发送给用。当用户终止父进程时,需要发送信号给子进程。当子进I/O结束终止时,父进程也需要接收子进程的结束信号。使用多线程同样需要一些复杂的线程间同步操作。

  1. 异步I/O的方式,对两个read fd使用不同的信号,使用不同的处理函数处理。

以上三种方法在读写连接少的时候没什么问题,当一个server进程需要维护成千上万条通信连接时就会出问题。第1种会无端浪费cpu,第2种就算使用线程\进程池来避免上下文切换的开销,当连接数量过多的时候,会占用大量的内存,第3种使用异步I/O显然信号类型肯定是不够用的。所以为了应对此类问题,有了I/O多路复用的技术。

  1. 使用select、poll、epoll,将两个read fd注册到内核,I/O多路复用会阻塞直到有read请求过来,然后返回通知应用,应用针对不同的描述符进行不同的操作。这样可以做到在一个进程中监听并处理多个描述符,再搭配线程池使用,则可以尽量的减少cpu和内存的使用,自然可以维护更多的连接。
select

先看一下select的创建函数

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval timeout); // 监听描述符数目
// readfds、writefds、exceptfds表示可读、可写、异常事件对应的fd
// timeout表示select阻塞多长时间后返回,NULL为一直阻塞、0为立即返回、或指定超时时间 /

返回值:<br/>
0表示超时时间内没有就绪的fds<br/>
成功时返回就绪fds总数(读、写、异常)<br/>
失败返回-1并设置errno,如果select等待期间被信号中断则立即返回-1并设置errno为EINTR<br/>

*/

fd_setsys/selct.hfd_set‘.  */值为,系统默认单个进程打开最大fd数量

select通过以下四个宏来对fd_set置位:

void FD_CLR(int fd, fd_set *set); // 清除fd_set中的fd位
int FD_ISSET(int fd, fd_set *set); // 确认fd是否在fd_set中开启,非0值为开启,0为关闭
void FD_SET(int fd, fd_set *set); // 开启fd在fd_set中的位
void FD_ZERO(fd_set *set); // 清除fd_set的所有位

demo

我们可以使用select的read_fds和exception_fds来接收普通数据和带外数据

#include &lt;arpa/inet.h&gt;
#include &lt;netinet/in.h&gt;
#include &lt;sys/select.h&gt;
#include &lt;sys/socket.h&gt;
#include &lt;unistd.h&gt; #include &lt;cassert&gt;
#include &lt;cstring&gt;
#include &lt;iostream&gt;
#define BUFFERSIZE 1024
using namespace std; int main(int argc, char *argv[]) {
if (argc &lt; 3) {

cout &lt;&lt; &#34;usage: &#34; &lt;&lt; argv[0] &lt;&lt; &#34; ip port&#34; &lt;&lt; endl;<br/>
return 1;<br/>

}
// 设置TCP socket server
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
const char *ip = argv[1];
inet_pton(AF_INET, ip, &server_addr.sin_addr); int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd &lt; 0) {

cout &lt;&lt; &#34;error in create socket&#34; &lt;&lt; endl;<br/>
return 1;<br/>

}
int ret =

  bind(listenfd, (struct sockaddr *)&amp;server_addr, sizeof(server_addr));<br/>

assert(ret != -1);
ret = listen(listenfd, 6);
assert(ret != -1); // 接收客户端连接
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int connfd =

  accept(listenfd, (struct sockaddr *)&amp;client_addr, &amp;client_addr_len);<br/>

if (connfd &lt; 0) {

close(listenfd);<br/>
cout &lt;&lt; &#34;accept connect error&#34; &lt;&lt; endl;<br/>
return 1;<br/>

}
// 初始化要用到的select fd集
fd_set readfds;
fd_set exceptionfds;
FD_ZERO(&readfds);
FD_ZERO(&exceptionfds);
char buffer[BUFFERSIZE];
while (true) {

// 如果是普通数据则触发readfds, 如果是oob数据触发exceptionfds<br/>
FD_SET(connfd, &amp;readfds);<br/>
FD_SET(connfd, &amp;exceptionfds);<br/>
// 注册select, 不关心写fds设置为NULL,timeout NULL为阻塞<br/>
ret = select(connfd + 1, &amp;readfds, NULL, &amp;exceptionfds, NULL);<br/>
if (ret &lt; 0) {<br/>
  cout &lt;&lt; &#34;select error&#34; &lt;&lt; endl;<br/>
  break;<br/>
}<br/>
memset(buffer, &#39;\0&#39;, BUFFERSIZE);<br/>
if (FD_ISSET(connfd, &amp;readfds)) {<br/>
  // 接收普通数据<br/>
  int number = recv(connfd, buffer, BUFFERSIZE - 1, 0);<br/>
  if (number &lt; 0) {<br/>
    cout &lt;&lt; &#34;recv normal data error&#34; &lt;&lt; endl;<br/>
    break;<br/>
  } else if (number == 0) {<br/>
    cout &lt;&lt; &#34;connection closed&#34; &lt;&lt; endl;<br/>
    break;<br/>
  }<br/>
  cout &lt;&lt; &#34;recv normal data &#34; &lt;&lt; number &lt;&lt; &#34; bytes: &#34; &lt;&lt; buffer &lt;&lt; endl;<br/>
}<br/>
memset(buffer, &#39;\0&#39;, BUFFERSIZE);<br/>
if (FD_ISSET(connfd, &amp;exceptionfds)) {<br/>
  // 接收带外数据<br/>
  int number = recv(connfd, buffer, BUFFERSIZE - 1, MSG_OOB);<br/>
  if (number &lt; 0) {<br/>
    cout &lt;&lt; &#34;recv oob data error&#34; &lt;&lt; endl;<br/>
    break;<br/>
  } else if (number == 0) {<br/>
    cout &lt;&lt; &#34;connection closed&#34; &lt;&lt; endl;<br/>
    break;<br/>
  }<br/>
  cout &lt;&lt; &#34;recv oob data &#34; &lt;&lt; number &lt;&lt; &#34; bytes: &#34; &lt;&lt; buffer &lt;&lt; endl;<br/>
}<br/>

}
close(listenfd);
close(connfd);
return 0;
}

客户端截取部分发送内容

const char *oob_data = “abc”;
const char normal_data = “123”;
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, oob_data, strlen(oob_data), MSG_OOB);
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, normal_data, strlen(normal_data), 0);

运行结果如下,成功的接收到带外数据并处理:

socket与I/O事件触发

socket fd可读事件

getsockoptsetsockopt

socket fd可写事件

  1. 内核发送缓冲区空间大于等于SO_SNDLOWAT可无阻塞写,send返回大于0
  2. 如果该socket fd已经关闭,再执行写会触发SIGPIPE信号
  3. connect连接成功或超时失败
  4. socket上有未处理的错误,通过getsockopt读取和清除错误

socket fd异常事件

  1. socket上接收到带外数据
poll

poll较select做出了改进,select使用bitmap来监视fds,而poll使用pollfd结构的数组来监视fds,突破了fds数量的限制,通过结构体将fd与events绑定,可以监视更多类型的事件

struct pollfd {
int fd; /
file descriptor /
short events; /
requested events 注册的事件/
short revents; /
returned events 实际发生的事件*/
};

常用事件类型

  • POLLIN:数据可读
  • POLLOUT:数据可写
  • POLLRDHUP:TCP连接被对端关闭,或者对端关闭了写操作
  • POLLERR:poll发生错误
  • POLLHUP:管道写端关闭,读端fd收到POLLHUP事件
  • POLLINVAL:fd没有打开

poll的创建函数

int poll(struct pollfd fds, nfds_t nfds, int timeout)
// fds 是pollfd结构类型的数组
// nfds 指定fds的大小
// timeout 超时时间,-1阻塞,0立即返回 /

返回值:<br/>
0表示超时时间内没有就绪的fds<br/>
成功时返回就绪fds总数(读、写、异常)<br/>
失败返回-1并设置errno,如果select等待期间被信号中断则立即返回-1并设置errno为EINTR<br/>

*/

demo

监听两个文件的写入,输出到标准输出

#include &lt;fcntl.h&gt;
#include &lt;poll.h&gt;
#include &lt;unistd.h&gt; #include &lt;cstdio&gt;
#include &lt;cstring&gt;
#include &lt;iostream&gt;
#define BUFFERSIZE 1024
using namespace std;
// 存放pollfd结构数组
pollfd fds[2]; void setnonblocking(int fd) {
int old_fd_option = fcntl(fd, F_GETFL);
int new_fd_option = O_NONBLOCK | old_fd_option;
fcntl(fd, F_SETFL, new_fd_option);
} int main(int argc, char *argv[]) {
if (argc &lt; 2) {

cout &lt;&lt; &#34;usage: &#34; &lt;&lt; argv[0] &lt;&lt; &#34;filename1 filename2&#34; &lt;&lt; endl;<br/>
return 1;<br/>

}
// 打开创建好的文件
int fd1 = open(argv[1], O_RDONLY);
int fd2 = open(argv[2], O_RDONLY);
// 设置pollfd结构
fds[0].fd = fd1;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0; fds[1].fd = fd2;
fds[1].events = POLLIN | POLLERR;
fds[1].revents = 0;
// 设置fd为非阻塞,方便看读取的效果,否则会阻塞在read调用上
setnonblocking(fd1);
setnonblocking(fd2); char buffer[BUFFERSIZE];
int number = 0;
while (true) {

// 创建poll<br/>
int ret = poll(fds, 2, -1);<br/>
if (ret &lt; 0) {<br/>
  cout &lt;&lt; &#34;poll error&#34; &lt;&lt; endl;<br/>
  break;<br/>
}<br/>
for (int i = 0; i &lt; 2; ++i) {<br/>
  pollfd fd = fds[i];<br/>
  if (fd.revents &amp; POLLERR) {<br/>
    cout &lt;&lt; &#34;poll error fd: &#34; &lt;&lt; fd.fd &lt;&lt; endl;<br/>
    continue;<br/>
    // 如果fd可读<br/>
  } else if (fd.revents &amp; POLLIN) {<br/>
    // 每次poll事件清空缓冲区<br/>
    bzero(buffer, BUFFERSIZE);<br/>
    while ((number = read(fd.fd, buffer, BUFFERSIZE)) &gt; 0) {<br/>
      cout &lt;&lt; &#34;read &#34; &lt;&lt; number &lt;&lt; &#34; bytes from file &#34; &lt;&lt; argv[i + 1]<br/>
           &lt;&lt; &#34; content: &#34; &lt;&lt; buffer &lt;&lt; endl;<br/>
    }<br/>
  }<br/>
}<br/>

}
close(fd1);
return 0;
}

  1. 新建文件1.txt和2.txt
  2. 运行server,另起终端随机在1.txt和2.txt上使用echo追加写入内容

server端输出

epoll
epoll_ctl
  • 一方面无需像使用select/poll每次调用都将整个fd集传递给它们。
  • 另一方面在使用的时候应用遍历的都是事件就绪的fd。

创建epoll:

int epoll_create(int size);
// size:提示内核事件表的大小,不是硬限制
// 返回一个fd,所有其他的函数都操作该fd

操作事件:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event event);
// epfd:epoll_create返回的fd
/
op:

EPOLL_CTL_ADD 添加fd到epfd,事件集合为event<br/>
EPOLL_CTL_MOD 修改epfd中的fd事件,事件集合为event<br/>
EPOLL_CTL_DEL 从epfd中删除fd,忽略event参数,一般设为NULL<br/>

*/
// 返回值:成功返回0,失败返回-1设置errno

获取就绪的事件集

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// epfd:epoll_create返回的fd
// events:就绪的事件数组,应用遍历它
// maxevents:指定最大监听的事件数目
// timeout:超时时间,-1阻塞,0立即返回
// 返回值:成功返回就绪fd的数目,失败返回-1设置errno

LT和ET模式

epoll支持两个模式LT(Level Trigger)和ET(Edge Trigger)

epoll_wait

EPOLLONESHOT事件

epoll_ctl

demo

server的主线程与客户端建立TCP连接,建立好连接后将连接fd注册到epoll,如果该链接有请求数据就启动新的线程来处理。使用telnet作为客户端对比不使用EPOLLONESHOT和使用EPOLLONESHOT后server的行为

#include &lt;arpa/inet.h&gt;
#include &lt;fcntl.h&gt;
#include &lt;netinet/in.h&gt;
#include &lt;pthread.h&gt;
#include &lt;sys/epoll.h&gt;
#include &lt;sys/socket.h&gt;
#include &lt;unistd.h&gt; #include &lt;cassert&gt;
#include &lt;cstring&gt;
#include &lt;iostream&gt; using namespace std;
#define MAX_EVENT_NUMBER 1024
#define BUFFERSIZE 1024 static int epollfd = 0; void setnonblocking(int fd) {
int old_fd_option = fcntl(fd, F_GETFL);
int new_fd_option = old_fd_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_fd_option);
} void register_epoll(int epollfd, int fd, bool newfd = false,

                bool oneshot = false) {<br/>

epoll_event events;
events.data.fd = fd;
events.events = EPOLLIN | EPOLLET; // 读事件、ET工作模式
if (oneshot) {

events.events |= EPOLLONESHOT;  // 使用EPOLLONESHOT<br/>

}
if (newfd) {

epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &amp;events);<br/>

} else {

epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &amp;events);<br/>

}
setnonblocking(fd);
} void *handle_connect(void *arg) {
pid_t tid = gettid();
int connfd = *((int *)arg);
cout &lt;&lt; “use thread ” &lt;&lt; tid &lt;&lt; “ to handle connect ” &lt;&lt; connfd &lt;&lt; endl;
char buffer[BUFFERSIZE];
memset(buffer, ’\0‘, BUFFERSIZE);
while (true) {

int bytes = recv(connfd, buffer, BUFFERSIZE - 1, 0);<br/>
if (bytes == 0) {<br/>
  cout &lt;&lt; &#34;the other peer close connection&#34; &lt;&lt; endl;<br/>
  close(connfd);<br/>
  break;<br/>
} else if (bytes &lt; 0) {<br/>
  if (errno == EAGAIN) {<br/>
    cout &lt;&lt; connfd &lt;&lt; &#34; Temporarily unavailable, read later&#34; &lt;&lt; endl;<br/>
    register_epoll(epollfd, connfd, false,<br/>
                   true);  // 重置该连接fd的EPOLLONESHOT<br/>
    break;<br/>
  } else {<br/>
    cout &lt;&lt; &#34;read &#34; &lt;&lt; connfd &lt;&lt; &#34; failure&#34; &lt;&lt; endl;<br/>
    close(connfd);<br/>
  }<br/>
} else {<br/>
  cout &lt;&lt; &#34;thread &#34; &lt;&lt; tid &lt;&lt; &#34; recve &#34; &lt;&lt; bytes<br/>
       &lt;&lt; &#34; bytes from connection &#34; &lt;&lt; connfd &lt;&lt; &#34;, content: &#34; &lt;&lt; buffer<br/>
       &lt;&lt; endl;<br/>
  sleep(10);<br/>
}<br/>

}
cout &lt;&lt; “thread ” &lt;&lt; tid &lt;&lt; “ end handle connect ” &lt;&lt; connfd &lt;&lt; endl;
} int main(int argc, char *argv[]) {
if (argc &lt; 3) {

cout &lt;&lt; &#34;usage: &#34; &lt;&lt; argv[0] &lt;&lt; &#34; ip port&#34; &lt;&lt; endl;<br/>
return 1;<br/>

}
// 创建server端socket
const char *ip = argv[1];
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(atoi(argv[2]));
inet_pton(AF_INET, ip, &serv_addr.sin_addr); int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd &lt; 0) {

cout &lt;&lt; &#34;create socket error&#34; &lt;&lt; endl;<br/>
return 1;<br/>

}
int ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
// epoll_event数组,用来接收返回的就绪fd
epoll_event events[MAX_EVENT_NUMBER];
// 创建epoll
epollfd = epoll_create(5);
if (epollfd &lt; 0) {

cout &lt;&lt; &#34;create epoll error&#34; &lt;&lt; endl;<br/>
close(listenfd);<br/>
return 1;<br/>

}
// listenfd 无需使用EPOLLONESHOT
register_epoll(epollfd, listenfd, true, false);
while (true) {

// 等待事件触发<br/>
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);<br/>
for (int i = 0; i &lt; number; i++) {<br/>
  int sockfd = events[i].data.fd;<br/>
  if ((sockfd == listenfd) &amp;&amp; (events[i].events &amp; EPOLLIN)) {<br/>
    // 接收客户端连接<br/>
    struct sockaddr cli_addr;<br/>
    socklen_t cli_addr_len = sizeof(cli_addr);<br/>
    int connfd =<br/>
        accept(sockfd, (struct sockaddr *)&amp;cli_addr, &amp;cli_addr_len);<br/>
    if (connfd &lt; 0) {<br/>
      cout &lt;&lt; &#34;accept connect failure&#34; &lt;&lt; endl;<br/>
      continue;<br/>
    }<br/>
    // 新的连接使用EPOLLONESHOT属性<br/>
    register_epoll(epollfd, connfd, true, true);

// 新的连接不使用EPOLLONESHOT属性

    // register_epoll(epollfd, connfd, true, false);<br/>
  } else if (events[i].events &amp; EPOLLIN) {<br/>
    // 已建立的连接有数据请求<br/>
    pthread_t thread;<br/>
    // 创建线程处理连接数据,传入sockfd参数以便重置EPOLLONESHOT<br/>
    pthread_create(&amp;thread, NULL, handle_connect, &amp;sockfd);<br/>
  } else {<br/>
    cout &lt;&lt; &#34;other errors&#34; &lt;&lt; endl;<br/>
  }<br/>
}<br/>

}
close(listenfd);
close(epollfd); return 0;
}

使用EPOLLONESHOT事件:

  1. telnet1发送c1 h1, 发送c1 h2
  2. telnet2发送c2 h1
  3. telnet3发送c1 h3

server使用线程102108逐个处理 connect5的请求,对于connect6使用线程102109单独处理

不使用EPOLLONESHOT事件:

修改代码

注释掉
// register_epoll(epollfd, connfd, false,
// true); // 重置该连接fd的EPOLLONESHOT 不给connfd使用EPOLLONESHOT // 新的连接使用EPOLLONESHOT属性
// register_epoll(epollfd, connfd, true, true); // 新的连接不使用EPOLLONESHOT属性
register_epoll(epollfd, connfd, true, false);

编译运行

  1. telnet1发送c1 h1, 发送c1 h2
  2. telnet2发送c2 h1
  3. telnet3发送c1 h3

线程102137处理connfd 5,sleep的期间内,connfd5有新的请求到来,可以看到新起了线程来处理connfd5的新消息

对比总结

select

FD_ISSET__FD_SETSIZE 1024

poll

/proc/sys/fs/file-max/

epoll

epoll_createepoll_ctlepoll_wait/proc/sys/fs/file-max/

学习自:

《Linux高性能服务器编程》

《UNIX环境高级编程》

《UNIX系统编程》