epoll(2) 源码分析

文本内核代码取自 5.0.18 版本,和上一篇文章中的版本不同是因为另一个电脑出了问题,但是总体差异不大。

引子留下的问题

上一篇文章中留下了几个问题,本文将针对这几个问题进行分析:

  1. epoll(2) 得到就绪事件的复杂度为何是 \(O(1)\)
  2. epoll(2) 和普通的文件相比的区别在哪里,比如和 eventfd(2) 比较
  3. epoll(2) 相对 poll(2)/select(2) 多提供了 EPOLLET 的触发模式,现象在上面可以看到区别,实现是如何做到的。
  4. epoll(2) 相互关注时,有就绪事件到来会产生相互唤醒的问题,为何会出现这样的问题
  5. 对于问题 4,内核是如何解决这种相互唤醒的问题。

解答在文末.

关键的数据结构

在第一次阅读代码时,优先掌握该功能的核心数据结构有利于对于全局的把控。

struct eventpoll

struct eventpoll 对应一个 epoll 实例的结构,包含所有的文件事件,作为 epoll 的接口使用。

/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
spinlock_t lock; // 保护整个数据结构
struct mutex mtx; // 保护正在操作的文件 wait_queue_head_t wq; // sys_epoll_wait() 使用的等待队列
wait_queue_head_t poll_wait; // epoll 作为被监视文件时 file->poll() 使用的等待队列,使用较少
// poll_wait 队列作用和 eventfd 文件中的 wqh 队列相同 struct list_head rdllist; // 就绪的文件链表,连接 epitem 上的 rdllink
struct epitem *ovflist; // 也是用来串联就绪的事件,作为 rdlist 的备胎使用 struct rb_root_cached rbr; // 所有关注的文件事件的红黑树,在内核空间维护 /* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws; // 不分析该功能,只知道为唤醒源就行
struct user_struct *user; // epoll 创建操作所属的用户
struct file *file; // epoll 关联的文件结构 /* used to optimize loop detection check */
int visited;
struct list_head visited_list_link; #ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif
};

struct epitem

struct epitem 每个文件描述符添加到 eventpoll 接口将产生一个 epitem项 被链接到 eventpoll 中的红黑树上。

/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
* Avoid increasing the size of this struct, there can be many thousands
* of these on a server and we do not want this to take another cache line.
*/
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu;
}; struct list_head rdllink; // 用于连接到 eventpoll->rdllist 的链表,和 rdllist 一起使用
struct epitem *next; // 连接到 eventpoll->ovflist 的指针,和 ovflist 一起使用
struct epoll_filefd ffd; // 文件 file 结构 + fd,作为红黑树的节点 int nwait; // 附加在 poll 操作上活跃的等待队列的数量
/* List containing poll wait queues */
struct list_head pwqlist; // 注释是包含轮询等待队列的链表,但是实际上个人更倾向为这个链表只是为了连接 eppoll_entry 结构。
// 和上面那个 nwait 一样,这两个变量的添加操作只会发生一次,就是调用 ep_insert() 的时候,但是 epitem 在一个 epoll 实例中只会调用一次。 struct eventpoll *ep; // 当前 epitem 的所有者
struct list_head fllink; // 连接文件结构的链表 /* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws; // 唤醒源,不考虑 /* The structure that describe the interested events and the source fd */
struct epoll_event event; // 用户传入的 event 结构
};

struct eppoll_entry

struct eppoll_entry 为文件的等待队列项回调和epoll相关联的结构. 类似为poll(2) 中的 poll_table_entry

/* Wait structure used by the poll hooks */
struct eppoll_entry {
struct list_head llink; // 连接至 epitem 中的 pwqlist 链表中
struct epitem *base; // epitem 所属者 wait_queue_entry_t wait; // 等待队列项
wait_queue_head_t *whead; // 等待队列头,关注文件的等待队列,如 eventfd->pwh
};

epoll(2) 相关的系统调用

整个 fs/eventpoll.c 的代码量较多(2000+), 所以这里节选部分主要的代码进行分析, 一些对于参数的合法性的校验就不放出来了.

epoll 的实现做了两种区分: 关注的文件是否为 epoll 类型, 我们先对非epoll文件进行分析, 这个部分代码比较直观易懂, 对epoll文件的处理考虑了多种情况, 留作之后分析.

epoll_create(2)

创建一个新的文件描述符, 对应一个 epoll 实例.

  1. 为 eventpoll 结构分配内存并且初始化
  2. 获取一个新的文件并且与 eventpoll 结构相关联.
