写点什么

epoll 源码分析以及在 Redis 中的实现

发布于: 2021 年 03 月 18 日

1.概述


这篇文章分析一下 linux 中 epoll 的实现原理,主要为了增强自己对网络调用的理解。业界使用 epoll 的框架比较多,随便就能列出来很多,比如 jdk 的 nio 在 linux 下的实现,以及 netty、redis 等涉及到长链接网络请求的地方,我们都可以直接使用 epoll。文末会从 redis 源码简单看看如何使用 epoll 做 IO 多路复用实现高并发。


2.具体实现


参考官方文档描述:


The central concept of the epoll API is the epoll instance, an inkernel data structure which, from a user-space perspective, can be considered as a container for two lists


所以 其实就是 epoll 就是内核的一个数据结构。从用户空间的角度,其实就是两个链表。所以基本上就是维护两个链表就可以了。 理解完这段话,我们也就能理解 Epoll 提供了三个方法了:


  • create 是初始化这个内核的数据结构。返回一个 fd。众所周知,unix 万物皆文件。所以,这里创建返回的是一个文件 fd。每次操作我们只需要传入 fd,内核便能拿到 epoll 对应的数据结构。

  • epoll_ctl 就是对其中一个链表的操作。这个链表存放用户感兴趣的 io 事件。当然在注册事件的时候,会有一些其他的操作。后面详细解释

  • epoll_wait,则是返回就绪事件(感兴趣的事件)。然后让应用层去处理。


int epoll_create(int size);int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
struct epoll_event { __uint32_t events; epoll_data_t data;};
复制代码


epoll_create 方法


