写点什么

读懂才会用 : 瞅瞅 Redis 的 epoll 模型

发布于: 2020 年 05 月 13 日
读懂才会用 : 瞅瞅Redis的epoll模型

上一篇提到了 Redis 采用 epoll 模型来提升链接处理能力。本文,我们从源代码的角度,简单理解 Redis 是如何使用 epoll 以及 epoll 的实现原理。浅入浅出~


通过本文了解如下三件事儿,就算是达到了本文目的:

1、epoll 是 Linux 提供的系统实现,核心方法只有三个

2、epoll 效率高,是因为基于红黑树、双向链表、事件回调机制

3、redis 的 IO 多路复用,Linux 上用 epoll 进行了实现


epoll 是 Linux 内核提供的一种多路复用器,照例问问 Linux 的男人:

EPOLL(7)                    Linux Programmer's Manual                    EPOLL(7)
NAME epoll - I/O event notification facility
SYNOPSIS #include <sys/epoll.h>
DESCRIPTION The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them. The epoll API can be used either as an edge-triggered or a level-triggered interface and scales well to large numbers of watched file descriptors. The following system calls are provided to create and manage an epoll instance:
* epoll_create(2) creates an epoll instance and returns a file descriptor referring to that instance. (The more recent epoll_create1(2) extends the functionality of epoll_create(2).)
* Interest in particular file descriptors is then registered via epoll_ctl(2). The set of file descriptors currently registered on an epoll instance is sometimes called an epoll set. * epoll_wait(2) waits for I/O events, blocking the calling thread if no events are currently available.
复制代码

核心方法