/*
* Open an eventpoll file descriptor.
*/
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file; error = ep_alloc(&ep); // 分配内存并初始化, 代码较直观, 不做分析 fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC)); ep->file = file;
fd_install(fd, file);
}

epoll_ctl(2)

epoll_ctl 为epoll的控制函数, 根据函数的 @op 入参分发需要进行的操作.

函数的功能主体比较清晰, 也分为两部分:

  1. 对监视文件为epoll经行循环检测
  2. 根据操作类型分发具体执行的函数
/*
* The following function implements the controller interface for
* the eventpoll file that enables the insertion/removal/change of
* file descriptors inside the interest set.
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
// 加锁部分为对监视的文件是epoll时候进行的循环检测, 这部分后面分析, 这里只看非 epoll 文件的处理
mutex_lock_nested(&ep->mtx, 0);
if (op == EPOLL_CTL_ADD) {
if (!list_empty(&f.file->f_ep_links) ||
is_file_epoll(tf.file)) {
full_check = 1;
mutex_unlock(&ep->mtx);
mutex_lock(&epmutex);
if (is_file_epoll(tf.file)) {
error = -ELOOP;
if (ep_loop_check(ep, tf.file) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tf.file->f_tfile_llink,
&tfile_check_list);
mutex_lock_nested(&ep->mtx, 0);
if (is_file_epoll(tf.file)) {
tep = tf.file->private_data;
mutex_lock_nested(&tep->mtx, 1);
}
}
} /*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tf.file, fd); // 从红黑树中寻找操作的文件 error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) { // 不存在就插入到eventpoll中
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, &epds);
}
} else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx); return error;
}

ep_insert()

分配一个 epitem 的内存并初始化, 再将该 epitem 添加到 eventpoll 中的红黑树上.

初始化过程也包含了几个部分:

  1. 对 epitem 结构进行初始化, 设置各成员变量的值.
  2. 调用目标文件的file->f_op->poll() 函数设置等待队列项回调函数, 这个是实现 epoll_wait(2) 复杂度为 \(O(1)\) 最重要的一步, 关注的文件产生就绪事件就会调用该回调函数 ep_ptable_queue_proc
  3. 返回就绪事件掩码, 将当前 epitem 添加到 eventpoll->rdllist 中, 唤醒 epoll_wait(2) 线程
/*
* Must be called with "mtx" held.
*/
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
__poll_t revents;
struct epitem *epi; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM; /* Item initialization follow here ... */ /* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); /*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
revents = ep_item_poll(epi, &epq.pt, 1); /* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); // 将当前 epitem 添加到监视文件的 f_ep_links 链表上.
spin_unlock(&tfile->f_lock); /*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
ep_rbtree_insert(ep, epi); // 将当前 epitem 添加到eventpoll的红黑树中 /* If the file is already "ready" we drop it inside the ready list */
if (revents && !ep_is_linked(epi)) { // 产生就绪事件并且当前 epitem 未添加进 eventpoll 中(这个有点儿明显)
list_add_tail(&epi->rdllink, &ep->rdllist); // 添加至 ep->rdllist, 留意这两个链表是一起出现的 /* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq)) // wq 队列是 epoll_wait(2) 使用的, 这里唤醒调用 epoll_wait(2) 进入阻塞状态的线程.
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait)) // 这里不直接唤醒是加锁的原因, poll_wait 队列属于被监视文件使用, 不应该在epoll实例中唤醒
pwake++;
} spin_unlock_irq(&ep->wq.lock); /* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait); }

ep_remove()

作用和 ep_insert() 相反, 释放内存, 删除与其它资源相关联的连接, 在互斥量 eventpoll->mtx 加锁下进行.

/*
* Removes a "struct epitem" from the eventpoll RB tree and deallocates
* all the associated resources. Must be called with "mtx" held.
*/
static int ep_remove(struct eventpoll *ep, struct epitem *epi)
{
struct file *file = epi->ffd.file; lockdep_assert_irqs_enabled(); /*
* Removes poll wait queue hooks.
*/
ep_unregister_pollwait(ep, epi); // 删除 epitem->pwqlist 关联的等待项链表 /* Remove the current item from the list of epoll hooks */
spin_lock(&file->f_lock);
list_del_rcu(&epi->fllink); // 从监视文件中的 file->f_ep_links 链表中删除当前 epitem
spin_unlock(&file->f_lock); rb_erase_cached(&epi->rbn, &ep->rbr); // 从 eventpoll 中的红黑树中删除当前 epitem 节点 spin_lock_irq(&ep->wq.lock);
if (ep_is_linked(epi))
list_del_init(&epi->rdllink); // 从 eventpoll 中的就绪队列 rdllist 中删除当前 epitem 节点
spin_unlock_irq(&ep->wq.lock); /*
* At this point it is safe to free the eventpoll item. Use the union
* field epi->rcu, since we are trying to minimize the size of
* 'struct epitem'. The 'rbn' field is no longer in use. Protected by
* ep->mtx. The rcu read side, reverse_path_check_proc(), does not make
* use of the rbn field.
*/
call_rcu(&epi->rcu, epi_rcu_free); // 释放当前 epitem 的内存 atomic_long_dec(&ep->user->epoll_watches); // eventpoll 所属用户监视的 epitem数量减一 return 0;
}

