epoll反应堆模型demo实现

在高并发TCP请求中,为了实现资源的节省,效率的提升,Epoll逐渐替代了之前的select和poll,它在用户层上规避了忙轮询这种效率不高的监听方式,epoll的时间复杂度为O(1), 也就意味着,epoll在高并发场景,随着文件描述符的增长,有良好的可扩展性。

  • select 和 poll 监听文件描述符list,进行一个线性的查找 O(n)
  • epoll: 使用了内核文件级别的回调机制O(1)

关键函数有三个:

  • epoll_create(int size): 创建一个epoll实例,文件描述符

    返回一个对象,即红黑树的树根,在代码实现中多为全局变量文件描述符。size参数是对连接的预估,对内核只有参考价值,实际超过也无妨

  • epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):

    该系统调用用于添加、修改或删除文件引用的列表描述符epfd。它要求执行操作op对于目标文件描述符fd。op的操作有添加,删除,更改。事件参数描述链接到文件的对象描述符fd。数据类型为下方核心结构epoll_event。

  • epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout):

    等待epoll事件从epoll实例中发生, 并返回事件以及对应文件描述符.

    当timeout大于0时为规定时限,等于0为非阻塞,小于0为阻塞,maxevents为events可以接收的最大元素个数,即最大返回事件个数,events是一个传出数组,它用来接收返回事件。

核心数据结构

点击查看代码
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t; struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

原理

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关,如下所示:

struct eventpoll {

  ...

  /红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,

  也就是这个epoll监控的事件
/

  struct rb_root rbr;

  /双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件/

  struct list_head rdllist;

  ...

};

我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个rdllist双向链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个rdllist双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。

在epoll中对于每一个事件都会建立一个epitem结构体,如下所示:

struct epitem {

  ...

  //红黑树节点

  struct rb_node rbn;

  //双向链表节点

  struct list_head rdllink;

  //事件句柄等信息

  struct epoll_filefd ffd;

  //指向其所属的eventepoll对象

  struct eventpoll *ep;

  //期待的事件类型

  struct epoll_event event;

  ...

}; // 这里包含每一个事件对应着的信息。

当调用epoll_wait检查是否有发生事件的连接时,只是检查eventpoll对象中的rdllist双向链表是否有epitem元素而已,如果rdllist链表不为空,则这里的事件复制到用户态内存(使用共享内存提高效率)中,同时将事件数量返回给用户。因此epoll_waitx效率非常高。epoll_ctl在向epoll对象中添加、修改、删除事件时,从rbr红黑树中查找事件也非常快,也就是说epoll是非常高效的。

触发模式

epoll有EPOLLLT和EPOLLET两种触发模式,LT是默认的模式,ET是“高速”模式。

LT(水平触发)模式下,只要这个文件描述符还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作;

ET(边缘触发)模式下,在它检测到有 I/O 事件时,通过 epoll_wait 调用会得到有事件通知的文件描述符,对于每一个被通知的文件描述符,如可读,则必须将该文件描述符一直读到空,让 errno 返回 EAGAIN 为止,否则下次的 epoll_wait 不会返回余下的数据,会丢掉事件。如果ET模式不是非阻塞的,那这个一直读或一直写势必会在最后一次阻塞。

还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。

【总结】:

ET模式(边缘触发)只有数据到来才触发,不管缓存区中是否还有数据,缓冲区剩余未读尽的数据不会导致epoll_wait返回;

LT 模式(水平触发,默认)只要有数据都会触发,缓冲区剩余未读尽的数据会导致epoll_wait返回。

反应堆模型流程

【epoll反应堆模型的流程】:

epoll_create(); // 创建监听红黑树

epoll_ctl(); // 向书上添加监听fd

epoll_wait(); // 监听

有客户端连接上来--->lfd调用acceptconn()--->将cfd挂载到红黑树上监听其读事件--->

epoll_wait()返回cfd--->cfd回调recvdata()--->将cfd摘下来监听写事件--->

