[toc]

前言

redis6已经发布了RC3版本,关于新特性可以查看https://www.cnblogs.com/Jackeyzhe/p/12094070.html
这篇文章主要探讨下IO多线程,redis版本为redis6.0-rc2

我们知道,redis因为是单进程的,所以可以不必考虑很多因素就可以保证其原子性.并且redis的应用场景使其IO消耗占比很重,所以只加入IO多线程会让redis的QPS性能加强并且不会增加命令实现的复杂性.

主流程

加入多线程 IO 之后,整体的读流程如下:

  • 主线程负责接收处理AE事件,收到读事件后将其放入全局队列里.
  • 主线程处理完读事件之后,通过轮询将这些连接分配给这些 IO 线程,然后主线程忙等待
  • IO 线程将请求数据读取并解析完成
  • 主线程执行所有命令并清空整个请求等待读处理队列(单线程执行)

redis的多线程IO只会同时存在于读或者写.不会有多线程同时处理读和写的情况.

详细流程解读

初始化

执行

源码分析

初始化

void InitServerLast() {
    bioInit();
    //初始化server最后阶段初始化多线程 IO
    initThreadedIO();  
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}
void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */

    serverLog(LL_WARNING,"IO  number is %d.", server.io_threads_num);
    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    //初始化IO线程,i为线程编号
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
       //每个线程创建一个list
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* 0号线程为主线程*/

        /* Things we do only for the additional threads. */
        pthread_t tid;    //线程ID
        pthread_mutex_init(&io_threads_mutex[i],NULL);    //初始化线程锁
        io_threads_pending[i] = 0;    //每个线程待处理的client数量
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */

        //初始化线程,传入线程编号
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        //将线程ID与线程编号关联
        io_threads[i] = tid;
    }
}
void *IOThreadMain(void *myid) {
    //传入的线程编号
    long id = (unsigned long)myid;

    while(1) {
        //等待有分配的client后开始执行
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        //断言是否有分配的client
        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        //将迭代器li的next指向io_threads_list[id]的头指针
        listRewind(io_threads_list[id],&li);
        //执行操作
        while((ln = listNext(&li))) {
            //获取对应的client
            client *c = listNodeValue(ln);

            if (io_threads_op == IO_THREADS_OP_WRITE) {//全局操作为写
                //向客户端写数据
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {//全局操作为读
                //从客户端读数据
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        //清空clients 并将 pending置为0
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

执行

写操作

handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    /* If we have just a few clients to serve, don't use I/O threads, but the
     * boring synchronous code. */
    //判断是否需要停止线程,判断条件为clients < 2* io_thread_num
    if (stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
   //如果线程没有启动,启动IO线程
    if (!io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        //client flag置为写
        c->flags &= ~CLIENT_PENDING_WRITE;
        //计算分配给哪个线程
        int target_id = item_id % server.io_threads_num;
        //将client加入待处理的list
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    //将操作位置为写
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        //计算待处理client的大小并设置
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    /* Also use the main thread to process a slice of clients. */
    //主线程也会处理一部分IO
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

   //等待IO线程完成处理
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    //返回处理的数量
    return processed;
}

读操作

int handleClientsWithPendingReadsUsingThreads(void) {
    //IO协程是否开启并且配置了io_threads_do_reads
    if (!io_threads_active || !server.io_threads_do_reads) return 0;
    //待处理的读事件
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        //计算分配给哪个线程
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    //全局操作置为读
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    /* Also use the main thread to process a slice of clients. */
    //主线程也会处理一部分IO
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    //等待IO线程完成处理
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    /* Run the list of clients again to process the new buffers. */
    listRewind(server.clients_pending_read,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~ CLIENT_PENDING_COMMAND;
            processCommandAndResetClient(c);
        }
        processInputBufferAndReplicate(c);
    }
    listEmpty(server.clients_pending_read);
    //返回处理的数量
    return processed;
}

配置文件

redis开启事件需在redis.conf里开启下图两个选项并配置需要设置的线程数量

# IO线程的数量
io-threads 4

# 开启IO线程读
io-threads-do-reads yes

调试

配置文件配置配置的4线程,所以设置8个客户端调试,刚好达到最小触发值 ./redis-benchmark --threads 4 -t get -n 1000 -c 8

性能测试

环境说明

系统环境:虚拟机 CPU: 瑞龙R3 2200G 4核4线程 内存: 4G 系统:ubuntu 18.04

压测数据仅是简单的压测了get/set,生产环境更复杂,不过还是具有参考意义。

单线程压测数据


root@lb:/home/soft/redis-6.0/src\# ./redis-benchmark -t get,set -n 1000000 -r 1000000000 -c 256 -d 128

====== SET ======

1000000 requests completed in 18.75 seconds

256 parallel clients

128 bytes payload

keep alive: 1

host configuration "save": 900 1 300 10 60 10000

host configuration "appendonly": no

multi-thread: no

0.00% &lt;= 0.1 milliseconds

0.00% &lt;= 0.2 milliseconds

0.00% &lt;= 0.3 milliseconds

0.01% &lt;= 0.4 milliseconds

0.03% &lt;= 0.5 milliseconds

0.09% &lt;= 0.6 milliseconds

0.21% &lt;= 0.7 milliseconds

0.41% &lt;= 0.8 milliseconds

0.66% &lt;= 0.9 milliseconds

0.96% &lt;= 1.0 milliseconds

1.22% &lt;= 1.1 milliseconds

1.46% &lt;= 1.2 milliseconds

1.91% &lt;= 1.3 milliseconds

2.51% &lt;= 1.4 milliseconds

3.18% &lt;= 1.5 milliseconds

3.97% &lt;= 1.6 milliseconds

5.01% &lt;= 1.7 milliseconds

6.40% &lt;= 1.8 milliseconds

8.11% &lt;= 1.9 milliseconds

10.15% &lt;= 2 milliseconds

45.83% &lt;= 3 milliseconds

70.66% &lt;= 4 milliseconds

80.44% &lt;= 5 milliseconds

85.91% &lt;= 6 milliseconds

90.17% &lt;= 7 milliseconds

93.79% &lt;= 8 milliseconds

95.60% &lt;= 9 milliseconds

96.87% &lt;= 10 milliseconds

97.81% &lt;= 11 milliseconds

98.44% &lt;= 12 milliseconds

98.89% &lt;= 13 milliseconds

99.23% &lt;= 14 milliseconds

99.38% &lt;= 15 milliseconds

99.47% &lt;= 16 milliseconds

99.54% &lt;= 17 milliseconds

99.60% &lt;= 18 milliseconds

99.64% &lt;= 19 milliseconds

99.69% &lt;= 20 milliseconds

99.77% &lt;= 21 milliseconds

99.83% &lt;= 22 milliseconds

99.84% &lt;= 23 milliseconds

99.86% &lt;= 24 milliseconds

99.88% &lt;= 25 milliseconds

99.92% &lt;= 26 milliseconds

99.96% &lt;= 27 milliseconds

99.96% &lt;= 28 milliseconds

99.97% &lt;= 29 milliseconds

99.97% &lt;= 31 milliseconds

99.97% &lt;= 33 milliseconds

99.97% &lt;= 34 milliseconds

99.97% &lt;= 36 milliseconds

99.98% &lt;= 37 milliseconds

99.98% &lt;= 42 milliseconds

99.99% &lt;= 43 milliseconds

99.99% &lt;= 44 milliseconds

100.00% &lt;= 44 milliseconds

53330.49 requests per second

====== GET ======

1000000 requests completed in 17.63 seconds

256 parallel clients

128 bytes payload

keep alive: 1

host configuration "save": 900 1 300 10 60 10000

host configuration "appendonly": no

multi-thread: no

0.09% &lt;= 1 milliseconds

11.84% &lt;= 2 milliseconds

62.83% &lt;= 3 milliseconds

80.03% &lt;= 4 milliseconds

86.48% &lt;= 5 milliseconds

89.92% &lt;= 6 milliseconds

93.20% &lt;= 7 milliseconds

95.65% &lt;= 8 milliseconds

96.89% &lt;= 9 milliseconds

97.61% &lt;= 10 milliseconds

98.16% &lt;= 11 milliseconds

98.56% &lt;= 12 milliseconds

98.82% &lt;= 13 milliseconds

98.98% &lt;= 14 milliseconds

99.15% &lt;= 15 milliseconds

99.27% &lt;= 16 milliseconds

99.39% &lt;= 17 milliseconds

99.54% &lt;= 18 milliseconds

99.59% &lt;= 19 milliseconds

99.68% &lt;= 20 milliseconds

99.74% &lt;= 21 milliseconds

99.77% &lt;= 22 milliseconds

99.79% &lt;= 23 milliseconds

99.79% &lt;= 24 milliseconds

99.80% &lt;= 25 milliseconds

99.81% &lt;= 26 milliseconds

99.83% &lt;= 27 milliseconds

99.83% &lt;= 28 milliseconds

99.84% &lt;= 29 milliseconds

99.84% &lt;= 30 milliseconds

99.87% &lt;= 31 milliseconds

99.88% &lt;= 32 milliseconds

99.88% &lt;= 33 milliseconds

99.89% &lt;= 34 milliseconds

99.92% &lt;= 35 milliseconds

99.94% &lt;= 36 milliseconds

99.95% &lt;= 37 milliseconds

99.95% &lt;= 38 milliseconds

99.95% &lt;= 39 milliseconds

99.96% &lt;= 40 milliseconds

99.96% &lt;= 45 milliseconds

99.97% &lt;= 47 milliseconds

99.99% &lt;= 48 milliseconds

99.99% &lt;= 83 milliseconds

100.00% &lt;= 84 milliseconds

100.00% &lt;= 84 milliseconds

56708.63 requests per second

多线程压测数据


root@lb:/home/soft/redis-6.0/src\# ./redis-benchmark -t get,set -n 1000000 -r 1000000000 -c 256 -d 128 --threads 4

====== SET ======

1000000 requests completed in 10.55 seconds

256 parallel clients

128 bytes payload

keep alive: 1

host configuration "save": 900 1 300 10 60 10000

host configuration "appendonly": no

multi-thread: yes

threads: 4

0.00% &lt;= 0.4 milliseconds

0.00% &lt;= 0.5 milliseconds

0.00% &lt;= 0.6 milliseconds

0.01% &lt;= 0.7 milliseconds

0.01% &lt;= 0.8 milliseconds

0.03% &lt;= 0.9 milliseconds

0.06% &lt;= 1.0 milliseconds

0.11% &lt;= 1.1 milliseconds

0.21% &lt;= 1.2 milliseconds

0.39% &lt;= 1.3 milliseconds

0.83% &lt;= 1.4 milliseconds

1.74% &lt;= 1.5 milliseconds

3.55% &lt;= 1.6 milliseconds

7.38% &lt;= 1.7 milliseconds

14.38% &lt;= 1.8 milliseconds

23.57% &lt;= 1.9 milliseconds

32.88% &lt;= 2 milliseconds

86.45% &lt;= 3 milliseconds

92.56% &lt;= 4 milliseconds

95.12% &lt;= 5 milliseconds

96.48% &lt;= 6 milliseconds

97.35% &lt;= 7 milliseconds

98.15% &lt;= 8 milliseconds

98.72% &lt;= 9 milliseconds

98.97% &lt;= 10 milliseconds

99.17% &lt;= 11 milliseconds

99.35% &lt;= 12 milliseconds

99.50% &lt;= 13 milliseconds

99.58% &lt;= 14 milliseconds

99.63% &lt;= 15 milliseconds

99.65% &lt;= 16 milliseconds

99.70% &lt;= 17 milliseconds

99.82% &lt;= 18 milliseconds

99.84% &lt;= 19 milliseconds

99.87% &lt;= 20 milliseconds

99.88% &lt;= 21 milliseconds

99.89% &lt;= 22 milliseconds

99.90% &lt;= 23 milliseconds

99.91% &lt;= 24 milliseconds

99.91% &lt;= 25 milliseconds

99.92% &lt;= 26 milliseconds

99.92% &lt;= 27 milliseconds

99.92% &lt;= 28 milliseconds

99.93% &lt;= 29 milliseconds

99.94% &lt;= 30 milliseconds

99.95% &lt;= 31 milliseconds

99.95% &lt;= 32 milliseconds

99.96% &lt;= 33 milliseconds

99.97% &lt;= 34 milliseconds

100.00% &lt;= 35 milliseconds

100.00% &lt;= 35 milliseconds

94768.77 requests per second

====== GET ======

1000000 requests completed in 9.26 seconds

256 parallel clients

128 bytes payload

keep alive: 1

host configuration "save": 900 1 300 10 60 10000

host configuration "appendonly": no

multi-thread: yes

threads: 4

0.15% &lt;= 1 milliseconds

64.43% &lt;= 2 milliseconds

91.21% &lt;= 3 milliseconds

94.42% &lt;= 4 milliseconds

96.18% &lt;= 5 milliseconds

97.39% &lt;= 6 milliseconds

97.92% &lt;= 7 milliseconds

98.53% &lt;= 8 milliseconds

98.77% &lt;= 9 milliseconds

98.98% &lt;= 10 milliseconds

99.22% &lt;= 11 milliseconds

99.42% &lt;= 12 milliseconds

99.57% &lt;= 13 milliseconds

99.69% &lt;= 14 milliseconds

99.76% &lt;= 15 milliseconds

99.84% &lt;= 16 milliseconds

99.86% &lt;= 17 milliseconds

99.88% &lt;= 18 milliseconds

99.88% &lt;= 19 milliseconds

99.89% &lt;= 20 milliseconds

99.89% &lt;= 21 milliseconds

99.90% &lt;= 22 milliseconds

99.90% &lt;= 23 milliseconds

99.90% &lt;= 24 milliseconds

99.90% &lt;= 25 milliseconds

99.90% &lt;= 26 milliseconds

99.91% &lt;= 29 milliseconds

99.92% &lt;= 30 milliseconds

99.95% &lt;= 31 milliseconds

99.96% &lt;= 32 milliseconds

99.96% &lt;= 34 milliseconds

99.96% &lt;= 35 milliseconds

99.97% &lt;= 37 milliseconds

99.97% &lt;= 43 milliseconds

99.97% &lt;= 44 milliseconds

99.97% &lt;= 45 milliseconds

99.97% &lt;= 46 milliseconds

99.97% &lt;= 47 milliseconds

99.97% &lt;= 50 milliseconds

99.97% &lt;= 51 milliseconds

99.99% &lt;= 67 milliseconds

99.99% &lt;= 68 milliseconds

99.99% &lt;= 78 milliseconds

99.99% &lt;= 79 milliseconds

99.99% &lt;= 94 milliseconds

100.00% &lt;= 95 milliseconds

100.00% &lt;= 95 milliseconds

107979.70 requests per second

图表展示

SET

GET

results matching ""

    No results matching ""