ep_modify()

调整关注文件的事件.

/*
* Modify the interest event mask by dropping an event if the new mask
* has a match in the current file status. Must be called with "mtx" held.
*/
static int ep_modify(struct eventpoll *ep, struct epitem *epi,
const struct epoll_event *event)
{
int pwake = 0;
poll_table pt; lockdep_assert_irqs_enabled(); // 设置 file->f_op->poll 的回调函数为NULL, 因为在insert中已经设置了文件等待队列项的回调函数
init_poll_funcptr(&pt, NULL); /*
* Get current event bits. We can safely use the file* here because
* its usage count has been increased by the caller of this function.
* If the item is "hot" and it is not registered inside the ready
* list, push it inside.
*/
if (ep_item_poll(epi, &pt, 1)) { // 调用f_op->poll() 获取文件的就绪事件
spin_lock_irq(&ep->wq.lock);
if (!ep_is_linked(epi)) { // 未添加至 eventpoll 接口的就绪队列中
list_add_tail(&epi->rdllink, &ep->rdllist); // 添加
ep_pm_stay_awake(epi); // 电源管理的函数, 不看 /* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); // 唤醒调用 epoll_wait(2) 的线程
if (waitqueue_active(&ep->poll_wait)) // 分析同 ep_insert()
pwake++;
}
spin_unlock_irq(&ep->wq.lock);
} /* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait); return 0;
}

epoll_wait(2)

等待就绪的事件。

ep_events_available 为检查是否存在就绪事件,其实就是检查 rdllist 和 ovflist 是否有被修改过,复杂度为 \(O(1)\).

static inline int ep_events_available(struct eventpoll *ep)
{
return !list_empty_careful(&ep->rdllist) ||
READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
}

epoll_wait(2) 的主要逻辑由 ep_poll() 实现,核心逻辑分为两部分

  1. 检查就绪事件是否存在,存在执行 2,不存在根据超时时间进入阻塞状态和直接返回。
  2. 将就绪事件复制到用户空间,若是复制失败,在条件(见代码分析)满足的情况下执行 1,成功则返回。
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data; /* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
} /*
* ep_poll - Retrieves ready events, and delivers them to the caller supplied
* event buffer.
*/
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
u64 slack = 0;
bool waiter = false;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL; lockdep_assert_irqs_enabled(); // 时间处理,略过 fetch_events: // 这为一整个获取就绪事件逻辑的开端 if (!ep_events_available(ep)) // 无就绪事件
ep_busy_loop(ep, timed_out); // 中断缓解技术对 中断频繁的设置 eavail = ep_events_available(ep); // 有就绪事件
if (eavail)
goto send_events; // 直接goto发送数据 /*
* We don't have any available event to return to the caller. We need
* to sleep here, and we will be woken by ep_poll_callback() when events
* become available.
*/
if (!waiter) { // 无数据,需要等待
waiter = true; // 设置等待标识
init_waitqueue_entry(&wait, current); // 初始化等待队列项 spin_lock_irq(&ep->wq.lock);
__add_wait_queue_exclusive(&ep->wq, &wait); // 投入到 ep->wq 的等待队列中
spin_unlock_irq(&ep->wq.lock);
} for (;;) { // 进入无限循环
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE); // 设置可中断运行状态
/*
* Always short-circuit for fatal signals to allow
* threads to make a timely exit without the chance of
* finding more events available and fetching
* repeatedly.
*/
if (fatal_signal_pending(current)) { // 先判断致命错误信号
res = -EINTR;
break;
} eavail = ep_events_available(ep); // 再判断是否有就绪事件的产生,有的话推出循环
if (eavail)
break;
if (signal_pending(current)) { // 非致命错误信号产生,中断去处理该中断
res = -EINTR;
break;
} // 超时调度
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
timed_out = 1;
break;
}
} __set_current_state(TASK_RUNNING); send_events: // 将就绪事件复制到用户空间逻辑开端
// 1.没有错误产生 2.有就绪事件 3.事件复制到用户空间失败 4.未超时
// 满足以上4个条件的情况下重新进行获取就绪事件逻辑
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events; // 在等待标志设置的情况下,需要把已添加等待队列节点删除。
if (waiter) {
spin_lock_irq(&ep->wq.lock);
__remove_wait_queue(&ep->wq, &wait);
spin_unlock_irq(&ep->wq.lock);
} return res;
}

