AQS源碼學習
AQS源碼解讀
AQS源碼解讀
什么是AQS
AQS能干嘛
源碼解讀
重要類屬性
重要結構:AQS的CLH隊列
Node類源碼
從ReentrantLock解讀AQS
ReentrantLock源碼
relock的 lock()方法
從銀行流程理解AQS流程(非公平鎖)
初始狀態: state = 0
啟動線程A,默認按照非公平鎖執行。調用relock的lock()方法
A搶占到鎖,線程A開始辦業務。
線程B開始運行,調用lock方法
線程B加入等待隊列
線程C開始獲取鎖
線程A釋放鎖
線程B獲取鎖成功
公平鎖回溯獲取鎖流程
視頻參考:AQS源碼解析
什么是AQS
AQS全稱為 AbstractQueuedSynchronizer,抽象隊列同步器。
是用來構建鎖或者其他同步器組件的重量級基礎框架以及整個JUC體系的基石。
基本原理: 通過內置的FIFO隊列 完成資源獲取線程的排隊工作,并且通過一個int類型變量表示持有鎖的狀態。
CLH: 等待隊列是“CLH”(Craig、Landin 和 Hagersten)鎖定隊列的變體
AQS能干嘛
AQS是一個隊列
保存獲取、搶占資源失敗的線程數據
有一定的阻塞等待喚醒機制保證鎖分配
它將請求共享資源的線程封裝成隊列的結點(Node),通過CAS、自旋以及LockSupportpark)的方式,維護state變量的狀態,使并發達到同步的控制效果。
源碼解讀
重要類屬性
private volatile int state; // 同步狀態
AQS使用一個volatile的int類型成員變量來表示同步狀態,通過內置的FIFO隊列去完成資源獲取的排隊工作將每條要去搶占資源的線程封裝成一個Node
節點來實現鎖的分配,通過CAS完成對state值的修改。
state成員變量相當于銀行辦理業務的受理窗口狀態。
零就是沒人,自由狀態可以辦理
大于等于1,有人占用窗口,等著去
重要結構:AQS的CLH隊列
CLH隊列是一個雙向隊列
類比銀行候客區的等待顧客
官方解釋:
等待隊列是“CLH”(Craig、Landin和Hagersten)鎖隊列的變體。CLH鎖通常用于旋轉鎖。相反,我們使用它們來阻止同步器,但是使用相同的基本策略,即在其節點的前一個線程中保存一些關于該線程的控制信息。每個節點中的“status”字段跟蹤線程是否應該阻塞。當一個節點的前一個節點釋放時,它會發出信號。否則,隊列的每個節點都充當一個特定的通知樣式監視器,其中包含一個等待線程。狀態字段并不控制線程是否被授予鎖等。如果線程是隊列中的第一個線程,它可能會嘗試獲取。但是,第一并不能保證成功,它只會給人爭取的權利。因此,當前發布的內容線程可能需要重新等待。
要排隊進入CLH鎖,您可以將其作為新的尾部進行原子拼接。要出列,只需設置head字段。
有阻塞就需要排隊,排隊就需要隊列。
AQS: state變量 + CLH變種的雙端隊列
Node類源碼
AQS內部類Node源碼
static final class Node { /** Marker to indicate a node is waiting in shared mode */ // 表示線程以共享的模式等待鎖 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 表示線程正在以獨占的方式等待鎖 static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ // 線程被取消時的狀態 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // 后續線程需要喚醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 等待condition 喚醒 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 共享式同步狀態獲取將會無條件的傳播下去(不懂) static final int PROPAGATE = -3; // 當前節點在隊列中的狀態 // 初始為0 狀態為上面的幾種 // 對于條件節點,該字段被初始化為 CONDITION。 它使用 CAS 修改(或在可能的情況下,無條件 volatile 寫入)。 volatile int waitStatus; // 前驅節點 volatile Node prev; // 后繼節點 volatile Node next; // 處于該節點的線程 volatile Thread thread; // 指向下一個處于CONDITION狀態的線程 Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前驅節點,沒有拋出NPE */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
AQS隊列的基本結構
從ReentrantLock解讀AQS
下面是relock(簡寫)的基本結構
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; // 內部屬性 sync private final Sync sync; // Sync 是一個抽象類,繼承了 AQS abstract static class Sync extends AbstractQueuedSynchronizer { // ... } // 非公平鎖的實現,繼承了 Sync static final class NonfairSync extends Sync { // ... } // 公平鎖的實現,繼承了 Sync static final class FairSync extends Sync { // ... } public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } }
ReentrantLock默認構造器返回一個 非公平鎖,可以支持傳入一個 布爾類型的值,指定返回一個 非公平鎖 或者 公平鎖。
public void lock() { sync.lock(); }
lock方法調用 sync的lock方法,這里面又分為 FairSync 和 NonFairSync
FairSync 和NonFairSync都重寫了 AQS的lock方法
FairSync
// lock調用了 Sync 父類的 acquire方法, acquire方法調用了 重寫后的 tryAcquire方法 final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
NonFairSync
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
公平鎖與非公平鎖的區別在于 FairSync在獲取同步狀態時多了一個限制條件:!hasQueuedPredecessors()
hasQueuedPredecessors 判斷了是否需要排隊
NonFairSync的占鎖流程
嘗試加鎖。
加鎖失敗,線程入隊列。
線程入隊列后,進入阻塞狀態。
從銀行流程理解AQS流程(非公平鎖)
模擬三個線程 A、 B、 C來銀行辦理業務,現在只有一個窗口辦理,所以需要排隊。
relock .lock () // NonfairSync 方法 final void lock() { if (compareAndSetState(0, 1)) // 嘗試是否可以直接占鎖成功,通過unsafe類的CAS功能 setExclusiveOwnerThread(Thread.currentThread()); // 設置獨占線程,開始是 A else acquire(1); // 占鎖失敗執行 acquire方法, 如線程B } // AQS 方法 public final void acquire(int arg) { // if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // NonfairSync方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // Sync的方法 final boolean nonfairTryAcquire(int acquires) { // 嘗試再次占用 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
假設線程B、C開始之前,線程A又執行了一次relock的lock方法,會發生什么呢?(可重入鎖)
final boolean nonfairTryAcquire(int acquires) { // 嘗試再次占用 final Thread current = Thread.currentThread(); // 當前線程是 線程A int c = getState(); // 獲取 state = 1 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 走這里 int nextc = c + acquires; // nextc = 1 + 1 = 2 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); // state 變為 2,則這個地方就需要 unlock兩次,才能將state變為0 return true; } return false; }
由于此時線程A還在占用,所以compareAndSetState方法失敗,線程B執行acquire方法
final boolean nonfairTryAcquire(int acquires) { // 嘗試再次占用 final Thread current = Thread.currentThread(); // 這里是線程B int c = getState(); // state == 1 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 獨占線程 線程A 當前線程 線程B A!=B int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; // 直接返回false }
線程B嘗試獲取鎖失敗,
if (!tryAcquire(arg) && // 線程B獲取失敗,執行acquireQueued方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter(Node.EXCLUSIVE) > acquireQueued // 創建一個節點,并且將其快速添加到隊尾 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 隊尾節點 if (pred != null) { // 將新節點作為隊尾節點 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速添加到隊尾失敗,則執行 enq方法,隊列為空時。 enq(node); return node; } private Node enq(final Node node) { // 死循環 for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 如果隊尾為空,插入一個空的節點,也就是傀儡節點 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { // 真正插入我們需要的節點,也就是包含線程B引用的節點 t.next = node; return t; } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 返回前一個節點,對于線程B來說,p就是傀儡節點 // p == head 滿足條件, 由于A還在占用,所以tryAcquire返回一個false // 走下面的if //第二次循環,假設線程A繼續正在工作,下面的if語塊還是不執行 if (p == head && tryAcquire(arg)) { setHead(node);// 將附帶線程B的節點變成新的傀儡節點,去掉 前驅指針 p.next = null; // help GC //置空原傀儡指針與新的傀儡節點之間的后驅指針,方便GC回收 failed = false; return interrupted; } // shouldParkAfterFailedAcquire if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 通過parkAndCheckInterrupt()使用了 LockSupport阻塞了線程B // 下次線程被喚醒從這個地方開始執行。 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // p指向傀儡節點,p.waitStatus = 0 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // -1 return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 第一次走這里,將p的waitStatus設為 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
雙向列表中,第一個節點為虛節點,也叫哨兵節點,其實不存儲任何信息,只是占位
由于線程A還在使用,線程C與線程B一樣,同樣會被阻塞。
線程A調用完成后,需要調用lock.unlock()方法
// relock public void unlock() { sync.release(1); } // AQS的方法 public final boolean release(int arg) { if (tryRelease(arg)) { // Node h = head; // 返回傀儡節點 if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 設置傀儡節點狀態為 0 ,并且 喚醒傀儡節點的下一個節點。 return true; } return false; } // Relock Sync的方法 protected final boolean tryRelease(int releases) { // releases = 1 int c = getState() - releases; // c = 1 - 1 = 0 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); // c=0 設置獨占線程為null } setState(c); return free; } private void unparkSuccessor(Node node) { // p表示傀儡節點 int ws = node.waitStatus; if (ws < 0) // waitStatus為-1 滿足,改為0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒傀儡節點的下一個節點, 帶有線程B的節點。 LockSupport.unpark(s.thread); }
private void setHead(Node node) { head = node; node.thread = null; // 傀儡節點的線程去掉 node.prev = null; }
公平鎖回溯獲取鎖流程
更多細節參考:(11條消息) AQS深入理解 hasQueuedPredecessors源碼分析 JDK8_anlian523的博客-CSDN博客
FairSync執行lock()方法
final void lock() { acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 這個地方多了一個 hasQueuedPredecessors() 通過這個方法來判斷有沒有線程排在了當前線程前面 // 只有這個方法返回false的時候,才會通過CAS占用鎖 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 公平鎖獨有 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // 1. head 和 tail都為null,說明隊列為空,可以占用鎖,false // 2. head 和 tail都指向同一個Node,也返回false 說明只有 一個 dummy node(傀儡節點) return h != t && // 3. (s = h.next == null) 為false 分多鐘情況,參見前面的參考連接 // 4. (s.thread != Thread.currentThread()) 為false 分多鐘情況,參見前面的參考連接 ((s = h.next) == null || s.thread != Thread.currentThread()); }
提問:為什么先讀取 tail的值,再讀取 head的值?
答案: 去看參考連接吧,主要是為了 避免 (s = h.next) == null 報空指針異常
Java 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。