隊列同步器AQS
隊列同步器AQS
同步器的設(shè)計是基于模板方法模式的,
重寫同步器指定的方法時,需要使用同步器提供的如下3個方法來訪問或修改同步狀態(tài)。
getState():獲取當(dāng)前同步狀態(tài)。
setState(int newState):設(shè)置當(dāng)前同步狀態(tài)。
compareAndSetState(int expect,int update):使用CAS設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài)設(shè)置的原子性。
同步器依賴內(nèi)部的同步隊列(一個FIFO雙向隊列)來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時,同步器會將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成為一個節(jié)點(Node)并將其加入同步隊列,同時會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時,會把首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
獨占式同步狀態(tài)獲取與釋放
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
獨占式同步鎖獲取流程:
通過調(diào)用同步器的release(int arg)方法可以釋放同步狀態(tài),該方法在釋放了同步狀態(tài)之后,會喚醒其后繼節(jié)點(進(jìn)而使后繼節(jié)點重新嘗試獲取同步狀態(tài))。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀態(tài)的線程。
在獲取同步狀態(tài)時,同步器維護(hù)一個同步隊列,獲取狀態(tài)失敗的線程都會被加入到隊列中并在隊列中進(jìn)行自旋;移出隊列(或停止自旋)的條件是前驅(qū)節(jié)點為頭節(jié)點且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時,同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點的后繼節(jié)點。
共享式同步狀態(tài)獲取與釋放
共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。以文件的讀寫為例,如果一個程序在對文件進(jìn)行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進(jìn)行。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問,兩種不同的訪問模式在同一時刻對文件或資源的訪問情況。
通過調(diào)用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態(tài)
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在acquireShared(int arg)方法中,同步器調(diào)用tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài),tryAcquireShared(int arg)方法返回值為int類型,當(dāng)返回值大于等于0時,表示能夠獲取到同步狀態(tài)。
因此,在共享式獲取的自旋過程中,成功獲取到同步狀態(tài)并退出自旋的條件就是tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋過程中,如果當(dāng)前節(jié)點的前驅(qū)為頭節(jié)點時,嘗試獲取同步狀態(tài),如果返回值大于等于0,表示該次獲取同步狀態(tài)成功并從自旋過程中退出。
通過調(diào)用releaseShared(int arg)方法可以釋放同步狀態(tài),
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
該方法在釋放同步狀態(tài)之后,將會喚醒后續(xù)處于等待狀態(tài)的節(jié)點。對于能夠支持多個線程同時訪問的并發(fā)組件(比如Semaphore),它和獨占式主要區(qū)別在于tryReleaseShared(int arg)方法必須確保同步狀態(tài)(或者資源數(shù))線程安全釋放,一般是通過循環(huán)和CAS來保證的,因為釋放同步狀態(tài)的操作會同時來自多個線程。
獨占式超時獲取同步狀態(tài)
通過調(diào)用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超時獲取同步狀
態(tài),即在指定的時間段內(nèi)獲取同步狀態(tài),如果獲取到同步狀態(tài)則返回true,否則,返回false。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
該方法在自旋過程中,當(dāng)節(jié)點的前驅(qū)節(jié)點為頭節(jié)點時嘗試獲取同步狀態(tài),如果獲取成功則從該方法返回,這個過程和獨占式同步獲取的過程類似,但是在同步狀態(tài)獲取失敗的處理上有所不同。如果當(dāng)前線程獲取同步狀態(tài)失敗,則判斷是否超時(nanosTimeout小于等于0表示已經(jīng)超時),如果沒有超時,重新計算超時間隔nanosTimeout,然后使當(dāng)前線程等待nanosTimeout納秒(當(dāng)已到設(shè)置的超時時間,該線程會從LockSupport.parkNanos(Objectblocker,long nanos)方法返回)。
acquire(int args)在未獲取到同步狀態(tài)時,將會使當(dāng)前線程一直處于等待狀態(tài)
doAcquireNanos(int arg,long nanosTimeout)會使當(dāng)前線程等待nanosTimeout納秒,如果當(dāng)前線程在nanosTimeout納秒內(nèi)沒有獲取到同步狀態(tài),將會從等待邏輯中自動返回。
獨占式超時獲取同步狀態(tài)的流程:
自定義同步組件——TwinsLock
該工具在同一時刻,只允許至多兩個線程同時訪問,超過兩個線程的訪問將被阻塞,我們將這個同步工具命名為TwinsLock。
共享式訪問,重寫tryAcquireShared(int args)方法和tryReleaseShared(int args)方法
定義資源數(shù)。TwinsLock在同一時刻允許至多兩個線程的同時訪問,表明同步資源數(shù)為2,這樣可以設(shè)置初始狀態(tài)status為2,當(dāng)一個線程進(jìn)行獲取,status減1,該線程釋放,則status加1,狀態(tài)的合法范圍為0、1和2,其中0表示當(dāng)前已經(jīng)有兩個線程獲取了同步資源,此時再有其他線程對同步狀態(tài)進(jìn)行獲取,該線程只能被阻塞。在同步狀態(tài)變更時,需要使用compareAndSet(int expect,int update)方法做原子性保障。
package com.example.xppdemo.chapter4; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class TwinsLock implements Lock { private final Sync sync = new Sync(2); private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); } public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared(int returnCount) { for (; ; ) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } } public void lock() { sync.acquireShared(1); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public void unlock() { sync.releaseShared(1); } @Override public Condition newCondition() { return null; } }
package com.example.xppdemo.chapter4; import java.util.concurrent.locks.Lock; public class TwinsLockTest { public static void main(String[] args) { test(); } public static void test() { final Lock lock = new TwinsLock(); class Worker extends Thread { public void run() { while (true) { lock.lock(); try { SleepUtils.second(1); System.out.println(Thread.currentThread().getName()); SleepUtils.second(1); } finally { lock.unlock(); } } } } // 啟動10個線程 for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.setDaemon(true); w.start(); } // 每隔1秒換行 for (int i = 0; i < 10; i++) { SleepUtils.second(1); System.out.println(); } } }
執(zhí)行結(jié)果:
Thread-3 Thread-2 Thread-2 Thread-3 Thread-3 Thread-2 Thread-3 Thread-2 Thread-3 Thread-2
該線程在執(zhí)行過程中獲取鎖,當(dāng)獲取鎖之后使當(dāng)前線程睡眠1秒(并不釋放鎖),隨后打印當(dāng)前線程名稱,最后再次睡眠1秒并釋放鎖
任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。