ep_send_events 将就绪的事件复制至用户空间,ep_send_events_proc 为实际的执行函数,ep_scan_ready_list 为辅助函数,这个函数放在后面具体说明,这里只看 ep_send_events_proc 的实现。

static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed; esed.maxevents = maxevents;
esed.events = events; ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
} static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
__poll_t revents;
struct epitem *epi, *tmp;
struct epoll_event __user *uevent = esed->events;
struct wakeup_source *ws;
poll_table pt; init_poll_funcptr(&pt, NULL); // 初始化poll_table, 但是并不设置 file->f_op->poll 的回调函数
esed->res = 0; /*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
lockdep_assert_held(&ep->mtx); // head 实际上为 rdllist,遍历就绪文件链表
list_for_each_entry_safe(epi, tmp, head, rdllink) {
if (esed->res >= esed->maxevents) // 超过用户的提供的缓冲区大小,maxevents 为 epoll_wait(2) 的第3个参数
break; // __pm_stay_awake(ep->ws);
// 为电源保持唤醒状态的处理,略过这部分逻辑 list_del_init(&epi->rdllink); // 从就绪文件链表中删除当前事件 /*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/
revents = ep_item_poll(epi, &pt, 1); // 调用 file->f_op->poll() 获取就绪事件的掩码
if (!revents) // 无关注的就绪事件,抬走下一个就绪文件
continue; // 复制就绪事件至用户空间
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head); // 复制失败,将当前就绪文件重新链接至就绪文件链表中
ep_pm_stay_awake(epi);
if (!esed->res) // 如果一个事件都没有复制,就产生致命错误,毕竟连个毛都没有捞着有点气
esed->res = -EFAULT;
return 0;
}
esed->res++; // 成功复制的数量
uevent++; // 用户空间的缓冲区增长一下
if (epi->event.events & EPOLLONESHOT) // 用户设置了 EPOLLONESHOT的情况下
epi->event.events &= EP_PRIVATE_BITS; // 重新设置关注的事件,见 ep_poll_callback 分析
else if (!(epi->event.events & EPOLLET)) {
// 未设置边缘触发模式,则将当前就绪文件添加回就绪文件链表中
// 这里就区分了边缘触发和水平触发,水平触发必须每次epoll_wait(2)调用都检查就绪文件的事件
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
} return 0;
}

相关就绪事件逻辑

ep_scan_ready_list 用来扫描就绪文件链表上的就绪文件并处理;ep_poll_callback 用来向就绪文件链表中添加就绪文件。

ep_scan_ready_list

就绪文件是否存在判断了两个变量,rdllist 上面分析过了,还有一个备胎的就绪文件链表 ovflist,备胎 ovflist 是在 rdllist 被占用的情况下使用的。

ep_scan_ready_list 对两个就绪文件列表进行扫描找出就绪的文件,rdllist 是单独放在回调函数中进行处理的,ovflist 是直接在 ep_scan_ready_list 中处理的。

  1. 在对 rdllist 链表处理的回调函数执行前,改变 ovflist 的状态,使得在回调函数执行期间ovflist可以串联起就绪的文件。
  2. 执行回调函数。
  3. 遍历ovflist,将链表上的就绪文件对应的 epitem 插入到 rdllist 上,并且唤醒调用 epoll_wait(2) 线程。
/**
* ep_scan_ready_list - Scans the ready list in a way that makes possible for
* the scan code, to call f_op->poll(). Also allows for
* O(NumReady) performance.
* Returns: The same integer error code returned by the @sproc callback.
*/
static __poll_t ep_scan_ready_list(struct eventpoll *ep,
__poll_t (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv, int depth, bool ep_locked)
{
__poll_t res;
int pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist); // 初始化一个链表,将 eventpoll->rdllist 转移到该链表上 /*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep->ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep->rdllist,
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
spin_lock_irqsave(&ep->lock, flags);
list_splice_init(&ep->rdllist, &txlist); // txlist = rdllist; init(rdllist)
// 这里做一个状态的转换,在文件的等待列队项回调函数(ep_poll_callback)执行时会判断ovflist是否发生改变,
// 而NULL则为本函数后面的遍历操作使用
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags); /*
* Now call the callback function.
*/
res = (*sproc)(ep, &txlist, priv); spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) { // 每次访问的 epi->next 回到最初状态
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
// 保留这段代码的注释
if (!ep_is_linked(&epi->rdllink)) { // 未被包含进 txlist(rdllist)
list_add_tail(&epi->rdllink, &ep->rdllist); // 添加至正统就绪文件链表 rdllist 中
ep_pm_stay_awake(epi);
}
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
ep->ovflist = EP_UNACTIVE_PTR; // 恢复 ovflist的状态, 备胎的寿命到此结束,投下一次胎(下次调用 ep_scan_ready_list) /*
* Quickly re-inject items left on "txlist".
*/
list_splice(&txlist, &ep->rdllist); // 把对txlist 处理的结果,加入到rdllist,
__pm_relax(ep->ws); if (!list_empty(&ep->rdllist)) { // 如果还有就绪事件,则唤醒epoll_wait(2)线程
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags); if (!ep_locked)
mutex_unlock(&ep->mtx); /* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait); return res;
}

ep_poll_callback

以上为对就绪链表有数据的处理,ep_poll_callback 为向就绪链表添加数据的回调函数。

ep_poll_callback() 在 ep_insert() 中被设置为目标文件的等待队列项回调函数,当文件的状态发生改变时,会调用 ep_poll_callback() 把目标文件对应的 epitem 添加至就绪链表中。

ewake 为独占唤醒标志,ep_poll_callback 在 __wake_up_comm 中被调用

/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*/
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
__poll_t pollflags = key_to_poll(key);
int ewake = 0; spin_lock_irqsave(&ep->lock, flags); if (!(epi->event.events & ~EP_PRIVATE_BITS)) // 对EPOLLONESHOT标志的处理
goto out_unlock; // ep->ovflist 的状态修改后作为备胎使用
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) { // 未连接到 ovflist 中
epi->next = ep->ovflist; // 头插法插入链表中
ep->ovflist = epi;
}
goto out_unlock; // 不能进入正统就绪链表 rdllist的处理逻辑中,其实这里可用不用goto的
} /* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) { // 当前epitem未连接至就绪列表中时,添加至就绪文件链表 rdllist 中
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
} // EPOLLEXCLUSIVE 标志的处理,决定返回的参数
// pollflags 为目标文件的就绪的事件掩码,可阅读 eventfd_read 的源码
// 1.关注的事件中出现了读写就绪的事件 2.就绪的事件不为读写事件,满足其中之一则独占唤醒的标志置 1
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!(pollflags & POLLFREE)) {
switch (pollflags & EPOLLINOUT_BITS) {
case EPOLLIN:
if (epi->event.events & EPOLLIN)
ewake = 1;
break;
case EPOLLOUT:
if (epi->event.events & EPOLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
wake_up_locked(&ep->wq);
}
if (waitqueue_active(&ep->poll_wait))
pwake++; out_unlock:
spin_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait); // 分析同上 if (!(epi->event.events & EPOLLEXCLUSIVE)) // 未设置独占唤醒标志
ewake = 1; return ewake;
}

文件状态发生改变调用文件的等待队列项上的回调函数 func,在epoll中也就是 ep_poll_callback,ret 为回调函数的独占唤醒标志。

当等待项设置了WQ_FLAG_EXCLUSIVE标志,只要回调函数返回了独占唤醒标志,在独占唤醒计数消耗完时,退出循环。

如果当前文件被多个 epoll 监视,那么这个文件的等待队列上就有多个 ep_poll_callback 回调函数,现在设置了等待项的标志 wait_entry->flags |= WQ_FLAG_EXCLUSIVE 后,只会执行 nr_exclusive 个回调函数,而 nr_exclusive 在epoll中为 1,所以就只会把就绪的文件添加到一个 epoll 中,这样避免了惊群效应.

代码位置 kernel/sched/wait.c, __wake_up_comm()

list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
unsigned flags = curr->flags;
ret = curr->func(curr, mode, wake_flags, key);
if (ret < 0)
break;
if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}

epoll 间的相互影响及处理

产生这个最根本的原因就是 epoll作为一个文件既可以监视其他文件,也可以被其他epoll监视。这样就产生了一个监视的有向图

ep_eventpoll_poll

ep_eventpoll_poll 文件的poll操作,也就是file->f_op->poll(). 调用该函数可以获取就绪文件的事件掩码,但是 epoll 文件只提供读就绪事件,并且读就绪事件是由非epoll文件的就绪事件决定的。也就是说当一个epoll文件被 select(2)/poll(2)/epoll(2) 监听时,必须该epoll已经监听了其他的非epoll文件(如eventfd), 在调用 该epoll file->f_op->poll() 时才可能返回可读的就绪事件。

static __poll_t ep_eventpoll_poll(struct file *file, poll_table *wait)
{
struct eventpoll *ep = file->private_data;
int depth = 0; /* Insert inside our poll wait queue */
poll_wait(file, &ep->poll_wait, wait); /*
* Proceed to find out if wanted events are really available inside
* the ready list.
*/
return ep_scan_ready_list(ep, ep_read_events_proc,
&depth, depth, false);
}

