一、协程的由来
从 IO 同步和异步的优缺点分析如下:
IO 同步优点就是 sockfd 管理方便,操作逻辑清晰;缺点是程序依赖 epoll_wait 的循环响应速度,程序性能差。
IO 异步优点就是子模块好规划,程序性能高;缺点就是逻辑理解有点难度,还会出现多个线程共用一个 sockfd,此时需要避免在 IO 操作时,出现 sockfd 出现关闭或其它异常。
有没有一种方式,同步的方式实现了异步的性能呢?那就是下文所说的协程。
二、协程切换的核心
协程切换核心就是 yield(让出)与 resume(恢复)来实现协程上下文切换,实现有以下 3 种方法。
(1)longjmp 和 setjmp
(2)ucontext
(3)汇编实现跳转
本文使用第三种汇编实现,yied = switch(a,b),resume = switch(b,a),根据不同的处理器的汇编指令实现 switch 的操作,比如 x64_86 如下。
_asm__(
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
" movq %rsp, 0(%rsi) # 从rsp存到rsi寄存器 \n"
" movq %rbp, 8(%rsi) # 移动8个字节,一个指针是8个字节 \n"
" movq (%rsp), %rax # save insn_pointer \n"
" movq %rax, 16(%rsi) \n"
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"
" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 # restore rbx,r12-r15 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n"
" movq %rax, (%rsp) \n"
" ret \n"
);
//64位系统,一个指针是8个字节
复制代码
x86 _64 的寄存器有 16 个 64 位寄存器,分别是 :%rax, %rbx,%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9,%r10, %r11, %r12, %r13, %r14, %r15 ,%rax。
(1)%rax 作为函数返回值使用的
(2)%rsp 栈指针寄存器 指向栈顶
(3)%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函数参数 依次对应第 1 参数 第 2 参数。。。
(4)%rbx, %rbp, %r12, %r13, %r14, %r15 用作数据存储
协程上下文切换,就是将 CPU 的寄存器暂时保存,再将即将运行的协程的上下文寄存器分别 mov 到 cpu 对应的寄存器上。
三、回调协程的子过程
协程的上下文结构体
typedef struct _nty_cpu_ctx {
void *esp; //栈指针指向-->stack
void *ebp;
void *eip;//指向回调函数入口
void *edi;//参数
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
复制代码
cpu 中有一个非常重要的寄存器 eip,用来存储 cpu 运行的下一条指令地址,可以把回调函数的地址存储到 eip。
四、协程定义
一个协程核心结构体如下
typedef struct _nty_coroutine {
nty_cpu_ctx ctx;
proc_coroutine func;
void *arg;
size_t stack_size;
nty_coroutine_status status;
nty_schedule *sched;
uint64_t birth;
uint64_t id;
void *stack;
RB_ENTRY(_nty_coroutine) sleep_node;
RB_ENTRY(_nty_coroutine) wait_node;
TAILQ_ENTRY(_nty_coroutine) ready_next;
TAILQ_ENTRY(_nty_coroutine) defer_next;
} nty_coroutine;
复制代码
(1)context,上下文,切换用的
(2)stack,每个协程的栈,协程内部用来做函数压栈
(3)size,协程栈的大小
(4)func,协程入口函数
(5)arg,入口函数的参数
(6)wait(等待集),等待 IO 就绪,等待集合采用红黑树存储
(7)sleep(睡眠树),采用红黑树存储<key,value>,按睡眠时间进行排序,key 为睡眠时长,value 为协程节点
(8)ready(就绪集合),采用队列 ready_queue 存储
(9)status 状态
协程有 3 种状态:就绪、睡眠、等待;新创建的协程,创建完成后,加入就绪集合,等待调度器的调度;协程在运行完成后,进行 IO 操作,此时 IO 并未准备好,进入等待状态集合;IO 准备就绪,协程开始运行,后续进行 sleep 操作,此时进入到睡眠状态集合。
推荐一个纯 C 语言|实现协程框架,底层原理与性能分析视频:
C++架构师免费学习地址:C/C++Linux服务器开发高级架构师/C++后台开发架构师
协程的实现与原理剖析丨掌握协程的运用丨协程案例分析丨实例讲解(上)
协程的实现与原理剖析丨掌握协程的运用丨协程案例分析丨实例讲解(下)
以下文档资料获取添加:Q群:720209036 点击加入~ 群文件共享
五、调度器实现
调度器主要实现协程的切换,当 IO 准备就绪时,切换到该 IO 对应的协程,调度器的结构体如下。
typedef struct _nty_coroutine_queue nty_coroutine_queue;
typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;
typedef struct _nty_schedule {
uint64_t birth; nty_cpu_ctx ctx;
struct _nty_coroutine *curr_thread;
int page_size;
int poller_fd;
int eventfd;
struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
int nevents;
int num_new_events;
nty_coroutine_queue ready;
nty_coroutine_rbtree_sleep sleeping;
nty_coroutine_rbtree_wait waiting;
} nty_schedule;
复制代码
调度器从 3 部分来得到就绪 IO 的协程:就绪集合、睡眠集合、等待集合,代码如下。
void nty_schedule_run(void) {
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL) return ;
while (!nty_schedule_isdone(sched)) {
// 1. expired --> sleep rbtree 睡眠等待时间
nty_coroutine *expired = NULL;
while ((expired = nty_schedule_expired(sched)) != NULL) {
nty_coroutine_resume(expired);//那些时间到期了,恢复协程的运行
}
// 2. ready queue 就绪队列
nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);
while (!TAILQ_EMPTY(&sched->ready)) {
//从就绪队列拿出第一个节点
nty_coroutine *co = TAILQ_FIRST(&sched->ready);
TAILQ_REMOVE(&co->sched->ready, co, ready_next);
if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {
nty_coroutine_free(co);
break;
}
nty_coroutine_resume(co);//恢复协程的运行
if (co == last_co_ready) break;
}
// 3. wait rbtree IO等待 其他协程让出后,回到调度器这里
//调度器处理IO等待
nty_schedule_epoll(sched);//调用epoll_wait,监听就绪IO,sched->num_new_events就是IO事件数量
while (sched->num_new_events) {
int idx = --sched->num_new_events;
struct epoll_event *ev = sched->eventlist+idx;
int fd = ev->data.fd;
int is_eof = ev->events & EPOLLHUP;
if (is_eof) errno = ECONNRESET;
nty_coroutine *co = nty_schedule_search_wait(fd);//通过fd,从红黑树中获取对应的coroutine
if (co != NULL) {
if (is_eof) {
co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);
}
nty_coroutine_resume(co);//恢复,返回到协程
的运行
}
is_eof = 0;
}
}
nty_schedule_free(sched);
return ;
}
复制代码
fd 如何知道就绪?
创建协程时,把 fd 添加到 epoll 进行管理,然后 yied 让出给调度器,由调度器 resume 到 IO 就绪的协程。其实调度器通过 epoll_wait()监听 IO 是否就绪,得到就绪的 fd,通过 fd 从红黑树中获取对应的协程,再通过 resume()回到该就绪 fd 对应的协程,该协程继续执行 accept/recv/send 等阻塞 API。
六、协程的接口
协程接口分为两部分
(1)协程本身的 API
创建协程:int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg); 运行调度器:void nty_schedule_run(void);
(2)posix 的 API
对于需要等待 IO 就绪的的网络 IO 的操作函数需重新封装使用,而不是直接使用系统提供的,对于不需等待就绪的,可以不进行封装,需要的有:accept,connect,send,write,sendto,recv,read,recvfrom;不需要:socket,close,fcntl,setsocketopt,getsocketopt,listen。
七、多核问题
(1)多进程
每一个进程亲和一个 cpu,通过 sched_setaffinity 函数设置亲和力
(2)多线程
需要对调度器进行加锁
sleep_rbtree 中取出超时的节点,进行加锁 mutex
ready_queue 中取出就绪的节点,进行 springlock
wait_rbtree 中获取就绪 IO 可以使用 mutex
(3)x86 指令,未实现。
八、hook 钩子
这里为啥要介绍 hook 呢?因为使用协程时要把 posix 的 API 重新进行封装,所以可以使用 hook 劫持 posix 的 API 封装成自己的函数,hook 钩子函数可以劫持两类函数:
(1)系统的函数,使用 dlsym();
(2)第三方的库函数,使用 dlopen();
初始化 hook 后,就把系统的函数劫获,运行时执行的是自己定义的函数,而不是系统的函数(类似重定向),特别注意的是,其他的应用程序调用系统函数,不会执行当前应用程序对应定义 hook 函数,因为当前的应用程序调用系统函数时,只执行对应定义的函数,这个只限于当前应用程序。
例子 1:协程+mysql,不去修改 mysql-dev,使用 hook 来重新定义 connect、read、recv、send、write 等函数;
例子 2:hook 来劫持 malloc 和 free 检查内存泄露;
例子 3:nginx 运行在 dpdk 也是使用 hook 的方法;
评论