man 告诉我们 epoll 的定义在 sys/epoll.h 中,查看核心函数有 3 个:(在线代码https://elixir.bootlin.com/linux/v4.19.76/source/fs/eventpoll.c

epoll_create

epoll_create(int size)

核心功能:

  1. 创建一个 epoll 文件描述符

  2. 创建 eventpoll,其中包含红黑树 cache 和双向链表


参数 size 并不是限制了 epoll 所能监听的文件描述符最大个数,只是对内核初始分配内部数据结构的一个建议。在 Linux 2.6.8 后,size 参数被忽略,但是必须传一个比 0 大的数。

调用 epoll_create 后,会占用一个 fd 值。在 Linux 下可以查看/proc/$$/fd/ 文件描述符。使用完,需要调用 close 关闭。


eventpoll 代码片段:

struct eventpoll {	/*	 * This mutex is used to ensure that files are not removed	 * while epoll is using them. This is held during the event	 * collection loop, the file cleanup path, the epoll file exit	 * code and the ctl operations.	 */	struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq;
/* Wait queue used by file->poll() */ wait_queue_head_t poll_wait;
/* List of ready file descriptors */ struct list_head rdllist;//就绪列表,采用双向链表
/* RB tree root used to store monitored fd structs */ struct rb_root_cached rbr;//红黑树,保存存活的fd
/* * This is a single linked list that chains all the "struct epitem" that * happened while transferring ready events to userspace w/out * holding ->wq.lock. */ struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */ struct wakeup_source *ws;
/* The user that created the eventpoll descriptor */ struct user_struct *user;
struct file *file;
/* used to optimize loop detection check */ int visited; struct list_head visited_list_link;
#ifdef CONFIG_NET_RX_BUSY_POLL /* used to track busy poll napi_id */ unsigned int napi_id;#endif};

复制代码

epollctl

int epollctl(int epfd, int op, int fd, struct epollevent *event);

核心功能:

  1. 对指定描述符 fd 执行 op 的绑定操作

  2. 把 fd 写入红黑树,同时在内核注册回调函数


op 操作类型,用三个宏 EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD,来分别表示增删改对 fd 的监听。

epollwait

int epollwait(int epfd, struct epollevent *events, int maxevents, int timeout);

核心功能:

  1. 获取 epfd 上的 io 事件


参数 events 是就绪事件,用来得到想要获得的事件集合。maxevents 表示的 events 有多大,maxevents 的值必须大于 0,参数 timeout 是超时时间。epollwait 会阻塞,直到一个文件描述符触发了事件,或者被一个信号处理函数打断,或者 timeout 超时。返回值是需要处理的 fd 数量。

工作机制


  • 建立高速缓存(红黑树)和待读取列表(双向链表)

  • 对要监控的 fd(一切都是 fd,参考NIO 看破也说破(一)—— Linux/IO 基础),进行事件绑定。事件发生,通过 callback 放入待读取列表

  • 阻塞获取待读取列表


执行流程



优点


  1. epoll 创建的红黑树保存所有 fd,没有大小限制,且增删查的复杂度 O(logN)

  2. 基于 callback,利用系统内核触发感兴趣的事件

  3. 就绪列表为双线链表时间复杂度 O(1)

  4. 应用获取到的 fd 都是真实发生 IO 的 fd,与 select 和 poll 需要不断轮询判断是否可用相比,能避免无用的内存拷贝


结合 Redis 代码

源码太多,我们只看和本文相关的模块

事件处理模块 ae.c/ae_epoll.c

网路链接库 anet.cnetworking.c

服务器端 server.c



创建事件管理器

server.c 的 L2702 initServer() 是 redis server 的启动入口,

首先创建aeEventLoop对象,在 L2743 调用 aeCreateEventLoop(),初始化未就绪文件事件表、就绪文件事件表。events 指针指向未就绪文件事件表、fired 指针指向就绪文件事件表。

aeEventLoop *aeCreateEventLoop(int setsize) {    aeEventLoop *eventLoop;    int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop;
err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL;}
复制代码


ae_epoll.c L39 调用aeApiCreate 函数,首先创建了aeApiState对象,初始化了 epoll 就绪事件表;然后调用epoll_create创建了 epoll 实例,最后将该aeApiState赋值给 apidata 属性


static int aeApiCreate(aeEventLoop *eventLoop) {    aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0;}
复制代码


绑定事件

aeFileEvent是文件事件结构,对于每一个具体的事件,都有读处理函数和写处理函数。Redis 调用aeCreateFileEvent函数针对不同的套接字的读写事件,注册对应的文件事件。

/* File event structure */typedef struct aeFileEvent {    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */    aeFileProc *rfileProc;//读    aeFileProc *wfileProc;//写    void *clientData;} aeFileEvent;
复制代码


server.c L2848 aeCreateFileEvent 创建文件事件,执行 ae_epoll.c L73 aeApiAddEvent

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {    aeApiState *state = eventLoop->apidata;    struct epoll_event ee = {0}; /* avoid valgrind warning */    /* If the fd was already monitored for some event, we need a MOD     * operation. Otherwise we need an ADD operation. */    int op = eventLoop->events[fd].mask == AE_NONE ?            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0;}
复制代码

aeApiAddEvent 调用系统 epoll_ctl,注册事件


处理事件


server.c 倒数第三行,调用aeMain方法

void aeMain(aeEventLoop *eventLoop) {    eventLoop->stop = 0;    while (!eventLoop->stop) {        if (eventLoop->beforesleep != NULL)            eventLoop->beforesleep(eventLoop);        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);    }}
复制代码

aeProcessEvents方法中针对事件和文件事件处理,在 ae.c L433 调用 aeApiPoll,方法具体实现在 ae_poll.c L108:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {    aeApiState *state = eventLoop->apidata;    int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j;
numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents;}
复制代码

调用epoll_wait阻塞等待 epoll 的事件就绪,超时时间就是之前根据最快达到时间事件计算而来的超时时间;然后将就绪的 epoll 事件转换到 fired 就绪事件。aeApiPoll就是上文所说的 I/O 多路复用程序。


结论

  • epoll_create 创建就绪列表

  • epoll_ctl 绑定事件,事件发生时 fd 到就绪列表

  • epoll_wait 读取就绪列表


参考:

redis6.0.1 源码

https://elixir.bootlin.com/linux/v4.19.76/source/fs/eventpoll.c

man 手册

NIO 看破也说破(一)—— Linux/IO 基础


关注我

如果您在微信阅读,请您点击链接 关注我 ,如果您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误



发布于: 2020 年 05 月 13 日阅读数: 482
用户头像

欢迎关注公众号“小眼睛聊技术” 2018.11.12 加入

互联网老兵,关注产品、技术、管理

评论 (1 条评论)

发布
暂无评论
读懂才会用 : 瞅瞅Redis的epoll模型