对单独的 epitem 执行 poll 操作,获取就绪的文件事件掩码。

  1. 如果是非 epoll 文件,则执行 file->f_op->poll 操作。
  2. 如果是 epoll 文件,则扫描该 epoll 中就绪文件链表上的 epitem 是否就绪,这里产生了一个递归。

1 是递归的基准情况,而 ep_scan_ready_list 负责为向前推进


static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
int depth)
{
struct eventpoll *ep;
bool locked; pt->_key = epi->event.events;
if (!is_file_epoll(epi->ffd.file))
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) &
epi->event.events; ep = epi->ffd.file->private_data;
poll_wait(epi->ffd.file, &ep->poll_wait, pt);
locked = pt && (pt->_qproc == ep_ptable_queue_proc); return ep_scan_ready_list(epi->ffd.file->private_data,
ep_read_events_proc, &depth, depth,
locked) & epi->event.events;
} // 实际执行读取事件的函数
static __poll_t ep_read_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct epitem *epi, *tmp;
poll_table pt;
int depth = *(int *)priv; init_poll_funcptr(&pt, NULL);
depth++; // 遍历该 epoll 中的 rdllist(参数为head)
list_for_each_entry_safe(epi, tmp, head, rdllink) {
if (ep_item_poll(epi, &pt, depth)) { // 获取该 epitem 的就绪事件,若就绪,则返回为可读的就绪事件掩码
return EPOLLIN | EPOLLRDNORM;
} else { // 未就绪抬走下一个 epitem
/*
* Item has been dropped into the ready list by the poll
* callback, but it's not actually ready, as far as
* caller requested events goes. We can remove it here.
*/
__pm_relax(ep_wakeup_source(epi));
list_del_init(&epi->rdllink); // 无就绪事件,将当前epitem 从所有者 eventpoll 的就绪链表中删除
}
} return 0; // 当前这个 epitem 没有就绪的事件产生。
}

