elasticsearch入門系列">elasticsearch入門系列
760
2022-05-29
0 異步的優勢
太多的線程會造成頻繁的cpu上下文切換,你可以想象一下,假設你的小公司只有8臺電腦,你雇8個程序員一直不停的工作顯然是效率最高的。考慮到程序員要休息不可能連軸轉,雇傭24個人,每天三班倒,效率也還行。
但是,你要雇傭10000個人,他們還是只能用這8臺電腦,大部分時間不都浪費在換人、交接工作上啦。
異步編程是通過分工的方式,是為了減少了cpu因線程等待的可能,讓CPU一直處于工作狀態。換句話說,如果我們能想辦法減少CPU空閑時間,我們的計算機就可以支持更多的線程。
其實線程是一個抽象概念,我們從物理層面理解,就是單位時間內把每毫核分配處理不同的任務,從而提高單位時間內CPU的利用率。
線程就是為了能自動分配CPU時間片而生的。
異步實現里面還是要用線程池限制一下線程數吧,否則沒有達到減少線程的效果。
異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量的場景中,極大提升系統的整體性能,顯著降低時延。
因此,像MQ這種需要超高吞吐量和超低時延的中間件系統,在其核心流程中,會大量采用異步設計。
1 案例引入
一個轉賬的微服務Transfer( accountFrom, accountTo, amount),該服務有三參數
轉出賬戶
轉入賬戶
轉賬金額
要從賬戶A中轉賬100到賬戶B:
先從A的賬戶中減去100元
再給B的賬戶加上100元,轉賬完成。
對應時序圖:
調用另外一個微服務Add(account, amount),給賬戶account增加金額amount,當amount為負值時,就是扣減相應金額。
為簡化,省略了錯誤處理和事務相關代碼
2 同步的性能瓶頸
同步實現,對應的偽代碼:
Transfer(accountFrom, accountTo, amount) { // 先從accountFrom的賬戶中減去相應的錢數 Add(accountFrom, -1 * amount) // 再把減去的錢數加到accountTo的賬戶中 Add(accountTo, amount) return OK }
1
2
3
4
5
6
7
先從accountFrom的賬戶中減去相應的錢數,再把減去的錢數加到accountTo的賬戶中,同步實現簡單直接。
性能如何?
假設微服務Add的平均響應時延是50ms,微服務Transfer的平均響應時延大約等于執行2次Add的時延,即100ms。隨調用Transfer服務的請求越來越多,會出現什么情況呢?
該實現,每處理一個請求耗時100ms,這100ms過程要獨占個線程。
可得:每個線程每秒最多可處理10個請求。假設服務器同時打開的線程數量上限10,000,可以計算出這臺服務器每秒鐘可以處理的請求上限是: 10,000 (個線程)* 10(次請求每秒) = 100,000 次每秒。
若請求速度超過這個值,那么請求就不能被馬上處理,只能阻塞或者排隊,這時候Transfer服務的響應時延由100ms延長到了:排隊的等待時延 + 處理時延(100ms)。即大量請求時,我們的微服務的平均響應時延變長了。
這是不是已達服務器極限?遠沒有!
若監測服務器指標,會發現無論是CPU、內存,還是網卡流量或者是磁盤的IO都空閑的很,那我們Transfer服務中的那10,000個線程在作甚?
沒錯!絕大部分線程都在等待Add服務返回結果。
即采用同步,整個服務器的所有線程大部分時間都沒在工作,而在等待!
若能減少或避免這種無意義等待,即可大幅提升服務吞吐能力,提升性能。
異步實現方案
異步實現同樣的業務。
TransferAsync(accountFrom, accountTo, amount, OnComplete()) { // 異步從accountFrom的賬戶中減去相應的錢數,然后調用OnDebit方法。 AddAsync(accountFrom, -1 * amount, OnDebit(accountTo, amount, OnAllDone(OnComplete()))) } // 扣減賬戶accountFrom完成后調用 OnDebit(accountTo, amount, OnAllDone(OnComplete())) { // 再異步把減去的錢數加到accountTo的賬戶中,然后執行OnAllDone方法 AddAsync(accountTo, amount, OnAllDone(OnComplete())) } // 轉入賬戶accountTo完成后調用 OnAllDone(OnComplete()) { OnComplete() }
1
2
3
4
5
6
7
8
9
10
11
12
13
TransferAsync服務比Transfer多個參數,且該參數傳入的是一個回調方法OnComplete()(雖然Java并不支持將方法作為方法參數傳遞,但像JavaScript等很多語言都具有這樣特性,Java可傳個回調類的實例來變相實現)。
TransferAsync()方法語義
請幫我執行轉賬操作,當轉賬完成后,請調用OnComplete()方法。
調用TransferAsync的線程不必等待轉賬完成即可立即返回,待轉賬結束,TransferService自然會調用OnComplete()方法來執行轉賬后續工作。
異步實現相對同步稍復雜。先定義倆回調方法:
OnDebit():扣減賬戶accountFrom完成后調用的回調方法
OnAllDone():轉入賬戶accountTo完成后調用的回調方法。
異步實現語義相當于:
異步從accountFrom的賬戶中減去相應錢數,然后調用OnDebit
在OnDebit方法中,異步把減去的錢數加到accountTo的賬戶中,然后執行OnAllDone
在OnAllDone中調用OnComplete
時序圖
異步后,整個流程時序和同步完全一樣,只是線程模型由同步調用改為異步和回調。
異步實現的性能
由于流程時序和同步一樣,在少量請求場景下,平均響應時延一樣100ms。在高請求數量場景下,異步不再需線程等待執行結果,只需個位數量的線程,即可實現同步場景大量線程一樣的吞吐量。
由于沒線程的數量的限制,總體吞吐量上限會大大超過同步實現,且在服務器CPU、網絡帶寬資源達到極限前,響應時延不會隨請求數量增加而顯著升高,幾乎可一直保持約100ms的平均響應時延。
異步框架: CompletableFuture
開發時可使用異步框架和響應式框架,解決一些異步編程問題。
Java中比較常用的異步框架有Java8內置的CompletableFuture和ReactiveX的RxJava。
CompletableFuture簡單實用易理解
RxJava功能更強大
Java 8中新增的異步編程類:CompletableFuture,幾乎囊獲了開發異步程序的大部分功能,使用CompletableFuture很容易編寫優雅且易維護的異步代碼。
接下來用CompletableFuture實現的轉賬服務。
CompletableFuture定義2個微服務的接口:
/** * 賬戶服務 */ public interface AccountService { /** * 變更賬戶金額 * @param account 賬戶ID * @param amount 增加的金額,負值為減少 */ CompletableFuture
1
2
3
4
5
6
7
8
9
10
11
/** * 轉賬服務 */ public interface TransferService { /** * 異步轉賬服務 * @param fromAccount 轉出賬戶 * @param toAccount 轉入賬戶 * @param amount 轉賬金額,單位分 */ CompletableFuture
1
2
3
4
5
6
7
8
9
10
11
12
接口中定義的方法的返回類型都是泛型CompletableFeture,泛型類型就是真正方法需要返回數據的類型,這倆服務無需返回數據,所以用Void。
實現轉賬服務:
/** * 轉賬服務的實現 */ public class TransferServiceImpl implements TransferService { @Inject private AccountService accountService; // 使用依賴注入獲取賬戶服務的實例 @Override public CompletableFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
先定義一個AccountService實例,這個實例從外部注入進來,至于怎么注入不是我們關心的問題,就假設這個實例是可用的就好了。
看transfer()方法,先調用一次賬戶服務accountService.add()方法從fromAccount扣減相應金額,因add()方法返回的就是個CompletableFeture對象,可用CompletableFeture的thenCompose()方法將下一次調用accountService.add()串聯起來,實現異步依次調用兩次賬戶服務完整轉賬。
客戶端使用CompletableFuture也靈活,既可同步調用,也可異步。
public class Client { @Inject private TransferService transferService; // 使用依賴注入獲取轉賬服務的實例 private final static int A = 1000; private final static int B = 1001; public void syncInvoke() throws ExecutionException, InterruptedException { // 同步調用 transferService.transfer(A, B, 100).get(); System.out.println("轉賬完成!"); } public void asyncInvoke() { // 異步調用 transferService.transfer(A, B, 100) .thenRun(() -> System.out.println("轉賬完成!")); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
調用異步方法獲得返回值CompletableFuture對象后
既可調CompletableFuture#get,像調用同步方法樣等待調用的方法執行結束并獲得返回值
亦可如異步回調,調用CompletableFuture那些then開頭方法,為CompletableFuture定義異步方法結束之后的后續操作
比如上例,調用thenRun()方法,參數就是將轉賬完成打印在控臺上這個操作,這樣就可以實現在轉賬完成后,在控制臺打印“轉賬完成!”了。
總結
異步的思想就是,當要執行一項比較耗時的操作時,不去等待操作結束,而是給這個操作一個命令:“當操作完成后,接下來去執行什么。”
使用異步編程模型,雖并不能加快程序本身速度,但可減少或者避免線程等待,只用很少的線程即可得到超高吞吐。
同時我們也需注意異步模型問題:相比同步,異步實現的復雜度大很多,代碼可讀性和可維護性都顯著下降。
雖然使用一些異步編程框架會在一定程度上簡化異步開發,但是并不能解決異步模型高復雜度的問題。
異步性能雖好,勿濫用,只有類似MQ這種業務邏輯簡單且需超高吞吐量場景,或須長時等待資源,才考慮使用異步模型。
如果系統的業務邏輯比較復雜,在性能足夠滿足業務需求的情況下,采用符合人類自然的思路且易于開發和維護的同步模型是更加明智的選擇。
面試場景題快問快答
實現轉賬服務時,未考慮處理失敗情況。異步實現中,若調用賬戶服務失敗,如何將錯誤報告給客戶端?在兩次調用賬戶服務的Add方法時,如果某一次調用失敗了,該如何處理才能保證賬戶數據是平的?
調用賬戶失敗,可以在異步callBack里執行通知客戶端的邏輯
若是第一次失敗,后面那步就不用執行了,所以轉賬失敗;如果是第一次成功但是第二次失敗,首先考慮重試,如果轉賬服務是冪等的,可以考慮一定次數的重試,如果不能重試,可以考慮采用補償機制,undo第一次的轉賬操作。
在異步實現中,回調方法OnComplete()是在什么線程中運行的?我們是否能控制回調方法的執行線程數
CompletableFuture默認是在ForkjoinPool commonpool里執行的,也可以指定一個Executor線程池執行,借鑒guava的ListenableFuture的時間,回調可以指定線程池執行,這樣就能控制這個線程池的線程數目了。
在異步實現中,回調方法 OnComplete()在執行OnAllDone()回調方法的那個線程,可通過一個異步線程池控制回調方法的線程數,如Spring中的async就是通過結合線程池來實現異步。
任務調度 微服務
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。