写点什么

异步 I/O -- posix aio 从入门到放弃的吐血实践

用户头像
1412
关注
发布于: 2021 年 01 月 11 日

原文写于 2020.03.09。一句话总结:如果是在 MacOS 下,请你放弃 posix aio。本篇都是本人的吐血实践。 




越写发现自己懂的越少,上两篇写队列,其实只是一些比较粗浅(当然也是最常见)的实现,正好想起以前看过的一篇架构文章《异步 I/O 与 lock-free》,正好是最近我在积累的范围,有些文章和别人的研究成果还需要多看看。 


这篇需要先从另一个切入点异步 I/O 开始,是因为这周的工作与 aio 相关。自己写 blog 是为了实践和积累知识体系,所以文章目标更多肯定不是介绍,而是展示一下我试过之后的一些性能、以及这些东西的联系(有幸工作内容有这样的机会去接触)。所以前几篇的联系可以后续找一篇打通一下,毕竟大家需要消息队列也是为了满足异步而服务的。 


本周 demo 把几种方式的代码都放一起了,不太美观仅供参考: 


https://github.com/holmes1412/demo/blob/master/aio_demo.c


一、异步 I/O 的好几种方式


最开始接触到异步 I/O 是在 nginx,作为高并发读取静态网页,读文件部分脚趾头想想都知道是异步,当时没在意具体是怎么做的,最近我们框架也在改 mac 下能使用的 aio,所以重新看了下。 


自己没什么资格介绍知识点,单纯罗列一下几种我们开发时会用到的模型: 


linux 上的两套: 


posix aio :今天重点介绍对象

native aio :性能更好,但是要求打开文件必须是 O_DIRECT


以及 


  • windows 的 iocp


参考 nginx,poxis_aio 和 native_aio 都分别实现了对应的模块,应该还有 iocp 中的使用,不过 ngx_iocp_module.h 没在框架里,应该属于用户需要再单独下载编进去吧。 


二、纵向介绍 posix aio 的流程


https://www.freebsd.org/cgi/man.cgi?query=aio&sektion=4&apropos=0&manpath=FreeBSD+12.1-RELEASE+and+Ports


因为这周在学怎么用 kqueue,所以今天的主角是 posix aio。这部分先简单介绍使用流程。但是这不是我这周的心血所在,所以大家可以选择性跳过这 part。 


1、构造一个 aiocb,memset 做初始化,设置好 aiocb 的几个值,包括要读哪个 fd、读到哪个 buffer 中、buffer 大小与起始位置:


    char buf[BUFFER_MAX];    struct aiocb my;
memset(&my, 0, sizeof(struct aiocb)); int fd = open("pipe1", O_RDONLY);
my.aio_fildes = fd; my.aio_buf = buf; my.aio_nbytes = BUFFER_MAX; my.aio_offset = 0; // start offset
复制代码


2、把这个 aiocb 挂载到对应设备的队列上等待,如果失败,errno 会告诉你为什么失败。


    ret = aio_read(&my);    if (ret != 0)        fprintf(stderr, "aio_read() failed. errno = %d\n", errno);
复制代码


3、查询是否读完,比如同步等每秒看看,可以用这个接口:


    while (aio_error(&my) == EINPROGRESS)        sleep(1);
复制代码


4、取出上下文:


    fprintf(stderr, "%s\n", (char*)(my.aio_buf));
复制代码


三、横向介绍 posix aio 的几种通知用法


这部分是我这周工作的重点,几种异步用法,想让消息异步回来,主要有三种方式:


  • SIGEV_SIGNO


注册信号和信号处理函数:这样就要考虑整个进程的信号处理体系都可能需要改动


  • SIGEV_THREAD


注册线程属性与线程处理函数,等 io 回来了系统会根据这个线程属性创建一个线程去调用注册的回调函数


  • SIGEV_KEVENT


借用 kqueue,让 kqueue 帮你接管 IO 完成的消息


  • 另外:SIGEV_THREAD_ID


貌似还有一种 SIGEV_THREAD_ID,大家都不用,这里我也没试了。


