【并發技術13】條件阻塞Condition的應用
Condition 將 Object -方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用, Condition 替代了 Object 監視器方法的使用。
Condition 將 Object -方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用, Condition 替代了 Object 監視器方法的使用。
1. Condition的基本使用
由于 Condition 可以用來替代 wait、notify 等方法,所以可以對比著之前寫過的傳統線程同步通信技術的代碼來看,先來回顧一下原來的問題:
有兩個線程,子線程先執行10次,然后主線程執行5次,然后再切換到子線程執行10,再主線程執行5次……如此往返執行50次。
之前用 wait 和notify 來實現的,現在用 Condition 來改寫一下,代碼如下:
public?class?ConditionCommunication?{????public?static?void?main(String[]?args)?{????????Business?bussiness?=?new?Business();????????new?Thread(new?Runnable()?{//?開啟一個子線程????????????????????@Override????????????????????public?void?run()?{????????????????????????for?(int?i?=?1;?i?<=?50;?i++)?{????????????????????????????bussiness.sub(i);????????????????????????}????????????????????}????????????????}).start();????????//?main方法主線程????????for?(int?i?=?1;?i?<=?50;?i++)?{????????????bussiness.main(i);????????}????}???}class?Business?{????Lock?lock?=?new?ReentrantLock();????Condition?condition?=?lock.newCondition();?//Condition是在具體的lock之上的????private?boolean?bShouldSub?=?true;????public?void?sub(int?i)?{????????lock.lock();????????try?{????????????while?(!bShouldSub)?{????????????????try?{????????????????????condition.await();?//用condition來調用await方法????????????????}?catch?(Exception?e)?{????????????????????//?TODO?Auto-generated?catch?block????????????????????e.printStackTrace();????????????????}????????????}????????????for?(int?j?=?1;?j?<=?10;?j++)?{????????????????System.out.println("sub?thread?sequence?of?"?+?j????????????????????????+?",?loop?of?"?+?i);????????????}????????????bShouldSub?=?false;????????????condition.signal();?//用condition來發出喚醒信號,喚醒某一個????????}?finally?{????????????lock.unlock();????????}????}????public?void?main(int?i)?{????????lock.lock();????????try?{????????????while?(bShouldSub)?{????????????????try?{????????????????????condition.await();?//用condition來調用await方法????????????????}?catch?(Exception?e)?{????????????????????//?TODO?Auto-generated?catch?block????????????????????e.printStackTrace();????????????????}????????????}????????????for?(int?j?=?1;?j?<=?10;?j++)?{????????????????System.out.println("main?thread?sequence?of?"?+?j????????????????????????+?",?loop?of?"?+?i);????????????}????????????bShouldSub?=?true;????????????condition.signal();?//用condition來發出喚醒信號么,喚醒某一個????????}?finally?{????????????lock.unlock();????????}????}}
從代碼來看,Condition 的使用是和 Lock 一起的,沒有 Lock 就沒法使用 Condition,因為 Condition 是通過 Lock 來 new 出來的,這種用法很簡單,只要掌握了 synchronized 和 wait、notify 的使用,完全可以掌握 Lock 和 Condition 的使用。
2. Condition的拔高
上面使用 Lock 和 Condition 來代替 synchronized 和 Object 監視器方法實現了兩個線程之間的通信,現在再來寫個稍微高級點的應用:模擬緩沖區的阻塞隊列。
什么叫緩沖區呢?舉個例子,現在有很多人要發消息,我是中轉站,我要幫別人把消息發出去,那么現在我就需要做兩件事,一件事是接受用戶發過來的消息,并按照順序放到緩沖區,另一件事是從緩沖區按順序取出用戶發過來的消息,并發送出去。
現在把這個實際的問題抽象一下:緩沖區即一個數組,我們可以向數組種寫入數據,也可以從數組種把數據取走,我要做的兩件事就是開啟兩個線程,一個存數據,一個取數據。但是問題來了,如果緩沖區滿了,說明接收的消息太多了,即發送過來的消息太快了,我另一個線程還來不及發完,導致現在的緩沖區沒地方放了,那么此時就得阻塞存數據這個線程,讓其等待;相反,如果我轉發的太快,現在緩沖區所有內容都被我發完了,還沒有用戶發新的消息來,那么此時就得阻塞取數據這個線程。
好了,分析完了這個緩沖區的阻塞隊列,下面就用 Condition 技術來實現一下。
class?Buffer?{????final?Lock?lock?=?new?ReentrantLock();?//定義一個鎖????final?Condition?notFull?=?lock.newCondition();?//定義阻塞隊列滿了的Condition????final?Condition?notEmpty?=?lock.newCondition();//定義阻塞隊列空了的Condition????final?Object[]?items?=?new?Object[10];?//為了下面模擬,設置阻塞隊列的大小為10,不要設太大????int?putptr,?takeptr,?count;?//數組下標,用來標定位置的????//往隊列中存數據????public?void?put(Object?x)?throws?InterruptedException?{????????lock.lock();?//上鎖????????try?{????????????while?(count?==?items.length)?{????????????????System.out.println(Thread.currentThread().getName()?+?"?被阻塞了,暫時無法存數據!");????????????????notFull.await();????//如果隊列滿了,那么阻塞存數據這個線程,等待被喚醒????????????}????????????//如果沒滿,按順序往數組中存????????????items[putptr]?=?x;????????????if?(++putptr?==?items.length)?//這是到達數組末端的判斷,如果到了,再回到始端????????????????putptr?=?0;????????????++count;????//消息數量????????????System.out.println(Thread.currentThread().getName()?+?"?存好了值:?"?+?x);????????????notEmpty.signal();?//好了,現在隊列中有數據了,喚醒隊列空的那個線程,可以取數據啦????????}?finally?{????????????lock.unlock();?//放鎖????????}????}????//從隊列中取數據????public?Object?take()?throws?InterruptedException?{????????lock.lock();?//上鎖????????try?{????????????while?(count?==?0)?{????????????????System.out.println(Thread.currentThread().getName()?+?"?被阻塞了,暫時無法取數據!");????????????????notEmpty.await();??//如果隊列是空,那么阻塞取數據這個線程,等待被喚醒????????????}????????????//如果沒空,按順序從數組中取????????????Object?x?=?items[takeptr];????????????if?(++takeptr?==?items.length)?//判斷是否到達末端,如果到了,再回到始端????????????????takeptr?=?0;????????????--count;?//消息數量????????????System.out.println(Thread.currentThread().getName()?+?"?取出了值:?"?+?x);????????????notFull.signal();?//好了,現在隊列中有位置了,喚醒隊列滿的那個線程,可以存數據啦????????????return?x;????????}?finally?{????????????lock.unlock();?//放鎖????????}????}}
這個程序很經典,我是從官方 JDK 文檔中拿出來的,然后加了注釋。程序中定義了來兩個 Condition,分別針對兩個線程,等待和喚醒分別用不同的 Condition 來執行,思路很清晰,程序也很健壯。
可以考慮一個問題,為啥要用兩個 Condition 呢?之所以這么設計肯定是有原因的:如果用一個 Condition,現在假設隊列滿了,但是有 2 個線程 A 和 B 同時存數據,那么都進入了睡眠,好,現在另一個線程取走一個了,然后喚醒了其中一個線程 A,那么 A 可以存了,存完后, A 又喚醒一個線程,如果 B 被喚醒了,那就出問題了,因為此時隊列是滿的,B 不能存的,B存的話就會覆蓋原來還沒被取走的只,這就是一個 Condition 帶來的問題。
來測試一下上面的阻塞隊列的效果。
public?class?BoundedBuffer?{????public?static?void?main(String[]?args)?{????????????????Buffer?buffer?=?new?Buffer();???????????????for(int?i?=?0;?i?5;?i?++)?{?//開啟5個線程往緩沖區存數據????????????new?Thread(new?Runnable()?{?????????????????????????????@Override????????????????public?void?run()?{????????????????????try?{????????????????????????buffer.put(new?Random().nextInt(1000));?//隨機存數據????????????????????}?catch?(InterruptedException?e)?{????????????????????????e.printStackTrace();????????????????????}????????????????}????????????}).start();????????}????????for(int?i?=?0;?i?10;?i?++)?{?//開啟10個線程從緩沖區中取數據????????????new?Thread(new?Runnable()?{?????????????????????????????@Override????????????????public?void?run()?{????????????????????try?{????????????????????????buffer.take();?//從緩沖區取數據????????????????????}?catch?(InterruptedException?e)?{????????????????????????e.printStackTrace();????????????????????}????????????????}????????????}).start();????????}????}}
我故意只開啟 5 個線程存數據,10 個線程取數據,就是想讓它出現取數據被阻塞的情況發生,看運行的結果。
Thread-5 被阻塞了,暫時無法取數據!
Thread-10 被阻塞了,暫時無法取數據!
Thread-1 存好了值: 755
Thread-0 存好了值: 206
Thread-2 存好了值: 741
Thread-3 存好了值: 381
Thread-14 取出了值: 755
Thread-4 存好了值: 783
Thread-6 取出了值: 206
Thread-7 取出了值: 741
Thread-8 取出了值: 381
Thread-9 取出了值: 783
Thread-5 被阻塞了,暫時無法取數據!
Thread-11 被阻塞了,暫時無法取數據!
Thread-12 被阻塞了,暫時無法取數據!
Thread-10 被阻塞了,暫時無法取數據!
Thread-13 被阻塞了,暫時無法取數據!
從結果中可以看出,線程 5 和10 搶先執行,發現隊列中沒有,于是就被阻塞了,睡在那了,知道隊列中有新的值存入才可以取,但是他們兩運氣不好,存的數據又被其他線程前線取走了……可以多運行幾次。如果想要看到存數據被阻塞,可以將取數據的線程設置少一點,這里我就不設了。
還是原來那個題目,現在讓三個線程來執行,看一下題目:
有三個線程,子線程1先執行10次,然后子線程2執行10次,然后主線程執行5次,然后再切換到子線程1執行10次,子線程2執行10次,主線程執行5次……如此往返執行50次。
如果不用 Condition,還真不好弄,但是用 Condition 來做的話,就非常方便了,原理很簡單,定義三個 Condition,子線程 1 執行完喚醒子線程 2,子線程 2 執行完喚醒主線程,主線程執行完喚醒子線程1。喚醒機制和上面那個緩沖區道理差不多,下面附上代碼。
public?class?ThreeConditionCommunication?{????public?static?void?main(String[]?args)?{????????Business?bussiness?=?new?Business();????????new?Thread(new?Runnable()?{//?開啟一個子線程????????????????????@Override????????????????????public?void?run()?{????????????????????????for?(int?i?=?1;?i?<=?50;?i++)?{????????????????????????????bussiness.sub1(i);????????????????????????}????????????????????}????????????????}).start();????????new?Thread(new?Runnable()?{//?開啟另一個子線程????????????@Override????????????public?void?run()?{????????????????for?(int?i?=?1;?i?<=?50;?i++)?{????????????????????bussiness.sub2(i);????????????????}????????????}????????}).start();????????//?main方法主線程????????for?(int?i?=?1;?i?<=?50;?i++)?{????????????bussiness.main(i);????????}????}????static?class?Business?{????????Lock?lock?=?new?ReentrantLock();????????Condition?condition1?=?lock.newCondition();?//Condition是在具體的lock之上的????????Condition?condition2?=?lock.newCondition();????????Condition?conditionMain?=?lock.newCondition();????????private?int?bShouldSub?=?0;????????public?void?sub1(int?i)?{????????????lock.lock();????????????try?{????????????????while?(bShouldSub?!=?0)?{????????????????????try?{????????????????????????condition1.await();?//用condition來調用await方法????????????????????}?catch?(Exception?e)?{????????????????????????//?TODO?Auto-generated?catch?block????????????????????????e.printStackTrace();????????????????????}????????????????}????????????????for?(int?j?=?1;?j?<=?10;?j++)?{????????????????????System.out.println("sub1?thread?sequence?of?"?+?j????????????????????????????+?",?loop?of?"?+?i);????????????????}????????????????bShouldSub?=?1;????????????????condition2.signal();?//讓線程2執行????????????}?finally?{????????????????lock.unlock();????????????}????????}????????public?void?sub2(int?i)?{????????????lock.lock();????????????try?{????????????????while?(bShouldSub?!=?1)?{????????????????????try?{????????????????????????condition2.await();?//用condition來調用await方法????????????????????}?catch?(Exception?e)?{????????????????????????//?TODO?Auto-generated?catch?block????????????????????????e.printStackTrace();????????????????????}????????????????}????????????????for?(int?j?=?1;?j?<=?10;?j++)?{????????????????????System.out.println("sub2?thread?sequence?of?"?+?j????????????????????????????+?",?loop?of?"?+?i);????????????????}????????????????bShouldSub?=?2;????????????????conditionMain.signal();?//讓主線程執行????????????}?finally?{????????????????lock.unlock();????????????}????????}????????public?void?main(int?i)?{????????????lock.lock();????????????try?{????????????????while?(bShouldSub?!=?2)?{????????????????????try?{????????????????????????conditionMain.await();?//用condition來調用await方法????????????????????}?catch?(Exception?e)?{????????????????????????//?TODO?Auto-generated?catch?block????????????????????????e.printStackTrace();????????????????????}????????????????}????????????????for?(int?j?=?1;?j?<=?5;?j++)?{????????????????????System.out.println("main?thread?sequence?of?"?+?j????????????????????????????+?",?loop?of?"?+?i);????????????????}????????????????bShouldSub?=?0;????????????????condition1.signal();?//讓線程1執行????????????}?finally?{????????????????lock.unlock();????????????}????????}????}}
關于線程中 Condition 技術就分享這么多吧。
來源:微信公眾號
數據結構 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。