Java自制簡易線程池(不依賴concurrent包)
很久之前人們為了繼續享用并行化帶來的好處而不想使用進程,于是創造出了比進程更輕量級的線程。以linux為例,創建一個進程需要申請新的自己的內存空間,從父進程拷貝一些數據,所以開銷是比較大的,線程(或稱輕量級進程)可以和父進程共享內存空間,讓創建線程的開銷遠小于創建進程,于是就有了現在多線程的繁榮。
但是即便創建線程的開銷很小,但頻繁創建刪除也是很浪費性能的,于是人們又想到了線程池,線程池里的線程只會被創建一次,用完也不會銷毀,而是在線程池里等待被重復利用。這種尤其適用于多而小的任務。舉個極端點的例子,如果一個小任務的執行消耗不及創建和銷毀一個線程的消耗,那么不使用線程池時一大半的性能消耗都會是線程創建和銷毀。 最開始學java的時候,一直不理解線程池,尤其是理解不了線程是如何被復用的,以及線程池和我創建的Thread/Runnable對象有什么關系,今天我們就來寫一個建議的線程池來理解這一切。(不依賴java concurrent包)
首先糾正很多人的一個誤解,我們new一個Thread/Runnable對象的時候,并不是創建出一個線程,而是創建了一個需要被線程執行的任務,當我們調用Thread.start()方法的時候,jvm才會幫我們創建一個線程。線程池只是幫你執行這些任務而已,你submit的時候只是把這個任務放到某個存儲里,等待線程池里空閑的線程來執行,而不是創建線程。知道了這點,所以我們首先得有個東西來存儲任務,還要支持多線程下的存取,最好還支持阻塞以避免無任務時的線程空轉。
除了存儲外,我們還需要一些線程來消費這些任務,看到這你可能就很明白的知道了這其實是個生產者消費者模型,Java有好多種生產者消費者的實現,可以參考我之前的博客Java生產者消費者的幾種實現方式。如果實現線程池,我們可以選擇使用BlockingQueue來實現。雖然java concurrent包里已經實現了好多BlockingQueue,但為了讓大家理解BlockingQueue做了啥,我這里用LinkedListQueue簡單封裝了一個簡易BlockingQueue,代碼如下。
package me.xindoo.concurrent; import java.util.LinkedList; import java.util.Queue; public class LinkedBlockingQueue
我也只是簡單在LinkedListQueue的基礎上對其加了synchronized,以保證它在多線程環境下的正常運轉。其次我在隊列為空時通過wait()方法加了線程阻塞,以防止空隊列時線程空轉。既然加了阻塞也得加喚醒,每次在往隊列里添加任務的時候,就會調用notify()來喚醒一個等待中的線程。
存儲搞定了,我們接下來需要實現的就是消費者。消費者就是線程池里的線程,因為任務隊列里的任務都是實現了Runnable接口,所以我們消費任務時都可以直接調用其run()方法來執行。當一個任務執行完成后在從隊列里去取,知道整個線程池被shutdown。
package me.xindoo.concurrent; public class ThreadPool { private int coreSize; private boolean stop = false; private LinkedBlockingQueue
上面就是一個線程池的實現,是不是很簡單,在構造函數里初始化固定數目的線程,每個線程做的只是從隊列里取任務,執行……一直循環。
沒錯,一個簡易的線程池就通過上面幾十行的代碼實現了,已經可以拿去用了,甚至用在生產環境都沒啥問題(后果自負,哈哈)。當然這不是一個類似于concurrent包中功能完善、各種參數可自定義的線程池,但確確實實它實現了一個線程池的基本功能——線程的復用。 接下來寫個建議的測試代碼,如果線程池生產者消費者模型中的消費者,那這個測試代碼就是生產者,代碼如下。
package me.xindoo.concurrent; public class Test { private static class Task implements Runnable { @Override public void run() { try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ThreadPool pool = new ThreadPool(5); for (int i = 0; i < 30; i++) { pool.submit(new Task()); } System.out.println("add task finished"); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); } }
執行結果如下
thread0 start run, starttime 1566724202 thread2 start run, starttime 1566724202 thread1 start run, starttime 1566724202 thread3 start run, starttime 1566724202 thread4 start run, starttime 1566724202 add task finished thread2 finished, endtime 1566724207 thread2 start run, starttime 1566724207 thread1 finished, endtime 1566724207 thread4 finished, endtime 1566724207 thread3 finished, endtime 1566724207 thread0 finished, endtime 1566724207 thread3 start run, starttime 1566724207 thread4 start run, starttime 1566724207 thread1 start run, starttime 1566724207 thread0 start run, starttime 1566724207 thread3 finished, endtime 1566724212 thread0 finished, endtime 1566724212 thread1 finished, endtime 1566724212 thread4 finished, endtime 1566724212 thread2 finished, endtime 1566724212
測試代碼也非常簡單,創建一個5個線程,然后提交30個任務,從輸出也可以看到的確是5個線程分批次執行完了30個任務。備注:雖然我測試代碼里的任務非常簡單,其實復雜的任務也是可以的。
實時上如上文中好幾次提到,java.util.concurrent包里已經幫大家實現了一個很健壯、功能強大的線程池,大家不必再去造輪子了,使用不同的BlockingQueue就可以實現不同功能的線程池。舉個栗子,比如使用DelayedWorkQueue就可以實現可以定期執行的線程池了。 甚至Executors為大家封裝了更為簡易的線程池創建接口,但是《Alibaba Java開發手冊》強制不允許使用 Executors 去創建線程池,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
FixedThreadPool 和 SingleThreadPool: 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
CachedThreadPool 和 ScheduledThreadPool: 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
最后說點題外話,之前我們一個服務啟動的時候觸發了一個jdk未修復的bug https://bugs.java.com/bugdatabase/view_bug.do?bug_id=7092821,導致線程池里所有的任務都被阻塞,但其他工作線程一直在往里提交任務,因為我們直接使用了Executors.FixedThreadPool 所以最后內存爆了… 后來我們的就結局方案就是直接使用ThreadPoolExecutor,限制了BlockingQueue的大小。
Java 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。