結合美團技術篇詳述Java線程池實現原理(Java線程池實現原理及其在美團業務中的實踐)
一、寫在前面
1.1 線程池是什么
線程池(Thread Pool) 是一種池化思想管理線程的工具,經常出現在多線程服務器中,如MySQL。 線程過多會帶來額外的開銷,其中包括創建銷毀線程的開銷,操作系統調度線程的開銷等等,同時也降低了計算機的整體性能。線程池維護多個線程,等待監督管理者分配可并發執行的任務。這種做法,一方面避免了處理任務是創建銷毀線程開銷代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對操作系統內核的充分利用。 本文描述的線程池是JDK提供的ThreadPoolExecutor類
使用線程池帶來的好處
降低資源消耗:通過赤化技術重復利用已創建的線程,降低想成創建和 銷毀造成的消耗
提高響應速度:任務到達時,無需等待線程創建即可立即執行
提高線程的可管理性:線程是稀缺資源,如果無限制創建,不僅會消耗系統資源,還會因為線程的不合理分配導致資源調度失衡,降低系統的穩定性。使用線程池可以進行統一的分配、調優和監控
提供更多更強大的功能:線程池具備可拓展性,允許開發人員向其中增加風多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執行或定期執行
1.2 線程池解決的問題是什么
線程池解決的問題就是資源管理的問題。在并發環境下,系統不能夠確定在任意時刻有多少任務需要執行,有多少資源需要投入。
在這種不確定性下將會帶來以下若干的問題
頻繁申請/銷毀資源和調度資源,將帶來額外的開銷,可能是非常巨大的
對資源無限申請缺少抑制手段,容易引發系統資源耗盡問題的風險
系統無法合理管理內部的資源分布,會減低系統的穩定性
為了解決資源分配的問題,線程池采用“池化”(Pooling)思想。池化,顧名思義,是為了做大化收益并最小化風險,而將資源統一在一起管理的一種思想。
在計算機領域池化技術表現為:統一管理IT資源,包括服務器資源、存儲、網絡資源等。通過共享資源,使用戶在第投入中獲益。
除去線程池其他比較典型的幾種使用策略包括
內存池(Memory Pooling):預先申請內存,提升申請內存的速度,減少內存碎片
連接池(Connection Pooling):預先申請數據庫連接,提升申請連接的速度,降低系統開銷
實例池(Object Pooling):循環使用對象,減資源在初始化和釋放時昂貴的損耗
二、線程池和核心設計與實現
2.1 總體設計
Java中線程池核心實現類是ThreadPoolExecutor,本章基于JDK1.8的源碼來分析Java線程池的核心設計與實現。首先看一下ThreadPoolExecutor的UML圖,了解ThreadPoolExecutor的繼承關系
ThreadPoolExecutor實現的頂層接口是Executor,頂層接口Executor提供了一種思想:將任務提交和任務執行進行解耦。用戶無需關注如何創建線程,如何調度線程來執行任務,用戶只需提供Runnable對象,將任務的運行邏輯提交到執行器Executor中,由Executor框架完成線程的調配和任務的執行部分。
ExecutorService
擴充執行任務的能力,補充可以為一個或者一批異步任務生成Future的方法
提供了管理線程池的方法,比如停止線程池的運行
AbstractExecutorService
串聯任務流程,保證下層的實現只需要關注一個執行任務的方法
ThreadPoolExecutor
維護自身的生命周期
管理線程和任務,使兩者良好的結合從而執行并行任務
ThreadPoolExecutor是如何運行,如何同時維護線程和執行任務的呢?其運行機制如下圖所示
ThreadPoolExecutor運行流程
線程池在內部實際上構造了一個生產者消費者模型,將線程和任務兩者解耦,并不直接關聯,從而良好的管理緩沖任務,復用線程。線程池的運行主要分成兩部分::任務管理、線程管理。任務管理充當生產者角色,當任務提交后,線程池會判斷該任務后續流轉
任務申請線程執行該任務
緩沖到隊列中等待線程執行
拒絕該任務
線程管理部分是消費者,它們被統一維護在線程池內,根據任務請求進行線程的分配,當線程執行完任務后會繼續獲取新的任務執行,最終獲取不到任務的時候,線程會被回收。
接下來按照如下三個方面講解線程池的運行機制:
線程池如何維護自身狀態
線程池如何管理任務
線程池如何管理線程
2.2 生命周期管理
線程池運行的狀態,并不是用戶顯式設置的,而是伴隨著線程池的運行,由內部來維 護。線程池內部使用一個變量維護兩個值:運行狀態 (runState) 和線程數量 (workerCount)。在具體實現中,線程池將運行狀態 (runState)、線程數量 (workerCount)
兩個關鍵參數的維護放在了一起,如下代碼所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 這個 AtomicInteger 類型,是對線程池的運行狀態和線程池中有效線程的數量 進行控制的一個字段,它同時包含兩部分的信息:線程池的運行狀態 (runState) 和 線程池內有效線程的數量 (workerCount),高 3 位保存 runState,低 29 位保存 workerCount,兩個變量之間互不干擾。用一個變量去存儲兩個值,可避免在做相關 決策時,出現不一致的情況,不必為了維護兩者的一致,而占用鎖資源。通過閱讀線 程池源代碼也可以發現,經常出現要同時判斷線程池運行狀態和線程數量的情況。線程池也提供了若干方法去供用戶獲得線程池當前的運行狀態、線程個數。這里都使用的是位運算的方式,相比于基本運算,速度也會快很多。
關于內部封裝的獲取生命周期狀態、獲取線程池線程數量的計算方法如以下代碼 所示:
// Packing and unpacking ctl // 計算當前運行狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } // 計算當前線程數據 private static int workerCountOf(int c) { return c & CAPACITY; } // 通過狀態和線程數生成ctl private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor 的運行狀態有 5 種,分別為:
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
運行狀態
狀態描述
RUNNING
能接受新提交的任務,并且也能處理阻塞隊列中的任務
SHUTDOWN
狀態關閉,不在接受新提交的任務,但是能繼續處理阻塞隊列已保存的讓任務
STOP
不接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程
TIDYING
所有讓任務都已終止,workerCount(有效處理讓任務線程)狀態為0
TERMINATED
在terminated()方法執行結束后進入該狀態
其生命周期轉換如下圖所示
線程池生命周期
2.3 任務調度機制
2.3.1 任務調度
任務調度是線程池的主要入口,當用戶提交了一個任務,接下來這個任務將如何執行 都是由這個階段決定的。了解這部分就相當于了解了線程池的核心運行機制。 首先,所有任務的調度都是由 execute 方法完成的,這部分完成的工作是:檢查現在線程池的運行狀態、運行線程數、運行策略,決定接下來執行的流程,是直接申請線程執行,或是緩沖到隊列中執行,亦或是直接拒絕該任務。其執行過程如下:
首先檢測線程池運行狀態,如果不是 RUNNING,則直接拒絕,線程池要保證在 RUNNING 的狀態下執行任務
如果 workerCount < corePoolSize,則創建并啟動一個線程來執行新提交的任務
如果 workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中。
如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建并啟動一個線程來執行新提交的任務。
如果 workerCount >= maximumPoolSize,并且線程池內的阻塞隊列已滿 , 則根據拒絕策略來處理該任務 , 默認的處理方式是直接拋異常。
其執行流程如下
任務調度流程圖
2.3.2 任務緩沖
任務緩沖模塊是線程池能夠管理任務的核心部分。線程池的本質是對任務和線程的管 理,而做到這一點最關鍵的思想就是將任務和線程兩者解耦,不讓兩者直接關聯,才 可以做后續的分配工作。線程池中是以生產者消費者模式,通過一個阻塞隊列來實現 的。阻塞隊列緩存任務,工作線程從阻塞隊列中獲取任務。
阻塞隊列 (BlockingQueue) 是一個支持兩個附加操作的隊列。這兩個附加的操作是: 在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程 會等待隊列可用。阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元 素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
阻塞隊列
使用不同的隊列可以實現不一樣的任務存取策略。在這里,我們可以再介紹下阻塞隊列的成員:
名稱
描述
ArrayBlockingQueue
一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖
LinkedBlockingDeque
一個由鏈表結構組成的有界隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。此隊列的默認長度為Integer.MAX_VALUE,所以默認創建此隊列有容量危險
PriorityBlockingQueue
一個支持線程優先級排序的無界隊列,默認自然進行排序,也可以自定義實現compareTo()方法指定排序故障,不能保證同優先級元素的順序。
DelayQueue
一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延遲期滿后才能從隊列中獲取元素。
SynchronousQueue
一個不存儲元素的阻塞隊列,每個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務來)創建新的線程,如果有空閑的線程就使用空閑線程,線程空閑60秒會被回收。
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
LinkedTransferQueue
一個由鏈表結構組成的無界阻塞隊列,相當于其他隊列,LinkedTransferQueue多了transfer和tryTransfer方法
LinkedBlockingQueue
一個由鏈表結構組成的雙向阻塞隊列,隊列的頭部和尾部都可以插入和刪除元素,多線程并發時,可以將鎖的競爭最多降到一半
2.3.3 任務申請
由上文的任務分配部分可知,任務的執行有兩種可能:一種是任務直接由新創建的線 程執行。另一種是線程從任務隊列中獲取任務然后執行,執行完任務的空閑線程會再 次去從隊列中申請任務再去執行。第一種情況僅出現在線程初始創建的時候,第二種 是線程獲取任務絕大多數的情況。
線程需要從任務緩存模塊中不斷地取任務執行,幫助線程從阻塞隊列中獲取任務,實現線程管理模塊和任務管理模塊之間的通信。這部分策略由 getTask 方法實現,其 執行流程如下圖所示:
線程獲取任務的流程
getTask 這部分進行了多次判斷,為的是控制線程的數量,使其符合線程池的狀 態。如果線程池現在不應該持有那么多線程,則會返回 null 值。工作線程 Worker 會不斷接收新任務去執行,而當工作線程 Worker 接收不到任務的時候,就會開始 被回收。
源碼分析
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 判斷線程池是否已停止運行 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 判斷線程現階段是否夠多 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } // 限時任務獲取和阻塞獲取 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
2.3.4 任務拒絕
任務拒絕模塊是線程池的保護部分,線程池有一個最大的容量,當線程池的任務緩存 隊列已滿,并且線程池中的線程數目達到 maximumPoolSize 時,就需要拒絕掉該任務,采取任務拒絕策略,保護線程池。
拒絕策略是一個接口,其設計如下:
public interface RejectedExecutionHandler { /** * Method that may be invoked by a {@link ThreadPoolExecutor} when * {@link ThreadPoolExecutor#execute execute} cannot accept a * task. This may occur when no more threads or queue slots are * available because their bounds would be exceeded, or upon * shutdown of the Executor. * *
In the absence of other alternatives, the method may throw * an unchecked {@link RejectedExecutionException}, which will be * propagated to the caller of {@code execute}. * * @param r the runnable task requested to be executed * @param executor the executor attempting to execute this task * @throws RejectedExecutionException if there is no remedy */ void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
用戶可以通過實現這個接口去定制拒絕策略,也可以選擇 JDK 提供的四種已有拒絕策略,其特點如下
名稱
描述
ThreadPoolExecutor.AbortPolicy
丟棄任務并拋出RejectedExecutionException異常。這是線程池默認的拒絕策略,在任務不能在提交的時候,拋出異常,及時反饋程序運行狀態。如果是比較關鍵的業務,推薦使用該策略,這樣子在系統不能承載更大并發的時候,能過及時的通過異常發現。
ThreadPoolExecutor.DiscardPolicy
丟棄任務,但是不拋出異常。使用該策略,可能會使我們無法發現系統的異常狀態。建議一些無關緊要的業務采用此策略。
ThreadPoolExecutor.DiscardOldestPolicy
丟棄隊列最前面的任務,然后重新提交比拒接的任務。是否要采用此種策略,需要根據實際業務是否允許丟棄老任務來認真衡量
ThreadPoolExecutor.CallerRunsPolicy
由調用線程(提交任務的線程)來處理任務。這種情況是需要讓所有的任務都執行完畢,那么就適合大量計算的任務類型去執行,多線程僅僅是增加大吞吐量的手段,最終必須要讓每個任務都執行
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } /** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } /** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } /** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
2.4 ?Worker線程管理
2.4.1 Worker線程
線程池為了掌握線程的狀態并維護線程的生命周期,設計了線程池內的工作線程Worker。
Java Worker源碼部分
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ // worker持有的線程 final Thread thread; /** Initial task to run. Possibly null. */ // 初始化的任務,可以為null Runnable firstTask; ... }
Worker 這個工作線程,實現了 Runnable 接口,并持有一個線程 thread,一個初始化的任務 firstTask。thread 是在調用構造方法時通過 ThreadFactory 來創建的線程,可以用來執行任務;firstTask 用它來保存傳入的第一個任務,這個任務可以有也可以為 null。如果這個值是非空的,那么線程就會在啟動初期立即執行這個任務,也就對應核心線程創建時的情況;如果這個值是 null,那么就需要創建一個線程去執行任務列表(workQueue)中的任務,也就是非核心線程的創建
/** * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ # workerQueue 源碼定義 private final BlockingQueue
worker執行任務
線程池需要管理線程的生命周期,需要在線程長時間不運行的時候進行回收。線程池 使用一張 Hash 表去持有線程的引用,這樣可以通過添加引用、移除引用這樣的操作 來控制線程的生命周期。這個時候重要的就是如何判斷線程是否在運行。
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet
Worker 是通過繼承 AQS,使用 AQS 來實現獨占鎖這個功能。沒有使用可重入鎖ReentrantLock,而是使用 AQS,為的就是實現不可重入的特性去反應線程現在的執行狀態。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
lock方法一旦獲取了獨占鎖,表示當前線程正在執行任務中
如果正在執行任務,則不應該中斷線程
如果該線程現在不是獨占鎖狀態,也就是空閑狀態,說明它沒有正在處理任務,這時可以對該線程進行中斷
線程池在執行shutdown方法或tryTeriminate方法是或調用interruptIdleWorkers方法來中斷空閑線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態,如果是空閑狀態則可以安全回收
shutdown方法源碼
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 執行interruptIdleWorkers方法 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
tryTerminate方法源碼
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate // 執行interruptIdleWorkers interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
interruptIdleWorkers方法源碼
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
在線程回收過程中就使用到了這種特性,回收過程如下圖所示:
線程池回收過程
2.4.2 worker線程增加
增加線程是通過線程池中的 addWorker 方法,該方法的功能就是增加一個線程, 該方法不考慮線程池是在哪個階段增加的該線程,這個分配線程的策略是在上個步 驟完成的,該步驟僅僅完成增加線程,并使它運行,最后返回是否成功這個結果。 addWorker 方法有兩個參數:firstTask、core。firstTask 參數用于指定新增的線程執行的第一個任務,該參數可以為空;core 參數為 true 表示在新增線程時會判斷當前活動線程數是否少于 corePoolSize,false 表示新增線程前需要判斷當前活動線程數是否少于 maximumPoolSize,其執行流程如下圖所示:
申請線程執行流程圖
源碼分析
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判斷線程是否已經停止 // 判斷線程是否正在停止 如果是則判斷線程是否用于執行剩余任務firstTask // workQueue是否為空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取線程數量 int wc = workerCountOf(c); // 判斷線程是否超過容量 // 判斷線程是否超過對應核心數 上面講了core 傳true/false區別 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 嘗試登記線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 判斷線程池狀態是否改變 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 增加線程 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 釋放鎖 mainLock.unlock(); } // 增加成功啟動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
2.4.3 worker線程回收
線程池中線程的銷毀依賴 JVM 自動的回收,線程池做的工作是根據當前線程池的狀態維護一定數量的線程引用,防止這部分線程被 JVM 回收,當線程池決定哪些線 程需要回收時,只需要將其引用消除即可。Worker 被創建出來后,就會不斷地進行輪詢,然后獲取任務去執行,核心線程可以無限等待獲取任務,非核心線程要限時獲取任務。當 Worker 無法獲取到任務,也就是獲取的任務為空時,循環會結束,Worker 會主動消除自身在線程池內的引用。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 執行任務 } finally { // 獲取不到任務,主動回收自己 processWorkerExit(w, completedAbruptly); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 回收 workers.remove(w); } finally { mainLock.unlock(); } ... }
事實上,在這個方法中,將線程引用移出線程池就已經結束了線程銷毀的部分。但由于引起線程銷毀的可能性有很多,線程池還要判斷是什么引發了這次銷毀,是否要改變線程池的現階段狀態,是否要根據新狀態,重新分配線程。
線程銷毀流程
2.4.4 worker線程執行任務
在 Worker 類中的 run 方法調用了 runWorker 方法來執行任務,runWorker 方法的執行過程如下:
while循環不斷獲取getTask()方法獲取任務
getTask()方法從阻塞隊列獲取任務
如果線程池正在停止,那么保證當前線程是中斷狀態,否則要保證當前線程不是中斷狀態
執行任務
如果getTask結果為null則調出循環,執行processWorkerExit(),銷毀線程
執行任務流程
2.4.5 worker如何保證核心線程不被回收
源碼分析
我們通常都是通過執行execute(Runnable command)方法來向線程池提交一個不需要返回結果的任務的如果你需要返回結果那么就是
第一步:execute方法分析
public void execute(Runnable command) { // 提交任務為null 拋出異常 if (command == null) throw new NullPointerException(); // 獲取線程池狀態\線程池線程數據 int c = ctl.get(); // 小于核心線程數 addWorker() if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 大于核心線程數,當前線程池是運行狀態,向阻塞隊列中添加任務 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 隊列添加失敗 拒絕策略處理 else if (!addWorker(command, false)) reject(command); }
第二步:addWorker()方法分析
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 死循環 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果當前線程狀態是SHUTDOWN STOP TIDYING TERMINATED 并且SHUTDOWN狀態時任務隊列為空 返回false // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 死循環 for (;;) { int wc = workerCountOf(c); // core參數 true corePoolSize核心線程數 false maximumPoolSize最大線程數 // CAPACITY integer最大值 (1 << COUNT_BITS) - 1; if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 如果增加任務成功,退出該循環執行下面代碼,否則繼續 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 重點代碼 后續分析 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 內置鎖 加鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 判斷線程池狀態,防止使用過程中線程池被關閉 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 向正在被執行的任務隊列workers中添加worker // 注意區分 // HashSet
第三步:查看worker中的run()
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
第四步:查看runWorker()
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 獲取worker對象中的任務 可以為null Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 死循環 // 判斷任務是否為空,如果為空則getTask()獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任務執行前調用 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任務執行后調用 afterExecute(task, thrown); } } finally { // 重點代碼,執行完任務將task設置為null 則會從getTask()重新獲取 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 回收worker processWorkerExit(w, completedAbruptly); } }
我們可以看到beforeExecute(Thread t, Runnable r)方法和afterExecute(Runnable r, Throwable t)會在任務的執行前后執行,我們可以通過繼承線程池的方式來重寫這兩個方法,這樣就能夠對任務的執行進行監控啦。
第五步:查看getTask()
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // 死循環 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判斷線程池狀態 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 統計worker int wc = workerCountOf(c); // 如果設置了allowCoreThreadTimeOut(true) 或者當前運行的統計worker數大于設置的核心線程數,那么timed =true // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } // 核心代碼 try { // 看完這里就明白了 // 阻塞隊列獲取 // workQueue.poll() 規定時間獲取任務 // workQueue.take() 會一直等待,知道阻塞隊列中任務不為空 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 獲取任務返回 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
參考資料
JDK 1.8 源碼
美團2020后臺技術篇
深入理解 Java 線程池:ThreadPoolExecutor
Java 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。