首先事件循环的起点就是监听端口获取连接,我们可以在ngx_event_core_module模块的ngx_event_process_init函数中看到如下的代码

    /* for each listening socket */
/*为每个监听套接字从connection数组中分配一个连接,即一个slot*/
ls = cycle->listening.elts; //监听套接字是在master进程那里继承过来的,已经初始化好了
for (i = ; i < cycle->listening.nelts; i++) {
//为当前监听套接字的文件描述符分配一个connection,函数返回值c是当前监听套接字关联的connection
c = ngx_get_connection(ls[i].fd, cycle->log); if (c == NULL) {
return NGX_ERROR;
} c->log = &ls[i].log; c->listening = &ls[i]; //当前连接的监听端口
ls[i].connection = c; //当前监听端口的connection rev = c->read; //rev指向当前connection的读事件 rev->log = c->log;
rev->accept = ; //表示当前的读事件是监听端口的accept事件,可以用于epoll区分是一般的读事件还是监听对口的accept事件 #if (NGX_HAVE_DEFERRED_ACCEPT)
rev->deferred_accept = ls[i].deferred_accept;
#endif
if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
if (ls[i].previous) {
/*
* delete the old accept events that were bound to
* the old cycle read events array
*/ old = ls[i].previous->connection; if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
} old->fd = (ngx_socket_t) -;
}
} /*注册监听套接口读事件的回调函数ngx_event_accept*/
rev->handler = ngx_event_accept;
//说白了就是从监听套接字来获取连接的socket

这部分代码在worker进程中,为每一个listening分配一个connection与之对应,并将该connection的读事件的处理函数设置为ngx_event_accept函数,也就是说用这个函数来处理listening的accept,好了接下来我们从这个函数看起(该函数定义在Ngx_event_accept.c):

    lc = ev->data;  //获取该事件对应的connection
ls = lc->listening;
ev->ready = ;   //因为是用的epoll的触发机制,所以这里要不断的循环,直到数据全部读取完了才行
do {
socklen = NGX_SOCKADDRLEN;   //调用accept函数来获取连接的socket
s = accept(lc->fd, (struct sockaddr *) sa, &socklen);

上部分的代码,首先从event变量中获取该事件对应的connection,接着就可以调用accept函数了,从监听的socket描述符中获取连接。

        /*accept到一个新的连接后,就重新计算ngx_accept_disabled的值
ngx_accept_disabled已经提及过了,它主要用来做负载均衡之用。 这里,我们能够看到它的求值方式是“总连接数的八分之一,减去
剩余的连接数”。总连接数是指每个进程设定的最大连接数,这个数字
可以在配置文件中指定。由此处的计算方式,可以看出:每个进程accept
到总连接数的7/8后,ngx_accept_disabled就大于0了,连接也就
超载了。
*/ ngx_accept_disabled = ngx_cycle->connection_n /
- ngx_cycle->free_connection_n;
//为刚刚连接的socket分配connection
c = ngx_get_connection(s, ev->log);

上述代码用于在获取连接之后,计算ngx_accept_disabled的值,它用来进行worker进程间的负载均衡,避免一个worker进程持有太多的connection,具体的以后会讲。然后就是为连接进来的socket描述符分配connection,接下来的代码就是初始化这个刚刚分配的connection,例如为其分配内存池,将socket描述符设置为非阻塞等等。

      //将当前新生成的连接加入
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == ) {
if (ngx_add_conn(c) == NGX_ERROR) {
ngx_close_accepted_connection(c);
return;
}
} log->data = NULL;
log->handler = NULL; /*这里的listen handler很重要,它将完成新连接的最后初始化工作
同时将accept到的新连接放入epoll中;挂在这个handler上的函数
就是ngx_http_init_connection(位于src/http/ngx_http_request.c中);
这个函数放在分析http模块的时候再看吧。
*/
ls->handler(c);

上面这部分代码是比较重要的,它首先调用ngx_add_conn函数将刚刚的连接加入到epoll当中去,我们可以看看ngx_add_conn的定义,在Ngx_event.h当中:

#define ngx_add_conn         ngx_event_actions.add_conn

其实这里看过前面的文章就会知道ngx_add_conn说白了就是调用实际事件模块的add_conn函数,如果实际使用的是epoll模块的话那么将会调用epoll模块的ngx_epoll_add_connection函数,接下来还有一句代码:

        ls->handler(c);   

这里就是用listening的handler对刚刚分配的connection进行处理,这里就会涉及到http部分的东西了,以后再说吧。

好了到这里ngx_event_accept函数说的就差不多了。接下来可以正式进入Nginx的事件循环了。我们先看事件循环的入口吧,在worker进程的执行函数ngx_worker_process_cycle中,有如此一句代码在每次循环中都会用到

//处理时间和定时,说白了这个函数不断的处理发生的事件
ngx_process_events_and_timers(cycle);

