上一篇提到了 Redis 采用 epoll 模型来提升链接处理能力。本文,我们从源代码的角度,简单理解 Redis 是如何使用 epoll 以及 epoll 的实现原理。浅入浅出~
通过本文了解如下三件事儿,就算是达到了本文目的:
1、epoll 是 Linux 提供的系统实现,核心方法只有三个
2、epoll 效率高,是因为基于红黑树、双向链表、事件回调机制
3、redis 的 IO 多路复用,Linux 上用 epoll 进行了实现
epoll 是 Linux 内核提供的一种多路复用器,照例问问 Linux 的男人:
EPOLL(7) Linux Programmer's Manual EPOLL(7)
NAME
epoll - I/O event notification facility
SYNOPSIS
#include <sys/epoll.h>
DESCRIPTION
The epoll API performs a similar task to poll(2): monitoring multiple file
descriptors to see if I/O is possible on any of them. The epoll API can
be used either as an edge-triggered or a level-triggered interface and
scales well to large numbers of watched file descriptors. The following
system calls are provided to create and manage an epoll instance:
* epoll_create(2) creates an epoll instance and returns a file descriptor
referring to that instance. (The more recent epoll_create1(2) extends
the functionality of epoll_create(2).)
* Interest in particular file descriptors is then registered via
epoll_ctl(2). The set of file descriptors currently registered on an
epoll instance is sometimes called an epoll set.
* epoll_wait(2) waits for I/O events, blocking the calling thread if no
events are currently available.
复制代码
核心方法
man 告诉我们 epoll 的定义在 sys/epoll.h 中,查看核心函数有 3 个:(在线代码https://elixir.bootlin.com/linux/v4.19.76/source/fs/eventpoll.c)
epoll_create
epoll_create(int size)
核心功能:
创建一个 epoll 文件描述符
创建 eventpoll,其中包含红黑树 cache 和双向链表
参数 size 并不是限制了 epoll 所能监听的文件描述符最大个数,只是对内核初始分配内部数据结构的一个建议。在 Linux 2.6.8 后,size 参数被忽略,但是必须传一个比 0 大的数。
调用 epoll_create 后,会占用一个 fd 值。在 Linux 下可以查看/proc/$$/fd/ 文件描述符。使用完,需要调用 close 关闭。
eventpoll 代码片段:
struct eventpoll {
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist;//就绪列表,采用双向链表
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;//红黑树,保存存活的fd
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->wq.lock.
*/
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif
};
复制代码
epollctl
int epollctl(int epfd, int op, int fd, struct epollevent *event);
核心功能:
对指定描述符 fd 执行 op 的绑定操作
把 fd 写入红黑树,同时在内核注册回调函数
op 操作类型,用三个宏 EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD,来分别表示增删改对 fd 的监听。
epollwait
int epollwait(int epfd, struct epollevent *events, int maxevents, int timeout);
核心功能:
获取 epfd 上的 io 事件
参数 events 是就绪事件,用来得到想要获得的事件集合。maxevents 表示的 events 有多大,maxevents 的值必须大于 0,参数 timeout 是超时时间。epollwait 会阻塞,直到一个文件描述符触发了事件,或者被一个信号处理函数打断,或者 timeout 超时。返回值是需要处理的 fd 数量。
工作机制
执行流程
优点
epoll 创建的红黑树保存所有 fd,没有大小限制,且增删查的复杂度 O(logN)
基于 callback,利用系统内核触发感兴趣的事件
就绪列表为双线链表时间复杂度 O(1)
应用获取到的 fd 都是真实发生 IO 的 fd,与 select 和 poll 需要不断轮询判断是否可用相比,能避免无用的内存拷贝
结合 Redis 代码
源码太多,我们只看和本文相关的模块
事件处理模块 ae.c/ae_epoll.c
网路链接库 anet.c
和networking.c
服务器端 server.c
创建事件管理器
server.c 的 L2702 initServer()
是 redis server 的启动入口,
首先创建aeEventLoop
对象,在 L2743 调用 aeCreateEventLoop()
,初始化未就绪文件事件表、就绪文件事件表。events 指针指向未就绪文件事件表、fired 指针指向就绪文件事件表。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
复制代码
在ae_epoll.c
L39 调用aeApiCreate
函数,首先创建了aeApiState
对象,初始化了 epoll 就绪事件表;然后调用epoll_create
创建了 epoll 实例,最后将该aeApiState
赋值给 apidata 属性
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
复制代码
绑定事件
aeFileEvent
是文件事件结构,对于每一个具体的事件,都有读处理函数和写处理函数。Redis 调用aeCreateFileEvent
函数针对不同的套接字的读写事件,注册对应的文件事件。
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;//读
aeFileProc *wfileProc;//写
void *clientData;
} aeFileEvent;
复制代码
server.c L2848 aeCreateFileEvent
创建文件事件,执行 ae_epoll.c L73 aeApiAddEvent
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
复制代码
aeApiAddEvent
调用系统 epoll_ctl
,注册事件
处理事件
server.c 倒数第三行,调用aeMain
方法
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
复制代码
aeProcessEvents
方法中针对事件和文件事件处理,在 ae.c L433 调用 aeApiPoll
,方法具体实现在 ae_poll.c L108:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
复制代码
调用epoll_wait
阻塞等待 epoll 的事件就绪,超时时间就是之前根据最快达到时间事件计算而来的超时时间;然后将就绪的 epoll 事件转换到 fired 就绪事件。aeApiPoll
就是上文所说的 I/O 多路复用程序。
结论
参考:
redis6.0.1 源码
https://elixir.bootlin.com/linux/v4.19.76/source/fs/eventpoll.c
man 手册
NIO 看破也说破(一)—— Linux/IO 基础
关注我
如果您在微信阅读,请您点击链接 关注我 ,如果您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误
评论 (1 条评论)