写点什么

从 linux 源码看 epoll

发布于: 2020 年 09 月 14 日
从linux源码看epoll

从 linux 源码看 epoll


前言


在 linux 的高性能网络编程中,绕不开的就是 epoll。和 select、poll 等系统调用相比,epoll 在需要监视大量文件描述符并且其中只有少数活跃的时候,表现出无可比拟的优势。epoll 能让内核记住所关注的描述符,并在对应的描述符事件就绪的时候,在 epoll 的就绪链表中添加这些就绪元素,并唤醒对应的 epoll 等待进程。

本文就是笔者在探究 epoll 源码过程中,对 kernel 将就绪描述符添加到 epoll 并唤醒对应进程的一次源码分析(基于 linux-2.6.32 内核版本)。由于篇幅所限,笔者聚焦于 tcp 协议下 socket 可读事件的源码分析。


简单的 epoll 例子


下面的例子,是从笔者本人用 c 语言写的 dbproxy 中的一段代码。由于细节过多,所以做了一些删减。


int init_reactor(int listen_fd,int worker_count){    ......    // 创建多个epoll fd,以充分利用多核    for(i=0;i<worker_count;i++){        reactor->worker_fd = epoll_create(EPOLL_MAX_EVENTS);    }    /* epoll add listen_fd and accept */    // 将accept后的事件加入到对应的epoll fd中    int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));    // 将连接描述符注册到对应的worker里面    epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);}// reactor的worker线程static void* rw_thread_func(void* arg){    ......
for(;;){ // epoll_wait等待事件触发 int retval = epoll_wait(epfd,events,EPOLL_MAX_EVENTS,500); if(retval > 0){ for(j=0; j < retval; j++){ // 处理读事件 if(event & EPOLLIN){ handle_ready_read_connection(conn); continue; } /* 处理其它事件 */ } } } ......}
复制代码


上述代码事实上就是实现了一个 reactor 模式中的 accept 与 read/write 处理线程,如下图所示:



epoll_create


Unix 的万物皆文件的思想在 epoll 里面也有体现,epoll_create 调用返回一个文件描述符,此描述符挂载在 anon_inode_fs(匿名 inode 文件系统)的根目录下面。让我们看下具体的 epoll_create 系统调用源码:


SYSCALL_DEFINE1(epoll_create, int, size){    if (size <= 0)        return -EINVAL;
return sys_epoll_create1(0);}
复制代码


由上述源码可见,epoll_create 的参数是基本没有意义的,kernel 简单的判断是否为 0,然后就直接就调用了 sys_epoll_create1。由于 linux 的系统调用是通过(SYSCALL_DEFINE1,SYSCALL_DEFINE2……SYSCALL_DEFINE6)定义的,那么 sys_epoll_create1 对应的源码即是 SYSCALL_DEFINE(epoll_create1)。

(注:受限于寄存器数量的限制,(80x86 下的)kernel 限制系统调用最多有 6 个参数。据 ulk3 所述,这是由于 32 位 80x86 寄存器的限制)

接下来,我们就看下 epoll_create1 的源码:


SYSCALL_DEFINE1(epoll_create1, int, flags){    // kzalloc(sizeof(*ep), GFP_KERNEL),用的是内核空间    error = ep_alloc(&ep);    // 获取尚未被使用的文件描述符,即描述符数组的槽位    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));    // 在匿名inode文件系统中分配一个inode,并得到其file结构体    // 且file->f_op = &eventpoll_fops    // 且file->private_data = ep;    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,                 O_RDWR | (flags & O_CLOEXEC));    // 将file填入到对应的文件描述符数组的槽里面    fd_install(fd,file);                 ep->file = file;    return fd;}
复制代码


最后 epoll_create 生成的文件描述符如下图所示:



struct eventpoll


所有的 epoll 系统调用都是围绕 eventpoll 结构体做操作,现简要描述下其中的成员:


/* * 此结构体存储在file->private_data中 */struct eventpoll {    // 自旋锁,在kernel内部用自旋锁加锁,就可以同时多线(进)程对此结构体进行操作    // 主要是保护ready_list    spinlock_t lock;    // 这个互斥锁是为了保证在eventloop使用对应的文件描述符的时候,文件描述符不会被移除掉    struct mutex mtx;    // epoll_wait使用的等待队列,和进程唤醒有关    wait_queue_head_t wq;    // file->poll使用的等待队列,和进程唤醒有关    wait_queue_head_t poll_wait;    // 就绪的描述符队列    struct list_head rdllist;    // 通过红黑树来组织当前epoll关注的文件描述符    struct rb_root rbr;    // 在向用户空间传输就绪事件的时候,将同时发生事件的文件描述符链入到这个链表里面    struct epitem *ovflist;    // 对应的user    struct user_struct *user;    // 对应的文件描述符    struct file *file;    // 下面两个是用于环路检测的优化    int visited;    struct list_head visited_list_link;};
复制代码


本文讲述的是 kernel 是如何将就绪事件传递给 epoll 并唤醒对应进程上,因此在这里主要聚焦于(wait_queue_head_t wq)等成员。


epoll_ctl(add)


我们看下 epoll_ctl(EPOLL_CTL_ADD)是如何将对应的文件描述符插入到 eventpoll 中的。

借助于 spin_lock(自旋锁)和 mutex(互斥锁),epoll_ctl 调用可以在多个 KSE(内核调度实体,即进程/线程)中并发执行。


SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,        struct epoll_event __user *, event){    /* 校验epfd是否是epoll的描述符 */    // 此处的互斥锁是为了防止并发调用epoll_ctl,即保护内部数据结构    // 不会被并发的添加修改删除破坏    mutex_lock_nested(&ep->mtx, 0);    switch (op) {        case EPOLL_CTL_ADD:            ...            // 插入到红黑树中            error = ep_insert(ep, &epds, tfile, fd);            ...            break;        ......    }    mutex_unlock(&ep->mtx);    }
复制代码


上述过程如下图所示:



ep_insert


在 ep_insert 中初始化了 epitem,然后初始化了本文关注的焦点,即事件就绪时候的回调函数,代码如下所示:


static int ep_insert(struct eventpoll *ep, struct epoll_event *event,             struct file *tfile, int fd){    /* 初始化epitem */    // &epq.pt->qproc = ep_ptable_queue_proc    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);    // 在这里将回调函数注入    revents = tfile->f_op->poll(tfile, &epq.pt);    // 如果当前有事件已经就绪,那么一开始就会被加入到ready list    // 例如可写事件    // 另外,在tcp内部ack之后调用tcp_check_space,最终调用sock_def_write_space来唤醒对应的epoll_wait下的进程    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {        list_add_tail(&epi->rdllink, &ep->rdllist);        // wake_up ep对应在epoll_wait下的进程        if (waitqueue_active(&ep->wq)){            wake_up_locked(&ep->wq);        }        ......    }        // 将epitem插入红黑树    ep_rbtree_insert(ep, epi);    ......}
复制代码


tfile->f_op->poll 的实现


向 kernel 更底层注册回调函数的是 tfile->f_op->poll(tfile, &epq.pt)这一句,我们来看一下对于对应的 socket 文件描述符,其 fd=>file->f_op->poll 的初始化过程:


    // 将accept后的事件加入到对应的epoll fd中    int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));    // 将连接描述符注册到对应的worker里面    epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);
复制代码


回顾一下上述 user space 代码,fd 即 client_fd 是由 tcp 的 listen_fd 通过 accept 调用而来,那么我们看下 accept 调用链的关键路径:


accept      |->accept4            |->sock_attach_fd(newsock, newfile, flags & O_NONBLOCK);                  |->init_file(file,...,&socket_file_ops);                        |->file->f_op = fop;                              /* file->f_op = &socket_file_ops */            |->fd_install(newfd, newfile); // 安装fd
复制代码


那么,由 accept 获得的 client_fd 的结构如下图所示:



(注:由于是 tcp socket,所以这边 sock->ops=inet_stream_ops,这个初始化的过程在我的另一篇博客<<从 linux 源码看 socket 的阻塞和非阻塞>>中,博客地址如下:

https://my.oschina.net/alchemystar/blog/1791017)

既然知道了 tfile->f_op->poll 的实现,我们就可以看下此 poll 是如何将安装回调函数的。


回调函数的安装


kernel 的调用路径如下:


sock_poll /*tfile->f_op->poll(tfile, &epq.pt)*/;    |->sock->ops->poll        |->tcp_poll            /* 这边重要的是拿到了sk_sleep用于KSE(进程/线程)的唤醒 */            |->sock_poll_wait(file, sk->sk_sleep, wait);                |->poll_wait                    |->p->qproc(filp, wait_address, p);                    /* p为&epq.pt,而且&epq.pt->qproc= ep_ptable_queue_proc*/                        |-> ep_ptable_queue_proc(filp,wait_address,p);
复制代码


绕了一大圈之后,我们的回调函数的安装其实就是调用了 eventpoll.c 中的 ep_ptable_queue_proc,而且向其中传递了 sk->sk_sleep 作为其 waitqueue 的 head,其源码如下所示:


static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,                 poll_table *pt){    // 取出当前client_fd对应的epitem    struct epitem *epi = ep_item_from_epqueue(pt);    // &pwq->wait->func=ep_poll_callback,用于回调唤醒    // 注意,这边不是init_waitqueue_entry,即没有将当前KSE(current,当前进程/线程)写入到    // wait_queue当中,因为不一定是从当前安装的KSE唤醒,而应该是唤醒epoll\_wait的KSE    init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);    // 这边的whead是sk->sk_sleep,将当前的waitqueue链入到socket对应的sleep列表    add_wait_queue(whead, &pwq->wait);    }
复制代码


这样 client_fd 的结构进一步完善,如下图所示:



ep_poll_callback 函数是唤醒对应 epoll_wait 的地方,我们将在后面一起讲述。


epoll_wait


epoll_wait 主要是调用了 ep_poll:


SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,        int, maxevents, int, timeout){    /* 检查epfd是否是epoll\_create创建的fd */    // 调用ep_poll    error = ep_poll(ep, events, maxevents, timeout);    ...}
复制代码


紧接着,我们看下 ep_poll 函数:


static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,           int maxevents, long timeout){    ......retry:    // 获取spinlock    spin_lock_irqsave(&ep->lock, flags);    // 将当前task_struct写入到waitqueue中以便唤醒    // wq_entry->func = default_wake_function;    init_waitqueue_entry(&wait, current);    // WQ_FLAG_EXCLUSIVE,排他性唤醒,配合SO_REUSEPORT从而解决accept惊群问题    wait.flags |= WQ_FLAG_EXCLUSIVE;    // 链入到ep的waitqueue中    __add_wait_queue(&ep->wq, &wait);    for (;;) {        // 设置当前进程状态为可打断        set_current_state(TASK_INTERRUPTIBLE);        // 检查当前线程是否有信号要处理,有则返回-EINTR        if (signal_pending(current)) {            res = -EINTR;            break;        }        spin_unlock_irqrestore(&ep->lock, flags);        // schedule调度,让出CPU        jtimeout = schedule_timeout(jtimeout);        spin_lock_irqsave(&ep->lock, flags);    }    // 到这里,表明超时或者有事件触发等动作导致进程重新调度    __remove_wait_queue(&ep->wq, &wait);    // 设置进程状态为running    set_current_state(TASK_RUNNING);    ......    // 检查是否有可用事件    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;    ......    // 向用户空间拷贝就绪事件    ep_send_events(ep, events, maxevents)}
复制代码


上述逻辑如下图所示:



ep_send_events


ep_send_events 函数主要就是调用了 ep_scan_ready_list,顾名思义 ep_scan_ready_list 就是扫描就绪列表:


static int ep_scan_ready_list(struct eventpoll *ep,                  int (*sproc)(struct eventpoll *,                       struct list_head *, void *),                  void *priv,                  int depth){    ...    // 将epfd的rdllist链入到txlist    list_splice_init(&ep->rdllist, &txlist);    ...    /* sproc = ep_send_events_proc */    error = (*sproc)(ep, &txlist, priv);    ...    // 处理ovflist,即在上面sproc过程中又到来的事件    ...}
复制代码


其主要调用了 ep_send_events_proc:


static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,                   void *priv){    for (eventcnt = 0, uevent = esed->events;         !list_empty(head) && eventcnt < esed->maxevents;) {       // 遍历ready list         epi = list_first_entry(head, struct epitem, rdllink);        list_del_init(&epi->rdllink);        // readylist只是表明当前epi有事件,具体的事件信息还是得调用对应file的poll        // 这边的poll即是tcp_poll,根据tcp本身的信息设置掩码(mask)等信息 & 上兴趣事件掩码,则可以得知当前事件是否是epoll_wait感兴趣的事件        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &            epi->event.events;        if(revents){            /* 将event放入到用户空间 */            /* 处理ONESHOT逻辑 */            // 如果不是边缘触发,则将当前的epi重新加回到可用列表中,这样就可以下一次继续触发poll,如果下一次poll的revents不为0,那么用户空间依旧能感知 */            else if (!(epi->event.events & EPOLLET)){                list_add_tail(&epi->rdllink, &ep->rdllist);            }            /* 如果是边缘触发,那么就不加回可用列表,因此只能等到下一个可用事件触发的时候才会将对应的epi放到可用列表里面*/            eventcnt++        }        /* 如poll出来的revents事件epoll_wait不感兴趣(或者本来就没有事件),那么也不会加回到可用列表 */        ......    }    return eventcnt;}
复制代码


上述代码逻辑如下所示:



事件到来添加到 epoll 就绪队列(rdllist)的过程


经过上述章节的详述之后,我们终于可以阐述,tcp 在数据到来时是怎么加入到 epoll 的就绪队列的了。


可读事件到来


首先我们看下 tcp 数据包从网卡驱动到 kernel 内部 tcp 协议处理调用链:


step1:


网络分组到来的内核路径,网卡发起中断后调用 netif_rx 将事件挂入 CPU 的等待队列,并唤起软中断(soft_irq),再通过 linux 的软中断机制调用 net_rx_action,如下图所示:



注:上图来自 PLKA(<<深入 Linux 内核架构>>)


step2:


紧接着跟踪 next_rx_action


next_rx_action    |-process_backlog        ......            |->packet_type->func 在这里我们考虑ip_rcv                    |->ipprot->handler 在这里ipprot重载为tcp_protocol                        (handler 即为tcp_v4_rcv)
复制代码


我们再看下对应的 tcp_v4_rcv


tcp_v4_rcv      |->tcp_v4_do_rcv            |->tcp_rcv_state_process                  |->tcp_data_queue                        |-> sk->sk_data_ready(sock_def_readable)                              |->wake_up_interruptible_sync_poll(sk->sleep,...)                                    |->__wake_up                                          |->__wake_up_common                                                |->curr->func                                                /* 这里已经被ep_insert添加为ep_poll_callback,而且设定了排它标识WQ_FLAG_EXCLUSIVE*/                                                      |->ep_poll_callback
复制代码


这样,我们就看下最终唤醒 epoll_wait 的 ep_poll_callback 函数:


static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){    // 获取wait对应的epitem        struct epitem *epi = ep_item_from_wait(wait);    // epitem对应的eventpoll结构体    struct eventpoll *ep = epi->ep;    // 获取自旋锁,保护ready_list等结构    spin_lock_irqsave(&ep->lock, flags);    // 如果当前epi没有被链入ep的ready list,则链入    // 这样,就把当前的可用事件加入到epoll的可用列表了    if (!ep_is_linked(&epi->rdllink))        list_add_tail(&epi->rdllink, &ep->rdllist);    // 如果有epoll_wait在等待的话,则唤醒这个epoll_wait进程    // 对应的&ep->wq是在epoll_wait调用的时候通过init_waitqueue_entry(&wait, current)而生成的    // 其中的current即是对应调用epoll_wait的进程信息task_struct    if (waitqueue_active(&ep->wq))        wake_up_locked(&ep->wq);}
复制代码


上述过程如下图所示:



最后 wake_up_locked 调用__wake_up_common,然后调用了在 init_waitqueue_entry 注册的 default_wake_function,调用路径为:


wake_up_locked    |->__wake_up_common        |->default_wake_function            |->try_wake_up (wake up a thread)                |->activate_task                    |->enqueue_task    running
复制代码


将 epoll_wait 进程推入可运行队列,等待内核重新调度进程,然后 epoll_wait 对应的这个进程重新运行后,就从 schedule 恢复,继续下面的 ep_send_events(向用户空间拷贝事件并返回)。

wake_up 过程如下图所示:



可写事件到来


可写事件的运行过程和可读事件大同小异:

首先,在 epoll_ctl_add 的时候预先会调用一次对应文件描述符的 poll,如果返回事件里有可写掩码的时候直接调用 wake_up_locked 以唤醒对应的 epoll_wait 进程。

然后,在 tcp 在底层驱动有数据到来的时候可能携带了 ack 从而可以释放部分已经被对端接收的数据,于是触发可写事件,这一部分的调用链为:


tcp_input.ctcp_v4_rcv    |-tcp_v4_do_rcv        |-tcp_rcv_state_process            |-tcp_data_snd_check                |->tcp_check_space                    |->tcp_new_space                        |->sk->sk_write_space                        /* tcp下即是sk_stream_write_space*/
复制代码


最后在此函数里面 sk_stream_write_space 唤醒对应的 epoll_wait 进程


void sk_stream_write_space(struct sock *sk){    // 即有1/3可写空间的时候才触发可写事件    if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) {        clear_bit(SOCK_NOSPACE, &sock->flags);
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) wake_up_interruptible_poll(sk->sk_sleep, POLLOUT | POLLWRNORM | POLLWRBAND) ...... }}
复制代码


关闭描述符(close fd)


值得注意的是,我们在 close 对应的文件描述符的时候,会自动调用 eventpoll_release 将对应的 file 从其关联的 epoll_fd 中删除,kernel 关键路径如下:


close fd      |->filp_close            |->fput                  |->__fput                        |->eventpoll_release                              |->ep_remove
复制代码


所以我们在关闭对应的文件描述符后,并不需要通过 epoll_ctl_del 来删掉对应 epoll 中相应的描述符。


总结


epoll 作为 linux 下非常优秀的事件触发机制得到了广泛的运用。其源码还是比较复杂的,本文只是阐述了 epoll 读写事件的触发机制,探究 linux kernel 源码的过程非常快乐^_^。


公众号


关注笔者公众号,获取更多干货文章:




发布于: 2020 年 09 月 14 日阅读数: 93
用户头像

公众号: <<解Bug之路>> 2019.02.11 加入

无论多么艰苦的时刻,都不要忘记。辉煌的未来,在你的眼中闪耀!

评论

发布
暂无评论
从linux源码看epoll