[toc]
前言
redis6已经发布了RC3版本,关于新特性可以查看https://www.cnblogs.com/Jackeyzhe/p/12094070.html
这篇文章主要探讨下IO多线程,redis版本为redis6.0-rc2
我们知道,redis因为是单进程的,所以可以不必考虑很多因素就可以保证其原子性.并且redis的应用场景使其IO消耗占比很重,所以只加入IO多线程会让redis的QPS性能加强并且不会增加命令实现的复杂性.
主流程
加入多线程 IO 之后,整体的读流程如下:
- 主线程负责接收处理AE事件,收到读事件后将其放入全局队列里.
- 主线程处理完读事件之后,通过轮询将这些连接分配给这些 IO 线程,然后主线程忙等待
- IO 线程将请求数据读取并解析完成
- 主线程执行所有命令并清空整个请求等待读处理队列(单线程执行)
redis的多线程IO只会同时存在于读或者写.不会有多线程同时处理读和写的情况.

详细流程解读
初始化

执行

源码分析
初始化
void InitServerLast() {
bioInit();
//初始化server最后阶段初始化多线程 IO
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
serverLog(LL_WARNING,"IO number is %d.", server.io_threads_num);
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
//初始化IO线程,i为线程编号
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
//每个线程创建一个list
io_threads_list[i] = listCreate();
if (i == 0) continue; /* 0号线程为主线程*/
/* Things we do only for the additional threads. */
pthread_t tid; //线程ID
pthread_mutex_init(&io_threads_mutex[i],NULL); //初始化线程锁
io_threads_pending[i] = 0; //每个线程待处理的client数量
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
//初始化线程,传入线程编号
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
//将线程ID与线程编号关联
io_threads[i] = tid;
}
}
void *IOThreadMain(void *myid) {
//传入的线程编号
long id = (unsigned long)myid;
while(1) {
//等待有分配的client后开始执行
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
//断言是否有分配的client
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
//将迭代器li的next指向io_threads_list[id]的头指针
listRewind(io_threads_list[id],&li);
//执行操作
while((ln = listNext(&li))) {
//获取对应的client
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {//全局操作为写
//向客户端写数据
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {//全局操作为读
//从客户端读数据
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
//清空clients 并将 pending置为0
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
执行
写操作
handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If we have just a few clients to serve, don't use I/O threads, but the
* boring synchronous code. */
//判断是否需要停止线程,判断条件为clients < 2* io_thread_num
if (stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
//如果线程没有启动,启动IO线程
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//client flag置为写
c->flags &= ~CLIENT_PENDING_WRITE;
//计算分配给哪个线程
int target_id = item_id % server.io_threads_num;
//将client加入待处理的list
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
//将操作位置为写
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
//计算待处理client的大小并设置
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
//主线程也会处理一部分IO
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
//等待IO线程完成处理
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
//返回处理的数量
return processed;
}
读操作
int handleClientsWithPendingReadsUsingThreads(void) {
//IO协程是否开启并且配置了io_threads_do_reads
if (!io_threads_active || !server.io_threads_do_reads) return 0;
//待处理的读事件
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//计算分配给哪个线程
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
//全局操作置为读
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
//主线程也会处理一部分IO
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
//等待IO线程完成处理
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
processCommandAndResetClient(c);
}
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
//返回处理的数量
return processed;
}
配置文件
redis开启事件需在redis.conf里开启下图两个选项并配置需要设置的线程数量
# IO线程的数量
io-threads 4
# 开启IO线程读
io-threads-do-reads yes
调试
配置文件配置配置的4线程,所以设置8个客户端调试,刚好达到最小触发值 ./redis-benchmark --threads 4 -t get -n 1000 -c 8
性能测试
环境说明
系统环境:虚拟机 CPU: 瑞龙R3 2200G 4核4线程 内存: 4G 系统:ubuntu 18.04
压测数据仅是简单的压测了get/set,生产环境更复杂,不过还是具有参考意义。
单线程压测数据
root@lb:/home/soft/redis-6.0/src\# ./redis-benchmark -t get,set -n 1000000 -r 1000000000 -c 256 -d 128
====== SET ======
1000000 requests completed in 18.75 seconds
256 parallel clients
128 bytes payload
keep alive: 1
host configuration "save": 900 1 300 10 60 10000
host configuration "appendonly": no
multi-thread: no
0.00% <= 0.1 milliseconds
0.00% <= 0.2 milliseconds
0.00% <= 0.3 milliseconds
0.01% <= 0.4 milliseconds
0.03% <= 0.5 milliseconds
0.09% <= 0.6 milliseconds
0.21% <= 0.7 milliseconds
0.41% <= 0.8 milliseconds
0.66% <= 0.9 milliseconds
0.96% <= 1.0 milliseconds
1.22% <= 1.1 milliseconds
1.46% <= 1.2 milliseconds
1.91% <= 1.3 milliseconds
2.51% <= 1.4 milliseconds
3.18% <= 1.5 milliseconds
3.97% <= 1.6 milliseconds
5.01% <= 1.7 milliseconds
6.40% <= 1.8 milliseconds
8.11% <= 1.9 milliseconds
10.15% <= 2 milliseconds
45.83% <= 3 milliseconds
70.66% <= 4 milliseconds
80.44% <= 5 milliseconds
85.91% <= 6 milliseconds
90.17% <= 7 milliseconds
93.79% <= 8 milliseconds
95.60% <= 9 milliseconds
96.87% <= 10 milliseconds
97.81% <= 11 milliseconds
98.44% <= 12 milliseconds
98.89% <= 13 milliseconds
99.23% <= 14 milliseconds
99.38% <= 15 milliseconds
99.47% <= 16 milliseconds
99.54% <= 17 milliseconds
99.60% <= 18 milliseconds
99.64% <= 19 milliseconds
99.69% <= 20 milliseconds
99.77% <= 21 milliseconds
99.83% <= 22 milliseconds
99.84% <= 23 milliseconds
99.86% <= 24 milliseconds
99.88% <= 25 milliseconds
99.92% <= 26 milliseconds
99.96% <= 27 milliseconds
99.96% <= 28 milliseconds
99.97% <= 29 milliseconds
99.97% <= 31 milliseconds
99.97% <= 33 milliseconds
99.97% <= 34 milliseconds
99.97% <= 36 milliseconds
99.98% <= 37 milliseconds
99.98% <= 42 milliseconds
99.99% <= 43 milliseconds
99.99% <= 44 milliseconds
100.00% <= 44 milliseconds
53330.49 requests per second
====== GET ======
1000000 requests completed in 17.63 seconds
256 parallel clients
128 bytes payload
keep alive: 1
host configuration "save": 900 1 300 10 60 10000
host configuration "appendonly": no
multi-thread: no
0.09% <= 1 milliseconds
11.84% <= 2 milliseconds
62.83% <= 3 milliseconds
80.03% <= 4 milliseconds
86.48% <= 5 milliseconds
89.92% <= 6 milliseconds
93.20% <= 7 milliseconds
95.65% <= 8 milliseconds
96.89% <= 9 milliseconds
97.61% <= 10 milliseconds
98.16% <= 11 milliseconds
98.56% <= 12 milliseconds
98.82% <= 13 milliseconds
98.98% <= 14 milliseconds
99.15% <= 15 milliseconds
99.27% <= 16 milliseconds
99.39% <= 17 milliseconds
99.54% <= 18 milliseconds
99.59% <= 19 milliseconds
99.68% <= 20 milliseconds
99.74% <= 21 milliseconds
99.77% <= 22 milliseconds
99.79% <= 23 milliseconds
99.79% <= 24 milliseconds
99.80% <= 25 milliseconds
99.81% <= 26 milliseconds
99.83% <= 27 milliseconds
99.83% <= 28 milliseconds
99.84% <= 29 milliseconds
99.84% <= 30 milliseconds
99.87% <= 31 milliseconds
99.88% <= 32 milliseconds
99.88% <= 33 milliseconds
99.89% <= 34 milliseconds
99.92% <= 35 milliseconds
99.94% <= 36 milliseconds
99.95% <= 37 milliseconds
99.95% <= 38 milliseconds
99.95% <= 39 milliseconds
99.96% <= 40 milliseconds
99.96% <= 45 milliseconds
99.97% <= 47 milliseconds
99.99% <= 48 milliseconds
99.99% <= 83 milliseconds
100.00% <= 84 milliseconds
100.00% <= 84 milliseconds
56708.63 requests per second
多线程压测数据
root@lb:/home/soft/redis-6.0/src\# ./redis-benchmark -t get,set -n 1000000 -r 1000000000 -c 256 -d 128 --threads 4
====== SET ======
1000000 requests completed in 10.55 seconds
256 parallel clients
128 bytes payload
keep alive: 1
host configuration "save": 900 1 300 10 60 10000
host configuration "appendonly": no
multi-thread: yes
threads: 4
0.00% <= 0.4 milliseconds
0.00% <= 0.5 milliseconds
0.00% <= 0.6 milliseconds
0.01% <= 0.7 milliseconds
0.01% <= 0.8 milliseconds
0.03% <= 0.9 milliseconds
0.06% <= 1.0 milliseconds
0.11% <= 1.1 milliseconds
0.21% <= 1.2 milliseconds
0.39% <= 1.3 milliseconds
0.83% <= 1.4 milliseconds
1.74% <= 1.5 milliseconds
3.55% <= 1.6 milliseconds
7.38% <= 1.7 milliseconds
14.38% <= 1.8 milliseconds
23.57% <= 1.9 milliseconds
32.88% <= 2 milliseconds
86.45% <= 3 milliseconds
92.56% <= 4 milliseconds
95.12% <= 5 milliseconds
96.48% <= 6 milliseconds
97.35% <= 7 milliseconds
98.15% <= 8 milliseconds
98.72% <= 9 milliseconds
98.97% <= 10 milliseconds
99.17% <= 11 milliseconds
99.35% <= 12 milliseconds
99.50% <= 13 milliseconds
99.58% <= 14 milliseconds
99.63% <= 15 milliseconds
99.65% <= 16 milliseconds
99.70% <= 17 milliseconds
99.82% <= 18 milliseconds
99.84% <= 19 milliseconds
99.87% <= 20 milliseconds
99.88% <= 21 milliseconds
99.89% <= 22 milliseconds
99.90% <= 23 milliseconds
99.91% <= 24 milliseconds
99.91% <= 25 milliseconds
99.92% <= 26 milliseconds
99.92% <= 27 milliseconds
99.92% <= 28 milliseconds
99.93% <= 29 milliseconds
99.94% <= 30 milliseconds
99.95% <= 31 milliseconds
99.95% <= 32 milliseconds
99.96% <= 33 milliseconds
99.97% <= 34 milliseconds
100.00% <= 35 milliseconds
100.00% <= 35 milliseconds
94768.77 requests per second
====== GET ======
1000000 requests completed in 9.26 seconds
256 parallel clients
128 bytes payload
keep alive: 1
host configuration "save": 900 1 300 10 60 10000
host configuration "appendonly": no
multi-thread: yes
threads: 4
0.15% <= 1 milliseconds
64.43% <= 2 milliseconds
91.21% <= 3 milliseconds
94.42% <= 4 milliseconds
96.18% <= 5 milliseconds
97.39% <= 6 milliseconds
97.92% <= 7 milliseconds
98.53% <= 8 milliseconds
98.77% <= 9 milliseconds
98.98% <= 10 milliseconds
99.22% <= 11 milliseconds
99.42% <= 12 milliseconds
99.57% <= 13 milliseconds
99.69% <= 14 milliseconds
99.76% <= 15 milliseconds
99.84% <= 16 milliseconds
99.86% <= 17 milliseconds
99.88% <= 18 milliseconds
99.88% <= 19 milliseconds
99.89% <= 20 milliseconds
99.89% <= 21 milliseconds
99.90% <= 22 milliseconds
99.90% <= 23 milliseconds
99.90% <= 24 milliseconds
99.90% <= 25 milliseconds
99.90% <= 26 milliseconds
99.91% <= 29 milliseconds
99.92% <= 30 milliseconds
99.95% <= 31 milliseconds
99.96% <= 32 milliseconds
99.96% <= 34 milliseconds
99.96% <= 35 milliseconds
99.97% <= 37 milliseconds
99.97% <= 43 milliseconds
99.97% <= 44 milliseconds
99.97% <= 45 milliseconds
99.97% <= 46 milliseconds
99.97% <= 47 milliseconds
99.97% <= 50 milliseconds
99.97% <= 51 milliseconds
99.99% <= 67 milliseconds
99.99% <= 68 milliseconds
99.99% <= 78 milliseconds
99.99% <= 79 milliseconds
99.99% <= 94 milliseconds
100.00% <= 95 milliseconds
100.00% <= 95 milliseconds
107979.70 requests per second
图表展示
SET

GET