深度递归调用及死循环的检测

现在有三个文件

  1. eventfd efd
  2. epoll epfd1
  3. epoll epfd2

操作如下:

  • epoll_ctl(epfd1, EPOLL_CTL_ADD, efd, IN | OUT);
  • epoll_ctl(epfd2, EPOLL_CTL_ADD, epfd1, IN | OUT);

现在这三者的关系如下:

  • epfd1 \(\in\) struct_ctx(efd).wqh
  • epitem(efd) \(\in\) eventpoll(epfd1) , epfd2 \(\in\) eventpoll(epfd1).poll_wait

现在efd就绪,产生了 IN | OUT 事件,这个时候调用 ep_poll_callback(epfd1) 将 epitem(efd) 添加到 eventpoll(epfd1).rdllist 上,唤醒 epoll_wait(2) 和 eventpoll(epfd1).poll_wait 上的等待项, 这里再调用 ep_poll_callback(epfd2) 将 epitem(epfd1) 添加至 eventpoll(epfd2).rdllist 上,唤醒 epoll_wait(2).

关键点到了,如果现在epfd1 也监视 epfd2

  • 操作 epoll_ctl(epfd1, EPOLL_CTL_ADD, epfd2, IN | OUT).
  • 那么 ep_poll_callback(epfd1) \(\in\) eventpoll(epfd2).poll_wait.