epoll_wait()返回cfd--->cfd回调senddata()--->将cfd摘下来监听读事件--->...--->

demo

点击查看代码
/*
epoll基于非阻塞I/O事件驱动
*/
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h> #define MAX_EVENTS 1024
#define BUFLEN 4096
#define SERV_PORT 8080 void recvdata(int fd ,int events , void*arg);
void senddata(int fd ,int events , void*arg); struct myevent_s
{
int fd; //要监听的文件描述符
int events; //对应的监听事件
void *arg; //泛型函数
void (*call_back)(int fd, int events,void *arg); //回调函数
int status; //是否在监听:1 在红黑树上(监听) 0 不在
char buf[BUFLEN]; //缓冲区
int len; //缓冲区的大小
long last_active; //记录加入红黑树的时间
}; int g_efd; //全局变量 记录epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组 //初始化结构体myevent_s成员变量
void eventset(struct myevent_s *ev , int fd, void (*call_back)(int, int ,void*),void *arg)
{
ev->fd=fd;
ev->events=0;
ev->call_back=call_back;
ev->status=0;
ev->arg = arg;
if(ev->len <= 0)
{
memset(ev->buf, 0, sizeof(ev->buf));
ev->len = 0;
}
ev->last_active= (time)NULL;
return;
} /*向监听红黑树添加一个文件描述符*/
void event_add(int efd , int events , struct myevent_s *ev){
struct epoll_event epv = {0,{0}};
int op;
epv.events = ev->events = events;
epv.data.ptr = ev; if(ev->status == 0 ){
op = EPOLL_CTL_ADD; //将其加入到红黑树gfd中
ev->status = 1;
}
if(epoll_ctl(efd,op,ev->fd,&epv)<0){
printf("epv add failed\n");
}
else
printf("epv add succeed\n");
} void eventdel(int efd ,struct myevent_s *ev){
struct epoll_event epv = {0,{0}};
if(ev->status != 1){
return;
}
epv.data.ptr = NULL;
ev->status = 0;
epoll_ctl(efd,EPOLL_CTL_DEL,ev->fd, &epv);
return ;
} void acceptconn(int lfd , int events ,void *arg){
struct sockaddr_in cin ;
socklen_t len = sizeof(cin);
int cfd , i;
if((cfd=accept(lfd,(struct sockaddr*)&cin,&len))== -1)
{
printf("accept error\n");
return;
}
else
{
printf("connect success\n");
}
do
{
for (i =0;i<MAX_EVENTS;i++)
if(g_events[i].status == 0)
break;
if(i == MAX_EVENTS){
printf("connect limit\n");
break;
}
int flag = 0;
if((flag= fcntl(cfd,F_SETFL,O_NONBLOCK))< 0 ){
printf("nonblock failed\n");
break;
} eventset(&g_events[i],cfd,recvdata,&g_events[i]);
event_add(g_efd,EPOLLIN,&g_events[i]);
} while (0);
return;
}
//接收data
void recvdata(int fd,int events, void *arg){
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = recv(fd , ev->buf,sizeof(ev->buf),0); //读文件描述符,数据存到myevent_s中
eventdel(g_efd, ev); //将节点从红黑树中摘下
if(len>0){
ev->len = len;
ev->buf[len] = '\0'; //手动添加结束标志
eventset(ev,fd,senddata,ev);
event_add(g_efd,EPOLLOUT,ev); //将fd添加到g_efd 中监听写事件 }
else if(len == 0){
close(ev->fd);
printf("recv null and close");
}
else{
close(ev->fd);
printf("recv error");
}
} void senddata(int fd ,int events , void *arg){
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = send(fd,ev->buf,ev->len,0); //直接写回到客户端。
eventdel(g_efd,ev); //从红黑树中删除
if(len>0){
printf("send success");
eventset(ev,fd,recvdata,ev);
event_add(g_efd,EPOLLIN,ev);
}
else{
close(fd);
printf("send error");
}
} void initlistensocket(int efd , short port){
struct sockaddr_in sin; int lfd = socket(AF_INET,SOCK_STREAM,0);
fcntl(lfd,F_SETFL, O_NONBLOCK); //把socket设置为非阻塞 memset(&sin,0,sizeof(sin));
sin.sin_family=AF_INET;
sin.sin_addr.s_addr =INADDR_ANY;
sin.sin_port = htons(SERV_PORT); bind(lfd, (struct sockaddr *)&sin,sizeof(sin));
listen(lfd , 128);
eventset(&g_events[MAX_EVENTS],lfd,acceptconn,&g_events[MAX_EVENTS]); event_add(g_efd,EPOLLIN,&g_events[MAX_EVENTS]); } int main(){
int port = SERV_PORT;
g_efd = epoll_create(MAX_EVENTS+1); //创建红黑树返回给全局文件描述符
if(g_efd<=0){
printf("create epoll error");
}
initlistensocket(g_efd,port); //初始化监听socket struct epoll_event events[MAX_EVENTS+1]; //保存已经满足就绪事件的文件描述符数组 为epoll_wait做准备
printf("server running:port[%d]\n", port); int checkpos = 0 ,i;
while (1)
{ //超时验证
/*long now = time(NULL);
for(i = 0 ;i<100;i++){
if(checkpos == MAX_EVENTS)
checkpos = 0;
if(g_events[checkpos].status != 1 ){
continue; // 不在红黑树上
} long duration = now - g_events[checkpos].last_active;
if(duration >= 60){
close(g_events[checkpos].fd);
printf("timeout");
eventdel(g_efd,&g_events[checkpos]);
}
}*/ //监听红黑树g_efd,将满足的事件添加到g_events中。
int nfd = epoll_wait(g_efd,events,MAX_EVENTS+1,1000);
if(nfd<0){
printf("epoll_wait error");
exit(-1);
} for(i = 0 ;i < nfd ;i++){
struct myevent_s *ev = (struct myevent_s *) events[i].data.ptr;
if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)){ //读就绪事件
ev->call_back(ev->fd,events[i].events,ev->arg);
}
if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)){ //写就绪事件
ev->call_back(ev->fd,events[i].events,ev->arg);
}
} } }