嗯,ngx_process_events_and_timers函数就是事件循环的入口函数,其定义在Ngx_event.c当中,接下来我们来分析该函数:

    /*ngx_use_accept_mutex变量代表是否使用accept互斥体
默认是使用,accept_mutex off;指令关闭。
accept mutex的作用就是避免惊群,同时实现负载均衡。
*/
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > ) {
ngx_accept_disabled--;
} else {
/* ngx_accept_disabled小于0,连接数没超载*/ /*尝试锁accept mutex,只有成功获取锁的进程,才会将listen
套接字放入epoll中。因此,这就保证了只有一个进程拥有
监听套接口,故所有进程阻塞在epoll_wait时,不会出现惊群现象。
*/
//这里的ngx_trylock_accept_mutex函数中,如果顺利的获取了锁,那么它会将监听端口注册到当前worker进程的epoll当中
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
/*获取锁的进程,将添加一个NGX_POST_EVENTS标志,
此标志的作用是将所有产生的事件放入一个队列中,
等释放锁后,再慢慢来处理事件。因为,处理事件可能
会很耗时,如果不先释放锁再处理的话,该进程就长
时间霸占了锁,导致其他进程无法获取锁,这样accept
的效率就低了。
*/
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS; //获取了锁,那么就设置标志位
} else {
/*没有获得锁的进程,当然不需要NGX_POST_EVENTS标志了。
但需要设置最长延迟多久,再次去争抢锁。
*/
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}

首先是判断是否使用了ngx_use_accept_mutex信号量,该信号量用于避免惊群的发生。只有当当前worker进程获取了该信号量之后才会将listening真正加入到自己的epoll当中,相应accept事件。这里也看到了上面提到的ngx_accept_disabled便用的作用,他也用于判断是否将listening加入到当前worker进程的epoll当中,这样可以做到负载均衡,避免一个worker进程持有了太多的connection。

	/*epoll开始wait事件了,ngx_process_events的具体实现是对应到
epoll模块中的ngx_epoll_process_events函数。单独分析epoll
模块的时候,再具体看看。
*/
(void) ngx_process_events(cycle, timer, flags);

这句代码直接调用的是实际事件模块的process_events函数,来处理事件,还是来看其定义吧:

#define ngx_process_events   ngx_event_actions.process_events

嗯,一看就明白了,如果使用的是epoll模块的话,那么将会调用其的ngx_epoll_process_events函数。待会再细讲它吧。

       if (ngx_posted_accept_events) {
/*ngx_posted_accept_events是一个事件队列
暂存epoll从监听套接口wait到的accept事件。
前文提到的NGX_POST_EVENTS标志被使用后,就会将
所有的accept事件暂存到这个队列。 这里完成对队列中的accept事件的处理,实际就是调用
ngx_event_accept函数来获取一个新的连接,然后放入
epoll中。
*/
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
} /*所有accept事件处理完成,如果拥有锁的话,就赶紧释放了。
其他进程还等着抢了。
*/
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}

该部分代码判断是否从listening监听中获取了accept事件,如果有的话,那么就要赶紧处理它,因为说明现在worker进程已经占有了ngx_accept_mutex信号量,处理完accept事件后就要赶紧释放掉该信号量,好让别的worker进程可以获取该锁,然后从listening中获取连接。

    /*处理普通事件(连接上获得的读写事件)队列上的所有事件,
因为每个事件都有自己的handler方法,该怎么处理事件就
依赖于事件的具体handler了。
*/
if (ngx_posted_events) {
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle); } else {
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}

这部分就用于处理普通的事件了。这样ngx_process_events_and_timers函数中处理事件的部分就讲完了,但是该函数其实还有用于处理定时的部分,这个以后讲Nginx的定时函数处理的时候再说吧。

好,接下来分析感刚刚提到的epoll模块的ngx_epoll_process_events函数。

//这里是epoll的wait,将得到的事件存到event_list里面,最大的事件量是nevents
/*一开始就是等待事件,最长等待时间为timer;nginx为事件
专门用红黑树维护了一个计时器。后续对这个timer单独分析。
*/
events = epoll_wait(ep, event_list, (int) nevents, timer); //这个超时事件是从红黑树里面获取的,当前最近的超时,这样可以保证epoll的wait能够在合适的时间内返回,保证定义的超时事件可以执行

首先就是调用epoll_wait函数从epoll中获取发生的事件,然后就可以遍历这些事件了:

//循环遍历所有产生的事件
for (i = ; i < events; i++) {
c = event_list[i].data.ptr; //获取该事件实际对应的connection //instance 说白了就是个整形的变量
instance = (uintptr_t) c & ;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~); rev = c->read; if (c->fd == - || rev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, ,
"epoll: stale event %p", c);
continue;
}
  //获取发生的事件的类型
revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, ,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);
//如果发生了错误事件
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, ,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);
} #if 0
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ,
"strange epoll_wait() events fd:%d ev:%04XD",
c->fd, revents);
}
#endif if ((revents & (EPOLLERR|EPOLLHUP))
&& (revents & (EPOLLIN|EPOLLOUT)) == )
{
/*
* if the error events were returned without EPOLLIN or EPOLLOUT,
* then add these flags to handle the events at least in one
* active handler
*/ revents |= EPOLLIN|EPOLLOUT;
}
/*该事件是一个读事件,并该连接上注册的读事件是active的*/
if ((revents & EPOLLIN) && rev->active) { if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
rev->posted_ready = ; } else {
rev->ready = ;
} if (flags & NGX_POST_EVENTS) {
//如果设置了NGX_POST_EVENTS,表示当前worker进程已经获取了锁,那么将获取的事件入队,因为可能是监听端口的accept事件,这里如果是监听端口的accept事件的话,那么该event的accept域会置为1 ,这个是在事件模块的worker进程初始化中会设置的
//这里持有了锁就应该将产生的事件放入队列中,是为了能够在锁释放了以后再处理这些事件,这样可以让别的worker进程能够尽快的获取锁
queue = (ngx_event_t **) (rev->accept ?
&ngx_posted_accept_events : &ngx_posted_events); ngx_locked_post_event(rev, queue); } else {
rev->handler(rev);
}
} wev = c->write;
//如果是写事件,而且相应connection的写事件是激活的
if ((revents & EPOLLOUT) && wev->active) { if (c->fd == - || wev->instance != instance) { /*
* the stale event from a file descriptor
* that was just closed in this iteration
*/ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, ,
"epoll: stale event %p", c);
continue;
} if (flags & NGX_POST_THREAD_EVENTS) {
wev->posted_ready = ; } else {
wev->ready = ;
} if (flags & NGX_POST_EVENTS) {
ngx_locked_post_event(wev, &ngx_posted_events); } else {
wev->handler(wev);
}
}
}

上面的代码其实注释就已经说的很明白了,循环遍历所有的事件,判断该事件的类型,并获取该事件实际所属的connection,如果发生的是读事件,那么取出该connection的read事件,然后用read的handler来处理,如果是写事件,那么就获取该connection的write,然后用write的handler来处理。但是这里需要注意的是,

  if (flags & NGX_POST_EVENTS) {
//如果设置了NGX_POST_EVENTS,表示当前worker进程已经获取了锁,那么将获取的事件入队,因为可能是监听端口的accept事件,这里如果是监听端口的accept事件的话,那么该event的accept域会置为1 ,这个是在事件模块的worker进程初始化中会设置的
//这里持有了锁就应该将产生的事件放入队列中,是为了能够在锁释放了以后再处理这些事件,这样可以让别的worker进程能够尽快的获取锁
queue = (ngx_event_t **) (rev->accept ?
&ngx_posted_accept_events : &ngx_posted_events); ngx_locked_post_event(rev, queue); } else {
rev->handler(rev);
}

该段代码判断是否持有了信号量,前面已经说过了,如果持有的话,就要将这些事件放入到队列中,稍后在处理,这里是为了尽快能够释放信号量,并且还要判断该事件的类型,区分是accept事件还是普通的读事件,用于将它们放入不同的队列,嗯,event的accept域这个在以前已经说过了,就是为了这个判断的。

好了,ngx_epoll_process_events函数也已经基本讲完了,那么事件循环也就差不多了。

转自:http://www.xuebuyuan.com/2041521.html

最新文章

  1. js继承方式
  2. poj2236(并查集)
  3. 【.NET】单例模式标准写法
  4. 《JavaScript+DOM编程艺术》的摘要(一)---基本知识点
  5. TreeSize Free 查看文件夹大小 v2.3.3 汉化版
  6. adb not responding. if you&#39;d like to
  7. apache软件包下载地址
  8. Jfinal-Plugin源码解读
  9. 剑指Offer-字符流中第一个不重复的字符
  10. 修改 Docker 的 daemon.json后启动失败
  11. 【干货】Jquery.Datables与Bootstrap3的组合使用
  12. 第十五次oo作业
  13. 解决Jupyter notebook[import tensorflow as tf]报错
  14. SNF框架及机器人2018年1-9月份升级内容
  15. RPM二进制包软件安装
  16. 自动化部署之jenkins及简介
  17. SQL server约束
  18. PHP的CURL的POST/GET访问
  19. PHP获取http头信息
  20. 在Ubuntu系统下设置永久性Swap交换空间(转帖)

热门文章

  1. hdu 1083 Courses (最大匹配)
  2. centos 更换用户密码
  3. Ubuntu改坏sudoers后无法使用sudo的解决办法
  4. IDEA导入MySQL的jdbc驱动,并操作数据库
  5. 【论文阅读】Clustering Convolutional Kernels to Compress Deep Neural Networks
  6. 【Luogu P5490】扫描线
  7. 各种优化方法总结比较(sgd/momentum/Nesterov/adagrad/adadelta)
  8. mybatis源码学习(三)-一级缓存二级缓存
  9. 渗透测试初学者的靶场实战 1--墨者学院SQL注入—布尔盲注
  10. Chapter 01—Introduction to R