写点什么

Redis6.0 多线程源码分析

用户头像
代码诗人
关注
发布于: 2020 年 05 月 20 日
Redis6.0 多线程源码分析

1、概述

redis6.0 新增了 多线程机制。一直存在两个疑问:


1)、 为什么要加入多线程

2)、多线程机制,为什么不设计成跟 memcache 一样


2、解析


首先第一个问题,redis 区别于 memcache 的一点是,redis 是单线程。但是性能也很好,主要得益于他的 IO 多路复用,还有单线程省了多线程切换上下文的开销。线程上下文切换,可能会造成 CPU CACHE MISS, CPU 的一级缓存和三级缓存的 速度不是一个量级的。这次 redis6.0 开启了 IO 多线程的机制,主要是考虑,redis 的读写网络的 read/write 系统调用,在 redis 执行期间占用了大量的 CPU 时间。如果把网络 IO 做成多线程会提升很大性能。另一方面,如果是比较耗时的操作(比如 lrange,del 大 key),避免阻塞主线程太多时间,开线线程异步处理 不失为一个好的选择。

第二个问题,redis 选择多线程模型的时候,为什么不选择跟 memcache 一样的?redis 的作者 Antirez 曾经说过:

Redis 支持多线程有 2 种可行的方式:

第一种就是像“Memcached”那样,一个 Redis 实例开启多个线程,从而提升 GET/SET 等简单命令中每秒可以执行的操作。这涉及到 I/O、命令解析等多线程处理,因此,我们将其称之为“I/O threading”。

另一种就是允许在不同的线程中执行较耗时较慢的命令,以确保其它客户端不被阻塞,我们将这种线程模型称为“Slow commands threading”。

经过深思熟虑,Redis 不会采用“I/O threading”,Redis 在运行时主要受制于网络和内存,所以提升 Redis 性能主要是通过在多个 Redis 实例,特别是 Redis 集群。

接下来我们主要会考虑改进两个方面

  • Redis 集群的多个实例通过编排能够合理地使用本地实例的磁盘,避免同时重写 AOF。

  • 提供一个 Redis 集群代理,便于用户在没有较好的集群协议客户端时抽象出一个集群。

补充说明一下,Redis 和 Memcached 一样是一个内存系统,但不同于 Memcached。

多线程是复杂的,必须考虑使用简单的数据模型,执行 LPUSH 的线程需要服务其他执行 LPOP 的线程。

我真正期望的实际是“slow operations threading”,在 Redis 6 或 Redis 7 中,将提供“key-level locking”,使得线程可以完全获得对键的控制以处理缓慢的操作

简单的说就是,redis 目前性能还是受限于网络和内存。所以采用多线程来处理网络 IO 读写和解析,执行命令依然是交给主线程来做。多线程的并发控制又是复杂的(比如事务,LPUSH/LPOP 的并发控制),为了保持 redis 简单的设计原则,采用 IO 多线程是比较合适的。

3、 源码剖析

3.1、关键变量

io_threads_active:是否开始 IO 多线程

io_threads_op :操作类型 读写 IO_THREADS_OP_WRITE or IO_THREADS_OP_READ

io_threads_list[IO_THREADS_MAX_NUM] :每个线程的 等待客户端列表

io_threads_pending:每个线程的 等待客户端个数

3.1、初始化线程,注册回调

void initThreadedIO(void) {    io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); }
/* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); io_threads_pending[i] = 0; pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; }}
复制代码


第 25 行,调用 pthread_create 库函数创建线程,并且注册线程回调函数 IOThreadMain。线程 TID 绑定线程 ID io_threads[i] = tid

3.2、回调函数

void *IOThreadMain(void *myid) {    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is     * used by the thread to just manipulate a single sub-array of clients. */    long id = (unsigned long)myid;    char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname);
while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (io_threads_pending[id] != 0) break; }
/* Give the main thread a chance to stop this thread. */ if (io_threads_pending[id] == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; }
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id); }}
复制代码


第 12 行其实是一个循环等待的实现,这里不用 sleep,是为了避免设置 sleep 的时间不合适造成性能的损失, 但是循环等待 也会 占用 CPU,也是一个开销。


第 17 行其实是允许主线程来关闭其他线程的操作,只要将某个线程 i 的 io_threads_pending[id] 设置为 0


第 32 行是将当前线程等待队列里所有的请求 client,依次取出处理, 读操作通过 readQueryFromClient 处理, 写操作通过 writeToClient 处理。

3.3、分配待处理任务

int handleClientsWithPendingWritesUsingThreads(void) {    int processed = listLength(server.clients_pending_write);    if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't * use I/O threads, but thejboring synchronous code. */ if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); }
/* Start threads if needed. */ if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_write,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; }
/* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; }
/* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where * needed. */ listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some * of the clients. */ if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) { freeClientAsync(c); } } listEmpty(server.clients_pending_write); return processed;}
复制代码


上面函数的主要功能是将 IO 请求,分配给不同的 IO 线程去处理。


第 24 行可以看出,分配策略是 RR,int target_id = item_id % server.io_threads_num; 是用 id 做的 hash 取模。


第 25 行是选出 IO 线程后,将请求 client,加入到他的队列尾部。


第 49 行是更新标识,记录各个线程队列的长度,来通知回调函数执行。


第 50 行,是等所有线程都执行完成后,再回归主线程。


第 56 行,是主线程执行接线来的命令操作。

发布于: 2020 年 05 月 20 日阅读数: 323
用户头像

代码诗人

关注

文艺程序员 2019.08.30 加入

优雅代码诗

评论

发布
暂无评论
Redis6.0 多线程源码分析