AQS源碼學習

      網友投稿 816 2025-04-02

      AQS源碼解讀

      AQS源碼解讀

      什么是AQS

      AQS能干嘛

      源碼解讀

      重要類屬性

      重要結構:AQS的CLH隊列

      Node類源碼

      從ReentrantLock解讀AQS

      ReentrantLock源碼

      relock的 lock()方法

      從銀行流程理解AQS流程(非公平鎖)

      初始狀態: state = 0

      啟動線程A,默認按照非公平鎖執行。調用relock的lock()方法

      A搶占到鎖,線程A開始辦業務。

      線程B開始運行,調用lock方法

      線程B加入等待隊列

      線程C開始獲取鎖

      線程A釋放鎖

      線程B獲取鎖成功

      公平鎖回溯獲取鎖流程

      視頻參考:AQS源碼解析

      什么是AQS

      AQS全稱為 AbstractQueuedSynchronizer,抽象隊列同步器。

      是用來構建鎖或者其他同步器組件的重量級基礎框架以及整個JUC體系的基石。

      基本原理: 通過內置的FIFO隊列 完成資源獲取線程的排隊工作,并且通過一個int類型變量表示持有鎖的狀態。

      CLH: 等待隊列是“CLH”(Craig、Landin 和 Hagersten)鎖定隊列的變體

      AQS能干嘛

      AQS是一個隊列

      保存獲取、搶占資源失敗的線程數據

      有一定的阻塞等待喚醒機制保證鎖分配

      它將請求共享資源的線程封裝成隊列的結點(Node),通過CAS、自旋以及LockSupportpark)的方式,維護state變量的狀態,使并發達到同步的控制效果。

      源碼解讀

      重要類屬性

      private volatile int state; // 同步狀態

      AQS使用一個volatile的int類型成員變量來表示同步狀態,通過內置的FIFO隊列去完成資源獲取的排隊工作將每條要去搶占資源的線程封裝成一個Node

      節點來實現鎖的分配,通過CAS完成對state值的修改。

      state成員變量相當于銀行辦理業務的受理窗口狀態。

      零就是沒人,自由狀態可以辦理

      大于等于1,有人占用窗口,等著去

      重要結構:AQS的CLH隊列

      CLH隊列是一個雙向隊列

      類比銀行候客區的等待顧客

      官方解釋:

      等待隊列是“CLH”(Craig、Landin和Hagersten)鎖隊列的變體。CLH鎖通常用于旋轉鎖。相反,我們使用它們來阻止同步器,但是使用相同的基本策略,即在其節點的前一個線程中保存一些關于該線程的控制信息。每個節點中的“status”字段跟蹤線程是否應該阻塞。當一個節點的前一個節點釋放時,它會發出信號。否則,隊列的每個節點都充當一個特定的通知樣式監視器,其中包含一個等待線程。狀態字段并不控制線程是否被授予鎖等。如果線程是隊列中的第一個線程,它可能會嘗試獲取。但是,第一并不能保證成功,它只會給人爭取的權利。因此,當前發布的內容線程可能需要重新等待。

      要排隊進入CLH鎖,您可以將其作為新的尾部進行原子拼接。要出列,只需設置head字段。

      有阻塞就需要排隊,排隊就需要隊列。

      AQS: state變量 + CLH變種的雙端隊列

      Node類源碼

      AQS內部類Node源碼

      static final class Node { /** Marker to indicate a node is waiting in shared mode */ // 表示線程以共享的模式等待鎖 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 表示線程正在以獨占的方式等待鎖 static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ // 線程被取消時的狀態 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // 后續線程需要喚醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 等待condition 喚醒 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 共享式同步狀態獲取將會無條件的傳播下去(不懂) static final int PROPAGATE = -3; // 當前節點在隊列中的狀態 // 初始為0 狀態為上面的幾種 // 對于條件節點,該字段被初始化為 CONDITION。 它使用 CAS 修改(或在可能的情況下,無條件 volatile 寫入)。 volatile int waitStatus; // 前驅節點 volatile Node prev; // 后繼節點 volatile Node next; // 處于該節點的線程 volatile Thread thread; // 指向下一個處于CONDITION狀態的線程 Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前驅節點,沒有拋出NPE */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }

      AQS隊列的基本結構

      從ReentrantLock解讀AQS

      下面是relock(簡寫)的基本結構

      public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; // 內部屬性 sync private final Sync sync; // Sync 是一個抽象類,繼承了 AQS abstract static class Sync extends AbstractQueuedSynchronizer { // ... } // 非公平鎖的實現,繼承了 Sync static final class NonfairSync extends Sync { // ... } // 公平鎖的實現,繼承了 Sync static final class FairSync extends Sync { // ... } public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } }

      ReentrantLock默認構造器返回一個 非公平鎖,可以支持傳入一個 布爾類型的值,指定返回一個 非公平鎖 或者 公平鎖。

      public void lock() { sync.lock(); }

      lock方法調用 sync的lock方法,這里面又分為 FairSync 和 NonFairSync

      FairSync 和NonFairSync都重寫了 AQS的lock方法

      FairSync

      // lock調用了 Sync 父類的 acquire方法, acquire方法調用了 重寫后的 tryAcquire方法 final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

      NonFairSync

      final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

      公平鎖與非公平鎖的區別在于 FairSync在獲取同步狀態時多了一個限制條件:!hasQueuedPredecessors()

      hasQueuedPredecessors 判斷了是否需要排隊

      NonFairSync的占鎖流程

      AQS源碼學習

      嘗試加鎖。

      加鎖失敗,線程入隊列。

      線程入隊列后,進入阻塞狀態。

      從銀行流程理解AQS流程(非公平鎖)

      模擬三個線程 A、 B、 C來銀行辦理業務,現在只有一個窗口辦理,所以需要排隊。

      relock .lock () // NonfairSync 方法 final void lock() { if (compareAndSetState(0, 1)) // 嘗試是否可以直接占鎖成功,通過unsafe類的CAS功能 setExclusiveOwnerThread(Thread.currentThread()); // 設置獨占線程,開始是 A else acquire(1); // 占鎖失敗執行 acquire方法, 如線程B } // AQS 方法 public final void acquire(int arg) { // if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // NonfairSync方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // Sync的方法 final boolean nonfairTryAcquire(int acquires) { // 嘗試再次占用 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

      假設線程B、C開始之前,線程A又執行了一次relock的lock方法,會發生什么呢?(可重入鎖)

      final boolean nonfairTryAcquire(int acquires) { // 嘗試再次占用 final Thread current = Thread.currentThread(); // 當前線程是 線程A int c = getState(); // 獲取 state = 1 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 走這里 int nextc = c + acquires; // nextc = 1 + 1 = 2 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); // state 變為 2,則這個地方就需要 unlock兩次,才能將state變為0 return true; } return false; }

      由于此時線程A還在占用,所以compareAndSetState方法失敗,線程B執行acquire方法

      final boolean nonfairTryAcquire(int acquires) { // 嘗試再次占用 final Thread current = Thread.currentThread(); // 這里是線程B int c = getState(); // state == 1 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 獨占線程 線程A 當前線程 線程B A!=B int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; // 直接返回false }

      線程B嘗試獲取鎖失敗,

      if (!tryAcquire(arg) && // 線程B獲取失敗,執行acquireQueued方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

      addWaiter(Node.EXCLUSIVE) > acquireQueued // 創建一個節點,并且將其快速添加到隊尾 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 隊尾節點 if (pred != null) { // 將新節點作為隊尾節點 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速添加到隊尾失敗,則執行 enq方法,隊列為空時。 enq(node); return node; } private Node enq(final Node node) { // 死循環 for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 如果隊尾為空,插入一個空的節點,也就是傀儡節點 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { // 真正插入我們需要的節點,也就是包含線程B引用的節點 t.next = node; return t; } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 返回前一個節點,對于線程B來說,p就是傀儡節點 // p == head 滿足條件, 由于A還在占用,所以tryAcquire返回一個false // 走下面的if //第二次循環,假設線程A繼續正在工作,下面的if語塊還是不執行 if (p == head && tryAcquire(arg)) { setHead(node);// 將附帶線程B的節點變成新的傀儡節點,去掉 前驅指針 p.next = null; // help GC //置空原傀儡指針與新的傀儡節點之間的后驅指針,方便GC回收 failed = false; return interrupted; } // shouldParkAfterFailedAcquire if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 通過parkAndCheckInterrupt()使用了 LockSupport阻塞了線程B // 下次線程被喚醒從這個地方開始執行。 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // p指向傀儡節點,p.waitStatus = 0 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // -1 return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 第一次走這里,將p的waitStatus設為 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

      雙向列表中,第一個節點為虛節點,也叫哨兵節點,其實不存儲任何信息,只是占位

      由于線程A還在使用,線程C與線程B一樣,同樣會被阻塞。

      線程A調用完成后,需要調用lock.unlock()方法

      // relock public void unlock() { sync.release(1); } // AQS的方法 public final boolean release(int arg) { if (tryRelease(arg)) { // Node h = head; // 返回傀儡節點 if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 設置傀儡節點狀態為 0 ,并且 喚醒傀儡節點的下一個節點。 return true; } return false; } // Relock Sync的方法 protected final boolean tryRelease(int releases) { // releases = 1 int c = getState() - releases; // c = 1 - 1 = 0 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); // c=0 設置獨占線程為null } setState(c); return free; } private void unparkSuccessor(Node node) { // p表示傀儡節點 int ws = node.waitStatus; if (ws < 0) // waitStatus為-1 滿足,改為0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒傀儡節點的下一個節點, 帶有線程B的節點。 LockSupport.unpark(s.thread); }

      private void setHead(Node node) { head = node; node.thread = null; // 傀儡節點的線程去掉 node.prev = null; }

      公平鎖回溯獲取鎖流程

      更多細節參考:(11條消息) AQS深入理解 hasQueuedPredecessors源碼分析 JDK8_anlian523的博客-CSDN博客

      FairSync執行lock()方法

      final void lock() { acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 這個地方多了一個 hasQueuedPredecessors() 通過這個方法來判斷有沒有線程排在了當前線程前面 // 只有這個方法返回false的時候,才會通過CAS占用鎖 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 公平鎖獨有 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // 1. head 和 tail都為null,說明隊列為空,可以占用鎖,false // 2. head 和 tail都指向同一個Node,也返回false 說明只有 一個 dummy node(傀儡節點) return h != t && // 3. (s = h.next == null) 為false 分多鐘情況,參見前面的參考連接 // 4. (s.thread != Thread.currentThread()) 為false 分多鐘情況,參見前面的參考連接 ((s = h.next) == null || s.thread != Thread.currentThread()); }

      提問:為什么先讀取 tail的值,再讀取 head的值?

      答案: 去看參考連接吧,主要是為了 避免 (s = h.next) == null 報空指針異常

      Java 任務調度

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:如何為word2007創建自定義表格樣式(word文檔怎么新建表格樣式)
      下一篇:Excel2016怎么給頁眉頁腳添加文件路徑
      相關文章
      亚洲成AV人片在WWW| 风间由美在线亚洲一区| 久久久久亚洲?V成人无码| 亚洲AV无码成H人在线观看| 日韩欧美亚洲中文乱码| 亚洲AV无码AV吞精久久| 亚洲精品V天堂中文字幕| 亚洲av日韩精品久久久久久a| 亚洲午夜精品一区二区麻豆| 亚洲人成网站色在线观看| 国产亚洲精品美女久久久 | 亚洲伊人成无码综合网| 亚洲人成色77777在线观看大| 亚洲va中文字幕无码| 亚洲福利精品一区二区三区| 亚洲精品视频久久久| 亚洲日韩中文在线精品第一| 在线观看亚洲成人| 亚洲成a人片77777kkkk| 亚洲国产精品久久久久婷婷软件| 亚洲国产综合专区电影在线| 亚洲色图黄色小说| 亚洲国产av一区二区三区丶| 亚洲一区二区三区免费在线观看| 中文字幕 亚洲 有码 在线 | 亚洲自偷自偷在线制服| 国产成人精品日本亚洲| 亚洲天天在线日亚洲洲精| 亚洲成aⅴ人片在线观| 亚洲一区无码中文字幕乱码| 久久综合久久综合亚洲| 亚洲国产精华液2020| mm1313亚洲精品无码又大又粗| 亚洲国产小视频精品久久久三级 | 亚洲免费视频一区二区三区| 亚洲熟女少妇一区二区| 亚洲成a人片在线观看中文动漫| 久久久婷婷五月亚洲97号色| 国产精品亚洲精品青青青| 亚洲AV日韩综合一区| 国产亚洲成人久久|