并發編程實戰-創建和執行任務的最佳實踐
創建和執行任務
若無法通過并行流實現并發,則必須創建并運行自己的任務。運行任務的理想Java 8方法就是CompletableFuture。
Java并發的歷史始于非常原始和有問題的機制,并且充滿各種嘗試的優化。本文將展示一個規范形式,表示創建和運行任務的最簡單,最好的方法。
Java初期通過直接創建自己的Thread對象來使用線程,甚至子類化來創建特定“任務線程”對象。手動調用構造函數并自己啟動線程。創建所有這些線程的開銷變得非常重要,現在不鼓勵。Java 5中,添加了類來為你處理線程池。可以將任務創建為單獨的類型,然后將其交給ExecutorService運行,而不是為每種不同類型的任務創建新的Thread子類型。ExecutorService為你管理線程,并在運行任務后重新循環線程而不是丟棄線程。
創建任務
這只是個包含run()方法的Runnable類。它沒有包含實際運行任務的機制。使用Nap類中的“sleep”:
第二個構造函數在超時的時候,會顯示一條消息。TimeUnit.MILLISECONDS.sleep():獲取“當前線程”并在參數中將其置于休眠狀態,這意味著該線程被掛起。這并不意味著底層處理器停止。os將其切換到其他任務,例如在你的計算機上運行另一個窗口。OS任務管理器定期檢查**sleep()**是否超時。當它執行時,線程被“喚醒”并給予更多處理時間。
sleep()拋已檢查的InterruptedException:通過突然中斷它們來終止任務。由于它往往會產生不穩定狀態,所以不鼓勵用來終止。但我們必須在需要或仍發生終止的情況下捕獲該異常。
執行任務
結果:
All tasks submitted main awaiting termination main awaiting termination NapTask[0] pool-1-thread-1 main awaiting termination NapTask[1] pool-1-thread-1 main awaiting termination NapTask[2] pool-1-thread-1 main awaiting termination NapTask[3] pool-1-thread-1 main awaiting termination NapTask[4] pool-1-thread-1 main awaiting termination NapTask[5] pool-1-thread-1 main awaiting termination NapTask[6] pool-1-thread-1 main awaiting termination NapTask[7] pool-1-thread-1 main awaiting termination NapTask[8] pool-1-thread-1 main awaiting termination NapTask[9] pool-1-thread-1
創建十個NapTasks并將它們提交給ExecutorService,它們開始自己運行。然而,期間main()繼續運行。當運行至exec.shutdown();時,main告訴ExecutorService完成已提交的任務,但不再接受新任務。此時,這些任務仍在運行,必須等到它們在退出main()之前完成。這是通過檢查exec.isTerminated()來實現:在所有任務完成后為true。
main()中線程的名稱是main,且只有一個其他線程pool-1-thread-1。此外,交錯輸出顯示兩個線程確實在同時運行。
若僅調用exec.shutdown(),程序將完成所有任務,若嘗試提交新任務將拋RejectedExecutionException。
exec.shutdown()的替代方法exec.shutdownNow():除了不接受新任務,還會嘗試通過中斷任務來停止任何當前正在運行的任務。同樣,中斷是錯誤的,容易出錯,不鼓勵!
使用更多線程
使用線程的重點幾乎總是更快地完成任務,那為何要限制自己使用SingleThreadExecutor?Executors還給了我們更多選項,如CachedThreadPool:
運行該程序時,你會發現它完成得更快。這是有道理的,而不是使用相同線程來順序運行每個任務,每個任務都有自己的線程,所以它們并行運行。似乎沒有缺點,很難看出為什么有人會使用SingleThreadExecutor。
要理解這個問題,需要一個更復雜任務:
用CachedThreadPool試一下:
輸出結果:
0 pool-1-thread-1 195 3 pool-1-thread-4 400 2 pool-1-thread-3 300 1 pool-1-thread-2 200 5 pool-1-thread-6 600 6 pool-1-thread-7 700 4 pool-1-thread-5 500 7 pool-1-thread-3 800 8 pool-1-thread-5 900 9 pool-1-thread-7 1000
輸出不是期望的,并且從一次運行到下一次運行會有所不同。問題是所有的任務都試圖寫入val的單個實例,并且他們正在踩著彼此的腳趾。這樣的類就不是線程安全的。
看SingleThreadExecutor表現怎樣:
輸出結果:
0 pool-1-thread-1 100 1 pool-1-thread-1 200 2 pool-1-thread-1 300 3 pool-1-thread-1 400 4 pool-1-thread-1 500 5 pool-1-thread-1 600 6 pool-1-thread-1 700 7 pool-1-thread-1 800 8 pool-1-thread-1 900 9 pool-1-thread-1 1000
每次都得到一致結果,雖然InterferingTask缺乏線程安全性。這是SingleThreadExecutor的主要好處 - 因為它一次運行一個任務,這些任務不會相互干擾,等于強加了線程安全性。這種現象稱為線程限制,因為在單線程上運行任務限制了它們的影響。【線程限制】限制了加速,但能節省很多困難的調試和重寫。
產生結果
因為InterferingTask是Runnable,無返回值,因此只能使用副作用產生結果 - 操縱緩沖值而不是返回結果。副作用是并發編程中的主要問題之一,因為我們看到了CachedThreadPool2.java。InterferingTask中的val被稱為可變共享狀態,這就是問題:多個任務同時修改同一個變量會產生競爭。結果取決于首先在終點線上執行哪個任務,并修改變量(以及其他可能性的各種變化)。
避免競爭條件的最好方法是避免可變的共享狀態,可稱為自私的孩子原則:什么都不分享。
使用InterferingTask,最好刪除副作用并返回任務結果。為此,我們創建Callable而非Runnable:
call()完全獨立于所有其他CountingTasks生成其結果,這意味著沒有可變的共享狀態。
輸出結果:
0 pool-1-thread-1 100 2 pool-1-thread-3 100 1 pool-1-thread-2 100 3 pool-1-thread-4 100 4 pool-1-thread-5 100 5 pool-1-thread-6 100 6 pool-1-thread-7 100 7 pool-1-thread-5 100 8 pool-1-thread-7 100 9 pool-1-thread-6 100 sum = 1000
所有任務完成后,invokeAll()才會返回一個Future列表,每個任務一個Future。Future是Java 5中引入的機制,允許提交任務而無需等待它完成。
結果:
99 pool-1-thread-1 100 100
但這意味著,在CachedThreadPool3.java中,Future似乎是多余的,因為**invokeAll()**在所有任務完成前都不會返回。但此處的Future并非用于延遲結果,而是捕獲任何可能的異常。
在CachedThreadPool3.java.get()拋異常,因此extractResult()在Stream中執行此提取。因為調用get()時,Future會阻塞,所以它只能解決【等待任務完成】的問題。最終,Futures被認為是一種無效解決方案,現在不鼓勵,支持Java 8的CompletableFuture,將在后面探討。當然,你仍會在遺留庫中遇到Futures。
可使用并行Stream,更簡單優雅解決該問題:
輸出結果:
4 ForkJoinPool.commonPool-worker-15 100 1 ForkJoinPool.commonPool-worker-11 100 5 ForkJoinPool.commonPool-worker-1 100 2 ForkJoinPool.commonPool-worker-9 100 0 ForkJoinPool.commonPool-worker-6 100 3 ForkJoinPool.commonPool-worker-8 100 9 ForkJoinPool.commonPool-worker-13 100 6 main 100 8 ForkJoinPool.commonPool-worker-2 100 7 ForkJoinPool.commonPool-worker-4 100 1000
這更容易理解,需要做的就是將**parallel()**插入到其他順序操作中,然后一切都在同時運行。
Lambda和方法引用作為任務
使用lambdas和方法引用,你不僅限于使用Runnables和Callables。因為Java 8通過匹配簽名來支持lambda和方法引用(即支持結構一致性),所以我們可以將不是Runnables或Callables的參數傳遞給ExecutorService:
輸出結果:
Lambda1 NotRunnable Lambda2 NotCallable
這里,前兩個submit()調用可以改為調用execute()。所有submit()調用都返回Futures,你可以在后兩次調用的情況下提取結果。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。