Java自制簡易線程池(不依賴concurrent包)

      網友投稿 614 2022-05-30

      很久之前人們為了繼續享用并行化帶來的好處而不想使用進程,于是創造出了比進程更輕量級的線程。以linux為例,創建一個進程需要申請新的自己的內存空間,從父進程拷貝一些數據,所以開銷是比較大的,線程(或稱輕量級進程)可以和父進程共享內存空間,讓創建線程的開銷遠小于創建進程,于是就有了現在多線程的繁榮。

      但是即便創建線程的開銷很小,但頻繁創建刪除也是很浪費性能的,于是人們又想到了線程池,線程池里的線程只會被創建一次,用完也不會銷毀,而是在線程池里等待被重復利用。這種尤其適用于多而小的任務。舉個極端點的例子,如果一個小任務的執行消耗不及創建和銷毀一個線程的消耗,那么不使用線程池時一大半的性能消耗都會是線程創建和銷毀。 最開始學java的時候,一直不理解線程池,尤其是理解不了線程是如何被復用的,以及線程池和我創建的Thread/Runnable對象有什么關系,今天我們就來寫一個建議的線程池來理解這一切。(不依賴java concurrent包)

      首先糾正很多人的一個誤解,我們new一個Thread/Runnable對象的時候,并不是創建出一個線程,而是創建了一個需要被線程執行的任務,當我們調用Thread.start()方法的時候,jvm才會幫我們創建一個線程。線程池只是幫你執行這些任務而已,你submit的時候只是把這個任務放到某個存儲里,等待線程池里空閑的線程來執行,而不是創建線程。知道了這點,所以我們首先得有個東西來存儲任務,還要支持多線程下的存取,最好還支持阻塞以避免無任務時的線程空轉。

      除了存儲外,我們還需要一些線程來消費這些任務,看到這你可能就很明白的知道了這其實是個生產者消費者模型,Java有好多種生產者消費者的實現,可以參考我之前的博客Java生產者消費者的幾種實現方式。如果實現線程池,我們可以選擇使用BlockingQueue來實現。雖然java concurrent包里已經實現了好多BlockingQueue,但為了讓大家理解BlockingQueue做了啥,我這里用LinkedListQueue簡單封裝了一個簡易BlockingQueue,代碼如下。

      package me.xindoo.concurrent; import java.util.LinkedList; import java.util.Queue; public class LinkedBlockingQueue { private Object lock; private Queue queue; public LinkedBlockingQueue() { queue = new LinkedList<>(); lock = new Object(); } public boolean add(E e) { synchronized (lock) { queue.add(e); lock.notify(); return true; } } public E take() { synchronized (lock) { while (queue.isEmpty()) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return queue.poll(); } } public synchronized int size() { return queue.size(); } }

      我也只是簡單在LinkedListQueue的基礎上對其加了synchronized,以保證它在多線程環境下的正常運轉。其次我在隊列為空時通過wait()方法加了線程阻塞,以防止空隊列時線程空轉。既然加了阻塞也得加喚醒,每次在往隊列里添加任務的時候,就會調用notify()來喚醒一個等待中的線程。

      存儲搞定了,我們接下來需要實現的就是消費者。消費者就是線程池里的線程,因為任務隊列里的任務都是實現了Runnable接口,所以我們消費任務時都可以直接調用其run()方法來執行。當一個任務執行完成后在從隊列里去取,知道整個線程池被shutdown。

      package me.xindoo.concurrent; public class ThreadPool { private int coreSize; private boolean stop = false; private LinkedBlockingQueue tasks = new LinkedBlockingQueue<>(); private Thread[] threads; public ThreadPool(int coreSize) { this.coreSize = coreSize; threads = new Thread[coreSize]; for (int i = 0; i < coreSize; i++) { threads[i] = new Thread(new Worker("thread"+i)); threads[i].start(); } } public boolean submit(Runnable command) { return tasks.add(command); } public void shutdown() { stop = true; } private class Worker implements Runnable { public String name; public Worker(String name) { this.name = name; } @Override public void run() { while (!stop) { Runnable command = tasks.take(); System.out.println(name + " start run, starttime " + System.currentTimeMillis()/1000); //方便觀察線程的執行情況 command.run(); System.out.println(name + " finished, endtime " + System.currentTimeMillis()/1000); } } } }

      上面就是一個線程池的實現,是不是很簡單,在構造函數里初始化固定數目的線程,每個線程做的只是從隊列里取任務,執行……一直循環。

      沒錯,一個簡易的線程池就通過上面幾十行的代碼實現了,已經可以拿去用了,甚至用在生產環境都沒啥問題(后果自負,哈哈)。當然這不是一個類似于concurrent包中功能完善、各種參數可自定義的線程池,但確確實實它實現了一個線程池的基本功能——線程的復用。 接下來寫個建議的測試代碼,如果線程池生產者消費者模型中的消費者,那這個測試代碼就是生產者,代碼如下。

      package me.xindoo.concurrent; public class Test { private static class Task implements Runnable { @Override public void run() { try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ThreadPool pool = new ThreadPool(5); for (int i = 0; i < 30; i++) { pool.submit(new Task()); } System.out.println("add task finished"); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); } }

      執行結果如下

      Java自制簡易線程池(不依賴concurrent包)

      thread0 start run, starttime 1566724202 thread2 start run, starttime 1566724202 thread1 start run, starttime 1566724202 thread3 start run, starttime 1566724202 thread4 start run, starttime 1566724202 add task finished thread2 finished, endtime 1566724207 thread2 start run, starttime 1566724207 thread1 finished, endtime 1566724207 thread4 finished, endtime 1566724207 thread3 finished, endtime 1566724207 thread0 finished, endtime 1566724207 thread3 start run, starttime 1566724207 thread4 start run, starttime 1566724207 thread1 start run, starttime 1566724207 thread0 start run, starttime 1566724207 thread3 finished, endtime 1566724212 thread0 finished, endtime 1566724212 thread1 finished, endtime 1566724212 thread4 finished, endtime 1566724212 thread2 finished, endtime 1566724212

      測試代碼也非常簡單,創建一個5個線程,然后提交30個任務,從輸出也可以看到的確是5個線程分批次執行完了30個任務。備注:雖然我測試代碼里的任務非常簡單,其實復雜的任務也是可以的。

      實時上如上文中好幾次提到,java.util.concurrent包里已經幫大家實現了一個很健壯、功能強大的線程池,大家不必再去造輪子了,使用不同的BlockingQueue就可以實現不同功能的線程池。舉個栗子,比如使用DelayedWorkQueue就可以實現可以定期執行的線程池了。 甚至Executors為大家封裝了更為簡易的線程池創建接口,但是《Alibaba Java開發手冊》強制不允許使用 Executors 去創建線程池,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。

      FixedThreadPool 和 SingleThreadPool: 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

      CachedThreadPool 和 ScheduledThreadPool: 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

      最后說點題外話,之前我們一個服務啟動的時候觸發了一個jdk未修復的bug https://bugs.java.com/bugdatabase/view_bug.do?bug_id=7092821,導致線程池里所有的任務都被阻塞,但其他工作線程一直在往里提交任務,因為我們直接使用了Executors.FixedThreadPool 所以最后內存爆了… 后來我們的就結局方案就是直接使用ThreadPoolExecutor,限制了BlockingQueue的大小。

      Java 任務調度

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:10分鐘了解JSON Web令牌(JWT)
      下一篇:移動應用隱私合規檢測簡介及目標檢測技術的應用
      相關文章
      亚洲一区免费观看| 亚洲女女女同性video| 久久久无码精品亚洲日韩蜜桃 | 国产成人精品日本亚洲专区| 亚洲欧美成人综合久久久| 天天爽亚洲中文字幕| 久久精品国产亚洲AV蜜臀色欲| 在线观看亚洲一区二区| 亚洲一区免费观看| 18亚洲男同志videos网站| 亚洲日本精品一区二区| 亚洲制服中文字幕第一区| 亚洲av网址在线观看| 亚洲色图在线播放| 亚洲一区二区中文| 亚洲国产片在线观看| 亚洲香蕉在线观看| 亚洲色最新高清av网站| 国产AV旡码专区亚洲AV苍井空| 亚洲色无码专区一区| 18禁亚洲深夜福利人口| 亚洲伊人成无码综合网| 久久久久亚洲精品男人的天堂| 亚洲中文字幕无码一区二区三区| 在线日韩日本国产亚洲| 亚洲日韩一页精品发布| 久久精品亚洲日本佐佐木明希| 亚洲AV日韩精品久久久久久| 久久精品亚洲精品国产色婷 | 一本色道久久88—综合亚洲精品| 日韩亚洲产在线观看| 亚洲成aⅴ人片久青草影院按摩| 久久人午夜亚洲精品无码区| 国产精品亚洲色婷婷99久久精品| 亚洲国产电影av在线网址| 国产亚洲精品不卡在线| 亚洲AV无码一区二区二三区入口| 亚洲精品无码Av人在线观看国产| 亚洲精品国产第一综合99久久| 国产精品亚洲а∨无码播放麻豆| 亚洲毛片网址在线观看中文字幕|