AQS源碼探究_09 Semaphore源碼分析

      網友投稿 797 2025-04-02

      文章參考:小劉老師講源碼


      1、簡介

      AQS源碼探究_09 Semaphore源碼分析

      Semaphore,信號量,它保存了一系列的許可(permits),每次調用acquire()都將消耗一個許可,每次調用release()都將歸還一個許可。

      Semaphore通常用于限制同一時間對共享資源的訪問次數上,也就是常說的限流。

      Semaphore信號量,獲取通行證流程圖:

      2、入門案例

      案例1:Pool.java

      /** * date: 2021/5/10 * @author csp */ public class Pool { /** * 可同時訪問資源的最大線程數 */ private static final int MAX_AVAILABLE = 100; /** * 信號量 表示:可獲取的對象通行證 */ private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); /** * 共享資源,可以想象成 items 數組內存儲的都是Connection對象 模擬是連接池 */ protected Object[] items = new Object[MAX_AVAILABLE]; /** * 共享資源占用情況,與items數組一一對應,比如: * items[0]對象被外部線程占用,那么 used[0] == true,否則used[0] == false */ protected boolean[] used = new boolean[MAX_AVAILABLE]; /** * 獲取一個空閑對象 * 如果當前池中無空閑對象,則等待..直到有空閑對象為止 */ public Object getItem() throws InterruptedException { // 每次調用acquire()都將消耗一個許可(permits) available.acquire(); return getNextAvailableItem(); } /** * 歸還對象到池中 */ public void putItem(Object x) { if (markAsUnused(x)) available.release(); } /** * 獲取池內一個空閑對象,獲取成功則返回Object,失敗返回Null * 成功后將對應的 used[i] = true */ private synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; } /** * 歸還對象到池中,歸還成功返回true * 歸還失敗: * 1.池中不存在該對象引用,返回false * 2.池中存在該對象引用,但該對象目前狀態為空閑狀態,也返回false */ private synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } }

      案例2:SemaphoreTest02.java

      /** * date: 2020/5/10 * @author csp */ public class SemaphoreTest02 { public static void main(String[] args) throws InterruptedException { // 聲明信號量,初始的許可(permits)為2 // 公平模式:fair為true final Semaphore semaphore = new Semaphore(2, true); Thread tA = new Thread(() ->{ try { // 每次調用acquire()都將消耗一個許可(permits) semaphore.acquire(); System.out.println("線程A獲取通行證成功"); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { }finally { // 每次調用release()都將歸還一個許可(permits) semaphore.release(); } }); tA.start(); // 確保線程A已經執行 TimeUnit.MILLISECONDS.sleep(200); Thread tB = new Thread(() ->{ try { // 調用acquire(2)都將消耗2個許可(permits) semaphore.acquire(2); System.out.println("線程B獲取通行證成功"); } catch (InterruptedException e) { }finally { // 調用release(2)都將歸還2個許可(permits) semaphore.release(2); } }); tB.start(); // 確保線程B已經執行 TimeUnit.MILLISECONDS.sleep(200); Thread tC = new Thread(() ->{ try { // 每次調用acquire()都將消耗一個許可(permits) semaphore.acquire(); System.out.println("線程C獲取通行證成功"); } catch (InterruptedException e) { }finally { // 每次調用release()都將歸還一個許可(permits) semaphore.release(); } }); tC.start(); } }

      執行結果:

      線程A獲取通行證成功 線程B獲取通行證成功 線程C獲取通行證成功

      3、源碼分析

      內部類Sync

      通過Sync的幾個實現方法,我們獲取到以下幾點信息:

      許可是在構造方法時傳入的;

      許可存放在狀態變量state中;

      嘗試獲取一個許可的時候,則state的值減1;

      當state的值為0的時候,則無法再獲取許可;

      釋放一個許可的時候,則state的值加1;

      許可的個數可以動態改變;

      abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 構造方法,傳入許可次數,放入state中 Sync(int permits) { setState(permits); } // 獲取許可次數 final int getPermits() { return getState(); } // 非公平模式嘗試獲取許可 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 先看看還有幾個許可 int available = getState(); // 減去這次需要獲取的許可還剩下幾個許可 int remaining = available - acquires; // 如果剩余許可小于0了則直接返回 // 如果剩余許可不小于0,則嘗試原子更新state的值,成功了返回剩余許可 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 釋放許可 protected final boolean tryReleaseShared(int releases) { for (;;) { // 先看看還有幾個許可 int current = getState(); // 加上這次釋放的許可 int next = current + releases; // 檢測溢出 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 如果原子更新state的值成功,就說明釋放許可成功,則返回true if (compareAndSetState(current, next)) return true; } } // 減少許可 final void reducePermits(int reductions) { for (;;) { // 先看看還有幾個許可 int current = getState(); // 減去將要減少的許可 int next = current - reductions; // 檢測溢出 if (next > current) // underflow throw new Error("Permit count underflow"); // 原子更新state的值,成功了返回true if (compareAndSetState(current, next)) return; } } // 銷毀許可 final int drainPermits() { for (;;) { // 先看看還有幾個許可 int current = getState(); // 如果為0,直接返回 // 如果不為0,把state原子更新為0 if (current == 0 || compareAndSetState(current, 0)) return current; } } }

      內部類NonfairSync

      非公平模式下,直接調用父類的nonfairTryAcquireShared()嘗試獲取許可。

      static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; // 構造方法,調用父類的構造方法 NonfairSync(int permits) { super(permits); } // 嘗試獲取許可,調用父類的nonfairTryAcquireShared()方法 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }

      內部類FairSync

      公平模式下,先檢測前面是否有排隊的,如果有排隊的則獲取許可失敗,進入隊列排隊,否則嘗試原子更新state的值。

      **注意:**為了閱讀方便,該內部類中將一些AQS中的方法粘貼過來了,在方法頭注釋加有標注!

      static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } /** * 該方法位于AQS中: * 嘗試獲取通行證,獲取成功返回 >= 0的值; * 獲取失敗 返回 < 0 值 */ protected int tryAcquireShared(int acquires) { for (;;) { // 判斷當前 AQS 阻塞隊列內 是否有等待者線程,如果有直接返回-1,表示當前aquire操作的線程需要進入到隊列等待.. if (hasQueuedPredecessors()) return -1; // 執行到這里,有哪幾種情況? // 1.調用aquire時 AQS阻塞隊列內沒有其它等待者 // 2.當前節點 在阻塞隊列內是headNext節點 // 獲取state ,state這里表示 通行證 int available = getState(); // remaining 表示當前線程 獲取通行證完成之后,semaphore還剩余數量 int remaining = available - acquires; // 條件一:remaining < 0 成立,說明線程獲取通行證失敗.. // 條件二:前置條件,remaning >= 0, CAS更新state 成功,說明線程獲取通行證成功,CAS失敗,則自旋。 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * 該方法位于AQS中: */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 條件成立:說明當前調用acquire方法的線程 已經是 中斷狀態了,直接拋出異常.. if (Thread.interrupted()) throw new InterruptedException(); // 對應業務層面 執行任務的線程已經將latch打破了。然后其他再調用latch.await的線程,就不會在這里阻塞了 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /** * 該方法位于AQS中: */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 將調用Semaphore.aquire方法的線程 包裝成node加入到 AQS的阻塞隊列當中。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 獲取當前線程節點的前驅節點 final Node p = node.predecessor(); // 條件成立,說明當前線程對應的節點 為 head.next節點 if (p == head) { // head.next節點就有權利獲取 共享鎖了.. int r = tryAcquireShared(arg); // 站在Semaphore角度:r 表示還剩余的通行證數量 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // shouldParkAfterFailedAcquire 會給當前線程找一個好爸爸,最終給爸爸節點設置狀態為 signal(-1),返回true // parkAndCheckInterrupt 掛起當前節點對應的線程... if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * 該方法位于AQS中: * 設置當前節點為 head節點,并且向后傳播!(依次喚醒!) */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 將當前節點設置為 新的 head節點。 setHead(node); // 調用setHeadAndPropagete 時 propagate == 1 一定成立 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 獲取當前節點的后繼節點.. Node s = node.next; // 條件一:s == null 什么時候成立呢? 當前node節點已經是 tail了,條件一會成立。 doReleaseShared() 里面會處理這種情況.. // 條件二:前置條件,s != null , 要求s節點的模式必須是 共享模式。 latch.await() -> addWaiter(Node.SHARED) if (s == null || s.isShared()) // 基本上所有情況都會執行到 doReleasseShared() 方法。 doReleaseShared(); } } //AQS.releaseShared 該方法位于AQS中: public final boolean releaseShared(int arg) { // 條件成立:表示當前線程釋放資源成功,釋放資源成功后,去喚醒獲取資源失敗的線程.. if (tryReleaseShared(arg)) { // 喚醒獲取資源失敗的線程... doReleaseShared(); return true; } return false; } /** * 喚醒獲取資源失敗的線程 * * CountDownLatch版本 * 都有哪幾種路徑會調用到doReleaseShared方法呢? * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 喚醒當前阻塞隊列內的 head.next 對應的線程。 * 2.被喚醒的線程 -> doAcquireSharedInterruptibly parkAndCheckInterrupt() 喚醒 -> setHeadAndPropagate() -> doReleaseShared() * * Semaphore版本 * 都有哪幾種路徑會調用到doReleaseShared方法呢? * */ //AQS.doReleaseShared 該方法位于AQS中: private void doReleaseShared() { for (;;) { // 獲取當前AQS 內的 頭結點 Node h = head; // 條件一:h != null 成立,說明阻塞隊列不為空.. // 不成立:h == null 什么時候會是這樣呢? // latch創建出來后,沒有任何線程調用過 await() 方法之前,有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯.. // 條件二:h != tail 成立,說明當前阻塞隊列內,除了head節點以外 還有其他節點。 // h == tail -> head 和 tail 指向的是同一個node對象。 什么時候會有這種情況呢? // 1. 正常喚醒情況下,依次獲取到 共享鎖,當前線程執行到這里時 (這個線程就是 tail 節點。) // 2. 第一個調用await()方法的線程 與 調用countDown()且觸發喚醒阻塞節點的線程 出現并發了.. // 因為await()線程是第一個調用 latch.await()的線程,此時隊列內什么也沒有,它需要補充創建一個Head節點,然后再次自旋時入隊 // 在await()線程入隊完成之前,假設當前隊列內 只有 剛剛補充創建的空元素 head 。 // 同期,外部有一個調用countDown()的線程,將state 值從1,修改為0了,那么這個線程需要做 喚醒 阻塞隊列內元素的邏輯.. // 注意:調用await()的線程 因為完全入隊完成之后,再次回到上層方法 doAcquireSharedInterruptibly 會進入到自旋中, // 獲取當前元素的前驅,判斷自己是head.next, 所以接下來該線程又會將自己設置為 head,然后該線程就從await()方法返回了... if (h != null && h != tail) { // 執行到if里面,說明當前head 一定有 后繼節點! int ws = h.waitStatus; // 當前head狀態 為 signal 說明 后繼節點并沒有被喚醒過呢... if (ws == Node.SIGNAL) { // 喚醒后繼節點前 將head節點的狀態改為 0 // 這里為什么,使用CAS呢? 回頭說... // 當doReleaseShared方法 存在多個線程 喚醒 head.next 邏輯時, // CAS 可能會失敗... // 案例: // t3 線程在if(h == head) 返回false時,t3 會繼續自旋. 參與到 喚醒下一個head.next的邏輯.. // t3 此時執行到 CAS WaitStatus(h,Node.SIGNAL, 0) 成功.. t4 在t3修改成功之前,也進入到 if (ws == Node.SIGNAL) 里面了, // 但是t4 修改 CAS WaitStatus(h,Node.SIGNAL, 0) 會失敗,因為 t3 改過了... if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒后繼節點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 條件成立: // 1.說明剛剛喚醒的 后繼節點,還沒執行到 setHeadAndPropagate方法里面的 設置當前喚醒節點為head的邏輯。 // 這個時候,當前線程 直接跳出去...結束了.. // 此時用不用擔心,喚醒邏輯 在這里斷掉呢?、 // 不需要擔心,因為被喚醒的線程 早晚會執行到doReleaseShared方法。 // 2.h == null latch創建出來后,沒有任何線程調用過 await() 方法之前, // 有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯.. // 3.h == tail -> head 和 tail 指向的是同一個node對象 // 條件不成立: // 被喚醒的節點 非常積極,直接將自己設置為了新的head,此時 喚醒它的節點(前驅),執行h == head 條件會不成立.. // 此時 head節點的前驅,不會跳出 doReleaseShared 方法,會繼續喚醒 新head 節點的后繼... if (h == head) // loop if head changed break; } } }

      構造方法

      創建Semaphore時需要傳入許可次數。Semaphore默認也是非公平模式,但是你可以調用第二個構造方法聲明其為公平模式。

      // 構造方法,創建時要傳入許可次數,默認使用非公平模式 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 構造方法,需要傳入許可次數,及是否公平模式 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

      acquire()方法

      獲取一個許可,默認使用的是可中斷方式,如果嘗試獲取許可失敗,會進入AQS的隊列中排隊。

      public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 獲取一個許可,非中斷方式,如果嘗試獲取許可失敗,會進入AQS的隊列中排隊。 public void acquireUninterruptibly() { sync.acquireShared(1); }

      acquire(int permits)方法

      一次獲取多個許可,可中斷方式。

      public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } // 一次獲取多個許可,非中斷方式。 public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }

      tryAcquire()方法

      嘗試獲取一個許可,使用Sync的非公平模式嘗試獲取許可方法,不論是否獲取到許可都返回,只嘗試一次,不會進入隊列排隊。

      public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } // 嘗試獲取一個許可,先嘗試一次獲取許可,如果失敗則會等待timeout時間,這段時間內都沒有獲取到許可,則返回false,否則返回true; public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }

      release()方法

      釋放一個許可,釋放一個許可時state的值會加1,并且會喚醒下一個等待獲取許可的線程。

      public void release() { sync.releaseShared(1); }

      release(int permits)方法

      一次釋放多個許可,state的值會相應增加permits的數量。

      public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }

      4、小結

      Semaphore,也叫信號量,通常用于控制同一時刻對共享資源的訪問上,也就是限流場景;

      Semaphore的內部實現是基于AQS的共享鎖來實現的;

      Semaphore初始化的時候需要指定許可的次數,許可的次數是存儲在state中;

      獲取一個許可時,則state值減1;

      釋放一個許可時,則state值加1;

      await 任務調度

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:如何選擇食品制造ERP軟件
      下一篇:excel矩陣數據在工作表中繪制線條?
      相關文章
      亚洲国产成人AV在线播放| 亚洲AV成人无码久久精品老人| 亚洲成人在线网站| 国产亚洲日韩在线三区| 夜色阁亚洲一区二区三区| 亚洲丁香婷婷综合久久| 亚洲精品无码久久久久YW| 亚洲av永久无码天堂网| 亚洲精品国产精品| 亚洲AV无码精品国产成人| 亚洲精品理论电影在线观看| 亚洲精品V天堂中文字幕| 亚洲一区二区无码偷拍| 亚洲熟妇无码八V在线播放| 亚洲熟妇AV一区二区三区宅男| 亚洲人成影院77777| 456亚洲人成在线播放网站| 亚洲综合中文字幕无线码| 亚洲精品动漫免费二区| 亚洲av成人无码网站…| 日韩精品电影一区亚洲| 亚洲精品无码久久不卡| 国产亚洲精品成人AA片新蒲金 | 亚洲综合国产一区二区三区| 国产午夜亚洲精品午夜鲁丝片| 色噜噜AV亚洲色一区二区| 中文字幕精品亚洲无线码二区| 亚洲综合精品香蕉久久网| 亚洲精品乱码久久久久久按摩 | 亚洲另类自拍丝袜第五页| 亚洲精品国产suv一区88| 人人狠狠综合久久亚洲高清| 亚洲偷自拍拍综合网| 国产美女亚洲精品久久久综合| 国产亚洲一区二区三区在线观看| 亚洲AV无码日韩AV无码导航 | 亚洲高清日韩精品第一区| 亚洲人成人77777网站不卡| 亚洲中文字幕久久精品无码VA| 午夜亚洲国产理论片二级港台二级| 婷婷亚洲综合五月天小说在线 |