Redis 多線程網絡模型全面揭秘

      網友投稿 761 2025-03-31

      導語?一文帶你完全吃透 Redis 整個核心網絡模型的原理和源碼。

      目錄

      導言

      Redis 有多快?

      Redis 為什么快?

      Redis 為何選擇單線程?

      避免過多的上下文切換開銷

      避免同步機制的開銷

      簡單可維護

      Redis 真的是單線程?

      單線程事件循環

      多線程異步任務

      Redis 多線程網絡模型

      設計思路

      源碼剖析

      性能提升

      模型缺陷

      總結

      博客原文

      導言

      在目前的技術選型中,Redis 儼然已經成為了系統高性能緩存方案的事實標準,因此現在 Redis 也成為了后端開發的基本技能樹之一,Redis 的底層原理也順理成章地成為了必須學習的知識。

      Redis 從本質上來講是一個網絡服務器,而對于一個網絡服務器來說,網絡模型是它的精華,搞懂了一個網絡服務器的網絡模型,你也就搞懂了它的本質。

      本文通過層層遞進的方式,介紹了 Redis 網絡模型的版本變更歷程,剖析了其從單線程進化到多線程的工作原理,此外,還一并分析并解答了 Redis 的網絡模型的很多抉擇背后的思考,幫助讀者能更深刻地理解 Redis 網絡模型的設計。

      Redis 有多快?

      根據官方的 benchmark,通常來說,在一臺普通硬件配置的 Linux 機器上跑單個 Redis 實例,處理簡單命令(時間復雜度 O(N) 或者 O(log(N))),QPS 可以達到 8w+,而如果使用 pipeline 批處理功能,則 QPS 至高能達到 100w。

      僅從性能層面進行評判,Redis 完全可以被稱之為高性能緩存方案。

      Redis 為什么快?

      Redis 的高性能得益于以下幾個基礎:

      C 語言實現,雖然 C 對 Redis 的性能有助力,但語言并不是最核心因素。

      純內存 I/O,相較于其他基于磁盤的 DB,Redis 的純內存操作有著天然的性能優勢。

      I/O 多路復用,基于 epoll/select/kqueue 等 I/O 多路復用技術,實現高吞吐的網絡 I/O。

      單線程模型,單線程無法利用多核,但是從另一個層面來說則避免了多線程頻繁上下文切換,以及同步機制如鎖帶來的開銷。

      Redis 為何選擇單線程?

      Redis 的核心網絡模型選擇用單線程來實現,這在一開始就引起了很多人的不解,Redis 官方的對于此的回答是:

      It’s not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

      核心意思就是,對于一個 DB 來說,CPU 通常不會是瓶頸,因為大多數請求不會是 CPU 密集型的,而是 I/O 密集型。具體到 Redis 的話,如果不考慮 RDB/AOF 等持久化方案,Redis 是完全的純內存操作,執行速度是非常快的,因此這部分操作通常不會是性能瓶頸,Redis 真正的性能瓶頸在于網絡 I/O,也就是客戶端和服務端之間的網絡傳輸延遲,因此 Redis 選擇了單線程的 I/O 多路復用來實現它的核心網絡模型。

      上面是比較籠統的官方答案,實際上更加具體的選擇單線程的原因可以歸納如下:

      避免過多的上下文切換開銷

      多線程調度過程中必然需要在 CPU 之間切換線程上下文 context,而上下文的切換又涉及程序計數器、堆棧指針和程序狀態字等一系列的寄存器置換、程序堆棧重置甚至是 CPU 高速緩存、TLB 快表的汰換,如果是進程內的多線程切換還好一些,因為單一進程內多線程共享進程地址空間,因此線程上下文比之進程上下文要小得多,如果是跨進程調度,則需要切換掉整個進程地址空間。

      如果是單線程則可以規避進程內頻繁的線程切換開銷,因為程序始終運行在進程中單個線程內,沒有多線程切換的場景。

      避免同步機制的開銷

      如果 Redis 選擇多線程模型,又因為 Redis 是一個數據庫,那么勢必涉及到底層數據同步的問題,則必然會引入某些同步機制,比如鎖,而我們知道 Redis 不僅僅提供了簡單的 key-value 數據結構,還有 list、set 和 hash 等等其他豐富的數據結構,而不同的數據結構對同步訪問的加鎖粒度又不盡相同,可能會導致在操作數據過程中帶來很多加鎖解鎖的開銷,增加程序復雜度的同時還會降低性能。

      簡單可維護

      事實上,多線程編程也不是那么盡善盡美,首先多線程的引入會使得程序不再保持代碼邏輯上的串行性,代碼執行的順序將變成不可預測的,稍不注意就會導致程序出現各種并發編程的問題;其次,多線程模式也使得程序調試更加復雜和麻煩。網絡上有一幅很有意思的圖片,生動形象地描述了并發編程面臨的窘境。

      你期望的多線程編程?VS?實際上的多線程編程:

      前面我們提到引入多線程必須的同步機制,如果 Redis 使用多線程模式,那么所有的底層數據結構都必須實現成線程安全的,這無疑又使得 Redis 的實現變得更加復雜。

      總而言之,Redis 選擇單線程可以說是多方博弈之后的一種權衡:在保證足夠的性能表現之下,使用單線程保持代碼的簡單和可維護性。

      Redis 真的是單線程?

      在討論這個問題之前,我們要先明確『單線程』這個概念的邊界:它的覆蓋范圍是核心網絡模型,抑或是整個 Redis?如果是前者,那么答案是肯定的,在 Redis 的 v6.0 版本正式引入多線程之前,其網絡模型一直是單線程模式的;如果是后者,那么答案則是否定的,Redis 早在 v4.0 就已經引入了多線程。

      因此,當我們討論 Redis 的多線程之時,有必要對 Redis 的版本劃出兩個重要的節點:

      Redis v4.0(引入多線程處理異步任務)

      Redis v6.0(正式在網絡模型中實現 I/O 多線程)

      單線程事件循環

      我們首先來剖析一下 Redis 的核心網絡模型,從 Redis 的 v1.0 到 v6.0 版本之前,Redis 的核心網絡模型一直是一個典型的單 Reactor 模型:利用 epoll/select/kqueue 等多路復用技術,在單線程的事件循環中不斷去處理事件(客戶端請求),最后回寫響應數據到客戶端:

      這里有幾個核心的概念需要學習:

      client:客戶端對象,Redis 是典型的 CS 架構(Client <—> Server),客戶端通過?socket?與服務端建立網絡通道然后發送請求命令,服務端執行請求的命令并回復。Redis?使用結構體?client?存儲客戶端的所有相關信息,包括但不限于封裝的套接字連接 -- *conn,當前選擇的數據庫指針 -- *db,讀入緩沖區 -- querybuf,寫出緩沖區 -- buf,寫出數據鏈表 -- reply等。

      aeApiPoll:I/O 多路復用 API,是基于 epoll_wait/select/kevent 等系統調用的封裝,監聽等待讀寫事件觸發,然后處理,它是事件循環(Event Loop)中的核心函數,是事件驅動得以運行的基礎。

      acceptTcpHandler:連接應答處理器,底層使用系統調用?accept?接受來自客戶端的新連接,并為新連接注冊綁定命令讀取處理器,以備后續處理新的客戶端 TCP 連接;除了這個處理器,還有對應的?acceptUnixHandler?負責處理 Unix Domain Socket 以及?acceptTLSHandler?負責處理 TLS 加密連接。

      readQueryFromClient:命令讀取處理器,解析并執行客戶端的請求命令。

      beforeSleep:事件循環中進入 aeApiPoll 等待事件到來之前會執行的函數,其中包含一些日常的任務,比如把?client->buf?或者?client->reply?(后面會解釋為什么這里需要兩個緩沖區)中的響應寫回到客戶端,持久化 AOF 緩沖區的數據到磁盤等,相對應的還有一個 afterSleep 函數,在 aeApiPoll 之后執行。

      sendReplyToClient:命令回復處理器,當一次事件循環之后寫出緩沖區中還有數據殘留,則這個處理器會被注冊綁定到相應的連接上,等連接觸發寫就緒事件時,它會將寫出緩沖區剩余的數據回寫到客戶端。

      Redis 內部實現了一個高性能的事件庫 — AE,基于 epoll/select/kqueue/evport 四種事件驅動技術,實現 Linux/MacOS/FreeBSD/Solaris 多平臺的高性能事件循環模型。Redis 的核心網絡模型正式構筑在 AE 之上,包括 I/O 多路復用、各類處理器的注冊綁定,都是基于此才得以運行。

      至此,我們可以描繪出客戶端向 Redis 發起請求命令的工作原理

      Redis 服務器啟動,開啟主線程事件循環(Event Loop),注冊?acceptTcpHandler?連接應答處理器到用戶配置的監聽端口對應的文件描述符,等待新連接到來;

      客戶端和服務端建立網絡連接;

      acceptTcpHandler?被調用,主線程使用 AE 的 API 將?readQueryFromClient?命令讀取處理器綁定到新連接對應的文件描述符上,并初始化一個?client?綁定這個客戶端連接;

      客戶端發送請求命令,觸發讀就緒事件,主線程調用?readQueryFromClient?通過 socket 讀取客戶端發送過來的命令存入?client->querybuf?讀入緩沖區;

      接著調用?processInputBuffer,在其中使用?processInlineBuffer?或者?processMultibulkBuffer?根據 Redis 協議解析命令,最后調用?processCommand?執行命令;

      根據請求命令的類型(SET, GET, DEL, EXEC 等),分配相應的命令執行器去執行,最后調用?addReply?函數族的一系列函數將響應數據寫入到對應?client?的寫出緩沖區:client->buf?或者?client->reply?,client->buf?是首選的寫出緩沖區,固定大小 16KB,一般來說可以緩沖足夠多的響應數據,但是如果客戶端在時間窗口內需要響應的數據非常大,那么則會自動切換到?client->reply?鏈表上去,使用鏈表理論上能夠保存無限大的數據(受限于機器的物理內存),最后把?client?添加進一個 LIFO 隊列?clients_pending_write;

      在事件循環(Event Loop)中,主線程執行?beforeSleep?-->?handleClientsWithPendingWrites,遍歷?clients_pending_write?隊列,調用?writeToClient?把?client?的寫出緩沖區里的數據回寫到客戶端,如果寫出緩沖區還有數據遺留,則注冊?sendReplyToClient?命令回復處理器到該連接的寫就緒事件,等待客戶端可寫時在事件循環中再繼續回寫殘余的響應數據。

      對于那些想利用多核優勢提升性能的用戶來說,Redis 官方給出的解決方案也非常簡單粗暴:在同一個機器上多跑幾個 Redis 實例。事實上,為了保證高可用,線上業務一般不太可能會是單機模式,更加常見的是利用 Redis 分布式集群多節點和數據分片負載均衡來提升性能和保證高可用。

      多線程異步任務

      以上便是 Redis 的核心網絡模型,這個單線程網絡模型一直到 Redis v6.0 才改造成多線程模式,但這并不意味著整個 Redis 一直都只是單線程。

      Redis 在 v4.0 版本的時候就已經引入了的多線程來做一些異步操作,此舉主要針對的是那些非常耗時的命令,通過將這些命令的執行進行異步化,避免阻塞單線程的事件循環。

      我們知道 Redis 的?DEL?命令是用來刪除掉一個或多個 key 儲存的值,它是一個阻塞的命令,大多數情況下你要刪除的 key 里存的值不會特別多,最多也就幾十上百個對象,所以可以很快執行完,但是如果你要刪的是一個超大的鍵值對,里面有幾百萬個對象,那么這條命令可能會阻塞至少好幾秒,又因為事件循環是單線程的,所以會阻塞后面的其他事件,導致吞吐量下降。

      于是,在 Redis v4.0 之后增加了一些的非阻塞命令如?UNLINK、FLUSHALL ASYNC、FLUSHDB ASYNC。

      UNLINK?命令其實就是?DEL?的異步版本,它不會同步刪除數據,而只是把 key 從 keyspace 中暫時移除掉,然后將任務添加到一個異步隊列,最后由后臺線程去刪除,不過這里需要考慮一種情況是如果用?UNLINK?去刪除一個很小的 key,用異步的方式去做反而開銷更大,所以它會先計算一個開銷的閥值,只有當這個值大于 64 才會使用異步的方式去刪除 key,對于基本的數據類型如 List、Set、Hash 這些,閥值就是其中存儲的對象數量。

      Redis 多線程網絡模型

      前面提到 Redis 最初選擇單線程網絡模型的理由是:CPU 通常不會成為性能瓶頸,瓶頸往往是內存和網絡,因此單線程足夠了。那么為什么現在 Redis 又要引入多線程呢?很簡單,就是 Redis 的網絡 I/O 瓶頸已經越來越明顯了。

      隨著互聯網的飛速發展,互聯網業務系統所要處理的線上流量越來越大,Redis 的單線程模式會導致系統消耗很多 CPU 時間在網絡 I/O 上從而降低吞吐量,要提升 Redis 的性能有兩個方向:

      優化網絡 I/O 模塊

      提高機器內存讀寫的速度

      后者依賴于硬件的發展,暫時無解。所以只能從前者下手,網絡 I/O 的優化又可以分為兩個方向:

      零拷貝技術或者 DPDK 技術

      利用多核優勢

      零拷貝技術有其局限性,無法完全適配 Redis 這一類復雜的網絡 I/O 場景,更多網絡 I/O 對 CPU 時間的消耗和 Linux 零拷貝技術,可以閱讀我的另一篇文章:Linux I/O 原理和 Zero-copy 技術全面揭秘。而 DPDK 技術通過旁路網卡 I/O 繞過內核協議棧的方式又太過于復雜以及需要內核甚至是硬件的支持。

      因此,利用多核優勢成為了優化網絡 I/O 性價比最高的方案。

      6.0 版本之后,Redis 正式在核心網絡模型中引入了多線程,也就是所謂的?I/O threading,至此 Redis 真正擁有了多線程模型。前一小節,我們了解了 Redis 在 6.0 版本之前的單線程事件循環模型,實際上就是一個非常經典的 Reactor 模型:

      目前 Linux 平臺上主流的高性能網絡庫/框架中,大都采用 Reactor 模式,比如 netty、libevent、libuv、POE(Perl)、Twisted(Python)等。

      Reactor 模式本質上指的是使用?I/O 多路復用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)?的模式。

      更多關于 Reactor 模式的細節可以參考我之前的文章:Go netpoller 原生網絡模型之源碼全面揭秘,Reactor 網絡模型那一小節,這里不再贅述。

      Redis 的核心網絡模型在 6.0 版本之前,一直是單 Reactor 模式:所有事件的處理都在單個線程內完成,雖然在 4.0 版本中引入了多線程,但是那個更像是針對特定場景(刪除超大 key 值等)而打的補丁,并不能被視作核心網絡模型的多線程。

      通常來說,單 Reactor 模式,引入多線程之后會進化為 Multi-Reactors 模式,基本工作模式如下:

      區別于單 Reactor 模式,這種模式不再是單線程的事件循環,而是有多個線程(Sub Reactors)各自維護一個獨立的事件循環,由 Main Reactor 負責接收新連接并分發給 Sub Reactors 去獨立處理,最后 Sub Reactors 回寫響應給客戶端。

      Multiple Reactors 模式通常也可以等同于 Master-Workers 模式,比如 Nginx 和 Memcached 等就是采用這種多線程模型,雖然不同的項目實現細節略有區別,但總體來說模式是一致的。

      設計思路

      Redis 雖然也實現了多線程,但是卻不是標準的 Multi-Reactors/Master-Workers 模式,這其中的緣由我們后面會分析,現在我們先看一下 Redis 多線程網絡模型的總體設計:

      Redis 服務器啟動,開啟主線程事件循環(Event Loop),注冊?acceptTcpHandler?連接應答處理器到用戶配置的監聽端口對應的文件描述符,等待新連接到來;

      客戶端和服務端建立網絡連接;

      Redis 多線程網絡模型全面揭秘

      acceptTcpHandler?被調用,主線程使用 AE 的 API 將?readQueryFromClient?命令讀取處理器綁定到新連接對應的文件描述符上,并初始化一個?client?綁定這個客戶端連接;

      客戶端發送請求命令,觸發讀就緒事件,服務端主線程不會通過 socket 去讀取客戶端的請求命令,而是先將?client?放入一個 LIFO 隊列?clients_pending_read;

      在事件循環(Event Loop)中,主線程執行?beforeSleep?-->handleClientsWithPendingReadsUsingThreads,利用 Round-Robin 輪詢負載均衡策略,把?clients_pending_read隊列中的連接均勻地分配給 I/O 線程各自的本地 FIFO 任務隊列?io_threads_list[id]?和主線程自己,I/O 線程通過 socket 讀取客戶端的請求命令,存入?client->querybuf?并解析第一個命令,但不執行命令,主線程忙輪詢,等待所有 I/O 線程完成讀取任務;

      主線程和所有 I/O 線程都完成了讀取任務,主線程結束忙輪詢,遍歷?clients_pending_read?隊列,執行所有客戶端連接的請求命令,先調用?processCommandAndResetClient?執行第一條已經解析好的命令,然后調用?processInputBuffer?解析并執行客戶端連接的所有命令,在其中使用?processInlineBuffer?或者?processMultibulkBuffer?根據 Redis 協議解析命令,最后調用?processCommand?執行命令;

      根據請求命令的類型(SET, GET, DEL, EXEC 等),分配相應的命令執行器去執行,最后調用?addReply?函數族的一系列函數將響應數據寫入到對應?client?的寫出緩沖區:client->buf?或者?client->reply?,client->buf?是首選的寫出緩沖區,固定大小 16KB,一般來說可以緩沖足夠多的響應數據,但是如果客戶端在時間窗口內需要響應的數據非常大,那么則會自動切換到?client->reply?鏈表上去,使用鏈表理論上能夠保存無限大的數據(受限于機器的物理內存),最后把?client?添加進一個 LIFO 隊列?clients_pending_write;

      在事件循環(Event Loop)中,主線程執行?beforeSleep?-->?handleClientsWithPendingWritesUsingThreads,利用 Round-Robin 輪詢負載均衡策略,把?clients_pending_write?隊列中的連接均勻地分配給 I/O 線程各自的本地 FIFO 任務隊列?io_threads_list[id]?和主線程自己,I/O 線程通過調用?writeToClient?把?client?的寫出緩沖區里的數據回寫到客戶端,主線程忙輪詢,等待所有 I/O 線程完成寫出任務;

      主線程和所有 I/O 線程都完成了寫出任務, 主線程結束忙輪詢,遍歷?clients_pending_write?隊列,如果?client?的寫出緩沖區還有數據遺留,則注冊?sendReplyToClient?到該連接的寫就緒事件,等待客戶端可寫時在事件循環中再繼續回寫殘余的響應數據。

      這里大部分邏輯和之前的單線程模型是一致的,變動的地方僅僅是把讀取客戶端請求命令和回寫響應數據的邏輯異步化了,交給 I/O 線程去完成,這里需要特別注意的一點是:I/O 線程僅僅是讀取和解析客戶端命令而不會真正去執行命令,客戶端命令的執行最終還是要在主線程上完成。

      源碼剖析

      以下所有代碼基于目前最新的?Redis v6.0.10?版本。

      多線程初始化

      1void initThreadedIO(void) {

      2 server.io_threads_active = 0; /* We start with threads not active. */

      3

      4 // 如果用戶只配置了一個 I/O 線程,則不會創建新線程(效率低),直接在主線程里處理 I/O。

      5 if (server.io_threads_num == 1) return;

      6

      7 if (server.io_threads_num > IO_THREADS_MAX_NUM) {

      8 serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "

      9 "The maximum number is %d.", IO_THREADS_MAX_NUM);

      10 exit(1);

      11 }

      12

      13 // 根據用戶配置的 I/O 線程數,啟動線程。

      14 for (int i = 0; i < server.io_threads_num; i++) {

      15 // 初始化 I/O 線程的本地任務隊列。

      16 io_threads_list[i] = listCreate();

      17 if (i == 0) continue; // 線程 0 是主線程。

      18

      19 // 初始化 I/O 線程并啟動。

      20 pthread_t tid;

      21 // 每個 I/O 線程會分配一個本地鎖,用來休眠和喚醒線程。

      22 pthread_mutex_init(&io_threads_mutex[i],NULL);

      23 // 每個 I/O 線程分配一個原子計數器,用來記錄當前遺留的任務數量。

      24 io_threads_pending[i] = 0;

      25 // 主線程在啟動 I/O 線程的時候會默認先鎖住它,直到有 I/O 任務才喚醒它。

      26 pthread_mutex_lock(&io_threads_mutex[i]);

      27 // 啟動線程,進入 I/O 線程的主邏輯函數 IOThreadMain。

      28 if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {

      29 serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");

      30 exit(1);

      31 }

      32 io_threads[i] = tid;

      33 }

      34}

      initThreadedIO?會在 Redis 服務器啟動時的初始化工作的末尾被調用,初始化 I/O 多線程并啟動。

      Redis 的多線程模式默認是關閉的,需要用戶在?redis.conf?配置文件中開啟:

      1io-threads 4

      2io-threads-do-reads yes

      讀取請求

      當客戶端發送請求命令之后,會觸發 Redis 主線程的事件循環,命令處理器?readQueryFromClient?被回調,在以前的單線程模型下,這個方法會直接讀取解析客戶端命令并執行,但是多線程模式下,則會把?client?加入到?clients_pending_read?任務隊列中去,后面主線程再分配到 I/O 線程去讀取客戶端請求命令:

      1void readQueryFromClient(connection *conn) {

      2 client *c = connGetPrivateData(conn);

      3 int nread, readlen;

      4 size_t qblen;

      5

      6 // 檢查是否開啟了多線程,如果是則把 client 加入異步隊列之后返回。

      7 if (postponeClientRead(c)) return;

      8

      9 // 省略代碼,下面的代碼邏輯和單線程版本幾乎是一樣的。

      10 ...

      11}

      12

      13int postponeClientRead(client *c) {

      14 // 當多線程 I/O 模式開啟、主線程沒有在處理阻塞任務時,將 client 加入異步隊列。

      15 if (server.io_threads_active &&

      16 server.io_threads_do_reads &&

      17 !ProcessingEventsWhileBlocked &&

      18 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))

      19 {

      20 // 給 client 打上 CLIENT_PENDING_READ 標識,表示該 client 需要被多線程處理,

      21 // 后續在 I/O 線程中會在讀取和解析完客戶端命令之后判斷該標識并放棄執行命令,讓主線程去執行。

      22 c->flags |= CLIENT_PENDING_READ;

      23 listAddNodeHead(server.clients_pending_read,c);

      24 return 1;

      25 } else {

      26 return 0;

      27 }

      28}

      接著主線程會在事件循環的?beforeSleep()?方法中,調用?handleClientsWithPendingReadsUsingThreads:

      1int handleClientsWithPendingReadsUsingThreads(void) {

      2 if (!server.io_threads_active || !server.io_threads_do_reads) return 0;

      3 int processed = listLength(server.clients_pending_read);

      4 if (processed == 0) return 0;

      5

      6 if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

      7

      8 // 遍歷待讀取的 client 隊列 clients_pending_read,

      9 // 通過 RR 輪詢均勻地分配給 I/O 線程和主線程自己(編號 0)。

      10 listIter li;

      11 listNode *ln;

      12 listRewind(server.clients_pending_read,&li);

      13 int item_id = 0;

      14 while((ln = listNext(&li))) {

      15 client *c = listNodeValue(ln);

      16 int target_id = item_id % server.io_threads_num;

      17 listAddNodeTail(io_threads_list[target_id],c);

      18 item_id++;

      19 }

      20

      21 // 設置當前 I/O 操作為讀取操作,給每個 I/O 線程的計數器設置分配的任務數量,

      22 // 讓 I/O 線程可以開始工作:只讀取和解析命令,不執行。

      23 io_threads_op = IO_THREADS_OP_READ;

      24 for (int j = 1; j < server.io_threads_num; j++) {

      25 int count = listLength(io_threads_list[j]);

      26 io_threads_pending[j] = count;

      27 }

      28

      29 // 主線程自己也會去執行讀取客戶端請求命令的任務,以達到最大限度利用 CPU。

      30 listRewind(io_threads_list[0],&li);

      31 while((ln = listNext(&li))) {

      32 client *c = listNodeValue(ln);

      33 readQueryFromClient(c->conn);

      34 }

      35 listEmpty(io_threads_list[0]);

      36

      37 // 忙輪詢,累加所有 I/O 線程的原子任務計數器,直到所有計數器的遺留任務數量都是 0,

      38 // 表示所有任務都已經執行完成,結束輪詢。

      39 while(1) {

      40 unsigned long pending = 0;

      41 for (int j = 1; j < server.io_threads_num; j++)

      42 pending += io_threads_pending[j];

      43 if (pending == 0) break;

      44 }

      45 if (tio_debug) printf("I/O READ All threads finshed\n");

      46

      47 // 遍歷待讀取的 client 隊列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 標記,

      48 // 然后解析并執行所有 client 的命令。

      49 while(listLength(server.clients_pending_read)) {

      50 ln = listFirst(server.clients_pending_read);

      51 client *c = listNodeValue(ln);

      52 c->flags &= ~CLIENT_PENDING_READ;

      53 listDelNode(server.clients_pending_read,ln);

      54

      55 if (c->flags & CLIENT_PENDING_COMMAND) {

      56 c->flags &= ~CLIENT_PENDING_COMMAND;

      57 // client 的第一條命令已經被解析好了,直接嘗試執行。

      58 if (processCommandAndResetClient(c) == C_ERR) {

      59 /* If the client is no longer valid, we avoid

      60 * processing the client later. So we just go

      61 * to the next. */

      62 continue;

      63 }

      64 }

      65 processInputBuffer(c); // 繼續解析并執行 client 命令。

      66

      67 // 命令執行完成之后,如果 client 中有響應數據需要回寫到客戶端,則將 client 加入到待寫出隊列 clients_pending_write

      68 if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))

      69 clientInstallWriteHandler(c);

      70 }

      71

      72 /* Update processed count on server */

      73 server.stat_io_reads_processed += processed;

      74

      75 return processed;

      76}

      這里的核心工作是:

      遍歷待讀取的?client?隊列?clients_pending_read,通過 RR 策略把所有任務分配給 I/O 線程和主線程去讀取和解析客戶端命令。

      忙輪詢等待所有 I/O 線程完成任務。

      最后再遍歷?clients_pending_read,執行所有?client?的命令。

      寫回響應

      完成命令的讀取、解析以及執行之后,客戶端命令的響應數據已經存入?client->buf?或者?client->reply?中了,接下來就需要把響應數據回寫到客戶端了,還是在?beforeSleep?中, 主線程調用?handleClientsWithPendingWritesUsingThreads:

      1int handleClientsWithPendingWritesUsingThreads(void) {

      2 int processed = listLength(server.clients_pending_write);

      3 if (processed == 0) return 0; /* Return ASAP if there are no clients. */

      4

      5 // 如果用戶設置的 I/O 線程數等于 1 或者當前 clients_pending_write 隊列中待寫出的 client

      6 // 數量不足 I/O 線程數的兩倍,則不用多線程的邏輯,讓所有 I/O 線程進入休眠,

      7 // 直接在主線程把所有 client 的相應數據回寫到客戶端。

      8 if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {

      9 return handleClientsWithPendingWrites();

      10 }

      11

      12 // 喚醒正在休眠的 I/O 線程(如果有的話)。

      13 if (!server.io_threads_active) startThreadedIO();

      14

      15 if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

      16

      17 // 遍歷待寫出的 client 隊列 clients_pending_write,

      18 // 通過 RR 輪詢均勻地分配給 I/O 線程和主線程自己(編號 0)。

      19 listIter li;

      20 listNode *ln;

      21 listRewind(server.clients_pending_write,&li);

      22 int item_id = 0;

      23 while((ln = listNext(&li))) {

      24 client *c = listNodeValue(ln);

      25 c->flags &= ~CLIENT_PENDING_WRITE;

      26

      27 /* Remove clients from the list of pending writes since

      28 * they are going to be closed ASAP. */

      29 if (c->flags & CLIENT_CLOSE_ASAP) {

      30 listDelNode(server.clients_pending_write, ln);

      31 continue;

      32 }

      33

      34 int target_id = item_id % server.io_threads_num;

      35 listAddNodeTail(io_threads_list[target_id],c);

      36 item_id++;

      37 }

      38

      39 // 設置當前 I/O 操作為寫出操作,給每個 I/O 線程的計數器設置分配的任務數量,

      40 // 讓 I/O 線程可以開始工作,把寫出緩沖區(client->buf 或 c->reply)中的響應數據回寫到客戶端。

      41 io_threads_op = IO_THREADS_OP_WRITE;

      42 for (int j = 1; j < server.io_threads_num; j++) {

      43 int count = listLength(io_threads_list[j]);

      44 io_threads_pending[j] = count;

      45 }

      46

      47 // 主線程自己也會去執行讀取客戶端請求命令的任務,以達到最大限度利用 CPU。

      48 listRewind(io_threads_list[0],&li);

      49 while((ln = listNext(&li))) {

      50 client *c = listNodeValue(ln);

      51 writeToClient(c,0);

      52 }

      53 listEmpty(io_threads_list[0]);

      54

      55 // 忙輪詢,累加所有 I/O 線程的原子任務計數器,直到所有計數器的遺留任務數量都是 0。

      56 // 表示所有任務都已經執行完成,結束輪詢。

      57 while(1) {

      58 unsigned long pending = 0;

      59 for (int j = 1; j < server.io_threads_num; j++)

      60 pending += io_threads_pending[j];

      61 if (pending == 0) break;

      62 }

      63 if (tio_debug) printf("I/O WRITE All threads finshed\n");

      64

      65 // 最后再遍歷一次 clients_pending_write 隊列,檢查是否還有 client 的寫出緩沖區中有殘留數據,

      66 // 如果有,那就為 client 注冊一個命令回復器 sendReplyToClient,等待客戶端寫就緒再繼續把數據回寫。

      67 listRewind(server.clients_pending_write,&li);

      68 while((ln = listNext(&li))) {

      69 client *c = listNodeValue(ln);

      70

      71 // 檢查 client 的寫出緩沖區是否還有遺留數據。

      72 if (clientHasPendingReplies(c) &&

      73 connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)

      74 {

      75 freeClientAsync(c);

      76 }

      77 }

      78 listEmpty(server.clients_pending_write);

      79

      80 /* Update processed count on server */

      81 server.stat_io_writes_processed += processed;

      82

      83 return processed;

      84}

      這里的核心工作是:

      檢查當前任務負載,如果當前的任務數量不足以用多線程模式處理的話,則休眠 I/O 線程并且直接同步將響應數據回寫到客戶端。

      喚醒正在休眠的 I/O 線程(如果有的話)。

      遍歷待寫出的?client?隊列?clients_pending_write,通過 RR 策略把所有任務分配給 I/O 線程和主線程去將響應數據寫回到客戶端。

      忙輪詢等待所有 I/O 線程完成任務。

      最后再遍歷?clients_pending_write,為那些還殘留有響應數據的?client?注冊命令回復處理器?sendReplyToClient,等待客戶端可寫之后在事件循環中繼續回寫殘余的響應數據。

      I/O 線程主邏輯

      1void *IOThreadMain(void *myid) {

      2 /* The ID is the thread number (from 0 to server.iothreads_num-1), and is

      3 * used by the thread to just manipulate a single sub-array of clients. */

      4 long id = (unsigned long)myid;

      5 char thdname[16];

      6

      7 snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);

      8 redis_set_thread_title(thdname);

      9 // 設置 I/O 線程的 CPU 親和性,盡可能將 I/O 線程(以及主線程,不在這里設置)綁定到用戶配置的

      10 // CPU 列表上。

      11 redisSetCpuAffinity(server.server_cpulist);

      12 makeThreadKillable();

      13

      14 while(1) {

      15 // 忙輪詢,100w 次循環,等待主線程分配 I/O 任務。

      16 for (int j = 0; j < 1000000; j++) {

      17 if (io_threads_pending[id] != 0) break;

      18 }

      19

      20 // 如果 100w 次忙輪詢之后如果還是沒有任務分配給它,則通過嘗試加鎖進入休眠,

      21 // 等待主線程分配任務之后調用 startThreadedIO 解鎖,喚醒 I/O 線程去執行。

      22 if (io_threads_pending[id] == 0) {

      23 pthread_mutex_lock(&io_threads_mutex[id]);

      24 pthread_mutex_unlock(&io_threads_mutex[id]);

      25 continue;

      26 }

      27

      28 serverAssert(io_threads_pending[id] != 0);

      29

      30 if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

      31

      32

      33 // 注意:主線程分配任務給 I/O 線程之時,

      34 // 會把任務加入每個線程的本地任務隊列 io_threads_list[id],

      35 // 但是當 I/O 線程開始執行任務之后,主線程就不會再去訪問這些任務隊列,避免數據競爭。

      36 listIter li;

      37 listNode *ln;

      38 listRewind(io_threads_list[id],&li);

      39 while((ln = listNext(&li))) {

      40 client *c = listNodeValue(ln);

      41 // 如果當前是寫出操作,則把 client 的寫出緩沖區中的數據回寫到客戶端。

      42 if (io_threads_op == IO_THREADS_OP_WRITE) {

      43 writeToClient(c,0);

      44 // 如果當前是讀取操作,則socket 讀取客戶端的請求命令并解析第一條命令。

      45 } else if (io_threads_op == IO_THREADS_OP_READ) {

      46 readQueryFromClient(c->conn);

      47 } else {

      48 serverPanic("io_threads_op value is unknown");

      49 }

      50 }

      51 listEmpty(io_threads_list[id]);

      52 // 所有任務執行完之后把自己的計數器置 0,主線程通過累加所有 I/O 線程的計數器

      53 // 判斷是否所有 I/O 線程都已經完成工作。

      54 io_threads_pending[id] = 0;

      55

      56 if (tio_debug) printf("[%ld] Done\n", id);

      57 }

      58}

      I/O 線程啟動之后,會先進入忙輪詢,判斷原子計數器中的任務數量,如果是非 0 則表示主線程已經給它分配了任務,開始執行任務,否則就一直忙輪詢一百萬次等待,忙輪詢結束之后再查看計數器,如果還是 0,則嘗試加本地鎖,因為主線程在啟動 I/O 線程之時就已經提前鎖住了所有 I/O 線程的本地鎖,因此 I/O 線程會進行休眠,等待主線程喚醒。

      主線程會在每次事件循環中嘗試調用?startThreadedIO?喚醒 I/O 線程去執行任務,如果接收到客戶端請求命令,則 I/O 線程會被喚醒開始工作,根據主線程設置的?io_threads_op?標識去執行命令讀取和解析或者回寫響應數據的任務,I/O 線程在收到主線程通知之后,會遍歷自己的本地任務隊列?io_threads_list[id],取出一個個?client?執行任務:

      如果當前是寫出操作,則調用?writeToClient,通過 socket 把?client->buf?或者?client->reply?里的響應數據回寫到客戶端。

      如果當前是讀取操作,則調用?readQueryFromClient,通過 socket 讀取客戶端命令,存入?client->querybuf,然后調用?processInputBuffer?去解析命令,這里最終只會解析到第一條命令,然后就結束,不會去執行命令。

      在全部任務執行完之后把自己的原子計數器置 0,以告知主線程自己已經完成了工作。

      1void processInputBuffer(client *c) {

      2// 省略代碼

      3...

      4

      5 while(c->qb_pos < sdslen(c->querybuf)) {

      6 /* Return if clients are paused. */

      7 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

      8

      9 /* Immediately abort if the client is in the middle of something. */

      10 if (c->flags & CLIENT_BLOCKED) break;

      11

      12 /* Don't process more buffers from clients that have already pending

      13 * commands to execute in c->argv. */

      14 if (c->flags & CLIENT_PENDING_COMMAND) break;

      15 /* Multibulk processing could see a <= 0 length. */

      16 if (c->argc == 0) {

      17 resetClient(c);

      18 } else {

      19 // 判斷 client 是否具有 CLIENT_PENDING_READ 標識,如果是處于多線程 I/O 的模式下,

      20 // 那么此前已經在 readQueryFromClient -> postponeClientRead 中為 client 打上該標識,

      21 // 則立刻跳出循環結束,此時第一條命令已經解析完成,但是不執行命令。

      22 if (c->flags & CLIENT_PENDING_READ) {

      23 c->flags |= CLIENT_PENDING_COMMAND;

      24 break;

      25 }

      26

      27 // 執行客戶端命令

      28 if (processCommandAndResetClient(c) == C_ERR) {

      29 /* If the client is no longer valid, we avoid exiting this

      30 * loop and trimming the client buffer later. So we return

      31 * ASAP in that case. */

      32 return;

      33 }

      34 }

      35 }

      36

      37...

      38}

      這里需要額外關注 I/O 線程初次啟動時會設置當前線程的 CPU 親和性,也就是綁定當前線程到用戶配置的 CPU 上,在啟動 Redis 服務器主線程的時候同樣會設置 CPU 親和性,Redis 的核心網絡模型引入多線程之后,加上之前的多線程異步任務、多進程(BGSAVE、AOF、BIO、Sentinel 腳本任務等),Redis 現如今的系統并發度已經很大了,而 Redis 本身又是一個對吞吐量和延遲極度敏感的系統,所以用戶需要 Redis 對 CPU 資源有更細粒度的控制,這里主要考慮的是兩方面:CPU 高速緩存和 NUMA 架構。

      首先是 CPU 高速緩存(這里討論的是 L1 Cache 和 L2 Cache 都集成在 CPU 中的硬件架構),這里想象一種場景:Redis 主進程正在 CPU-1 上運行,給客戶端提供數據服務,此時 Redis 啟動了子進程進行數據持久化(BGSAVE 或者 AOF),系統調度之后子進程搶占了主進程的 CPU-1,主進程被調度到 CPU-2 上去運行,導致之前 CPU-1 的高速緩存里的相關指令和數據被汰換掉,CPU-2 需要重新加載指令和數據到自己的本地高速緩存里,浪費 CPU 資源,降低性能。

      因此,Redis 通過設置 CPU 親和性,可以將主進程/線程和子進程/線程綁定到不同的核隔離開來,使之互不干擾,能有效地提升系統性能。

      其次是基于 NUMA 架構的考慮,在 NUMA 體系下,內存控制器芯片被集成到處理器內部,形成 CPU 本地內存,訪問本地內存只需通過內存通道而無需經過系統總線,訪問時延大大降低,而多個處理器之間通過 QPI 數據鏈路互聯,跨 NUMA 節點的內存訪問開銷遠大于本地內存的訪問:

      因此,Redis 通過設置 CPU 親和性,讓主進程/線程盡可能在固定的 NUMA 節點上的 CPU 上運行,更多地使用本地內存而不需要跨節點訪問數據,同樣也能大大地提升性能。

      關于 NUMA 相關知識請讀者自行查閱,篇幅所限這里就不再展開,以后有時間我再單獨寫一篇文章介紹。

      最后還有一點,閱讀過源碼的讀者可能會有疑問,Redis 的多線程模式下,似乎并沒有對數據進行鎖保護,事實上 Redis 的多線程模型是全程無鎖(Lock-free)的,這是通過原子操作+交錯訪問來實現的,主線程和 I/O 線程之間共享的變量有三個:io_threads_pending?計數器、io_threads_op?I/O 標識符和?io_threads_list?線程本地任務隊列。

      io_threads_pending?是原子變量,不需要加鎖保護,io_threads_op?和?io_threads_list?這兩個變量則是通過控制主線程和 I/O 線程交錯訪問來規避共享數據競爭問題:I/O 線程啟動之后會通過忙輪詢和鎖休眠等待主線程的信號,在這之前它不會去訪問自己的本地任務隊列?io_threads_list[id],而主線程會在分配完所有任務到各個 I/O 線程的本地隊列之后才去喚醒 I/O 線程開始工作,并且主線程之后在 I/O 線程運行期間只會訪問自己的本地任務隊列?io_threads_list[0]?而不會再去訪問 I/O 線程的本地隊列,這也就保證了主線程永遠會在 I/O 線程之前訪問?io_threads_list?并且之后不再訪問,保證了交錯訪問。io_threads_op?同理,主線程會在喚醒 I/O 線程之前先設置好?io_threads_op?的值,并且在 I/O 線程運行期間不會再去訪問這個變量。

      性能提升

      Redis 將核心網絡模型改造成多線程模式追求的當然是最終性能上的提升,所以最終還是要以 benchmark 數據見真章:

      測試數據表明,Redis 在使用多線程模式之后性能大幅提升,達到了一倍。更詳細的性能壓測數據可以參閱這篇文章:Benchmarking the experimental Redis Multi-Threaded I/O。

      以下是美圖技術團隊實測的新舊 Redis 版本性能對比圖,僅供參考

      模型缺陷

      首先第一個就是我前面提到過的,Redis 的多線程網絡模型實際上并不是一個標準的 Multi-Reactors/Master-Workers 模型,和其他主流的開源網絡服務器的模式有所區別,最大的不同就是在標準的 Multi-Reactors/Master-Workers 模式下,Sub Reactors/Workers 會完成?網絡讀 -> 數據解析 -> 命令執行 -> 網絡寫?整套流程,Main Reactor/Master 只負責分派任務,而在 Redis 的多線程方案中,I/O 線程任務僅僅是通過 socket 讀取客戶端請求命令并解析,卻沒有真正去執行命令,所有客戶端命令最后還需要回到主線程去執行,因此對多核的利用率并不算高,而且每次主線程都必須在分配完任務之后忙輪詢等待所有 I/O 線程完成任務之后才能繼續執行其他邏輯。

      Redis 之所以如此設計它的多線程網絡模型,我認為主要的原因是為了保持兼容性,因為以前 Redis 是單線程的,所有的客戶端命令都是在單線程的事件循環里執行的,也因此 Redis 里所有的數據結構都是非線程安全的,現在引入多線程,如果按照標準的 Multi-Reactors/Master-Workers 模式來實現,則所有內置的數據結構都必須重構成線程安全的,這個工作量無疑是巨大且麻煩的。

      所以,在我看來,Redis 目前的多線程方案更像是一個折中的選擇:既保持了原系統的兼容性,又能利用多核提升 I/O 性能。

      其次,目前 Redis 的多線程模型中,主線程和 I/O 線程的通信過于簡單粗暴:忙輪詢和鎖,因為通過自旋忙輪詢進行等待,導致 Redis 在啟動的時候以及運行期間偶爾會有短暫的 CPU 空轉引起的高占用率,而且這個通信機制的最終實現看起來非常不直觀和不簡潔,希望后面 Redis 能對目前的方案加以改進。

      總結

      讓我們來回顧一下 Redis 多線程網絡模型的設計方案:

      使用 I/O 線程實現網絡 I/O 多線程化,I/O 線程只負責網絡 I/O 和命令解析,不執行客戶端命令。

      利用原子操作+交錯訪問實現無鎖的多線程模型。

      通過設置 CPU 親和性,隔離主進程和其他子進程,讓多線程網絡模型能發揮最大的性能。

      通讀本文之后,相信讀者們應該能夠了解到一個優秀的網絡系統的實現所涉及到的計算機領域的各種技術:設計模式、網絡 I/O、并發編程、操作系統底層,甚至是計算機硬件。另外還需要對項目迭代和重構的謹慎,對技術方案的深入思考,絕不僅僅是寫好代碼這一個難點。

      博客原文

      Redis 多線程網絡模型全面揭秘

      Redis 任務調度 網絡

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:家具制造業生產管理流程(家具行業生產流程)
      下一篇:如何對單個單元格內,多個數據求和(怎么對多個單元格求和)
      相關文章
      无码国产亚洲日韩国精品视频一区二区三区| 亚洲美女激情视频| 亚洲国产精品久久久久秋霞影院| 亚洲av无码一区二区三区不卡| 一本久久a久久精品亚洲| 亚洲av无码成人精品区在线播放 | 亚洲精品9999久久久久无码| 91在线亚洲综合在线| 亚洲综合小说另类图片动图 | 亚洲人成日本在线观看| 亚洲综合亚洲国产尤物| 亚洲精品国产福利片| 亚洲欧洲日产专区| 亚洲成综合人影院在院播放| 亚洲成综合人影院在院播放| www.亚洲日本| 亚洲日韩精品无码专区加勒比| 亚洲中文字幕久久久一区| 亚洲日韩精品无码专区加勒比☆| 亚洲AV综合色区无码一二三区 | 蜜桃传媒一区二区亚洲AV| 日韩国产欧美亚洲v片 | www亚洲精品少妇裸乳一区二区| 亚洲av无码乱码在线观看野外| 亚洲欧洲日本在线| 国产aⅴ无码专区亚洲av麻豆 | 亚洲乱码无码永久不卡在线 | 久久亚洲一区二区| 在线免费观看亚洲| 亚洲一区二区三区播放在线| 国产精品亚洲综合久久| 亚洲av无码成人影院一区| 国产精品亚洲专区在线播放 | 国产亚洲人成网站在线观看| 国产AV无码专区亚洲Av| 亚洲一区二区三区日本久久九| 亚洲成人网在线播放| 亚洲熟妇成人精品一区| 日韩精品亚洲专区在线观看| 国产成人综合亚洲亚洲国产第一页 | 亚洲乱码一二三四区乱码|