这几种用法配置在哪里呢?可以看看 sys/aio.h


struct aiocb {    int              aio_fildes;             /* File descriptor */    off_t            aio_offset;             /* File offset */    volatile void   *aio_buf;                /* Location of buffer */    size_t           aio_nbytes;             /* Length of transfer */    int              aio_reqprio;            /* Request priority offset */    struct           sigevent aio_sigevent;  /* Signal number and value */    int              aio_lio_opcode;         /* Operation to be performed */};
复制代码


以上我们刚才用到的 aiocb,在这边有个 sigevent,看一下 FreeBSD 的 man,不同的通知机制有不同的配法:



1、信号通知


// 1. 设置信号与捕捉函数    struct sigaction sa;     sigemptyset(&sa.sa_mask);    sa.sa_flags = SA_SIGINFO;    sa.sa_sigaction = signal_handler;    sigaction(IO_SIGNAL, &sa, NULL);
// 2. 填sigevent指定为信号方式 my.aio_sigevent.sigev_notify = SIGEV_SIGNAL; my.aio_sigevent.sigev_signo = IO_SIGNAL; my.aio_sigevent.sigev_value.sival_ptr = &;
复制代码


2、线程回调


// 1. 填sigevent指定为线程通知方式,pthread_attr_t可以用默认    my.aio_sigevent.sigev_notify = SIGEV_THREAD;    my.aio_sigevent.sigev_notify_function = thread_handler;    my.aio_sigevent.sigev_value.sival_ptr = &my;
// 2. 上述的aio_read挂到设备队列上 ret = aio_read(&my); if (ret < 0) { fprintf(stderr, "aio_read() errno=%d\n", errno); return; }
// 3. 进入等待,io读完系统会按照刚才传入的pthread_attr_t创建一个线程调用你的回调 ret = aio_suspend(aio_list, 1, NULL); if (ret < 0) { fprintf(stderr, "aio_suspend() errno=%d\n", errno); return; }
复制代码


3. 借助 kqueue 做事件通知


以下是 nginx 的写法,我照着写,但是!并!没!有!跑!过!等下告诉你们为什么。


先介绍一下,使用 kqueue 的话,kevent 回来之后,filter 是 EVFILT_AIO,我们就可以知道这是个 aio 事件了。kqueue 的 EVFILT_AIO 是只能水平触发的。


