读懂才会用 : 瞅瞅 Redis 的 epoll 模型

用户头像
小眼睛聊技术
关注
发布于: 2020 年 05 月 13 日
读懂才会用 : 瞅瞅Redis的epoll模型

上一篇提到了Redis采用epoll模型来提升链接处理能力。本文,我们从源代码的角度,简单理解Redis是如何使用epoll以及epoll的实现原理。浅入浅出~



通过本文了解如下三件事儿,就算是达到了本文目的:

1、epoll是Linux提供的系统实现,核心方法只有三个

2、epoll效率高,是因为基于红黑树、双向链表、事件回调机制

3、redis的IO多路复用,Linux上用epoll进行了实现



epoll是Linux内核提供的一种多路复用器,照例问问Linux的男人:

EPOLL(7) Linux Programmer's Manual EPOLL(7)
NAME
epoll - I/O event notification facility
SYNOPSIS
#include <sys/epoll.h>
DESCRIPTION
The epoll API performs a similar task to poll(2): monitoring multiple file
descriptors to see if I/O is possible on any of them. The epoll API can
be used either as an edge-triggered or a level-triggered interface and
scales well to large numbers of watched file descriptors. The following
system calls are provided to create and manage an epoll instance:
* epoll_create(2) creates an epoll instance and returns a file descriptor
referring to that instance. (The more recent epoll_create1(2) extends
the functionality of epoll_create(2).)
* Interest in particular file descriptors is then registered via
epoll_ctl(2). The set of file descriptors currently registered on an
epoll instance is sometimes called an epoll set.
* epoll_wait(2) waits for I/O events, blocking the calling thread if no
events are currently available.

核心方法



man告诉我们epoll的定义在sys/epoll.h中,查看核心函数有3个:(在线代码https://elixir.bootlin.com/linux/v4.19.76/source/fs/eventpoll.c

epoll_create

epoll_create(int size)

核心功能:

  1. 创建一个epoll文件描述符

  2. 创建eventpoll,其中包含红黑树cache和双向链表



参数size并不是限制了epoll所能监听的文件描述符最大个数,只是对内核初始分配内部数据结构的一个建议。在Linux 2.6.8后,size 参数被忽略,但是必须传一个比 0 大的数。

调用epoll_create后,会占用一个fd值。在Linux下可以查看/proc/$$/fd/ 文件描述符。使用完,需要调用close关闭。



eventpoll代码片段:

struct eventpoll {
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist;//就绪列表,采用双向链表
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;//红黑树,保存存活的fd
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->wq.lock.
*/
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif
};

epollctl

int epollctl(int epfd, int op, int fd, struct epollevent *event);

核心功能:

  1. 对指定描述符fd执行op的绑定操作

  2. 把fd写入红黑树,同时在内核注册回调函数



op操作类型,用三个宏EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD,来分别表示增删改对fd的监听。

epollwait

int epollwait(int epfd, struct epollevent *events, int maxevents, int timeout);

核心功能:

  1. 获取epfd上的io事件



参数events是就绪事件,用来得到想要获得的事件集合。maxevents表示的events有多大,maxevents的值必须大于0,参数timeout是超时时间。epollwait会阻塞,直到一个文件描述符触发了事件,或者被一个信号处理函数打断,或者timeout超时。返回值是需要处理的fd数量。

工作机制



  • 建立高速缓存(红黑树)和待读取列表(双向链表)

  • 对要监控的fd(一切都是fd,参考NIO 看破也说破(一)—— Linux/IO 基础),进行事件绑定。事件发生,通过callback放入待读取列表

  • 阻塞获取待读取列表



执行流程





优点



  1. epoll创建的红黑树保存所有fd,没有大小限制,且增删查的复杂度O(logN)

  2. 基于callback,利用系统内核触发感兴趣的事件

  3. 就绪列表为双线链表时间复杂度O(1)

  4. 应用获取到的fd都是真实发生IO的fd,与select 和 poll 需要不断轮询判断是否可用相比,能避免无用的内存拷贝



结合Redis代码

源码太多,我们只看和本文相关的模块

事件处理模块 ae.c/ae_epoll.c

网路链接库 anet.cnetworking.c

服务器端 server.c





创建事件管理器

server.c 的 L2702 initServer() 是redis server 的启动入口,

首先创建aeEventLoop对象,在L2743调用 aeCreateEventLoop(),初始化未就绪文件事件表、就绪文件事件表。events指针指向未就绪文件事件表、fired指针指向就绪文件事件表。

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}



ae_epoll.c L39 调用aeApiCreate 函数,首先创建了aeApiState对象,初始化了epoll就绪事件表;然后调用epoll_create创建了epoll实例,最后将该aeApiState赋值给apidata属性



static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}



绑定事件

aeFileEvent是文件事件结构,对于每一个具体的事件,都有读处理函数和写处理函数。Redis 调用aeCreateFileEvent函数针对不同的套接字的读写事件,注册对应的文件事件。

/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;//读
aeFileProc *wfileProc;//写
void *clientData;
} aeFileEvent;



server.c L2848 aeCreateFileEvent 创建文件事件,执行 ae_epoll.c L73 aeApiAddEvent

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

aeApiAddEvent 调用系统 epoll_ctl,注册事件



处理事件



server.c倒数第三行,调用aeMain方法

void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}

aeProcessEvents方法中针对事件和文件事件处理,在ae.c L433 调用 aeApiPoll,方法具体实现在ae_poll.c L108:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

调用epoll_wait阻塞等待epoll的事件就绪,超时时间就是之前根据最快达到时间事件计算而来的超时时间;然后将就绪的epoll事件转换到fired就绪事件。aeApiPoll就是上文所说的I/O多路复用程序。



结论

  • epoll_create 创建就绪列表

  • epoll_ctl绑定事件,事件发生时fd到就绪列表

  • epoll_wait读取就绪列表



参考:

redis6.0.1源码

https://elixir.bootlin.com/linux/v4.19.76/source/fs/eventpoll.c

man手册

NIO 看破也说破(一)—— Linux/IO 基础



关注我

如果您在微信阅读,请您点击链接 关注我 ,如果您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误





发布于: 2020 年 05 月 13 日 阅读数: 333
用户头像

小眼睛聊技术

关注

欢迎关注公众号“小眼睛聊技术” 2018.11.12 加入

互联网老兵,关注产品、技术、管理

评论 (1 条评论)

发布
暂无评论
读懂才会用 : 瞅瞅Redis的epoll模型