Java的面向?qū)ο缶幊?/a>">Java的面向?qū)ο缶幊?/a>
701
2025-04-01
文章目錄
概述
CPU密集型 vs IO密集型
計算密集型任務
IO密集型
簡單示例
Fork/Join常用的類
RecursiveTask 實現(xiàn) 并行計算
RecursiveAction
Fork/Join執(zhí)行流程
最佳實踐
概述
分支/合并框架的目的是以遞歸方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結(jié)果合并起來生成整體結(jié)果。
它是 ExecutorService 接口的一個實現(xiàn),它把子任務分配給線程池(稱為 ForkJoinPool )中的工作線程。
CPU密集型 vs IO密集型
通常來講,任務可以劃分為計算密集型和IO密集型
計算密集型任務
特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。
這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執(zhí)行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數(shù)量應當?shù)扔贑PU的核心數(shù)。
計算密集型任務由于主要消耗CPU資源,因此,代碼運行效率至關(guān)重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對于計算密集型任務,最好用C語言編寫。
IO密集型
涉及到網(wǎng)絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低于CPU和內(nèi)存的速度)。
對于IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。
IO密集型任務執(zhí)行期間,99%的時間都花在IO上,花在CPU上的時間很少 。
簡單示例
來看個最簡單的求和
public class ForkJoinTest { private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; public static void main(String[] args) { System.out.println("result=> " + calc()); System.out.println("result=> " + calcByStream()); } private static int calc() { int result = 0; for (int i = 0; i < data.length; i++) { result += data[i]; } return result; } private static Long calcByStream() { return LongStream.rangeClosed(0,10).reduce(0, Long::sum); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Fork/Join常用的類
ForkJoinTask:我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個 ForkJoin 任務。它提供在任務中執(zhí)行 fork() 和 join() 操作的機制 。
通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,F(xiàn)ork/Join 框架提供了以下兩個子類:
RecursiveAction:用于沒有返回結(jié)果的任務。 比如寫數(shù)據(jù)到磁盤,然后就退出了。 一個RecursiveAction可以把自己的工作分割成更小的幾塊, 這樣它們可以由獨立的線程或者CPU執(zhí)行。 我們可以通過繼承來實現(xiàn)一個RecursiveAction
RecursiveTask :用于有返回結(jié)果的任務。 可以將自己的工作分割為若干更小任務,并將這些子任務的執(zhí)行合并到一個集體結(jié)果。 可以有幾個水平的分割和合并
CountedCompleter: 在任務完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)
ForkJoinPool :ForkJoinTask 需要通過 ForkJoinPool 來執(zhí)行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。
RecursiveTask 實現(xiàn) 并行計算
要把任務提交到這個池,必須創(chuàng)建 RecursiveTask
要定義 RecursiveTask, 只需實現(xiàn)它唯一的抽象方法compute
protected abstract R compute();
1
這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成單個子任務結(jié)果的邏輯。
if (任務足夠小或不可分) { 順序計算該任務 } else { 將任務分成兩個子任務 遞歸調(diào)用本方法,拆分每個子任務,等待所有子任務完成 合并每個子任務的結(jié)果 }
1
2
3
4
5
6
7
一般來說并沒有確?的標準決定一個任務是否應該再拆分,但有幾種試探方法可以幫助你
事實上,這只不過是著名的分治算法的并行版本而已。
現(xiàn)在編寫一個方法來并行對前n個自然數(shù)求和就很簡單了。你只需把想要的數(shù)字數(shù)組傳給ForkJoinSumCalculator 的構(gòu)造函數(shù):
public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask
1
2
3
4
5
這里用了一個 LongStream 來生成包含前n個自然數(shù)的數(shù)組,然后創(chuàng)建一個 ForkJoinTask( RecursiveTask 的父類),并把數(shù)組傳遞給 ForkJoinSumCalculator 的公共構(gòu)造函數(shù)。
最后,創(chuàng)建了一個新的 ForkJoinPool ,并把任務傳給它的調(diào)用方法 。在ForkJoinPool 中執(zhí)行時,最后一個方法返回的值就是 ForkJoinSumCalculator 類定義的任務結(jié)果。
在實際應用時,使用多個 ForkJoinPool 是沒有什么意義的。正是出于這個原因,一般來說把它實例化一次,然后把實例保存在靜態(tài)字段中,使之成為單例,這樣就可以在軟件中任何部分方便地重用了。這里創(chuàng)建時用了其默認的無參數(shù)構(gòu)造函數(shù),這意味著想讓線程池使用JVM能夠使用的所有處理器。更確切地說,該構(gòu)造函數(shù)將使用 Runtime.availableProcessors 的返回值來決定線程池使用的線程數(shù)。請注意 availableProcessors 方法雖然看起來是處理器,但它實際上返回的是可用內(nèi)核的數(shù)量,包括超線程生成的虛擬內(nèi)核。
當把 ForkJoinSumCalculator 任務傳給 ForkJoinPool 時,這個任務就由池中的一個線程執(zhí)行,這個線程會調(diào)用任務的 compute 方法。
該方法會檢查任務是否小到足以順序執(zhí)行,如果不夠小則會把要求和的數(shù)組分成兩半,分給兩個新的 ForkJoinSumCalculator ,而它們也由ForkJoinPool 安排執(zhí)行。
因此,這一過程可以遞歸重復,把原任務分為更小的任務,直到滿足不方便或不可能再進一步拆分的條件(本例中是求和的項目數(shù)小于等于10 000)。
這時會順序計算每個任務的結(jié)果,然后由分支過程創(chuàng)建的(隱含的)任務二叉樹遍歷回到它的根。接下來會合并每個子任務的部分結(jié)果,從而得到總?cè)蝿盏慕Y(jié)果。
package com.artisan.java8; import java.util.concurrent.RecursiveTask; public class AccumulatorRecursiveTask extends RecursiveTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
RecursiveAction
package com.artisan.java8; import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicInteger; public class AccumulatorRecursiveAction extends RecursiveAction { private final int start; private final int end; private final int[] data; private final int LIMIT = 3; public AccumulatorRecursiveAction(int start, int end, int[] data) { this.start = start; this.end = end; this.data = data; } @Override protected void compute() { if ((end - start) <= LIMIT) { for (int i = start; i < end; i++) { AccumulatorHelper.accumulate(data[i]); } } else { int mid = (start + end) / 2; AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data); AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid, end, data); left.fork(); right.fork(); left.join(); right.join(); } } static class AccumulatorHelper { private static final AtomicInteger result = new AtomicInteger(0); static void accumulate(int value) { result.getAndAdd(value); } public static int getResult() { return result.get(); } static void rest() { result.set(0); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Fork/Join執(zhí)行流程
最佳實踐
雖然分支/合并框架還算簡單易用,不幸的是它也很容易被誤用
對一個任務調(diào)用 join 方法會阻塞調(diào)用方,直到該任務做出結(jié)果。因此,有必要在兩個子任務的計算都開始之后再調(diào)用它。否則,你得到的版本會比原始的順序算法更慢更復雜,因為每個子任務都必須等待另一個子任務完成才能啟動。
不應該在 RecursiveTask 內(nèi)部使用 ForkJoinPool 的 invoke 方法。相反,你應該始終直接調(diào)用 compute 或 fork 方法,只有順序代碼才應該用 invoke 來啟動并行計算。
Java 任務調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。