SYSCALL_DEFINE1(epoll_create1, int, flags){    int error, fd;    struct eventpoll *ep = NULL;    struct file *file;    // 创建内部数据结构eventpoll     error = ep_alloc(&ep);    //查询未使用的fd    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,                 O_RDWR | (flags & O_CLOEXEC));
ep->file = file; fd_install(fd, file); //建立fd和file的关联关系 return fd;out_free_fd: put_unused_fd(fd);out_free_ep: ep_free(ep); return error;}
复制代码


简单的说一下这个方法。首先这个方法返回一个文件描述符。我们使用这个文件描述符就可以找到对应的结构体。也就是一块内存区域。epoll 的所有数据都会保存在这里。 这块内存区域用 eventpoll 结构体表示。 所以这个方法逻辑如下:


1.创建 eventpoll 结构体。初始化结构体相应的数据


2.查询一个未使用的 fd,然后为 epoll 创建一个文件。将 file 的->private_data 指向 ep。创建文件的过程就不多说


3.将 ep->file 指向 file。其实就是绑定一下。


4.将 fd 和 file 关联。这样我们就能通过 fd 找到对应的 file。并找到 ep 对应的结构体(内存区域) 这里再说一下 file 的 private_data 其实在设备驱动程序中非常重要,它可以指向一块自定义的数据结构。这也就是保证了一个设备驱动程序可以适配多个设备的原因。因为不同的设备可能存在不同的属性。epoll 这里用 private_data 指向自己的数据结构是完全没有问题的。


eventpoll 结构体内容如下。后面遇到详细说。


struct eventpoll {    spinlock_t lock;    struct mutex mtx;    wait_queue_head_t wq; //sys_epoll_wait()使用的等待队列    wait_queue_head_t poll_wait; //file->poll()使用的等待队列    struct list_head rdllist; //所有准备就绪的文件描述符列表    struct rb_root rbr; //用于储存已监控fd的红黑树根节点        struct epitem *ovflist; //用于监听文件的结构。如果rdllist被锁定,临时事件会被连接到这里    struct wakeup_source *ws; // 当ep_scan_ready_list运行时使用wakeup_source    struct user_struct *user; //创建eventpoll描述符的用户    struct file *file;    int visited;           //用于优化循环检测检查    struct list_head visited_list_link;};
复制代码


epoll_ctl 方法


该方法主要就是对监听事件进行增删改。


SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,        struct epoll_event __user *, event){    int error;    int full_check = 0;    struct fd f, tf;    struct eventpoll *ep;        struct epitem *epi;         struct epoll_event epds;     struct eventpoll *tep = NULL;    error = -EFAULT;    //如果不是删除操作,将用户空间的epoll_event 拷贝到内核    if (ep_op_has_event(op) &&        copy_from_user(&epds, event, sizeof(struct epoll_event)))    f = fdget(epfd); //epfd对应的文件    tf = fdget(fd); //fd对应的文件.    ...    ep = f.file->private_data; // 取出epoll_create过程创建的ep    ...    epi = ep_find(ep, tf.file, fd); //ep红黑树中查看该fd    switch (op) {    case EPOLL_CTL_ADD:        if (!epi) {            epds.events |= POLLERR | POLLHUP;            error = ep_insert(ep, &epds, tf.file, fd, full_check);         }        if (full_check)            clear_tfile_check_list();        break;    case EPOLL_CTL_DEL:        if (epi)            error = ep_remove(ep, epi);         break;    case EPOLL_CTL_MOD:        if (epi) {            epds.events |= POLLERR | POLLHUP;            error = ep_modify(ep, epi, &epds);         }        break;    }    mutex_unlock(&ep->mtx);    fdput(tf);    fdput(f);    ...    return error;}
复制代码


分享更多关于 C/C++ Linux 后端开发网络底层原理知识学习提升 点击 学习资料 获取,完善技术栈,内容知识点包括 Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux 内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK 等等。



上面省略了一些判断的代码。主要核心就是根据不同的事件类型执行不同的函数。


epoll 调用 ep_find 从红黑树拿到对应的 epi。如果已经存在,则不需要进行 add。如果不存在,也不可进行 remove 和 modify 操作。整个过程会加锁。因为是红黑树,所以查找以及插入性能都是 logn 级别。所以对于高并发场景,也是能实现快速注册监听的。下面我们分别看一下这三个操作的逻辑。


ep_insert 操作


顾名思义,就是加入监听事件。


static int ep_insert(struct eventpoll *ep, struct epoll_event *event,             struct file *tfile, int fd, int full_check){    int error, revents, pwake = 0;    unsigned long flags;    long user_watches;    struct epitem *epi;    struct ep_pqueue epq; //[小节2.4.5]

user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM;
//构造并填充epi结构体 INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); // 将tfile和fd都赋值给ffd epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; if (epi->event.events & EPOLLWAKEUP) { error = ep_create_wakeup_source(epi); } else { RCU_INIT_POINTER(epi->ws, NULL); } epq.epi = epi; //设置轮询回调函数 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); //执行poll方法 revents = ep_item_poll(epi, &epq.pt); spin_lock(&tfile->f_lock); list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock); ep_rbtree_insert(ep, epi); //将将当前epi添加到RB树 spin_lock_irqsave(&ep->lock, flags); //事件就绪 并且 epi的就绪队列有数据 if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi);
//唤醒正在等待文件就绪,即调用epoll_wait的进程 if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irqrestore(&ep->lock, flags); atomic_long_inc(&ep->user->epoll_watches);

if (pwake) ep_poll_safewake(&ep->poll_wait); //唤醒等待eventpoll文件就绪的进程 return 0;...}
复制代码


1.首先会初始化一个 epi,对于目标文件的监听都是需要通过 epi 去维护。一个文件的监听对应一个 epi。并且保存在 ep 的红黑树中。


struct epitem {    union {        struct rb_node rbn; //RB树节点将此结构链接到eventpoll RB树        struct rcu_head rcu; //用于释放结构体epitem    };

struct list_head rdllink; //时间的就绪队列,主要就是链接到eventpoll的rdllist struct epitem *next; //配合eventpoll中的ovflist一起使用来保持单向链的条目 struct epoll_filefd ffd; //该结构 监听的文件描述符信息,每一个socket fd都会对应一个epitem 。就是通过这个结构关联 int nwait; //附加到poll轮询中的活跃等待队列数
struct list_head pwqlist; //用于保存被监听文件的等待队列 struct eventpoll *ep; //epi所属的ep struct list_head fllink; //主要是为了实现一个文件被多个epoll监听。将该结构链接到文件的f_ep_link。 struct wakeup_source __rcu *ws; //设置EPOLLWAKEUP时使用的wakeup_source struct epoll_event event; //监控的事件和文件描述符};
复制代码


2.初始化结束后,会将文件的 fd 以及对应文件指针绑定到 epi 的 ffd 中。主要作用就是将 fd 和改 epi 绑定起来。


struct epoll_filefd {    struct file *file;    int fd;} __packed;
复制代码


3.为 epq 的 pt(其实就是一个 poll_table)注册对应的函数 ep_ptable_queue_proc。


struct ep_pqueue {    poll_table pt;    struct epitem *epi;};
typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key;} poll_table;
复制代码


这里 epq 是一个结构体,里面绑定了 epi 以及 poll_table。poll_table 主要注册了 ep_ptable_queue_proc 函数。_key 用于记录事件。 所以 epq 就保存了 epi,以及对应的 ep_ptable_queue_proc。后续执行回调函数的时候,我们可以通过 poll_table 的地址拿到对应的 epq,最终拿到对应的 epi,这也就是定义这个结构的目的。


4.调用 ep_item_poll 方法。这个方法我简单说一下。他会调用文件系统的 poll 方法。


static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt){    pt->_key = epi->event.events;、    return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;}
复制代码


不同的驱动程序,都会有自己的 poll 方法,如果是 TCP 套接字,这个 poll 方法就是 tcp_poll。在 TCP 中,会周期性的调用这个方法,调用频率取决于协议栈中断频率的设置。一旦有事件到达后,对应的 tcp_poll 方法被调用,tcp_poll 方法会回调用 sock_poll_wait(),该方法会调用这里注册的 ep_ptable_queue_proc 方法。epoll 其实就是通过此机制实现将自己的回调函数加入到文件的 waitqueue 中的。这也是 ep_ptable_queue_proc 的目的。


5.调用 ep_item_poll 后会返回 revents。也就是该 fd 触发的事件。如果有我们感兴趣的事件,会将其插入到 ep 的 rdllist 中。如果有进程正在等待文件的就绪状态,也就是调用 epoll_wait 睡眠的进程。那么会唤醒等待进程。


if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {        list_add_tail(&epi->rdllink, &ep->rdllist);        ep_pm_stay_awake(epi);
//唤醒正在等待文件就绪,即调用epoll_wait的进程 if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; }
复制代码


ep_ptable_queue_proc 方法


整个过程其实就是通过文件的 poll 方法绑定 ep_ptable_queue_proc 函数 。当该文件描述符对应的文件有事件到达后,回调用这个函数


Ps:其中 file 就是对应文件的结构体。当然 多个 fd 可以指向通过 file 结构。多个 file 可以同时指向同一个 innode 节点。在 linux 中,一个文件的内容是通过 innode 去定义描述的。file 只是我们对文件操作的时候创建出来的。这个概念大家需要清楚。


static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,                 poll_table *pt){    struct epitem *epi = ep_item_from_epqueue(pt);    struct eppoll_entry *pwq;  

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { //初始化回调方法 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; //将ep_poll_callback放入等待队列whead add_wait_queue(whead, &pwq->wait); //将llink 放入epi->pwqlist的尾部 list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { epi->nwait = -1; //标记错误发生 }}
static inline voidinit_waitqueue_func_entry(wait_queue_t *q, wait_queue_func_t func){ q->flags = 0; q->private = NULL; q->func = func;}
复制代码


ep_ptable_queue_proc 有三个参数。 file 就是监听文件的指针,whead 为该 fd 对应的设备等待队列。pt 就是我们当时调用文件的 poll 传入的东西。


在 ep_ptable_queue_proc,引入了 eppoll_entry。也就是 pwq。pwq 主要完成 epi 和 epi 事件发生时 callback 函数的关联。


从上面代码可以看出。首先根据 pt 拿到对应的 epi。然后通过 pwq 将三者关联。


最后通过 add_wait_queue 方法,将 eppoll_entry 挂在到 fd 的设备等待队列上。也就是注册 epoll 的回调函数。


所以这个方法的主要目标就是将 eppoll_entry 挂在到 fd 的设备等待队列上。当设备有硬件数据到达时,硬件中断处理函数会唤醒该队列上等待的进程时,会调用唤醒函数 ep_poll_callback。


分享更多关于 C/C++ Linux 后端开发网络底层原理知识学习提升 点击 学习资料 获取,完善技术栈,内容知识点包括 Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux 内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK 等等。



ep_poll_callback 方法


这个函数主要功能就是,当被监听文件的事件就绪,将文件对应的 epi 加入到就绪队列。当应用层调用 epoll_wait()的时候,内核会将就绪队列的事件拷贝到用户空间。报告给应用。


static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){    int pwake = 0;    unsigned long flags;    struct epitem *epi = ep_item_from_wait(wait);    struct eventpoll *ep = epi->ep;    spin_lock_irqsave(&ep->lock, flags);    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {        if (epi->next == EP_UNACTIVE_PTR) {            epi->next = ep->ovflist;            ep->ovflist = epi;            if (epi->ws) {                __pm_stay_awake(ep->ws);            }        }        goto out_unlock;    }    //如果此文件已在就绪列表中,很快就会退出    if (!ep_is_linked(&epi->rdllink)) {        //将epi就绪事件 插入到ep就绪队列        list_add_tail(&epi->rdllink, &ep->rdllist);        ep_pm_stay_awake_rcu(epi);    }    // 如果活跃,唤醒eventpoll等待队列和 ->poll()等待队列    if (waitqueue_active(&ep->wq))        wake_up_locked(&ep->wq);  //当队列不为空,则唤醒进程    if (waitqueue_active(&ep->poll_wait))        pwake++;out_unlock:    spin_unlock_irqrestore(&ep->lock, flags);    if (pwake)        ep_poll_safewake(&ep->poll_wait);

if ((unsigned long)key & POLLFREE) { list_del_init(&wait->task_list); //删除相应的wait smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL); } return 1;}
//判断等待队列是否为空static inline int waitqueue_active(wait_queue_head_t *q){ return !list_empty(&q->task_list);}
复制代码


3.epoll 实现总结


透过现象看本质,其实 epoll 的灵魂就是 ep_item_poll 和 ep_poll_callback。


epoll 依赖虚拟文件系统的 ep_item_poll。将 ep_poll_callback 注册到对应文件的 waitqueue 中。当对应文件有数据到来。注册的函数就会被调用。epoll 的回调会将对应文件的 epi 加入到就绪队列。


当用户调用 epoll_wait(),epoll 会加锁,将队列数据传递到用户空间,这个时间到的事件会被挂到 ovflist 中。


4.Redis 使用 Epoll


具体实现在 ae_epoll.c 中


typedef struct aeApiState {
// epoll_event 实例描述符 int epfd; // 事件槽 struct epoll_event *events;
} aeApiState;
复制代码


aeApiCreate 方法


redis 在初始化 server 的时候会调用 aeCreateEventLoop 方法。aeCreateEventLoop 回调用 aeApiCreate 去创建 epoll 实例。


static int aeApiCreate(aeEventLoop *eventLoop) {    aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1; state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->kqfd = kqueue(); if (state->kqfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; }
复制代码


aeApiAddEvent 方法


这个方法是关联事件到 epoll,所以会调用 epoll 的 ctl 方法


static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {    aeApiState *state = eventLoop->apidata;    struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. * * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。 * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。 */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// 注册事件到 epoll ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0;}
复制代码


这个方法在 redis 服务创建一个新的客户端的时候会调用。会注册这个客户端的读事件。


当 redis 需要给客户端写数据的时候会调用 prepareClientToWrite 方法。这个方法主要是注册对应 fd 的写事件。


如果注册失败,redis 就不会将数据写入缓冲。


如果对应套件字可写,那么 redis 的事件循环就会将缓冲区新数据写入 socket。


aeMain 方法


Redis 事件处理器的主循环。


void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件处理前执行的函数,那么运行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop);
// 开始处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); }}
复制代码


这个方法最终会调用 epoll_wait()获取对应事件并执行。


用户头像

Linux服务器开发qun720209036,欢迎来交流 2020.11.26 加入

专注C/C++ Linux后台服务器开发。

评论

发布
暂无评论
epoll源码分析以及在Redis中的实现