在 ep_poll_callback(epfd2) 执行时,又会唤醒 eventpoll(epfd2).poll_wait 上的等待项,也就是 ep_poll_callback(epfd1). 所以就有可能出现死循环递归。

ep_call_nested 函数用来检测嵌套调用,就是针对 epitem 为 epoll 文件的处理。

我们可以将 epoll 比作一个树里面的一个节点,eventfd 这种文件只能作为叶节点使用,而 epoll 可以不是叶节点。现在我们对这棵树(如果出现相互监视,就变成了图)进行遍历,用 visited 作为已访问标志,检测到其结构中的epitem的文件类型只要是epoll 文件就继续向前推进(访问其子epitem),每次向前推进的时候进行检测,判断是否出现死循环或者递归深度超出范围。

和上面的 ep_scan_ready_list 处理逻辑有一点相近,就是遍历这些 epoll 文件形成图

static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct eventpoll *ep = file->private_data;
struct eventpoll *ep_tovisit;
struct rb_node *rbp;
struct epitem *epi; mutex_lock_nested(&ep->mtx, call_nests + 1);
ep->visited = 1; // 优化处理,已访问标志
list_add(&ep->visited_list_link, &visited_list);
for (rbp = rb_first_cached(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
if (unlikely(is_file_epoll(epi->ffd.file))) { // epoll 文件
ep_tovisit = epi->ffd.file->private_data;
if (ep_tovisit->visited)
continue;
// 继续向前推进,递归检测
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, epi->ffd.file,
ep_tovisit, current);
if (error != 0)
break;
} else {
// 该item未添加至文件检测链表中(唤醒风暴检测使用),是的 epoll 虽然叫文件,可是这里并不是一等公民。
if (list_empty(&epi->ffd.file->f_tfile_llink))
list_add(&epi->ffd.file->f_tfile_llink,
&tfile_check_list);
}
}
mutex_unlock(&ep->mtx); return error;
} static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
int (*nproc)(void *, void *, int), void *priv,
void *cookie, void *ctx)
{
int error, call_nests = 0;
unsigned long flags;
struct list_head *lsthead = &ncalls->tasks_call_list;
struct nested_call_node *tncur;
struct nested_call_node tnode; spin_lock_irqsave(&ncalls->lock, flags); list_for_each_entry(tncur, lsthead, llink) {
// call_nests 为嵌套的调用深度,cookie 为 eventpoll 结构,ctx 为当前的任务 struct_task,不懂为何呀用当前任务做限定。
if (tncur->ctx == ctx &&
(tncur->cookie == cookie || ++call_nests > max_nests)) {
error = -1;
goto out_unlock;
}
} /* Add the current task and cookie to the list */
tnode.ctx = ctx;
tnode.cookie = cookie;
list_add(&tnode.llink, lsthead); // 满足条件就添加到静态链表中 spin_unlock_irqrestore(&ncalls->lock, flags); /* Call the nested function */
error = (*nproc)(priv, cookie, call_nests); // 继续调用向前推进 /* Remove the current task from the list */
spin_lock_irqsave(&ncalls->lock, flags);
list_del(&tnode.llink);
out_unlock:
spin_unlock_irqrestore(&ncalls->lock, flags); return error;
}

唤醒风暴的处理

在进行插入的时候调用该检测操作,作为预防使用;而 EPOLLEXCLUSIVE 产生就绪事件后的处理,两者作用不能混淆。

唤醒风暴的检查为设定一个限制,epoll 允许唤醒的最大深度为 5,一个文件最多唤醒 path_limits[深度] 的epoll描述符。牵扯到递归的深度,自然是少不了 ep_call_nested 这个检测函数了。

先看 reverse_path_check 的返回,只有两种情况:

  1. -1 超出最该深度允许唤醒的epoll描述符
  2. 0 在正常范围内