// 1. 定义好我的回调函数指针typedef void (*aio_handler_pt)(void *ctx);
// 2. 封装我的上下文struct kqueue_ctx{ struct aiocb* my; aio_handler_pt handler;};
// 3. 传入刚才创建好的aiocbvoid test_kqueue(struct aiocb* my){ // 4. 创建一个kqueue int kqfd = kqueue(); if (kqfd == -1) { fprintf(stderr, "open kqueue() failed. error = %d\n", errno); return; }
// 5. 封装我的上下文 struct kqueue_ctx ctx; ctx.my = my; ctx.handler = kqueue_aio_handler;
// 6. 设置sigevent为使用kqueue的方式 my->aio_sigevent.sigev_notify_kqueue = kqfd; my->aio_sigevent.sigev_notify = SIGEV_KEVENT; my->aio_sigevent.sigev_value.sigval_ptr = &ctx;
// 7. 挂到设备的aio队列上 int ret = aio_read(my); if (ret < 0) { fprintf(stderr,"aio_read() ret = %d errno = %d\n", ret, errno); return; }}
// 8. 假设我们在另一个线程里wait。nginx的做法是挂完直接先调用一下回调函数看看读完没有,省得立刻读完了还切一次线程。void wait_handler(){ struct kqueue_ctx *back_ctx;
struct kevent out[KEVENT_MAX]; while (1) { // 9. 日常等kqueue fprintf(stderr, "start to wait.\n"); ret = kevent(kqfd, NULL, KEVENT_MAX, out, (int)KEVENT_MAX, NULL);
if (ret < 0) return;
if (ret == 0) sleep(1);
for (int i = 0; i < ret; i++) { // 10. kqueue事件回来之后,通过filter判断 switch (out[i].filter) { case EVFILT_AIO: fprintf(stderr, "aio back\n"); break; default: fprintf(stderr, "not possible here\n"); break; } // 11. 拿出上下文,如果是aio那么这个handler就是我们刚才配的回调函数 back_ctx = (struct kqueue_ctx *) out[i].udata; back_ctx->handler(back_ctx); } }}
// 12. 康康我们的回调函数static void kqueue_aio_handler(void *ctx){ int ret; struct kqueue_ctx *back_ctx = (struct kqueue_ctx *)ctx; struct aiocb *my = (struct aiocb *)back_ctx->my;
fprintf(stderr, "kqueue_aio_handler()\n");
if (aio_error(my) == EINPROGRESS) { // 13. 表示没有准备好 fprintf(stderr, "aio_error = %d errno = %d\n", aio_error(my), errno); return; }
// 14. 检查数据正确返回,这里的错误处理都应该根据实际需要自行修改后续逻辑 ret = aio_return(my); if (ret < 0) { fprintf(stderr, "aio_return = %d errno = %d\n", ret, errno); return; }
// 15. 取出数据 fprintf(stderr, "%s\n", (char *)(my->aio_buf));}
复制代码


四、MacOS 下几种异步方式的吐血经验


以上代码写完之后,信号方式和线程方式在 linux 下都是可以正常跑的,然而当我很开心地在 MacOS 下编译却发现全军覆没,以下是吐血经验总结: 


  1. MacOS 的信号方式不能传上下文,即我可以通过 pipe 来检测成功捕捉信号,但是上下文获取不到,errno 为 22,Invalid arguement;

  2. 线程方式直接在 aio_read(&my);的时候就跪了,看了下错误码是 35 ENOSYS, 仔细查了下,老大也翻遍了 google 的文档,最终在一个 pdf 确认确实是系统没实现;

  3. kqueue 方式 nginx 虽然有,但是 MacOS 下直接编译错误。我们刚才看到 FreeBSD 文档中 sigevent 的介绍,结构体中支持 sigev_notify_kqueue,但是看了下苹果机器上的 usr/include/sys/signal.h,发现是长这样的:



而我的编译错误,正是告诉我 sigevent 中没有 sigev_notify_kqueue 这个值。也就是说 FreeBSD 新点的版本或许支持 kqueue 响应 aio 事件,但是 MacOS 并没有。 


这真的是一个比一个惨。但是至少我们可以用 aio_suspend( ),测试了一下,只要我们管理线程池,aio_suspend( )之后就可以不耗费 cpu 等着了,gdb 可以看到一旦需要进行 aio,内部会创建一个用户态线程,进行 io 操作,读完了回到我 suspend 的地方。这是目前最实际的 posix_aio 的回调方式。这个线程会立刻创建立刻销毁,所以如果可以的话,自己实现线程池去做模拟 aio 其实应该更快。 


五、小结


总结下调研这些系统底层 api 的方法,基本属于看 man->找源码解析(如果有的话)-> 看头文件-> 自己尝试不同的场景(基本接口使用、调用的现象是否符合预期、性能等是否满足使用的需求)-> 最后就是思考怎么融合到当前的模块里,必须是符合逻辑的顺畅的设计才是最好对。 


这周感慨于自己写 demo 太慢了,有很多尝试必须是知道有这么回事才能去查到位,或者想到怎么去试。老大的说法是:“这个以前书上看过,从头看到尾。”(谢爷真是个容易进语录的人= =)嗯,更不用说百科全书科爷了,虽然自己跟前辈们没法比,但是也选了本闲书,最近 100 天从头看到尾吧。

发布于: 2021 年 01 月 11 日阅读数: 23
用户头像

1412

关注

鶸鶸的架构师 2018.08.07 加入

专注于异步调度框架开发和分布式存储技术,开源框架Workflow和srpc的作者之一。

评论

发布
暂无评论
异步I/O -- posix aio 从入门到放弃的吐血实践