使用方法:
gcc server.c -o server
./server
重新打开新的终端:
nc 127.1 8080

最新文章

  1. Spark 生态系统组件
  2. uva 11137 Ingenuous Cubrency
  3. PHP连接SQLServer
  4. Android Studio 导入so
  5. R语言实战
  6. tcpdump命令
  7. hdu 4267 树形DP
  8. java封装和多态
  9. (转)WCF中调用WebService出错,大家帮忙看看,回答就有分
  10. HDU 4998 Rotate
  11. C语言 &gt; 数组和指针
  12. WebApi生成在线API文档--Swagger
  13. DVWA 黑客攻防实战(十五) 绕过内容安全策略 Content Security Policy (CSP) Bypass
  14. Appium移动自动化测试入门及简单实例(python)
  15. Github介绍
  16. nyoj 1274信道安全 第九届河南省赛(SPFA)
  17. 如何修改被hosts.deny禁止访问的IP
  18. 抱SQL SERVER大腿之我爱用视图(对大数据量的管理)
  19. Python标准库笔记(9) — functools模块
  20. MyBatis高级及其SSM框架搭建

热门文章

  1. spring5无法在控制台打印日志的原因
  2. java list 类型删除其中的某些元素的正确方法
  3. 解决maven每次更新都编程java1.5
  4. vue js格式化数字为金额格式
  5. kafka时间轮的原理(一)
  6. 【Java】包装类
  7. 【刷题-LeetCode】154 Find Minimum in Rotated Sorted Array II
  8. kubernetes之手动部署k8s 1.14.1高可用集群
  9. Python与Javascript相互调用超详细讲解(四)使用PyNode进行Python与Node.js相互调用项(cai)目(keng)实(jing)践(yan)
  10. python小兵之时间模块