#define PATH_ARR_SIZE 5
static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };
static int path_count[PATH_ARR_SIZE]; static int path_count_inc(int nests)
{
/* Allow an arbitrary number of depth 1 paths */
if (nests == 0)
return 0; if (++path_count[nests] > path_limits[nests])
return -1;
return 0;
} static void path_count_init(void)
{
int i; for (i = 0; i < PATH_ARR_SIZE; i++)
path_count[i] = 0;
} static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct file *child_file;
struct epitem *epi; /* CTL_DEL can remove links here, but that can't increase our count */
rcu_read_lock();
// 遍历该文件上的 epoll 节点 epitem
list_for_each_entry_rcu(epi, &file->f_ep_links, fllink) {
child_file = epi->ep->file; // 该 epitem 所属 epoll 实例
if (is_file_epoll(child_file)) { // 文件应该必为 epoll 的
if (list_empty(&child_file->f_ep_links)) { // epoll 未被监视
if (path_count_inc(call_nests)) { // 判断是否满足调用深度的条件
error = -1;
break; // 不满足直接返回
}
} else { // 被监视,那就继续调用,往前推进
error = ep_call_nested(&poll_loop_ncalls,
EP_MAX_NESTS,
reverse_path_check_proc,
child_file, child_file,
current);
}
if (error != 0)
break;
} else {
printk(KERN_ERR "reverse_path_check_proc: "
"file is not an ep!\n");
}
}
rcu_read_unlock();
return error;
} static int reverse_path_check(void)
{
int error = 0;
struct file *current_file; /* let's call this for all tfiles */
// 遍历监视的文件
list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {
path_count_init(); // 初始化调用深度的次数为0
// 检验可能发生递归的调用
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
reverse_path_check_proc, current_file,
current_file, current);
if (error)
break;
}
return error;
}

引子的解答

  1. epoll(2) 得到就绪事件的复杂度为何是 \(O(1)\)。

    - epoll_wait(2) 只扫描就绪文件队列,不用对所有的文件进行判断,见 epoll_wait(2) 的分析。
  2. epoll(2) 和普通的文件相比的区别在哪里,比如和 eventfd(2) 比较

    - 少了 read(2)/write(2) 等文件操作

    - epoll 作为被监视文件只有可读就绪事件,eventfd 拥有读写就绪事件。

    - eventfd 的就绪事件来自文件自身的状态(计数)变化,而epoll的就绪来自监视文件的状态的变化
  3. epoll(2) 相对 poll(2)/select(2) 多提供了 EPOLLET 的触发模式,实现是如何做到的。

    - 区别在于每次调用 epoll_wait(2)在复制就绪事件至用户空间后,水平触发模式会将该文件添加回就绪链表。
  4. epoll(2) 相互关注时,有就绪事件到来会产生相互唤醒的问题,为何会出现这样的问题

    - 见 epoll 间的相互影响及处理
  5. 对于问题 4,内核是如何解决这种相互唤醒的问题。

    - 同 4 解答

新的问题

循环检测的时候为何需要限定单个线程(任务)间的 epoll 不同,这个猜测可能和唤醒的机制有关,作为一个问题留下。

参考

  1. epoll: add EPOLLEXCLUSIVE flagEPOLLEXCLUSIVE 标志的提交代码。
  2. linux 内核poll/select/epoll实现剖析,对epoll很好的分析,代码稍微有点旧了,不过还是非常值得一看。
  3. eventfd 源码分析,上一篇对eventfd的分析。

最新文章

  1. Android tween 动画 XML 梳理
  2. jmeter+ant+jenkins+mac环境搭建
  3. 404. Sum of Left Leaves
  4. Spring MVC3返回JSON数据中文乱码问题解决(转)
  5. mvc项目架构搭建之UI层的搭建
  6. yum工具介绍
  7. Java通过代理server上网
  8. Java基础知识强化之集合框架笔记28:ArrayList集合练习之去除ArrayList集合中的重复字符串元素(升级)
  9. __file__ __name__ __doc__ argv详解
  10. standing
  11. python爬虫从入门到放弃(七)之 PyQuery库的使用
  12. jprofiler配置
  13. mysql uodate 报错 You can&#39;t specify target table &#39;**&#39; for update in FROM clause
  14. ThreadLocal, HandlerThread, IntentService
  15. POJ 1182 食物链 【带权并查集】
  16. 学习笔记:python3,代码。小例子习作(2017)
  17. Python 管理 MySQL
  18. STL学习笔记--特殊容器
  19. EA类图与代码同步
  20. EOS 的网站及资料doc

热门文章

  1. PHPSTORM 2019 激活
  2. .net 更新access数据库 影响的行数为0
  3. 自定义TabLayout的Indicator
  4. SpringCloudEureka入门
  5. Flask基础(10)--&gt;http的无状态协议解决办法一(客户端cookie)
  6. 彻底理解CORS跨域原理
  7. Java 学习笔记之 线程脏读
  8. 快学Scala 第六课 (类getter和setter)
  9. dubbo配置负载均衡、集群环境
  10. cocos2d-x Windows 环境搭建