Java的面向?qū)ο缶幊?/a>">Java的面向?qū)ο缶幊?/a>
806
2022-05-29
線程池學(xué)習(xí)
以下所有內(nèi)容以及源碼分析都是基于JDK1.8的,請(qǐng)知悉。
我寫博客就真的比較沒(méi)有順序了,這可能跟我的學(xué)習(xí)方式有關(guān),我自己也覺(jué)得這樣挺不好的,但是沒(méi)辦法說(shuō)服自己去改變,所以也只能這樣想到什么學(xué)什么了。
池化技術(shù)真的是一門在我看來(lái)非常牛逼的技術(shù),因?yàn)樗龅搅嗽谟邢拶Y源內(nèi)實(shí)現(xiàn)了資源利用的最大化,這讓我想到了一門課程,那就是運(yùn)籌學(xué),當(dāng)時(shí)在上運(yùn)籌學(xué)的時(shí)候就經(jīng)常做這種類似的問(wèn)題。
言歸正傳吧,我接下來(lái)會(huì)進(jìn)行一次線程池方面知識(shí)點(diǎn)的學(xué)習(xí),也會(huì)記錄下來(lái)分享給大家。
線程池的內(nèi)容當(dāng)中有涉及到AQS同步器的知識(shí)點(diǎn),如果對(duì)AQS同步器知識(shí)點(diǎn)感覺(jué)有點(diǎn)薄弱。
線程池的優(yōu)勢(shì)
既然說(shuō)到線程池了,而且大多數(shù)的大牛也都會(huì)建議我們使用池化技術(shù)來(lái)管理一些資源,那線程池肯定也是有它的好處的,要不然怎么會(huì)那么出名并且讓大家使用呢?
我們就來(lái)看看它究竟有什么優(yōu)勢(shì)?
資源可控性:使用線程池可以避免創(chuàng)建大量線程而導(dǎo)致內(nèi)存的消耗
提高響應(yīng)速度:線程池地創(chuàng)建實(shí)際上是很消耗時(shí)間和性能的,由線程池創(chuàng)建好有任務(wù)就運(yùn)行,提升響應(yīng)速度。
便于管理:池化技術(shù)最突出的一個(gè)特點(diǎn)就是可以幫助我們對(duì)池子里的資源進(jìn)行管理。由線程池統(tǒng)一分配和管理。
線程池的創(chuàng)建
我們要用線程池來(lái)統(tǒng)一分配和管理我們的線程,那首先我們要?jiǎng)?chuàng)建一個(gè)線程池出來(lái),還是有很多大牛已經(jīng)幫我們寫好了很多方面的代碼的,Executors的工廠方法就給我們提供了創(chuàng)建多種不同線程池的方法。因?yàn)檫@個(gè)類只是一個(gè)創(chuàng)建對(duì)象的工廠,并沒(méi)有涉及到很多的具體實(shí)現(xiàn),所以我不會(huì)過(guò)于詳細(xì)地去說(shuō)明。
老規(guī)矩,還是直接上代碼吧。
corePoolSize(核心線程池大小):當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,當(dāng)任務(wù)數(shù)大于核心線程數(shù)的時(shí)候就不會(huì)再創(chuàng)建。在這里要注意一點(diǎn),線程池剛創(chuàng)建的時(shí)候,其中并沒(méi)有創(chuàng)建任何線程,而是等任務(wù)來(lái)才去創(chuàng)建線程,除非調(diào)用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法 ,這樣才會(huì)預(yù)先創(chuàng)建好 corePoolSize 個(gè)線程或者一個(gè)線程。
maximumPoolSize(線程池最大線程數(shù)):線程池允許創(chuàng)建的最大線程數(shù),如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是,如果使用了×××隊(duì)列,此參數(shù)就沒(méi)有意義了。
keepAliveTime(線程活動(dòng)保持時(shí)間):此參數(shù)默認(rèn)在線程數(shù)大于 corePoolSize 的情況下才會(huì)起作用, 當(dāng)線程的空閑時(shí)間達(dá)到 keepAliveTime 的時(shí)候就會(huì)終止,直至線程數(shù)目小于 corePoolSize 。不過(guò)如果調(diào)用了 allowCoreThreadTimeOut 方法,則當(dāng)線程數(shù)目小于 corePoolSize 的時(shí)候也會(huì)起作用.
unit(keelAliveTime的時(shí)間單位):keelAliveTime的時(shí)間單位,一共有7種,在這里就不列舉了。
workQueue(阻塞隊(duì)列):阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)也是非常重要的,在這里簡(jiǎn)單介紹一下幾個(gè)阻塞隊(duì)列。
ArrayBlockingQueue:這是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按照FIFO的原則對(duì)元素進(jìn)行排序。
LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按照FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法 Executors.newFixedThreadPool()就是使用了這個(gè)隊(duì)列。
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法 Executors.newCachedThreadPool() 就使用了這個(gè)隊(duì)列。
PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)的無(wú)阻塞隊(duì)列。
handler(飽和策略);當(dāng)線程池和隊(duì)列都滿了,說(shuō)明線程池已經(jīng)處于飽和狀態(tài)了,那么必須采取一種策略來(lái)處理還在提交過(guò)來(lái)的新任務(wù)。這個(gè)飽和策略默認(rèn)情況下是 AbortPolicy ,表示無(wú)法處理新任務(wù)時(shí)拋出異常。共有四種飽和策略提供,當(dāng)然我們也可以選擇自己實(shí)現(xiàn)飽和策略。
AbortPolicy:直接丟棄并且拋出 RejectedExecutionException 異常
CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)。
DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy:丟棄任務(wù)并且不拋出異常。
線程池的執(zhí)行流程就用參考資料里的圖介紹一下了,具體我們還是通過(guò)代碼去講解。
在上面我們簡(jiǎn)單的講解了一下 Executors 這個(gè)工廠類里的工廠方法,并且講述了一下創(chuàng)建線程池的一些參數(shù)以及它們的作用,當(dāng)然上面的講解并不是很深入,因?yàn)橄胍脑捠切枰掷m(xù)地花時(shí)間去看去理解的,而博主自己也還是沒(méi)有完全弄懂,不過(guò)博主的學(xué)習(xí)方法是先學(xué)了個(gè)大概,再回頭來(lái)看看之前的知識(shí)點(diǎn),可能會(huì)更加好理解,所以我們接著往下面講吧。
ThreadPoolExecutor源碼分析
在上面我們就發(fā)現(xiàn)了, Executors 的工廠方法主要就返回了 ThreadPoolExecutor 對(duì)象,至于另一個(gè)在這里暫時(shí)不講,也就是說(shuō),要學(xué)習(xí)線程池,其實(shí)關(guān)鍵的還是得學(xué)會(huì)分析 ThreadPoolExecutor 這個(gè)對(duì)象里面的源碼,我們接下來(lái)就會(huì)對(duì) ThreadPoolExecutor 里的關(guān)鍵代碼進(jìn)行分析。
AtomicInteger ctl
ctl 是主要的控制狀態(tài),是一個(gè)復(fù)合類型的變量,其中包括了兩個(gè)概念。
workerCount:表示有效的線程數(shù)目
runState:線程池里線程的運(yùn)行狀態(tài)
我們來(lái)分析一下跟 ctl 有關(guān)的一些源代碼吧,直接上代碼
CAPACITY
在這里我們講一下這個(gè)線程池最大數(shù)量的計(jì)算吧,因?yàn)檫@里涉及到源碼以及位移之類的操作,我感覺(jué)大多數(shù)人都還是不太會(huì)這個(gè),因?yàn)槲乙婚_始看的時(shí)候也是不太會(huì)的。
從代碼我們可以看出,是需要 1往左移29位 ,然后再減去1,那個(gè) 1往左移29位 是怎么計(jì)算的呢?
2.runState
正數(shù)的原碼、反碼、補(bǔ)碼都是一樣的
在計(jì)算機(jī)底層,是用補(bǔ)碼來(lái)表示的
RUNNING
可以接受新任務(wù)并且處理已經(jīng)在阻塞隊(duì)列的任務(wù)
高3位全部是1的話,就是RUNNING狀態(tài)
SHUTDOWN
不接受新任務(wù),但是處理已經(jīng)在阻塞隊(duì)列的任務(wù)
高3位全是0,就是SHUTDOWN狀態(tài)
STOP
不接受新任務(wù),也不處理阻塞隊(duì)列里的任務(wù),并且會(huì)中斷正在處理的任務(wù)
所以高3位是001,就是STOP狀態(tài)
TIDYING
所有任務(wù)都被中止,workerCount是0,線程狀態(tài)轉(zhuǎn)化為TIDYING并且調(diào)用terminated()鉤子方法
所以高3位是010,就是TIDYING狀態(tài)
TERMINATED
terminated()鉤子方法已經(jīng)完成
所以高3位是110,就是TERMINATED狀態(tài)
3.部分方法介紹
runStateOf(int c)
實(shí)時(shí)獲取runState的方法
workerCountOf(int c)
獲取線程池的當(dāng)前有效線程數(shù)目
private?static?int?workerCountOf(int?c)?{?return?c?&?CAPACITY;?}
CAPACITY的32位2進(jìn)制是
000?11111?11111111?11111111?11111111
用入?yún)跟CAPACITY進(jìn)行按位與操作
1、低29位都是1,所以保留c的低29位,也就是有效線程數(shù)
2、高3位都是0,所以c的高3位也是0
這樣獲取出來(lái)的便是workerCount的值
ctlOf(int rs, int wc)
原子整型變量ctl的初始化方法
//結(jié)合這幾句代碼來(lái)看
private?static?final?int?RUNNING?=?-1?<
private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
private?static?int?ctlOf(int?rs,?int?wc)?{?return?rs?|?wc;?}
RUNNING是
111?00000?00000000?00000000?00000000
ctlOf是將rs和wc進(jìn)行按位或的操作
初始化的時(shí)候是將RUNNING和0進(jìn)行按位或
0的32位2進(jìn)制是
00000000?00000000?00000000?00000000
所以初始化的ctl是
111?00000?00000000?00000000?00000000
核心方法源碼分析
execute(Runnable command)方法
public?void?execute(Runnable?command)?{
//需要執(zhí)行的任務(wù)command為空,拋出空指針異常
if?(command?==?null)?//?1
throw?new?NullPointerException();
/*
*執(zhí)行的流程實(shí)際上分為三步
*1、如果運(yùn)行的線程小于corePoolSize,以用戶給定的Runable對(duì)象新開一個(gè)線程去執(zhí)行
*?并且執(zhí)行addWorker方法會(huì)以原子性操作去檢查runState和workerCount,以防止當(dāng)返回false的
*?時(shí)候添加了不應(yīng)該添加的線程
*2、?如果任務(wù)能夠成功添加到隊(duì)列當(dāng)中,我們?nèi)孕枰獙?duì)添加的線程進(jìn)行雙重檢查,有可能添加的線程在前
*?一次檢查時(shí)已經(jīng)死亡,又或者在進(jìn)入該方法的時(shí)候線程池關(guān)閉了。所以我們需要復(fù)查狀態(tài),并有有必
*?要的話需要在停止時(shí)回滾入列操作,或者在沒(méi)有線程的時(shí)候新開一個(gè)線程
*3、如果任務(wù)無(wú)法入列,那我們需要嘗試新增一個(gè)線程,如果新建線程失敗了,我們就知道線程可能關(guān)閉了
*?或者飽和了,就需要拒絕這個(gè)任務(wù)
*
*/
//獲取線程池的控制狀態(tài)
int?c?=?ctl.get();?//?2
//通過(guò)workCountOf方法算workerCount值,小于corePoolSize
if?(workerCountOf(c)?
//添加任務(wù)到worker集合當(dāng)中
if?(addWorker(command,?true))
return;?//成功返回
//失敗的話再次獲取線程池的控制狀態(tài)
c?=?ctl.get();
}
/*
*判斷線程池是否正處于RUNNING狀態(tài)
*是的話添加Runnable對(duì)象到workQueue隊(duì)列當(dāng)中
*/
if?(isRunning(c)?&&?workQueue.offer(command))?{?//?3
//再次獲取線程池的狀態(tài)
int?recheck?=?ctl.get();
//再次檢查狀態(tài)
//線程池不處于RUNNING狀態(tài),將任務(wù)從workQueue隊(duì)列中移除
if?(!?isRunning(recheck)?&&?remove(command))
//拒絕任務(wù)
reject(command);
//workerCount等于0
else?if?(workerCountOf(recheck)?==?0)?//?4
//添加worker
addWorker(null,?false);
}
//加入阻塞隊(duì)列失敗,則嘗試以線程池最大線程數(shù)新開線程去執(zhí)行該任務(wù)
else?if?(!addWorker(command,?false))?//?5
//執(zhí)行失敗則拒絕任務(wù)
reject(command);
}
我們來(lái)說(shuō)一下上面這個(gè)代碼的流程:
1、首先判斷任務(wù)是否為空,空則拋出空指針異常
2、不為空則獲取線程池控制狀態(tài),判斷小于corePoolSize,添加到worker集合當(dāng)中執(zhí)行,
如成功,則返回
失敗的話再接著獲取線程池控制狀態(tài),因?yàn)橹挥袪顟B(tài)變了才會(huì)失敗,所以重新獲取
3、判斷線程池是否處于運(yùn)行狀態(tài),是的話則添加command到阻塞隊(duì)列,加入時(shí)也會(huì)再次獲取狀態(tài)并且檢測(cè)
狀態(tài)是否不處于運(yùn)行狀態(tài),不處于的話則將command從阻塞隊(duì)列移除,并且拒絕任務(wù)
4、如果線程池里沒(méi)有了線程,則創(chuàng)建新的線程去執(zhí)行獲取阻塞隊(duì)列的任務(wù)執(zhí)行
5、如果以上都沒(méi)執(zhí)行成功,則需要開啟最大線程池里的線程來(lái)執(zhí)行任務(wù),失敗的話就丟棄
有時(shí)候再多的文字也不如一個(gè)流程圖來(lái)的明白,所以還是畫了個(gè)execute的流程圖給大家方便理解。
2.addWorker(Runnable firstTask, boolean core)
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
//外部循環(huán)標(biāo)記
retry:
//外層死循環(huán)
for?(;;)?{
//獲取線程池控制狀態(tài)
int?c?=?ctl.get();
//獲取runState
int?rs?=?runStateOf(c);
//?Check?if?queue?empty?only?if?necessary.
/**
*1.如果線程池runState至少已經(jīng)是SHUTDOWN
*2.?有一個(gè)是false則addWorker失敗,看false的情況
*?-?runState==SHUTDOWN,即狀態(tài)已經(jīng)大于SHUTDOWN了
*?-?firstTask為null,即傳進(jìn)來(lái)的任務(wù)為空,結(jié)合上面就是runState是SHUTDOWN,但是
*?firstTask不為空,代表線程池已經(jīng)關(guān)閉了還在傳任務(wù)進(jìn)來(lái)
*?-?隊(duì)列為空,既然任務(wù)已經(jīng)為空,隊(duì)列為空,就不需要往線程池添加任務(wù)了
*/
if?(rs?>=?SHUTDOWN?&&?//runState大于等于SHUTDOWN,初始位RUNNING
!?(rs?==?SHUTDOWN?&&?//runState等于SHUTDOWN
firstTask?==?null?&&?//firstTask為null
!?workQueue.isEmpty()))?//workQueue隊(duì)列不為空
return?false;
//內(nèi)層死循環(huán)
for?(;;)?{
//獲取線程池的workerCount數(shù)量
int?wc?=?workerCountOf(c);
//如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize
//返回false
if?(wc?>=?CAPACITY?||
wc?>=?(core???corePoolSize?:?maximumPoolSize))
return?false;
//通過(guò)CAS操作,使workerCount數(shù)量+1,成功則跳出循環(huán),回到retry標(biāo)記
if?(compareAndIncrementWorkerCount(c))
break?retry;
//CAS操作失敗,再次獲取線程池的控制狀態(tài)
c?=?ctl.get();?//?Re-read?ctl
//如果當(dāng)前runState不等于剛開始獲取的runState,則跳出內(nèi)層循環(huán),繼續(xù)外層循環(huán)
if?(runStateOf(c)?!=?rs)
continue?retry;
//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
//CAS由于更改workerCount而失敗,繼續(xù)內(nèi)層循環(huán)
}
}
//通過(guò)以上循環(huán),能執(zhí)行到這是workerCount成功+1了
//worker開始標(biāo)記
boolean?workerStarted?=?false;
//worker添加標(biāo)記
boolean?workerAdded?=?false;
//初始化worker為null
Worker?w?=?null;
try?{
//初始化一個(gè)當(dāng)前Runnable對(duì)象的worker對(duì)象
w?=?new?Worker(firstTask);
//獲取該worker對(duì)應(yīng)的線程
final?Thread?t?=?w.thread;
//如果線程不為null
if?(t?!=?null)?{
//初始線程池的鎖
final?ReentrantLock?mainLock?=?this.mainLock;
//獲取鎖
mainLock.lock();
try?{
//?Recheck?while?holding?lock.
//?Back?out?on?ThreadFactory?failure?or?if
//?shut?down?before?lock?acquired.
//獲取鎖后再次檢查,獲取線程池runState
int?rs?=?runStateOf(ctl.get());
//當(dāng)runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask為null
if?(rs?
(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
//線程已存活
if?(t.isAlive())?//?precheck?that?t?is?startable
//線程未啟動(dòng)就存活,拋出IllegalThreadStateException異常
throw?new?IllegalThreadStateException();
//將worker對(duì)象添加到workers集合當(dāng)中
workers.add(w);
//獲取workers集合的大小
int?s?=?workers.size();
//如果大小超過(guò)largestPoolSize
if?(s?>?largestPoolSize)
//重新設(shè)置largestPoolSize
largestPoolSize?=?s;
//標(biāo)記worker已經(jīng)被添加
workerAdded?=?true;
}
}?finally?{
//釋放鎖
mainLock.unlock();
}
//如果worker添加成功
if?(workerAdded)?{
//啟動(dòng)線程
t.start();
//標(biāo)記worker已經(jīng)啟動(dòng)
workerStarted?=?true;
}
}
}?finally?{
//如果worker沒(méi)有啟動(dòng)成功
if?(!?workerStarted)
//workerCount-1的操作
addWorkerFailed(w);
}
//返回worker是否啟動(dòng)的標(biāo)記
return?workerStarted;
}
我們也簡(jiǎn)單說(shuō)一下這個(gè)代碼的流程吧,還真的是挺難的,博主寫的時(shí)候都停了好多次,想砸鍵盤的說(shuō):
1、獲取線程池的控制狀態(tài),進(jìn)行判斷,不符合則返回false,符合則下一步
2、死循環(huán),判斷workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,沒(méi)有的話則對(duì)workerCount+1操作,
3、如果不符合上述判斷或+1操作失敗,再次獲取線程池的控制狀態(tài),獲取runState與剛開始獲取的runState相比,不一致則跳出內(nèi)層循環(huán)繼續(xù)外層循環(huán),否則繼續(xù)內(nèi)層循環(huán)
4、+1操作成功后,使用重入鎖ReentrantLock來(lái)保證往workers當(dāng)中添加worker實(shí)例,添加成功就啟動(dòng)該實(shí)例。
接下來(lái)看看流程圖來(lái)理解一下上面代碼的一個(gè)執(zhí)行流程
3.addWorkerFailed(Worker w)
addWorker方法添加worker失敗,并且沒(méi)有成功啟動(dòng)任務(wù)的時(shí)候,就會(huì)調(diào)用此方法,將任務(wù)從workers中移除,并且workerCount做-1操作。
private?void?addWorkerFailed(Worker?w)?{
//重入鎖
final?ReentrantLock?mainLock?=?this.mainLock;
//獲取鎖
mainLock.lock();
try?{
//如果worker不為null
if?(w?!=?null)
//workers移除worker
workers.remove(w);
//通過(guò)CAS操作,workerCount-1
decrementWorkerCount();
tryTerminate();
}?finally?{
//釋放鎖
mainLock.unlock();
}
}
4.tryTerminate()
當(dāng)對(duì)線程池執(zhí)行了非正常成功邏輯的操作時(shí),都會(huì)需要執(zhí)行tryTerminate嘗試終止線程池
final?void?tryTerminate()?{
//死循環(huán)
for?(;;)?{
//獲取線程池控制狀態(tài)
int?c?=?ctl.get();
/*
*線程池處于RUNNING狀態(tài)
*線程池狀態(tài)最小大于TIDYING
*線程池==SHUTDOWN并且workQUeue不為空
*直接return,不能終止
*/
if?(isRunning(c)?||
runStateAtLeast(c,?TIDYING)?||
(runStateOf(c)?==?SHUTDOWN?&&?!?workQueue.isEmpty()))
return;
//如果workerCount不為0
if?(workerCountOf(c)?!=?0)?{?//?Eligible?to?terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//獲取線程池的鎖
final?ReentrantLock?mainLock?=?this.mainLock;
//獲取鎖
mainLock.lock();
try?{
//通過(guò)CAS操作,設(shè)置線程池狀態(tài)為TIDYING
if?(ctl.compareAndSet(c,?ctlOf(TIDYING,?0)))?{
try?{
terminated();
}?finally?{
//設(shè)置線程池的狀態(tài)為TERMINATED
ctl.set(ctlOf(TERMINATED,?0));
//發(fā)送釋放信號(hào)給在termination條件上等待的線程
termination.signalAll();
}
return;
}
}?finally?{
//釋放鎖
mainLock.unlock();
}
//?else?retry?on?failed?CAS
}
}
5.runWorker(Worker w)
該方法的作用就是去執(zhí)行任務(wù)
final?void?runWorker(Worker?w)?{
//獲取當(dāng)前線程
Thread?wt?=?Thread.currentThread();
//獲取worker里的任務(wù)
Runnable?task?=?w.firstTask;
//將worker實(shí)例的任務(wù)賦值為null
w.firstTask?=?null;
/*
*unlock方法會(huì)調(diào)用AQS的release方法
*release方法會(huì)調(diào)用具體實(shí)現(xiàn)類也就是Worker的tryRelease方法
*也就是將AQS狀態(tài)置為0,允許中斷
*/
w.unlock();?//?allow?interrupts
//是否突然完成
boolean?completedAbruptly?=?true;
try?{
//worker實(shí)例的task不為空,或者通過(guò)getTask獲取的不為空
while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
//獲取鎖
w.lock();
//?If?pool?is?stopping,?ensure?thread?is?interrupted;
//?if?not,?ensure?thread?is?not?interrupted.?This
//?requires?a?recheck?in?second?case?to?deal?with
//?shutdownNow?race?while?clearing?interrupt
/*
*獲取線程池的控制狀態(tài),至少要大于STOP狀態(tài)
*如果狀態(tài)不對(duì),檢查當(dāng)前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP
*如果上述滿足,檢查該對(duì)象是否處于中斷狀態(tài),不清除中斷標(biāo)記
*/
if?((runStateAtLeast(ctl.get(),?STOP)?||
(Thread.interrupted()?&&
runStateAtLeast(ctl.get(),?STOP)))?&&
!wt.isInterrupted())
//中斷改對(duì)象
wt.interrupt();
try?{
//執(zhí)行前的方法,由子類具體實(shí)現(xiàn)
beforeExecute(wt,?task);
Throwable?thrown?=?null;
try?{
//執(zhí)行任務(wù)
task.run();
}?catch?(RuntimeException?x)?{
thrown?=?x;?throw?x;
}?catch?(Error?x)?{
thrown?=?x;?throw?x;
}?catch?(Throwable?x)?{
thrown?=?x;?throw?new?Error(x);
}?finally?{
//執(zhí)行完后調(diào)用的方法,也是由子類具體實(shí)現(xiàn)
afterExecute(task,?thrown);
}
}?finally?{//執(zhí)行完后
//task設(shè)置為null
task?=?null;
//已完成任務(wù)數(shù)+1
w.completedTasks++;
//釋放鎖
w.unlock();
}
}
completedAbruptly?=?false;
}?finally?{
//處理并退出當(dāng)前worker
processWorkerExit(w,?completedAbruptly);
}
}
接下來(lái)我們用文字來(lái)說(shuō)明一下執(zhí)行任務(wù)這個(gè)方法的具體邏輯和流程。
首先在方法一進(jìn)來(lái),就執(zhí)行了w.unlock(),這是為了將AQS的狀態(tài)改為0,因?yàn)橹挥術(shù)etState() >= 0的時(shí)候,線程才可以被中斷;
判斷firstTask是否為空,為空則通過(guò)getTask()獲取任務(wù),不為空接著往下執(zhí)行
判斷是否符合中斷狀態(tài),符合的話設(shè)置中斷標(biāo)記
執(zhí)行beforeExecute(),task.run(),afterExecute()方法
任何一個(gè)出異常都會(huì)導(dǎo)致任務(wù)執(zhí)行的終止;進(jìn)入processWorkerExit來(lái)退出任務(wù)
正常執(zhí)行的話會(huì)接著回到步驟2
附上一副簡(jiǎn)單的流程圖:
6.getTask()
在上面的runWorker方法當(dāng)中我們可以看出,當(dāng)firstTask為空的時(shí)候,會(huì)通過(guò)該方法來(lái)接著獲取任務(wù)去執(zhí)行,那我們就看看獲取任務(wù)這個(gè)方法到底是怎么樣的?
private?Runnable?getTask()?{
//標(biāo)志是否獲取任務(wù)超時(shí)
boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?
//死循環(huán)
for?(;;)?{
//獲取線程池的控制狀態(tài)
int?c?=?ctl.get();
//獲取線程池的runState
int?rs?=?runStateOf(c);
//?Check?if?queue?empty?only?if?necessary.
/*
*判斷線程池的狀態(tài),出現(xiàn)以下兩種情況
*1、runState大于等于SHUTDOWN狀態(tài)
*2、runState大于等于STOP或者阻塞隊(duì)列為空
*將會(huì)通過(guò)CAS操作,進(jìn)行workerCount-1并返回null
*/
if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
decrementWorkerCount();
return?null;
}
//獲取線程池的workerCount
int?wc?=?workerCountOf(c);
//?Are?workers?subject?to?culling?
/*
*allowCoreThreadTimeOut:是否允許core?Thread超時(shí),默認(rèn)false
*workerCount是否大于核心核心線程池
*/
boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
/*
*1、wc大于maximumPoolSize或者已超時(shí)
*2、隊(duì)列不為空時(shí)保證至少有一個(gè)任務(wù)
*/
if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
&&?(wc?>?1?||?workQueue.isEmpty()))?{
/*
*通過(guò)CAS操作,workerCount-1
*能進(jìn)行-1操作,證明wc大于maximumPoolSize或者已經(jīng)超時(shí)
*/
if?(compareAndDecrementWorkerCount(c))
//-1操作成功,返回null
return?null;
//-1操作失敗,繼續(xù)循環(huán)
continue;
}
try?{
/*
*wc大于核心線程池
*執(zhí)行poll方法
*小于核心線程池
*執(zhí)行take方法
*/
Runnable?r?=?timed??
workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
workQueue.take();
//判斷任務(wù)不為空返回任務(wù)
if?(r?!=?null)
return?r;
//獲取一段時(shí)間沒(méi)有獲取到,獲取超時(shí)
timedOut?=?true;
}?catch?(InterruptedException?retry)?{
timedOut?=?false;
}
}
}
還是文字解說(shuō)一下上面的代碼邏輯和流程:
獲取線程池控制狀態(tài)和runState,判斷線程池是否已經(jīng)關(guān)閉或者正在關(guān)閉,是的話則workerCount-1操作返回null
獲取workerCount判斷是否大于核心線程池
判斷workerCount是否大于最大線程池?cái)?shù)目或者已經(jīng)超時(shí),是的話workerCount-1,-1成功則返回null,不成功則回到步驟1重新繼續(xù)
判斷workerCount是否大于核心線程池,大于則用poll方法從隊(duì)列獲取任務(wù),否則用take方法從隊(duì)列獲取任務(wù)
判斷任務(wù)是否為空,不為空則返回獲取的任務(wù),否則回到步驟1重新繼續(xù)
接下來(lái)依然有一副流程圖:
7.processWorkerExit
明顯的,在執(zhí)行任務(wù)當(dāng)中,會(huì)去獲取任務(wù)進(jìn)行執(zhí)行,那既然是執(zhí)行任務(wù),肯定就會(huì)有執(zhí)行完或者出現(xiàn)異常中斷執(zhí)行的時(shí)候,那這時(shí)候肯定也會(huì)有相對(duì)應(yīng)的操作,至于具體操作是怎么樣的,我們還是直接去看源碼最實(shí)際。
private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
/*
*completedAbruptly:在runWorker出現(xiàn),代表是否突然完成的意思
*也就是在執(zhí)行任務(wù)過(guò)程當(dāng)中出現(xiàn)異常,就會(huì)突然完成,傳true
*
*如果是突然完成,需要通過(guò)CAS操作,workerCount-1
*不是突然完成,則不需要-1,因?yàn)間etTask方法當(dāng)中已經(jīng)-1
*
*下面的代碼注釋貌似與代碼意思相反了
*/
if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
decrementWorkerCount();
//生成重入鎖
final?ReentrantLock?mainLock?=?this.mainLock;
//獲取鎖
mainLock.lock();
try?{
//線程池統(tǒng)計(jì)的完成任務(wù)數(shù)completedTaskCount加上worker當(dāng)中完成的任務(wù)數(shù)
completedTaskCount?+=?w.completedTasks;
//從HashSet
workers.remove(w);
}?finally?{
//釋放鎖
mainLock.unlock();
}
//因?yàn)樯鲜霾僮魇轻尫湃蝿?wù)或線程,所以會(huì)判斷線程池狀態(tài),嘗試終止線程池
tryTerminate();
//獲取線程池的控制狀態(tài)
int?c?=?ctl.get();
//判斷runState是否小魚STOP,即是RUNNING或者SHUTDOWN
//如果是RUNNING或者SHUTDOWN,代表沒(méi)有成功終止線程池
if?(runStateLessThan(c,?STOP))?{
/*
*是否突然完成
*如若不是,代表已經(jīng)沒(méi)有任務(wù)可獲取完成,因?yàn)間etTask當(dāng)中是while循環(huán)
*/
if?(!completedAbruptly)?{
/*
*allowCoreThreadTimeOut:是否允許core?thread超時(shí),默認(rèn)false
*min-默認(rèn)是corePoolSize
*/
int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
//允許core?thread超時(shí)并且隊(duì)列不為空
//min為0,即允許core?thread超時(shí),這樣就不需要維護(hù)核心核心線程池了
//如果workQueue不為空,則至少保持一個(gè)線程存活
if?(min?==?0?&&?!?workQueue.isEmpty())
min?=?1;
//如果workerCount大于min,則表示滿足所需,可以直接返回
if?(workerCountOf(c)?>=?min)
return;?//?replacement?not?needed
}
//如果是突然完成,添加一個(gè)空任務(wù)的worker線程--這里我也不太理解
addWorker(null,?false);
}
}
首先判斷線程是否突然終止,如果是突然終止,通過(guò)CAS,workerCount-1
統(tǒng)計(jì)線程池完成任務(wù)數(shù),并將worker從workers當(dāng)中移除
判斷線程池狀態(tài),嘗試終止線程池
線程池沒(méi)有成功終止
判斷是否突然完成任務(wù),不是則進(jìn)行下一步,是則進(jìn)行第三步
如允許核心線程超時(shí),隊(duì)列不為空,則至少保證一個(gè)線程存活
添加一個(gè)空任務(wù)的worker線程
Worker內(nèi)部類
我們?cè)谏厦嬉呀?jīng)算是挺詳細(xì)地講了線程池執(zhí)行任務(wù) execute 的執(zhí)行流程和一些細(xì)節(jié),在上面頻繁地出現(xiàn)了一個(gè)字眼,那就是worker實(shí)例,那么這個(gè)worker究竟是什么呢?里面都包含了一些什么信息,以及worker這個(gè)任務(wù)究竟是怎么執(zhí)行的呢?
我們就在這個(gè)部分來(lái)介紹一下吧,還是直接上源碼:
我們可以看到Worker內(nèi)部類繼承AQS同步器并且實(shí)現(xiàn)了Runnable接口,所以Worker很明顯就是一個(gè)可執(zhí)行任務(wù)并且又可以控制中斷、起到鎖效果的類。
private?final?class?Worker
extends?AbstractQueuedSynchronizer
implements?Runnable
{
/**
*?This?class?will?never?be?serialized,?but?we?provide?a
*?serialVersionUID?to?suppress?a?javac?warning.
*/
private?static?final?long?serialVersionUID?=?6138294804551838833L;
/**?工作線程,如果工廠失敗則為空.?*/
final?Thread?thread;
/**?初始化任務(wù),有可能為空?*/
Runnable?firstTask;
/**?已完成的任務(wù)計(jì)數(shù)?*/
volatile?long?completedTasks;
/**
*?創(chuàng)建并初始化第一個(gè)任務(wù),使用線程工廠來(lái)創(chuàng)建線程
*?初始化有3步
*1、設(shè)置AQS的同步狀態(tài)為-1,表示該對(duì)象需要被喚醒
*2、初始化第一個(gè)任務(wù)
*3、調(diào)用ThreadFactory來(lái)使自身創(chuàng)建一個(gè)線程,并賦值給worker的成員變量thread
*/
Worker(Runnable?firstTask)?{
setState(-1);?//?inhibit?interrupts?until?runWorker
this.firstTask?=?firstTask;
this.thread?=?getThreadFactory().newThread(this);
}
//重寫Runnable的run方法
/**?Delegates?main?run?loop?to?outer?runWorker?*/
public?void?run()?{
//調(diào)用ThreadPoolExecutor的runWorker方法
runWorker(this);
}
//?Lock?methods
//
//?The?value?0?represents?the?unlocked?state.
//?The?value?1?represents?the?locked?state.
//代表是否獨(dú)占鎖,0-非獨(dú)占?1-獨(dú)占
protected?boolean?isHeldExclusively()?{
return?getState()?!=?0;
}
//重寫AQS的tryAcquire方法嘗試獲取鎖
protected?boolean?tryAcquire(int?unused)?{
//嘗試將AQS的同步狀態(tài)從0改為1
if?(compareAndSetState(0,?1))?{
//如果改變成,則將當(dāng)前獨(dú)占模式的線程設(shè)置為當(dāng)前線程并返回true
setExclusiveOwnerThread(Thread.currentThread());
return?true;
}
//否則返回false
return?false;
}
//重寫AQS的tryRelease嘗試釋放鎖
protected?boolean?tryRelease(int?unused)?{
//設(shè)置當(dāng)前獨(dú)占模式的線程為null
setExclusiveOwnerThread(null);
//設(shè)置AQS同步狀態(tài)為0
setState(0);
//返回true
return?true;
}
//獲取鎖
public?void?lock()?{?acquire(1);?}
//嘗試獲取鎖
public?boolean?tryLock()?{?return?tryAcquire(1);?}
//釋放鎖
public?void?unlock()?{?release(1);?}
//是否被獨(dú)占
public?boolean?isLocked()?{?return?isHeldExclusively();?}
void?interruptIfStarted()?{
Thread?t;
if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{
try?{
t.interrupt();
}?catch?(SecurityException?ignore)?{
}
}
}
}
小結(jié)
寫這個(gè)線程池就真的是不容易了,歷時(shí)兩個(gè)星期,中途有很多的地方不懂,而且《Java并發(fā)編程的藝術(shù)》的這本書當(dāng)中對(duì)線程池的介紹其實(shí)并不算多,所以自己看起來(lái)也挺痛苦的,還經(jīng)常會(huì)看了這個(gè)方法就不知道為什么要調(diào)用這個(gè)以及調(diào)用這個(gè)方法是出何用意。而且在這學(xué)習(xí)的過(guò)程當(dāng)中,有在懷疑自己的學(xué)習(xí)方法對(duì)不對(duì),因?yàn)橐灿腥烁艺f(shuō)不需要一句句去看去分析源碼,只需要知道流程就可以了,但是后來(lái)還是想想按照自己的學(xué)習(xí)路線走,多讀源碼總是有好處的,在這里我也給程序猿一些建議,有自己的學(xué)習(xí)方法的時(shí)候,按照自己的方式堅(jiān)定走下去。
開發(fā)者 編程語(yǔ)言
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。