手撕環形隊列系列二:無鎖實現高并發

      網友投稿 986 2022-05-30

      本文是手撕環形隊列系列的第二篇,之前的文章鏈接如下:

      《手撕環形隊列》

      前面文章介紹的是一個比較基本的環形隊列,能夠在多線程中使用,但有一個前提:

      任意時刻,生產者和消費者最多都只能有一個。

      也就是說,如果有多個生產者要并發向隊列中寫入,需要在外部進行加鎖或其它方式的并發控制,保證任意時刻最多只有一個生產者真正向環形隊列進行寫入。同樣的,多個消費者要從隊列中讀取進行消費,也需要在外部進行加鎖或其它方式的并發控制,保證任意時刻最多只有一個消費者從環形隊列進行讀取。

      本文的內容,就是介紹如何能夠支持多線程場景下,多生產者并發寫入、多消費者并發讀取,完全由環形隊列內部來解決,無需外部做任何額外的控制。并且,使用無鎖的技術來實現,從而避免加鎖解鎖這種重操作對性能的影響。

      無鎖數據結構中,主要的技術實現手段是使用cpu的原子指令。介紹原子指令之前,先介紹一下沒有原子指令的情況下會有什么問題。

      通常我們在程序源碼中寫的語句,編譯為二進制后,代碼中的一行文本語句會變成二進制的多條匯編指令,因此這一行文本語句cpu執行時就不是原子的。多行文本語句,就更不是原子的了。多線程并發執行這些文本語句時,對應的多行匯編語句會在多個cpu 核上同時執行,無法保證他們之間的執行先后順序關系。在多線程同時讀寫一個共享數據時,會發生各種誤判,導致錯誤的結果。

      以環形隊列為例,來說明這個問題:

      環形隊列為初始狀態,隊列為空。兩個生產者線程都要向隊列進行寫入,都調用 ring_queue_push()方法。這個方法的函數實現中,producer1 線程讀取tail 為0,producer2 線程也讀取到tail為0。然后producer1 向位置0寫入數據,然后把tail 增1,tail變為1。

      producer2 也向位置0寫入數據,然后把tail 增1. tai增加1的過程:

      tail = tail + 1;

      由于producer2 初始讀取的tail 值為0,這個cpu core 可能意識不到tail 已經被別的線程修改了,因此還認為tail是0,因此最終

      tail = 0 + 1 = 1;

      最終的結果,producer2 把producer1的數據給覆蓋了(數據丟了),但兩個ring_queue_push()函數調用都返回成功了。這是一個嚴重的Bug!

      實際多線程環境中,各個cpu 之間的代碼執行時序都是不同的,因此沒有任何防護的情況下,對同樣的內存位置寫入、對同一個變量的并發讀和并發寫,都會產生嚴重的Bug。

      為了解決這些問題,原子指令閃亮登場了!

      用這些指令,對數據的操作在多cpu的情況下也是原子性的。所謂原子性,就是作為執行的最小單位,不能再分割。cpu core 要么執行了這個指令,要么還沒執行這個指令。不會出現在一個cpu core 執行這個指令一半的時候,另外一個cpu core開始執行這個指令的情況。

      通過正確使用cpu的原子指令,能夠有效解決多線程并發中的各種問題。

      在解決多線程并發問題,常規的方法是用mutex、semaphore、condvar等,這些可以理解為粗粒度鎖,使用簡單,適用范圍廣,但性能較差。

      cpu的原子指令,是cpu指令級的細粒度鎖,性能非常高,但設計起來復雜。

      各種操作系統、開發語言中都提供了對cpu原子指令的包裝函數,因此不需要我們手寫匯編指令。

      以gcc為例,gcc提供了 一系列builtin 的原子函數,比如今天我們要用的:

      bool __sync_bool_compare_and_swap(type *ptr, type oldval, type newval);

      這個函數,會將 ptr指向內存中的值,與oldval 比較,如果相等,則把 ptr指向內存的值修改為 newval. 整個比較和修改的全過程,要以原子方式完成。如果比較相等,并且修改成功,則返回true。其它情況都返回false。

      這個函數,也叫 cas,取的是 compare and swap 三個單詞的首字母縮寫。

      我們用原子指令,來增強一下環形隊列,實現多生產多消費者并發讀寫。思路如下:

      對于寫入,每個producer 必須先獲得寫鎖。成功獲得寫鎖之后,寫入數據,將tail移動到下一個位置,最后釋放寫鎖。

      對于讀取,每個consumer 必須先獲得讀鎖。成功獲得讀鎖之后,讀取數據,將head移動到下一個位置,最后釋放讀鎖。

      整個思路,與傳統通過mutex控制對共享數據的讀寫是完全一樣的,只是技術實現上我們用原子指令來實現,這種實現方式叫無鎖數據結構。

      手撕環形隊列系列二:無鎖實現高并發

      另外,需要說明的是:

      對于head和tail這樣的變量,由于多個線程會并發讀寫,因此我們需要用 volatile 來修飾它們,不讓cpu core 緩存它們,避免讀到舊數據。

      無鎖環形隊列,支持多生產者多消費者并發讀寫,用C語言實現的源碼如下:

      // ring_queue.h #ifndef RING_QUEUE_H #define RING_QUEUE_H typedef struct ring_queue_t { char* pbuf; int item_size; int capacity; volatile int write_flag; volatile int read_flag; volatile int head; volatile int tail; volatile int same_cycle; } ring_queue_t; int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity); void ring_queue_destroy(ring_queue_t* pqueue); int ring_queue_push(ring_queue_t* pqueue, void* pitem); int ring_queue_pop(ring_queue_t* pqueue, void* pitem); int ring_queue_is_empty(ring_queue_t* pqueue); int ring_queue_is_full(ring_queue_t* pqueue); #endif

      // ring_queue.c #include "ring_queue.h" #include #include #define CAS(ptr, old, new) __sync_bool_compare_and_swap(ptr, old, new) int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity) { memset(pqueue, 0, sizeof(*pqueue)); pqueue->pbuf = (char*)malloc(item_size * capacity); if (!pqueue->pbuf) { return -1; } pqueue->item_size = item_size; pqueue->capacity = capacity; pqueue->same_cycle = 1; return 0; } void ring_queue_destroy(ring_queue_t* pqueue) { free(pqueue->pbuf); memset(pqueue, 0, sizeof(*pqueue)); } int ring_queue_push(ring_queue_t* pqueue, void* pitem) { // try to set write flag while (1) { if (ring_queue_is_full(pqueue)) { return -1; } if (CAS(&pqueue->write_flag, 0, 1)) { // set write flag successfully break; } } // push data memcpy(pqueue->pbuf + pqueue->tail * pqueue->item_size, pitem, pqueue->item_size); pqueue->tail = (pqueue->tail + 1) % pqueue->capacity; if (0 == pqueue->tail) { // a new cycle pqueue->same_cycle = 0; // tail is not the same cycle with head } // reset write flag CAS(&pqueue->write_flag, 1, 0); return 0; } int ring_queue_pop(ring_queue_t* pqueue, void* pitem) { // try to set read flag while (1) { if (ring_queue_is_empty(pqueue)) { return -1; } if (CAS(&pqueue->read_flag, 0, 1)) { // set read flag successfully break; } } // read data memcpy(pitem, pqueue->pbuf + pqueue->head * pqueue->item_size, pqueue->item_size); pqueue->head = (pqueue->head + 1) % pqueue->capacity; if (0 == pqueue->head) { pqueue->same_cycle = 1; // head is now the same cycle with tail } // reset read flag CAS(&pqueue->read_flag, 1, 0); return 0; } int ring_queue_is_empty(ring_queue_t* pqueue) { return (pqueue->head == pqueue->tail) && pqueue->same_cycle; } int ring_queue_is_full(ring_queue_t* pqueue) { return (pqueue->head == pqueue->tail) && !pqueue->same_cycle; }

      我的微信號是 實力程序員,歡迎大家轉發至朋友圈,分享給更多的朋友。

      任務調度 多線程

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:萬字長文帶你詳解九大數據存儲引擎結構(上)
      下一篇:如何消除代碼屎山中的一大坨參數列表?
      相關文章
      亚洲中文字幕久久精品无码APP | 亚洲中文字幕在线观看| 亚洲风情亚Aⅴ在线发布| 国产精品亚洲综合五月天| 亚洲色大成网站www永久| 亚洲第一福利视频| 亚洲成人激情在线| 久久亚洲AV无码精品色午夜麻| 综合久久久久久中文字幕亚洲国产国产综合一区首 | 精品亚洲av无码一区二区柚蜜| 国内精品久久久久影院亚洲| 亚洲jjzzjjzz在线播放| 亚洲国产电影在线观看| 亚洲人成网站日本片| 国产L精品国产亚洲区久久| 亚洲Av无码国产情品久久| 午夜亚洲av永久无码精品| 免费在线观看亚洲| 亚洲第一页日韩专区| 人人狠狠综合久久亚洲高清| 亚洲aⅴ无码专区在线观看春色| 色偷偷尼玛图亚洲综合| 另类图片亚洲校园小说区| 亚洲男人的天堂在线va拉文| 浮力影院亚洲国产第一页| 亚洲乱色熟女一区二区三区丝袜 | 五月婷婷亚洲综合| 亚洲国产精品无码久久久久久曰| 亚洲国产一区二区三区| 国产亚洲自拍一区| 亚洲AV无码乱码国产麻豆| 精品亚洲国产成AV人片传媒| 亚洲福利电影在线观看| 亚洲娇小性xxxx色| 亚洲GV天堂GV无码男同| 亚洲va中文字幕无码| 亚洲中文久久精品无码ww16| 久久精品亚洲视频| 亚洲日韩乱码中文无码蜜桃臀| 亚洲综合丁香婷婷六月香| 亚洲av永久无码精品网址|