教面試官ReentrantLock源碼
737
2025-04-01
我們知道,在java中提供了兩類鎖的實現,一種是在jvm層級上實現的synchrinized隱式鎖,另一類是jdk在代碼層級實現的,juc包下的Lock顯示鎖,而提到Lock就不得不提一下它的核心隊列同步器(AQS)了,它的全稱是AbstractQueuedSynchronizer,是用來構建鎖或者其他一些同步組件的基礎,除了ReentrantLock、ReentrantReadWriteLock外,它還在CountDownLatch、Semaphore以及ThreadPoolExecutor中被使用,通過理解隊列同步器的工作原理,對我們了解和使用這些工具類會有很大的幫助。
1、AQS 基礎
為了便于理解AQS的概念,首先摘錄部分AbstractQueuedSynchronizer的注釋進行簡要翻譯:
它提供了一個框架,對于依賴先進先出等待隊列的阻塞鎖和同步器(例如信號量和事件),可以用它來實現。這個類的設計,對于大多數依賴于單個原子值來表示狀態(state)的同步器,可以提供有力的基礎。子類需要重寫被protected修飾的方法,例如更改狀態(state),定義在獲取或釋放對象時這些狀態表示的含義。基于這些,類中的其他方法實現了隊列和阻塞機制。在子類中可以維護其他的狀態字段,但是只有使用getState,setState,compareAndSetState方法原子更新的狀態值變量,才與同步有關。
子類被推薦定義為非public的內部類,用來實現封閉類的屬性同步。同步器本身沒有實現任何同步接口,它僅僅定義了一些方法,供具體的鎖和同步組件中的public方法調用。
隊列同步器支持獨占模式和共享模式,當一個線程在獨占模式下獲取時,其他線程不能獲取成功,在共享模式下多線程的獲取可能成功。在不同模式下,等待的線程使用的是相同的先進先出隊列。通常,實現子類只支持其中的一種模式,但是在ReadWriteLock中兩者都可以發揮作用。只支持一種模式的子類在實現時不需要重寫另一種模式中的方法。
閱讀這些注釋,可以知道AbstractQueuedSynchronizer是一個抽象類,它基于內部先進先出(FIFO)的雙向隊列、以及內置的一些protected方法來實現同步器,完成同步狀態的管理,并且我們可以通過子類繼承AQS抽象類的方式,在共享模式或獨占模式下,實現自定義的同步組件。
通過上面的描述,可以看出AQS中的兩大核心是同步狀態和雙向的同步隊列,來看一下源碼中是如何對它們進行定義的:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; //... } private transient volatile Node head; private transient volatile Node tail; private volatile int state; //... }
下面針對這兩個核心內容分別進行研究。
AQS內部靜態類Node用于表示同步隊列中的節點,變量表示的意義如下:
prev:當前節點的前驅節點,如果當前節點是同步隊列的頭節點,那么prev為null
next:當前節點的后繼節點,如果當前節點是同步隊列的尾節點,那么next為null
thread:獲取同步狀態的線程
waitStatus:等待狀態,取值可為以下情況
CANCELLED(1):表示當前節點對應的線程被取消,當線程等待超時或被中斷時會被修改為此狀態
SIGNAL(-1):當前節點的后繼節點的線程被阻塞,當前線程在釋放同步狀態或取消時,需要喚醒后繼節點的線程
CONDITION(-2):節點處于等待隊列中,節點線程等待在Condition上,當其他線程調用Condition的signal方法后,會將節點從等待隊列移到同步隊列
PROPAGATE(-3):表示下一次共享式同步狀態獲取能夠被執行,即同步狀態的獲取可以向后繼節點的后繼進行無條件的傳播
0:初始值,表示當前節點等待獲取同步狀態
每個節點的prev和next指針在加入隊列的時候進行賦值,通過這些指針就形成了一個雙向列表,另外AQS還保存了同步隊列的頭節點head和尾節點tail,通過這樣的結構,就能夠通過頭節點或尾節點,找到隊列中的任何一個節點。使用圖來表示同步隊列的結構如下:
另外可以看到,在源碼中為了保證可見性,同步器中的head、tail、state,以及節點中的prev,next屬性都加了關鍵字volatile修飾。
AQS的另一核心同步狀態,在代碼中是使用int類型的變量state來表示的,通過原子操作修改同步狀態的值,來實現對同步組件的狀態進行修改。在子類中,主要通過AQS提供的下面3個方法對同步狀態的訪問和轉換進行操作:
getState():獲取當前的同步狀態
setState(int newState): 設置新的同步狀態
compareAndSetState(int expect,int update): 調用Unsafe類的compareAndSwapInt方法,使用CAS操作更新同步狀態,保證了狀態修改的原子性
線程會試圖修改state的值,如果修改成功那么表示線程得到或釋放了同步狀態,如果失敗就會將當前線程封裝成一個Node節點,然后將其加入到同步隊列中,并阻塞當前線程。
AQS的設計使用了模板方法的設計模式,模板方法一般在父類中封裝不變的部分(如算法骨架),把擴展的可變部分交給子類進行擴展,子類的執行結果會影響父類的結果,是一種反向的控制結構。AQS中應用了這種設計模式,將一部分方法交給子類進行重寫,而自定義的同步組件在調用同步器提供的模板方法(父類中的方法)時,又會調用子類重寫的方法。
以AQS類中常用于獲取鎖的acquire方法為例,它的代碼如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire方法被final修飾,不可以在子類中重寫,因為它是對外提供的模板方法,有相對具體和固定的執行邏輯。在acquire方法中調用了tryAcquire方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
可以看到帶有protected修飾的tryAcquire方法是一個空殼方法,并沒有定義實際獲取同步狀態的邏輯,這就需要我們在繼承AQS的子類中對齊進行重寫,從而達到擴展目的。在重寫過程中,就會用到上面提到的獲取和修改同步狀態的3個方法getState、setState和compareAndSetState。
以ReentrantLock中的方法調用為例,當調用ReentrantLock中的lock方法時,會調用繼承了AQS的內部類Sync的父類中的acquire方法,acquire方法再調用子類Sync的tryAcquire方法并返回boolean類型結果。
除了tryAcquire方法外,子類中還提供了其他可以重寫的方法,列出如下:
tryAcquire:獨占式獲取同步狀態
tryRelease:獨占式釋放同步狀態
tryAcquireShared:共享式獲取同步狀態
tryReleaseShared:共享式釋放同步狀態
isHeldExclusively:當前線程是否獨占式的占用同步狀態
而我們在實現自定義的同步組件時,可以直接調用AQS提供的下面這些模板方法:
acquire:獨占式獲取同步狀態,如果線程獲取同步狀態成功那么方法返回,否則線程阻塞,進入同步隊列中
acquireInterruptibly:在acquire基礎上,添加了響應中斷功能
tryAcquireNanos:在acquireInterruptibly基礎上,添加了超時限制,超時會返回false
acquireShared:共享式獲取同步狀態,如果線程獲取同步狀態成功那么方法返回,否則線程進入同步隊列中阻塞。與acquire不同,該方法允許多個線程同時獲取鎖
acquireSharedInterruptibly:在acquireShared基礎上,可響應中斷
tryAcquireSharedNanos:在acquireSharedInterruptibly基礎上,添加了超時限制
release:獨占式釋放同步狀態,將喚醒同步隊列中第一個節點的線程
releaseShared:共享式釋放同步狀態
getQueuedThreads:獲取等待在同步隊列上的線程集合
從模板方法中可以看出,大多方法都是獨占模式和共享模式對稱出現的,除去查詢等待線程方法外,可以將他們分為兩類:獨占式獲取或釋放同步狀態、共享式獲取或釋放同步狀態,并且它們的核心都是acquire與release方法,其他方法只是在它們實現的基礎上做了部分的邏輯改動,增加了中斷和超時功能的支持。下面對主要的4個方法進行分析。
2、源碼分析
分析上面acquire方法中源碼的執行流程:
1.首先調用tryAcquire嘗試獲取同步狀態,如果獲取成功,那么直接返回
2.如果獲取同步狀態失敗,調用addWaiter方法生成新Node節點并加入同步隊列:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
方法中使用當前線程和等待狀態構造了一個新的Node節點,在同步隊列的隊尾節點不為空的情況下(說明同步隊列非空),調用compareAndSetTail方法以CAS的方式把新節點設置為同步隊列的隊尾節點。如果隊尾節點為空或添加新節點失敗,則調用enq方法:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在同步隊列為空的情況下,會先創建一個新的空節點作為頭節點,然后通過CAS的方式將當前線程創建的Node設為尾節點。在for循環中,只有通過CAS將節點插入到隊尾后才會返回,否則就會重復循環,通過這樣的方式,能夠將并發添加節點的操作變為串行添加,保證了線程的安全性。這一過程可以使用下圖表示:
3.添加新節點完成后,調用acquireQueued方法,嘗試以自旋的方式獲取同步狀態:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
當新添加的Node的前驅節點是同步隊列的頭節點并且嘗試獲取同步狀態成功時,線程將Node設為頭節點并從自旋中退出,否則調用shouldParkAfterFailedAcquire方法判斷是否需要掛起:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
在該方法中,傳入的第一個Node類型的參數是當前節點的前驅節點,對其等待狀態進行判斷:
如果為SIGNAL狀態,那么前驅節點釋放同步狀態或取消時都會通知后繼節點,因此可以將當前線程阻塞,返回true
如果大于0,那么為CANCEL狀態,表示前驅節點被取消,那么一直向前回溯,找到一個不為CANCEL狀態的節點,并將當前節點的前驅指向它
如果不是上面的兩種情況,那么將前驅節點的等待狀態設為SIGNAL。這里的目的是在每個節點進入阻塞狀態前將前驅節點的等待狀態設為SIGNAL,否則節點將無法被喚醒
在后兩種情況下,都會返回false,然后在acquireQueued方法中進行循環,直到進入shouldParkAfterFailedAcquire方法時為第一種情況,阻塞線程
當返回為true時,調用parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
在方法內部調用了LockSupport的park方法,阻塞當前線程,并返回當前線程是否被中斷的狀態。
在上面的代碼中,各節點通過自旋的方式檢測自己的前驅節點是否頭節點的過程,可用下圖表示:
4.當滿足條件,返回acquire方法后,調用selfInterrupt方法。方法內部使用interrupt方法,喚醒被阻塞的線程,繼續向下執行:
static void selfInterrupt() { Thread.currentThread().interrupt(); }
最后,使用流程圖的方式總結acquire方法獨占式獲取鎖的整體流程:
與acquire方法對應,release方法負責獨占式釋放同步狀態,流程也相對簡單。在ReentrantLock中,unlock方法就是直接調用的AQS的release方法。先來直接看一下它的源碼:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
1.方法中首先調用子類重寫的tryRelease方法,嘗試釋放當前線程持有的同步狀態,如果成功則向下執行,失敗返回false
2.如果同步隊列的頭節點不為空且等待狀態不為初始狀態,那么將調用unparkSuccessor方法喚醒它的后繼節點:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
方法主要實現的功能有:
如果頭節點的等待狀態小于0,使用CAS將它置為0
如果后續節點為空、或它的等待狀態為CANCEL被取消,那么從隊尾開始,向前尋找最靠近隊列頭部的一個等待狀態小于0 的節點
找到符合條件的節點后,調用LockSupport工具類的unpark方法,喚醒后繼節點中對應的線程
同步隊列新頭節點的設置過程如下圖所示:
在上面的過程中,采用的是從后向前遍歷尋找未取消節點的方式,這是因為AQS的同步隊列是一個弱一致性的雙向列表,在下面的情況中,存在next指針為null的情況:
在enq方法插入新節點時,可能存在舊尾節點的next指針還未指向新節點的情況
在shouldParkAfterFailedAcquire方法中,當移除CANCEL狀態的節點時,也存在next指針還未指向后續節點的情況
在了解了獨占式獲取同步狀態后,再來看一下共享式獲取同步狀態。在共享模式下,允許多個線程同時獲取到同步狀態,來看一下它的源碼:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
首先調用子類重寫的tryAcquireShared方法,返回值為int類型,如果值大于等于0表示獲取同步狀態成功,那么直接返回。如果小于0表示獲取失敗,執行下面的doAcquireShared方法,將線程放入等待隊列使用自旋嘗試獲取,直到獲取同步狀態成功:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
對上面的代碼進行簡要的解釋:
1.調用addWaiter方法,封裝新節點,并以共享模式(Node.SHARED)將節點放入同步隊列中隊尾
2.在for循環中,獲取當前節點的前驅節點,如果前驅節點是同步隊列的頭節點,那么就以共享模式去嘗試獲取同步狀態,判斷tryAcquireShared的返回值,如果返回值大于等于0,表示獲取同步狀態成功,修改新的頭節點,并將信息傳播給同步隊列中的后繼節點,然后檢查中斷標志位,如果線程被阻塞,那么進行喚醒
3.如果前驅節點不是頭節點、或獲取同步狀態失敗時,調用shouldParkAfterFailedAcquire判斷是否需要阻塞,如果需要則調用parkAndCheckInterrupt,在前驅節點的等待狀態為SIGNAL時,將節點對應的線程阻塞
可以看到,共享式的獲取同步狀態的調用過程和acquire方法非常相似,但不同的是在獲取同步狀態成功后,會調用setHeadAndPropagate方法進行共享式同步狀態的傳播:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
因為共享式同步狀態是允許多個線程共享的,所以在一個線程獲取到同步狀態后,需要在第一時間通知后繼節點的線程可以嘗試獲取同步資源,這樣就可以避免其他線程阻塞時間過長。在方法中,把當前節點設置為頭節點后,需要根據情況判斷后繼節點是否需要釋放:
propagate>0:表示還擁有剩余的同步資源,從doAcquireShared方法中執行到這時,取值是大于等于0的,在等于0的情況下,會繼續下面的判斷
h == null:原頭節點為空,一般情況下不滿足,有可能發生在原頭節點被gc回收的情況,此條不滿足情況則向下繼續判斷
h.waitStatus < 0:原頭節點的等待狀態可能取值為0或-3
當某個線程釋放同步資源或者前一個節點共享式獲取同步狀態時(會執行下面的doReleaseShared方法),會將自己的waitStatus從-1改變為0
這時可能后繼節點還沒有來的及將自己更新為頭節點,如果有其他的線程在這個時候再調用doReleaseShared方法,那么取到的還是原頭節點,會把它的waitStatus從0改變為-3,在這個過程中,說明其他線程調用doReleaseShared釋放了同步資源
(h = head) == null:新頭節點為空,一般情況下不滿足,會向下繼續判斷
h.waitStatus < 0:新頭節點的等待狀態可能取值為0或-3或-1
如果后繼節點剛加入隊列,還沒有運行到shouldParkAfterFailedAcquire方法,修改其前驅節點的等待狀態時,此時可能為0
如果節點被喚醒成為了新的頭節點,并且此時后繼節點才剛被加入同步隊列,又有其他線程釋放鎖調用了doReleaseShared,會把頭節點的狀態從0改為-3
隊列中的節點已經調用了shouldParkAfterFailedAcquire,會把waitStatus 從0或-3 改為-1
如果滿足上面的任何一種狀態,并且它的后繼節點是SHARED狀態的,則執行doReleaseShared方法釋放后繼節點:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
doReleaseShared方法不僅在這里的共享狀態傳播的情況下被調用,還會在后面介紹的共享式釋放同步狀態中被調用。在方法中,當頭節點不為空且不等于尾節點(意味著沒有后繼節點需要等待喚醒)時:
先將頭節點從SIGNAL狀態更新為0,然后調用unparkSuccessor方法喚醒頭節點的后繼節點
將頭節點的狀態從0更新為PROPAGATE,表明狀態需要向后繼節點傳播
如果頭節點在更新狀態的時候沒有發生改變,則退出循環
通過上面的流程,就實現了從頭節點嘗試向后喚醒節點,實現了共享狀態的向后傳播。
最后,再來看一下對應的共享式釋放同步狀態方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
releaseShared方法會釋放指定量的資源,如果調用子類重寫的tryReleaseShared方法返回值為true,表示釋放成功,那么還是執行上面介紹過的doReleaseShared方法喚醒同步隊列中的等待線程。
3、自定義同步組件
在前面的介紹中說過,在使用AQS時,需要定義一個子類繼承AbstractQueuedSynchronizer抽象類,并實現它的抽象方法來管理同步狀態。接下來我們就來手寫一個獨占式的鎖,按照文檔中的推薦,我們將子類定義為自定義同步工具類的靜態內部類:
public class MyLock { private static class AqsHelper extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { int state = getState(); if (state == 0) { if (compareAndSetState(0, arg)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } } else if (getExclusiveOwnerThread() == Thread.currentThread()) { setState(getState() + arg); return true; } return false; } @Override protected boolean tryRelease(int arg) { int state = getState() - arg; if (state == 0) { setExclusiveOwnerThread(null); setState(state); return true; } setState(state); return false; } @Override protected boolean isHeldExclusively() { return getState() == 1; } } private final AqsHelper aqsHelper = new AqsHelper(); public void lock() { aqsHelper.acquire(1); } public boolean tryLock() { return aqsHelper.tryAcquire(1); } public void unlock() { aqsHelper.release(1); } public boolean isLocked() { return aqsHelper.isHeldExclusively(); } }
在AQS的子類中,首先重寫了tryAcquire方法,在方法中利用CAS來修改state的狀態值,并在修改成功時設置當前線程獨占資源。并且通過比較嘗試獲取鎖的線程與持有鎖的線程是否相同的方式,來實現了鎖的可重入性。在重寫的tryRelease方法中,進行資源的釋放,如果存在重入的情況,會一直到所有重入鎖釋放完才會真正的釋放鎖,并放棄占有狀態。
可以注意到在自定義的鎖工具類中,我們定義了lock和tryLock兩個方法,分別調用了acquire和tryAcquire方法,它們的區別是lock會等待鎖資源,直到成功時才會返回,而tryLock嘗試獲取鎖時,會立即返回成功或失敗的狀態。
接下來,我們通過下面的測試代碼,驗證自定義的鎖的有效性:
public class Test { private MyLock lock=new MyLock(); private int i=0; public void sayHi(){ try { lock.lock(); System.out.println("i am "+i++); }finally { lock.unlock(); } } public static void main(String[] args) { Test test=new Test(); Thread[] th=new Thread[20]; for (int i = 0; i < 20; i++) { new Thread(()->{ test.sayHi(); }).start(); } } }
運行上面的測試代碼,結果如下,可以看見通過加鎖保證了對變量i的同步訪問控制:
接下來通過下面的例子測試鎖的可重入性:
public class Test2 { private MyLock lock=new MyLock(); public void function1(){ lock.lock(); System.out.println("execute function1"); function2(); lock.unlock(); } public void function2(){ lock.lock(); System.out.println("execute function2"); lock.unlock(); } public static void main(String[] args) { Test2 test2=new Test2(); new Thread(()->{ test2.function1(); }).start(); } }
執行上面的代碼,可以看到在function1未釋放鎖的情況下,function2對鎖進行了重入并執行了后續的代碼:
總結
通過上面的學習,我們了解了AQS的兩大核心同步隊列和同步狀態,并對AQS對資源的管理以及隊列狀態的變化有了一定的研究。其實歸根結底,AQS只是提供給我們來開發同步組件的一個底層框架,在它的層面上,并不關心子類在繼承它時要實現什么功能,AQS只是提供了一套維護同步狀態的功能,至于要完成什么樣的一個工具類,這完全是由我們自己去定義的。
最后
如果覺得對您有所幫助,小伙伴們可以點個贊啊,非常感謝~
公眾號『碼農參上』,一個熱愛分享的公眾號,有趣、深入、直接,與你聊聊技術。歡迎來加Hydra好友 (- DrHydra9),圍觀朋友圈,做個之交啊。
Java 高性能計算
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。