Java8 - 一文搞定Fork/Join 框架

      網(wǎng)友投稿 701 2025-04-01

      文章目錄


      概述

      CPU密集型 vs IO密集型

      計算密集型任務

      IO密集型

      簡單示例

      Fork/Join常用的類

      RecursiveTask 實現(xiàn) 并行計算

      RecursiveAction

      Fork/Join執(zhí)行流程

      最佳實踐

      概述

      分支/合并框架的目的是以遞歸方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結(jié)果合并起來生成整體結(jié)果。

      它是 ExecutorService 接口的一個實現(xiàn),它把子任務分配給線程池(稱為 ForkJoinPool )中的工作線程。

      CPU密集型 vs IO密集型

      通常來講,任務可以劃分為計算密集型和IO密集型

      計算密集型任務

      特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。

      這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執(zhí)行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數(shù)量應當?shù)扔贑PU的核心數(shù)。

      計算密集型任務由于主要消耗CPU資源,因此,代碼運行效率至關(guān)重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對于計算密集型任務,最好用C語言編寫。

      IO密集型

      涉及到網(wǎng)絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低于CPU和內(nèi)存的速度)。

      對于IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。

      IO密集型任務執(zhí)行期間,99%的時間都花在IO上,花在CPU上的時間很少 。

      簡單示例

      來看個最簡單的求和

      public class ForkJoinTest { private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; public static void main(String[] args) { System.out.println("result=> " + calc()); System.out.println("result=> " + calcByStream()); } private static int calc() { int result = 0; for (int i = 0; i < data.length; i++) { result += data[i]; } return result; } private static Long calcByStream() { return LongStream.rangeClosed(0,10).reduce(0, Long::sum); } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      Fork/Join常用的類

      ForkJoinTask:我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個 ForkJoin 任務。它提供在任務中執(zhí)行 fork() 和 join() 操作的機制 。

      通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,F(xiàn)ork/Join 框架提供了以下兩個子類:

      RecursiveAction:用于沒有返回結(jié)果的任務。 比如寫數(shù)據(jù)到磁盤,然后就退出了。 一個RecursiveAction可以把自己的工作分割成更小的幾塊, 這樣它們可以由獨立的線程或者CPU執(zhí)行。 我們可以通過繼承來實現(xiàn)一個RecursiveAction

      RecursiveTask :用于有返回結(jié)果的任務。 可以將自己的工作分割為若干更小任務,并將這些子任務的執(zhí)行合并到一個集體結(jié)果。 可以有幾個水平的分割和合并

      CountedCompleter: 在任務完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)

      ForkJoinPool :ForkJoinTask 需要通過 ForkJoinPool 來執(zhí)行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。

      RecursiveTask 實現(xiàn) 并行計算

      要把任務提交到這個池,必須創(chuàng)建 RecursiveTask 的一個子類,其中 R 是并行化任務(以及所有子任務)產(chǎn)生的結(jié)果類型,或者如果任務不返回結(jié)果,則是 RecursiveAction 類型 。

      要定義 RecursiveTask, 只需實現(xiàn)它唯一的抽象方法compute

      protected abstract R compute();

      1

      這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成單個子任務結(jié)果的邏輯。

      if (任務足夠小或不可分) { 順序計算該任務 } else { 將任務分成兩個子任務 遞歸調(diào)用本方法,拆分每個子任務,等待所有子任務完成 合并每個子任務的結(jié)果 }

      1

      2

      3

      4

      5

      6

      7

      一般來說并沒有確?的標準決定一個任務是否應該再拆分,但有幾種試探方法可以幫助你

      事實上,這只不過是著名的分治算法的并行版本而已。

      現(xiàn)在編寫一個方法來并行對前n個自然數(shù)求和就很簡單了。你只需把想要的數(shù)字數(shù)組傳給ForkJoinSumCalculator 的構(gòu)造函數(shù):

      public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); }

      1

      2

      3

      4

      5

      這里用了一個 LongStream 來生成包含前n個自然數(shù)的數(shù)組,然后創(chuàng)建一個 ForkJoinTask( RecursiveTask 的父類),并把數(shù)組傳遞給 ForkJoinSumCalculator 的公共構(gòu)造函數(shù)。

      最后,創(chuàng)建了一個新的 ForkJoinPool ,并把任務傳給它的調(diào)用方法 。在ForkJoinPool 中執(zhí)行時,最后一個方法返回的值就是 ForkJoinSumCalculator 類定義的任務結(jié)果。

      在實際應用時,使用多個 ForkJoinPool 是沒有什么意義的。正是出于這個原因,一般來說把它實例化一次,然后把實例保存在靜態(tài)字段中,使之成為單例,這樣就可以在軟件中任何部分方便地重用了。這里創(chuàng)建時用了其默認的無參數(shù)構(gòu)造函數(shù),這意味著想讓線程池使用JVM能夠使用的所有處理器。更確切地說,該構(gòu)造函數(shù)將使用 Runtime.availableProcessors 的返回值來決定線程池使用的線程數(shù)。請注意 availableProcessors 方法雖然看起來是處理器,但它實際上返回的是可用內(nèi)核的數(shù)量,包括超線程生成的虛擬內(nèi)核。

      當把 ForkJoinSumCalculator 任務傳給 ForkJoinPool 時,這個任務就由池中的一個線程執(zhí)行,這個線程會調(diào)用任務的 compute 方法。

      該方法會檢查任務是否小到足以順序執(zhí)行,如果不夠小則會把要求和的數(shù)組分成兩半,分給兩個新的 ForkJoinSumCalculator ,而它們也由ForkJoinPool 安排執(zhí)行。

      因此,這一過程可以遞歸重復,把原任務分為更小的任務,直到滿足不方便或不可能再進一步拆分的條件(本例中是求和的項目數(shù)小于等于10 000)。

      這時會順序計算每個任務的結(jié)果,然后由分支過程創(chuàng)建的(隱含的)任務二叉樹遍歷回到它的根。接下來會合并每個子任務的部分結(jié)果,從而得到總?cè)蝿盏慕Y(jié)果。

      package com.artisan.java8; import java.util.concurrent.RecursiveTask; public class AccumulatorRecursiveTask extends RecursiveTask { private final int start; private final int end; private final int[] data; private final int LIMIT = 3; public AccumulatorRecursiveTask(int start, int end, int[] data) { this.start = start; this.end = end; this.data = data; } @Override protected Integer compute() { if ((end - start) <= LIMIT) { int result = 0; for (int i = start; i < end; i++) { result += data[i]; } return result; } int mid = (start + end) / 2; AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data); AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid, end, data); left.fork(); Integer rightResult = right.compute(); Integer leftResult = left.join(); return rightResult + leftResult; } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      Java8 - 一文搞定Fork/Join 框架

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      RecursiveAction

      package com.artisan.java8; import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicInteger; public class AccumulatorRecursiveAction extends RecursiveAction { private final int start; private final int end; private final int[] data; private final int LIMIT = 3; public AccumulatorRecursiveAction(int start, int end, int[] data) { this.start = start; this.end = end; this.data = data; } @Override protected void compute() { if ((end - start) <= LIMIT) { for (int i = start; i < end; i++) { AccumulatorHelper.accumulate(data[i]); } } else { int mid = (start + end) / 2; AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data); AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid, end, data); left.fork(); right.fork(); left.join(); right.join(); } } static class AccumulatorHelper { private static final AtomicInteger result = new AtomicInteger(0); static void accumulate(int value) { result.getAndAdd(value); } public static int getResult() { return result.get(); } static void rest() { result.set(0); } } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      56

      57

      Fork/Join執(zhí)行流程

      最佳實踐

      雖然分支/合并框架還算簡單易用,不幸的是它也很容易被誤用

      對一個任務調(diào)用 join 方法會阻塞調(diào)用方,直到該任務做出結(jié)果。因此,有必要在兩個子任務的計算都開始之后再調(diào)用它。否則,你得到的版本會比原始的順序算法更慢更復雜,因為每個子任務都必須等待另一個子任務完成才能啟動。

      不應該在 RecursiveTask 內(nèi)部使用 ForkJoinPool 的 invoke 方法。相反,你應該始終直接調(diào)用 compute 或 fork 方法,只有順序代碼才應該用 invoke 來啟動并行計算。

      Java 任務調(diào)度

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:供應鏈管理如何與制造業(yè)生產(chǎn)率相關(guān)
      下一篇:wps表格怎樣拆分和合并單元格(wps表格合并后怎么拆分)
      相關(guān)文章
      亚洲精品在线不卡| 国产精品亚洲视频| 国产偷窥女洗浴在线观看亚洲| 亚洲中文字幕久久精品蜜桃| 亚洲一区二区免费视频| 久久精品国产亚洲AV无码偷窥| 久久久综合亚洲色一区二区三区| 久久亚洲国产精品一区二区| 亚洲国产婷婷六月丁香| 亚洲精品白浆高清久久久久久| 国产亚洲精久久久久久无码| 亚洲精品无码国产| 亚洲成AV人片一区二区| 亚洲VA中文字幕无码一二三区| 亚洲va无码va在线va天堂| 日本红怡院亚洲红怡院最新| 亚洲国产精品成人精品无码区| 亚洲av无码精品网站| 亚洲综合自拍成人| 亚洲伊人精品综合在合线| 国产亚洲国产bv网站在线| 久久亚洲精品专区蓝色区| 亚洲色偷偷综合亚洲AV伊人蜜桃| 亚洲国产aⅴ成人精品无吗| 国产成人亚洲综合无| 亚洲精品岛国片在线观看| 久久久久亚洲精品天堂久久久久久 | 色偷偷亚洲第一综合| 亚洲AⅤ视频一区二区三区| 亚洲国产精品无码久久九九| 中文字幕亚洲第一| 亚洲av中文无码乱人伦在线r▽| 亚洲午夜精品久久久久久人妖| 亚洲成人福利在线观看| 亚洲无人区码一二三码区别图片| 亚洲av午夜国产精品无码中文字| 亚洲国产精品自在拍在线播放 | 亚洲综合伊人久久综合| 久久亚洲免费视频| 亚洲一线产区二线产区精华| 亚洲国产区男人本色|