elasticsearch入門系列">elasticsearch入門系列
655
2022-05-29
CountDownLatch工作原理分析
一、大致介紹
1、在前面章節(jié)了解了CAS、AQS后,想必大家已經(jīng)對(duì)這塊知識(shí)有了深刻的了解了; 2、而JDK中有一個(gè)關(guān)于計(jì)數(shù)同步器的工具類,它也是基于AQS實(shí)現(xiàn)的; 3、那么本章節(jié)就和大家分享分析一下JDK1.8的CountDownLatch的工作原理;
二、簡(jiǎn)單認(rèn)識(shí)CountDownLatch
2.1 何為CountDownLatch?
1、CountDownLatch從英文字面上理解,count計(jì)數(shù)做down的減法動(dòng)作,而Latch又是門閂的意思; 2、CountDownLatch是一種同步幫助,允許一個(gè)或多個(gè)線程等待,直到在其他線程中執(zhí)行的一組操作完成。; 3、CountDownLatch內(nèi)部沒(méi)有所謂的公平鎖\非公平鎖的靜態(tài)內(nèi)部類,只有一個(gè)Sync靜態(tài)內(nèi)部類,CountDownLatch內(nèi)部基本上也是通過(guò)sync.xxx之類的這種調(diào)用方式的; 4、CountDownLatch內(nèi)部維護(hù)了一個(gè)虛擬的資源池,如果許可數(shù)不為為0一直線程阻塞等待,直到許可數(shù)為0時(shí)才釋放繼續(xù)往下執(zhí)行;
2.2 CountDownLatch的state關(guān)鍵詞
1、其實(shí)CountDownLatch的實(shí)現(xiàn)也恰恰很好利用了其父類AQS的state變量值; 2、初始化一個(gè)數(shù)量值作為計(jì)數(shù)器的默認(rèn)值,假設(shè)為N,那么當(dāng)任何線程調(diào)用一次countDown則計(jì)數(shù)值減1,直到許可為0時(shí)才釋放等待; 3、CountDownLatch,簡(jiǎn)單大致意思為:A組線程等待另外B組線程,B組線程執(zhí)行完了,A組線程才可以執(zhí)行;
2.3 常用重要的方法
1、public CountDownLatch(int count) // 創(chuàng)建一個(gè)給定許計(jì)數(shù)值的計(jì)數(shù)同步器對(duì)象 2、public void await() // 入隊(duì)等待,直到計(jì)數(shù)器值為0則釋放等待 3、public void countDown() // 釋放許可,計(jì)數(shù)器值減1,若計(jì)數(shù)器值為0則觸發(fā)釋放無(wú)用結(jié)點(diǎn) 4、public long getCount() // 獲取目前最新的共享資源計(jì)數(shù)器值
2.4 設(shè)計(jì)與實(shí)現(xiàn)偽代碼
1、獲取共享鎖: public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } await{ 如果檢測(cè)中斷狀態(tài)發(fā)現(xiàn)被中斷過(guò)的話,那么則拋出InterruptedException異常 如果嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各種方式由AQS的子類實(shí)現(xiàn) ), 那么就新增共享鎖結(jié)點(diǎn)通過(guò)自旋操作加入到隊(duì)列中,然后通過(guò)調(diào)用LockSupport.park進(jìn)入阻塞等待,直到計(jì)數(shù)器值為零才釋放等待 } 2、釋放共享鎖: public void countDown() { sync.releaseShared(1); } release{ 如果嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各種方式由AQS的子類實(shí)現(xiàn) ), 那么通過(guò)自旋操作完成阻塞線程的喚起操作 }
2.5、CountDownLatch生活細(xì)節(jié)化理解
比如百米賽跑,我就以賽跑為例生活化闡述該CountDownLatch原理: 1、場(chǎng)景:百米賽跑十人參賽,終點(diǎn)處有一個(gè)裁判計(jì)數(shù); 2、開(kāi)跑一聲起跑信號(hào),十個(gè)人爭(zhēng)先恐后的向終點(diǎn)跑去,真的是振奮多秒,令人振奮; 3、當(dāng)一個(gè)人到達(dá)終點(diǎn),這個(gè)人就完成了他的賽跑事情了,就沒(méi)事一邊玩去了,那么裁判則減去一個(gè)人; 4、隨著人員陸陸續(xù)續(xù)的都跑到了終點(diǎn),最后裁判計(jì)數(shù)顯示還有0個(gè)人未到達(dá),意思就是人員都達(dá)到了; 5、然后裁判就拿著登記的成績(jī)屁顛屁顛去輸入電腦登記了; 8、到此打止,這一系列的動(dòng)作認(rèn)為是A組線程等待另外其他組線程的操作,直到計(jì)數(shù)器為零,那么A則再干其他事情;
三、源碼分析CountDownLatch
3.1、CountDownLatch構(gòu)造器
1、構(gòu)造器源碼: /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 2、創(chuàng)建一個(gè)給定許計(jì)數(shù)值的計(jì)數(shù)同步器對(duì)象,計(jì)數(shù)器值必須大于零,count值最后賦值給了state這個(gè)共享資源值;
3.2、Sync同步器
1、AQS --> Sync 2、CountDownLatch內(nèi)的同步器都是通過(guò)Sync抽象接口來(lái)操作調(diào)用關(guān)系的,細(xì)看會(huì)發(fā)現(xiàn)基本上都是通過(guò)sync.xxx之類的這種調(diào)用方式的;
3.3、await()
1、源碼: /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * * // 導(dǎo)致當(dāng)前線程等待,直到計(jì)數(shù)器值減為零則釋放等待,或者由于線程被中斷也可導(dǎo)致釋放等待; * *
If the current count is zero then this method returns immediately. * *
If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: *
If the current thread: *
3.4、acquireSharedInterruptibly(int)
1、源碼: /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 調(diào)用之前先檢測(cè)該線程中斷標(biāo)志位,檢測(cè)該線程在之前是否被中斷過(guò) throw new InterruptedException(); // 若被中斷過(guò)的話,則拋出中斷異常 if (tryAcquireShared(arg) < 0) // 嘗試獲取共享資源鎖,小于0則獲取失敗,此方法由AQS的具體子類實(shí)現(xiàn) doAcquireSharedInterruptibly(arg); // 將嘗試獲取鎖資源的線程進(jìn)行入隊(duì)操作 } 2、由于是實(shí)現(xiàn)同步計(jì)數(shù)器功能,所以tryAcquireShared首次調(diào)用必定小于0,則就順利了進(jìn)入了doAcquireSharedInterruptibly線程等待; 至于首次調(diào)用為什么會(huì)小于0,請(qǐng)看子類的實(shí)現(xiàn),子類的實(shí)現(xiàn)判斷為 "(getState() == 0) ? 1 : -1" ;
3.5、tryAcquireShared(int)
1、源碼: protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; // 計(jì)數(shù)器值與零比較判斷,小于零則獲取鎖失敗,大于零則獲取鎖成功 } 2、嘗試獲取共享鎖資源,但是在計(jì)數(shù)器CountDownLatch這個(gè)功能中,小于零則需要入隊(duì),進(jìn)入阻塞隊(duì)列進(jìn)行等待;大于零則喚醒等待隊(duì)列,釋放await方法的阻塞等待;
3.6、doAcquireSharedInterruptibly(int)
1、源碼: /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 按照給定的mode模式創(chuàng)建新的結(jié)點(diǎn),模式有兩種:Node.EXCLUSIVE獨(dú)占模式、Node.SHARED共享模式; final Node node = addWaiter(Node.SHARED); // 創(chuàng)建共享模式的結(jié)點(diǎn) boolean failed = true; try { for (;;) { // 自旋的死循環(huán)操作方式 final Node p = node.predecessor(); // 獲取結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn) if (p == head) { // 若前驅(qū)結(jié)點(diǎn)為head的話,那么說(shuō)明當(dāng)前結(jié)點(diǎn)自然不用說(shuō)了,僅次于老大之后的便是老二了咯 int r = tryAcquireShared(arg); // 而且老二也希望嘗試去獲取一下鎖,萬(wàn)一頭結(jié)點(diǎn)恰巧剛剛釋放呢?希望還是要有的,萬(wàn)一實(shí)現(xiàn)了呢。。。 if (r >= 0) { // 若r>=0,說(shuō)明已經(jīng)成功的獲取到了共享鎖資源 setHeadAndPropagate(node, r); // 把當(dāng)前node結(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),并且調(diào)用doReleaseShared釋放一下無(wú)用的結(jié)點(diǎn) p.next = null; // help GC failed = false; return; } // 但是在await方法首次被調(diào)用會(huì)流轉(zhuǎn)到此,這個(gè)時(shí)候獲取鎖資源會(huì)失敗,即r<0,所以會(huì)進(jìn)入是否需要休眠的判斷 // 但是第一次進(jìn)入休眠方法,因?yàn)楸粍?chuàng)建的結(jié)點(diǎn)waitStatus=0,所以會(huì)被修改一次為SIGNAL狀態(tài),再次循環(huán)一次 // 而第二次循環(huán)進(jìn)入shouldParkAfterFailedAcquire方法時(shí),返回true就是需要休眠,則順利調(diào)用park方式阻塞等待 } if (shouldParkAfterFailedAcquire(p, node) && // 根據(jù)前驅(qū)結(jié)點(diǎn)看看是否需要休息一會(huì)兒 parkAndCheckInterrupt()) // 阻塞操作,正常情況下,獲取不到共享鎖,代碼就在該方法停止了,直到被喚醒 // 被喚醒后,發(fā)現(xiàn)parkAndCheckInterrupt()里面檢測(cè)了被中斷了的話,則補(bǔ)上中斷異常,因此拋了個(gè)異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 2、doAcquireSharedInterruptibly在實(shí)現(xiàn)計(jì)數(shù)器原理的時(shí)候,主要的干的事情就是等待再等待,等到計(jì)數(shù)器值為零時(shí)才蘇醒;
3.7、countDown()
1、源碼: /** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * *
If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * *
If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); // 釋放一個(gè)許可資源 } 2、釋放許可資源,也就是計(jì)數(shù)器值不斷的做減1操作,當(dāng)計(jì)數(shù)器值為零的時(shí)候,該方法將會(huì)釋放所有正在等待的線程隊(duì)列; 至于為什么還會(huì)釋放所有,請(qǐng)看后續(xù)的releaseShared(int arg)講解;
3.8、releaseShared(int)
1、源碼: /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖資源,此方法由AQS的具體子類實(shí)現(xiàn) doReleaseShared(); // 自旋操作,喚醒后繼結(jié)點(diǎn) return true; // 返回true表明所有線程已釋放 } return false; // 返回false表明目前還沒(méi)釋放完全,只要計(jì)數(shù)器值不為零的話,那么都會(huì)返回false } 2、releaseShared方法首先就判斷了tryReleaseShared(arg)的返回值,但是計(jì)數(shù)器值只要不為零,都會(huì)返回false,因此releaseShared該方法就立馬返回false了; 3、所以當(dāng)計(jì)數(shù)器值慢慢減至零時(shí),則立馬返回true,那么也就立馬會(huì)調(diào)用doReleaseShared釋放所有等待的線程隊(duì)列;
3.9、tryReleaseShared(int)
1、源碼: // CountDownLatch 的靜態(tài)內(nèi)部類 Sync 類的 tryReleaseShared 方法 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 自旋的死循環(huán)操作方式 int c = getState(); // 獲取最新的計(jì)數(shù)器值 if (c == 0) // 若計(jì)數(shù)器值為零,說(shuō)明已經(jīng)通過(guò)CAS操作減至零了,所以在并發(fā)中讀取到零時(shí)并不需要做什么操作,因此返回false return false; int nextc = c-1; // 計(jì)數(shù)器值減1操作 if (compareAndSetState(c, nextc)) // 通過(guò)CAS比較,順利情況下設(shè)置成功返回true return nextc == 0; // 當(dāng)通過(guò)計(jì)算操作得到的nextc為零時(shí)通過(guò)CAS修改成功,那么表明所有事情都已經(jīng)做完,需要釋放所有等待的線程隊(duì)列 // 若CAS失敗,想都不用想肯定是由于并發(fā)操作,導(dǎo)致CAS失敗,那么唯一可做的就是下一次循環(huán)查看是否已經(jīng)被其他線程處理了 } } 2、CountDownLatch的靜態(tài)內(nèi)部類實(shí)現(xiàn)父類AQS的方法,用來(lái)處理如何釋放鎖,籠統(tǒng)的講,若返回負(fù)數(shù)則需要進(jìn)入阻塞隊(duì)列,否則需要釋放所有等待隊(duì)列;
3.10、doReleaseShared()
1、源碼: /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { // 自旋的死循環(huán)操作方式 Node h = head; // 每次都是取出隊(duì)列的頭結(jié)點(diǎn) if (h != null && h != tail) { // 若頭結(jié)點(diǎn)不為空且也不是隊(duì)尾結(jié)點(diǎn) int ws = h.waitStatus; // 那么則獲取頭結(jié)點(diǎn)的waitStatus狀態(tài)值 if (ws == Node.SIGNAL) { // 若頭結(jié)點(diǎn)是SIGNAL狀態(tài)則意味著頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)需要被喚醒了 // 通過(guò)CAS嘗試設(shè)置頭結(jié)點(diǎn)的狀態(tài)為空狀態(tài),失敗的話,則繼續(xù)循環(huán),因?yàn)椴l(fā)有可能其它地方也在進(jìn)行釋放操作 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); // 喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn) } // 如頭結(jié)點(diǎn)為空狀態(tài),則把其改為PROPAGATE狀態(tài),失敗的則可能是因?yàn)椴l(fā)而被改動(dòng)過(guò),則再次循環(huán)處理 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若頭結(jié)點(diǎn)沒(méi)有發(fā)生什么變化,則說(shuō)明上述設(shè)置已經(jīng)完成,大功告成,功成身退 // 若發(fā)生了變化,可能是操作過(guò)程中頭結(jié)點(diǎn)有了新增或者啥的,那么則必須進(jìn)行重試,以保證喚醒動(dòng)作可以延續(xù)傳遞 if (h == head) // loop if head changed break; } } 2、主要目的是釋放線程中所有等待的隊(duì)列,當(dāng)計(jì)數(shù)器值為零時(shí),此方法馬上會(huì)被調(diào)用,通過(guò)自旋方式輪詢干掉所有等待的隊(duì)列;
四、總結(jié)
1、有了分析AQS的基礎(chǔ)后,再來(lái)分析CountDownLatch便快了很多; 2、在這里我簡(jiǎn)要總結(jié)一下CountDownLatch的流程的一些特性: ? 管理一個(gè)大于零的計(jì)數(shù)器值; ? 每countDown一次則state就減1一次,直到許可證數(shù)量等于0則釋放隊(duì)列中所有的等待線程; ? 也可以通過(guò)countDown/await組合一起使用,來(lái)實(shí)現(xiàn)CyclicBarrier的功能; ``` CountDownLatch用法 CountDownLatch類只提供了一個(gè)構(gòu)造器: public CountDownLatch(int count) { }; //參數(shù)count為計(jì)數(shù)值 然后下面這3個(gè)方法是CountDownLatch類中最重要的方法: public void await() throws InterruptedException { }; //調(diào)用await()方法的線程會(huì)被掛起,它會(huì)等待直到count值為0才繼續(xù)執(zhí)行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()類似,只不過(guò)等待一定的時(shí)間后count值還沒(méi)變?yōu)?的話就會(huì)繼續(xù)執(zhí)行 public void countDown() { }; //將count值減1 CountDownLatch, 一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。 下面舉個(gè)例子說(shuō)明: package main.java.CountDownLatch; import java.util.concurrent.CountDownLatch; /** * PROJECT_NAME:downLoad * Author:lucaifang * Date:2016/3/18 */ public class countDownlatchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); for(int i=0;i<5;i++){ new Thread(new readNum(i,countDownLatch)).start(); } countDownLatch.await(); System.out.println("線程執(zhí)行結(jié)束。。。。"); } static class readNum implements Runnable{ private int id; private CountDownLatch latch; public readNum(int id,CountDownLatch latch){ this.id = id; this.latch = latch; } @Override public void run() { synchronized (this){ System.out.println("id:"+id); latch.countDown(); System.out.println("線程組任務(wù)"+id+"結(jié)束,其他任務(wù)繼續(xù)"); } } } } 輸出結(jié)果: id:1 線程組任務(wù)1結(jié)束,其他任務(wù)繼續(xù) id:0 線程組任務(wù)0結(jié)束,其他任務(wù)繼續(xù) id:2 線程組任務(wù)2結(jié)束,其他任務(wù)繼續(xù) id:3 線程組任務(wù)3結(jié)束,其他任務(wù)繼續(xù) id:4 線程組任務(wù)4結(jié)束,其他任務(wù)繼續(xù) 線程執(zhí)行結(jié)束。。。。 線程在countDown()之后,會(huì)繼續(xù)執(zhí)行自己的任務(wù)
Java 任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。