Java并發(fā)線程池ThreadPoolExecutor源碼分析學(xué)習(xí)

      網(wǎng)友投稿 806 2022-05-29

      線程池學(xué)習(xí)

      以下所有內(nèi)容以及源碼分析都是基于JDK1.8的,請(qǐng)知悉。

      我寫博客就真的比較沒(méi)有順序了,這可能跟我的學(xué)習(xí)方式有關(guān),我自己也覺(jué)得這樣挺不好的,但是沒(méi)辦法說(shuō)服自己去改變,所以也只能這樣想到什么學(xué)什么了。

      池化技術(shù)真的是一門在我看來(lái)非常牛逼的技術(shù),因?yàn)樗龅搅嗽谟邢拶Y源內(nèi)實(shí)現(xiàn)了資源利用的最大化,這讓我想到了一門課程,那就是運(yùn)籌學(xué),當(dāng)時(shí)在上運(yùn)籌學(xué)的時(shí)候就經(jīng)常做這種類似的問(wèn)題。

      言歸正傳吧,我接下來(lái)會(huì)進(jìn)行一次線程池方面知識(shí)點(diǎn)的學(xué)習(xí),也會(huì)記錄下來(lái)分享給大家。

      線程池的內(nèi)容當(dāng)中有涉及到AQS同步器的知識(shí)點(diǎn),如果對(duì)AQS同步器知識(shí)點(diǎn)感覺(jué)有點(diǎn)薄弱。

      線程池的優(yōu)勢(shì)

      既然說(shuō)到線程池了,而且大多數(shù)的大牛也都會(huì)建議我們使用池化技術(shù)來(lái)管理一些資源,那線程池肯定也是有它的好處的,要不然怎么會(huì)那么出名并且讓大家使用呢?

      我們就來(lái)看看它究竟有什么優(yōu)勢(shì)?

      資源可控性:使用線程池可以避免創(chuàng)建大量線程而導(dǎo)致內(nèi)存的消耗

      提高響應(yīng)速度:線程池地創(chuàng)建實(shí)際上是很消耗時(shí)間和性能的,由線程池創(chuàng)建好有任務(wù)就運(yùn)行,提升響應(yīng)速度。

      便于管理:池化技術(shù)最突出的一個(gè)特點(diǎn)就是可以幫助我們對(duì)池子里的資源進(jìn)行管理。由線程池統(tǒng)一分配和管理。

      線程池的創(chuàng)建

      我們要用線程池來(lái)統(tǒng)一分配和管理我們的線程,那首先我們要?jiǎng)?chuàng)建一個(gè)線程池出來(lái),還是有很多大牛已經(jīng)幫我們寫好了很多方面的代碼的,Executors的工廠方法就給我們提供了創(chuàng)建多種不同線程池的方法。因?yàn)檫@個(gè)類只是一個(gè)創(chuàng)建對(duì)象的工廠,并沒(méi)有涉及到很多的具體實(shí)現(xiàn),所以我不會(huì)過(guò)于詳細(xì)地去說(shuō)明。

      老規(guī)矩,還是直接上代碼吧。

      corePoolSize(核心線程池大小):當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,當(dāng)任務(wù)數(shù)大于核心線程數(shù)的時(shí)候就不會(huì)再創(chuàng)建。在這里要注意一點(diǎn),線程池剛創(chuàng)建的時(shí)候,其中并沒(méi)有創(chuàng)建任何線程,而是等任務(wù)來(lái)才去創(chuàng)建線程,除非調(diào)用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法 ,這樣才會(huì)預(yù)先創(chuàng)建好 corePoolSize 個(gè)線程或者一個(gè)線程。

      maximumPoolSize(線程池最大線程數(shù)):線程池允許創(chuàng)建的最大線程數(shù),如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是,如果使用了×××隊(duì)列,此參數(shù)就沒(méi)有意義了。

      keepAliveTime(線程活動(dòng)保持時(shí)間):此參數(shù)默認(rèn)在線程數(shù)大于 corePoolSize 的情況下才會(huì)起作用, 當(dāng)線程的空閑時(shí)間達(dá)到 keepAliveTime 的時(shí)候就會(huì)終止,直至線程數(shù)目小于 corePoolSize 。不過(guò)如果調(diào)用了 allowCoreThreadTimeOut 方法,則當(dāng)線程數(shù)目小于 corePoolSize 的時(shí)候也會(huì)起作用.

      unit(keelAliveTime的時(shí)間單位):keelAliveTime的時(shí)間單位,一共有7種,在這里就不列舉了。

      workQueue(阻塞隊(duì)列):阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)也是非常重要的,在這里簡(jiǎn)單介紹一下幾個(gè)阻塞隊(duì)列。

      ArrayBlockingQueue:這是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按照FIFO的原則對(duì)元素進(jìn)行排序。

      LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按照FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法 Executors.newFixedThreadPool()就是使用了這個(gè)隊(duì)列。

      SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法 Executors.newCachedThreadPool() 就使用了這個(gè)隊(duì)列。

      PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)的無(wú)阻塞隊(duì)列。

      handler(飽和策略);當(dāng)線程池和隊(duì)列都滿了,說(shuō)明線程池已經(jīng)處于飽和狀態(tài)了,那么必須采取一種策略來(lái)處理還在提交過(guò)來(lái)的新任務(wù)。這個(gè)飽和策略默認(rèn)情況下是 AbortPolicy ,表示無(wú)法處理新任務(wù)時(shí)拋出異常。共有四種飽和策略提供,當(dāng)然我們也可以選擇自己實(shí)現(xiàn)飽和策略。

      AbortPolicy:直接丟棄并且拋出 RejectedExecutionException 異常

      CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)。

      DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。

      DiscardPolicy:丟棄任務(wù)并且不拋出異常。

      線程池的執(zhí)行流程就用參考資料里的圖介紹一下了,具體我們還是通過(guò)代碼去講解。

      在上面我們簡(jiǎn)單的講解了一下 Executors 這個(gè)工廠類里的工廠方法,并且講述了一下創(chuàng)建線程池的一些參數(shù)以及它們的作用,當(dāng)然上面的講解并不是很深入,因?yàn)橄胍脑捠切枰掷m(xù)地花時(shí)間去看去理解的,而博主自己也還是沒(méi)有完全弄懂,不過(guò)博主的學(xué)習(xí)方法是先學(xué)了個(gè)大概,再回頭來(lái)看看之前的知識(shí)點(diǎn),可能會(huì)更加好理解,所以我們接著往下面講吧。

      ThreadPoolExecutor源碼分析

      在上面我們就發(fā)現(xiàn)了, Executors 的工廠方法主要就返回了 ThreadPoolExecutor 對(duì)象,至于另一個(gè)在這里暫時(shí)不講,也就是說(shuō),要學(xué)習(xí)線程池,其實(shí)關(guān)鍵的還是得學(xué)會(huì)分析 ThreadPoolExecutor 這個(gè)對(duì)象里面的源碼,我們接下來(lái)就會(huì)對(duì) ThreadPoolExecutor 里的關(guān)鍵代碼進(jìn)行分析。

      AtomicInteger ctl

      ctl 是主要的控制狀態(tài),是一個(gè)復(fù)合類型的變量,其中包括了兩個(gè)概念。

      workerCount:表示有效的線程數(shù)目

      runState:線程池里線程的運(yùn)行狀態(tài)

      我們來(lái)分析一下跟 ctl 有關(guān)的一些源代碼吧,直接上代碼

      CAPACITY

      在這里我們講一下這個(gè)線程池最大數(shù)量的計(jì)算吧,因?yàn)檫@里涉及到源碼以及位移之類的操作,我感覺(jué)大多數(shù)人都還是不太會(huì)這個(gè),因?yàn)槲乙婚_始看的時(shí)候也是不太會(huì)的。

      從代碼我們可以看出,是需要 1往左移29位 ,然后再減去1,那個(gè) 1往左移29位 是怎么計(jì)算的呢?

      2.runState

      正數(shù)的原碼、反碼、補(bǔ)碼都是一樣的

      在計(jì)算機(jī)底層,是用補(bǔ)碼來(lái)表示的

      RUNNING

      可以接受新任務(wù)并且處理已經(jīng)在阻塞隊(duì)列的任務(wù)

      高3位全部是1的話,就是RUNNING狀態(tài)

      SHUTDOWN

      不接受新任務(wù),但是處理已經(jīng)在阻塞隊(duì)列的任務(wù)

      高3位全是0,就是SHUTDOWN狀態(tài)

      STOP

      不接受新任務(wù),也不處理阻塞隊(duì)列里的任務(wù),并且會(huì)中斷正在處理的任務(wù)

      所以高3位是001,就是STOP狀態(tài)

      TIDYING

      所有任務(wù)都被中止,workerCount是0,線程狀態(tài)轉(zhuǎn)化為TIDYING并且調(diào)用terminated()鉤子方法

      所以高3位是010,就是TIDYING狀態(tài)

      TERMINATED

      terminated()鉤子方法已經(jīng)完成

      所以高3位是110,就是TERMINATED狀態(tài)

      3.部分方法介紹

      runStateOf(int c)

      實(shí)時(shí)獲取runState的方法

      workerCountOf(int c)

      獲取線程池的當(dāng)前有效線程數(shù)目

      private?static?int?workerCountOf(int?c)?{?return?c?&?CAPACITY;?}

      CAPACITY的32位2進(jìn)制是

      000?11111?11111111?11111111?11111111

      用入?yún)跟CAPACITY進(jìn)行按位與操作

      1、低29位都是1,所以保留c的低29位,也就是有效線程數(shù)

      2、高3位都是0,所以c的高3位也是0

      這樣獲取出來(lái)的便是workerCount的值

      ctlOf(int rs, int wc)

      原子整型變量ctl的初始化方法

      //結(jié)合這幾句代碼來(lái)看

      private?static?final?int?RUNNING?=?-1?<

      private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));

      private?static?int?ctlOf(int?rs,?int?wc)?{?return?rs?|?wc;?}

      RUNNING是

      111?00000?00000000?00000000?00000000

      ctlOf是將rs和wc進(jìn)行按位或的操作

      初始化的時(shí)候是將RUNNING和0進(jìn)行按位或

      0的32位2進(jìn)制是

      00000000?00000000?00000000?00000000

      所以初始化的ctl是

      111?00000?00000000?00000000?00000000

      核心方法源碼分析

      execute(Runnable command)方法

      public?void?execute(Runnable?command)?{

      //需要執(zhí)行的任務(wù)command為空,拋出空指針異常

      if?(command?==?null)?//?1

      throw?new?NullPointerException();

      /*

      *執(zhí)行的流程實(shí)際上分為三步

      *1、如果運(yùn)行的線程小于corePoolSize,以用戶給定的Runable對(duì)象新開一個(gè)線程去執(zhí)行

      *?并且執(zhí)行addWorker方法會(huì)以原子性操作去檢查runState和workerCount,以防止當(dāng)返回false的

      *?時(shí)候添加了不應(yīng)該添加的線程

      *2、?如果任務(wù)能夠成功添加到隊(duì)列當(dāng)中,我們?nèi)孕枰獙?duì)添加的線程進(jìn)行雙重檢查,有可能添加的線程在前

      *?一次檢查時(shí)已經(jīng)死亡,又或者在進(jìn)入該方法的時(shí)候線程池關(guān)閉了。所以我們需要復(fù)查狀態(tài),并有有必

      *?要的話需要在停止時(shí)回滾入列操作,或者在沒(méi)有線程的時(shí)候新開一個(gè)線程

      *3、如果任務(wù)無(wú)法入列,那我們需要嘗試新增一個(gè)線程,如果新建線程失敗了,我們就知道線程可能關(guān)閉了

      *?或者飽和了,就需要拒絕這個(gè)任務(wù)

      *

      */

      //獲取線程池的控制狀態(tài)

      int?c?=?ctl.get();?//?2

      //通過(guò)workCountOf方法算workerCount值,小于corePoolSize

      if?(workerCountOf(c)?

      //添加任務(wù)到worker集合當(dāng)中

      if?(addWorker(command,?true))

      return;?//成功返回

      //失敗的話再次獲取線程池的控制狀態(tài)

      c?=?ctl.get();

      }

      /*

      *判斷線程池是否正處于RUNNING狀態(tài)

      *是的話添加Runnable對(duì)象到workQueue隊(duì)列當(dāng)中

      */

      if?(isRunning(c)?&&?workQueue.offer(command))?{?//?3

      //再次獲取線程池的狀態(tài)

      int?recheck?=?ctl.get();

      //再次檢查狀態(tài)

      //線程池不處于RUNNING狀態(tài),將任務(wù)從workQueue隊(duì)列中移除

      if?(!?isRunning(recheck)?&&?remove(command))

      //拒絕任務(wù)

      reject(command);

      //workerCount等于0

      else?if?(workerCountOf(recheck)?==?0)?//?4

      //添加worker

      addWorker(null,?false);

      }

      //加入阻塞隊(duì)列失敗,則嘗試以線程池最大線程數(shù)新開線程去執(zhí)行該任務(wù)

      else?if?(!addWorker(command,?false))?//?5

      //執(zhí)行失敗則拒絕任務(wù)

      reject(command);

      }

      我們來(lái)說(shuō)一下上面這個(gè)代碼的流程:

      1、首先判斷任務(wù)是否為空,空則拋出空指針異常

      2、不為空則獲取線程池控制狀態(tài),判斷小于corePoolSize,添加到worker集合當(dāng)中執(zhí)行,

      如成功,則返回

      失敗的話再接著獲取線程池控制狀態(tài),因?yàn)橹挥袪顟B(tài)變了才會(huì)失敗,所以重新獲取

      3、判斷線程池是否處于運(yùn)行狀態(tài),是的話則添加command到阻塞隊(duì)列,加入時(shí)也會(huì)再次獲取狀態(tài)并且檢測(cè)

      狀態(tài)是否不處于運(yùn)行狀態(tài),不處于的話則將command從阻塞隊(duì)列移除,并且拒絕任務(wù)

      4、如果線程池里沒(méi)有了線程,則創(chuàng)建新的線程去執(zhí)行獲取阻塞隊(duì)列的任務(wù)執(zhí)行

      5、如果以上都沒(méi)執(zhí)行成功,則需要開啟最大線程池里的線程來(lái)執(zhí)行任務(wù),失敗的話就丟棄

      有時(shí)候再多的文字也不如一個(gè)流程圖來(lái)的明白,所以還是畫了個(gè)execute的流程圖給大家方便理解。

      2.addWorker(Runnable firstTask, boolean core)

      private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{

      //外部循環(huán)標(biāo)記

      retry:

      //外層死循環(huán)

      for?(;;)?{

      //獲取線程池控制狀態(tài)

      int?c?=?ctl.get();

      //獲取runState

      int?rs?=?runStateOf(c);

      //?Check?if?queue?empty?only?if?necessary.

      /**

      *1.如果線程池runState至少已經(jīng)是SHUTDOWN

      *2.?有一個(gè)是false則addWorker失敗,看false的情況

      *?-?runState==SHUTDOWN,即狀態(tài)已經(jīng)大于SHUTDOWN了

      *?-?firstTask為null,即傳進(jìn)來(lái)的任務(wù)為空,結(jié)合上面就是runState是SHUTDOWN,但是

      *?firstTask不為空,代表線程池已經(jīng)關(guān)閉了還在傳任務(wù)進(jìn)來(lái)

      *?-?隊(duì)列為空,既然任務(wù)已經(jīng)為空,隊(duì)列為空,就不需要往線程池添加任務(wù)了

      */

      if?(rs?>=?SHUTDOWN?&&?//runState大于等于SHUTDOWN,初始位RUNNING

      !?(rs?==?SHUTDOWN?&&?//runState等于SHUTDOWN

      firstTask?==?null?&&?//firstTask為null

      !?workQueue.isEmpty()))?//workQueue隊(duì)列不為空

      return?false;

      //內(nèi)層死循環(huán)

      for?(;;)?{

      //獲取線程池的workerCount數(shù)量

      int?wc?=?workerCountOf(c);

      //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize

      //返回false

      if?(wc?>=?CAPACITY?||

      wc?>=?(core???corePoolSize?:?maximumPoolSize))

      return?false;

      //通過(guò)CAS操作,使workerCount數(shù)量+1,成功則跳出循環(huán),回到retry標(biāo)記

      if?(compareAndIncrementWorkerCount(c))

      break?retry;

      //CAS操作失敗,再次獲取線程池的控制狀態(tài)

      c?=?ctl.get();?//?Re-read?ctl

      //如果當(dāng)前runState不等于剛開始獲取的runState,則跳出內(nèi)層循環(huán),繼續(xù)外層循環(huán)

      if?(runStateOf(c)?!=?rs)

      continue?retry;

      //?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop

      //CAS由于更改workerCount而失敗,繼續(xù)內(nèi)層循環(huán)

      }

      }

      //通過(guò)以上循環(huán),能執(zhí)行到這是workerCount成功+1了

      //worker開始標(biāo)記

      boolean?workerStarted?=?false;

      //worker添加標(biāo)記

      boolean?workerAdded?=?false;

      //初始化worker為null

      Worker?w?=?null;

      try?{

      //初始化一個(gè)當(dāng)前Runnable對(duì)象的worker對(duì)象

      w?=?new?Worker(firstTask);

      //獲取該worker對(duì)應(yīng)的線程

      final?Thread?t?=?w.thread;

      //如果線程不為null

      if?(t?!=?null)?{

      //初始線程池的鎖

      final?ReentrantLock?mainLock?=?this.mainLock;

      //獲取鎖

      mainLock.lock();

      try?{

      //?Recheck?while?holding?lock.

      //?Back?out?on?ThreadFactory?failure?or?if

      //?shut?down?before?lock?acquired.

      //獲取鎖后再次檢查,獲取線程池runState

      int?rs?=?runStateOf(ctl.get());

      //當(dāng)runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask為null

      if?(rs?

      (rs?==?SHUTDOWN?&&?firstTask?==?null))?{

      //線程已存活

      if?(t.isAlive())?//?precheck?that?t?is?startable

      //線程未啟動(dòng)就存活,拋出IllegalThreadStateException異常

      throw?new?IllegalThreadStateException();

      //將worker對(duì)象添加到workers集合當(dāng)中

      workers.add(w);

      //獲取workers集合的大小

      int?s?=?workers.size();

      //如果大小超過(guò)largestPoolSize

      if?(s?>?largestPoolSize)

      //重新設(shè)置largestPoolSize

      largestPoolSize?=?s;

      //標(biāo)記worker已經(jīng)被添加

      workerAdded?=?true;

      }

      }?finally?{

      //釋放鎖

      mainLock.unlock();

      }

      //如果worker添加成功

      if?(workerAdded)?{

      //啟動(dòng)線程

      t.start();

      //標(biāo)記worker已經(jīng)啟動(dòng)

      workerStarted?=?true;

      }

      }

      }?finally?{

      //如果worker沒(méi)有啟動(dòng)成功

      if?(!?workerStarted)

      //workerCount-1的操作

      addWorkerFailed(w);

      }

      //返回worker是否啟動(dòng)的標(biāo)記

      return?workerStarted;

      }

      我們也簡(jiǎn)單說(shuō)一下這個(gè)代碼的流程吧,還真的是挺難的,博主寫的時(shí)候都停了好多次,想砸鍵盤的說(shuō):

      1、獲取線程池的控制狀態(tài),進(jìn)行判斷,不符合則返回false,符合則下一步

      2、死循環(huán),判斷workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,沒(méi)有的話則對(duì)workerCount+1操作,

      3、如果不符合上述判斷或+1操作失敗,再次獲取線程池的控制狀態(tài),獲取runState與剛開始獲取的runState相比,不一致則跳出內(nèi)層循環(huán)繼續(xù)外層循環(huán),否則繼續(xù)內(nèi)層循環(huán)

      4、+1操作成功后,使用重入鎖ReentrantLock來(lái)保證往workers當(dāng)中添加worker實(shí)例,添加成功就啟動(dòng)該實(shí)例。

      接下來(lái)看看流程圖來(lái)理解一下上面代碼的一個(gè)執(zhí)行流程

      3.addWorkerFailed(Worker w)

      addWorker方法添加worker失敗,并且沒(méi)有成功啟動(dòng)任務(wù)的時(shí)候,就會(huì)調(diào)用此方法,將任務(wù)從workers中移除,并且workerCount做-1操作。

      private?void?addWorkerFailed(Worker?w)?{

      //重入鎖

      final?ReentrantLock?mainLock?=?this.mainLock;

      //獲取鎖

      mainLock.lock();

      try?{

      //如果worker不為null

      if?(w?!=?null)

      //workers移除worker

      workers.remove(w);

      //通過(guò)CAS操作,workerCount-1

      decrementWorkerCount();

      tryTerminate();

      }?finally?{

      //釋放鎖

      mainLock.unlock();

      }

      }

      4.tryTerminate()

      當(dāng)對(duì)線程池執(zhí)行了非正常成功邏輯的操作時(shí),都會(huì)需要執(zhí)行tryTerminate嘗試終止線程池

      final?void?tryTerminate()?{

      //死循環(huán)

      for?(;;)?{

      //獲取線程池控制狀態(tài)

      int?c?=?ctl.get();

      /*

      *線程池處于RUNNING狀態(tài)

      *線程池狀態(tài)最小大于TIDYING

      *線程池==SHUTDOWN并且workQUeue不為空

      *直接return,不能終止

      */

      if?(isRunning(c)?||

      runStateAtLeast(c,?TIDYING)?||

      (runStateOf(c)?==?SHUTDOWN?&&?!?workQueue.isEmpty()))

      return;

      //如果workerCount不為0

      if?(workerCountOf(c)?!=?0)?{?//?Eligible?to?terminate

      interruptIdleWorkers(ONLY_ONE);

      return;

      }

      //獲取線程池的鎖

      final?ReentrantLock?mainLock?=?this.mainLock;

      //獲取鎖

      mainLock.lock();

      try?{

      //通過(guò)CAS操作,設(shè)置線程池狀態(tài)為TIDYING

      if?(ctl.compareAndSet(c,?ctlOf(TIDYING,?0)))?{

      try?{

      terminated();

      }?finally?{

      //設(shè)置線程池的狀態(tài)為TERMINATED

      ctl.set(ctlOf(TERMINATED,?0));

      //發(fā)送釋放信號(hào)給在termination條件上等待的線程

      termination.signalAll();

      }

      return;

      }

      }?finally?{

      //釋放鎖

      mainLock.unlock();

      }

      //?else?retry?on?failed?CAS

      }

      }

      5.runWorker(Worker w)

      該方法的作用就是去執(zhí)行任務(wù)

      final?void?runWorker(Worker?w)?{

      //獲取當(dāng)前線程

      Thread?wt?=?Thread.currentThread();

      //獲取worker里的任務(wù)

      Runnable?task?=?w.firstTask;

      //將worker實(shí)例的任務(wù)賦值為null

      w.firstTask?=?null;

      /*

      *unlock方法會(huì)調(diào)用AQS的release方法

      *release方法會(huì)調(diào)用具體實(shí)現(xiàn)類也就是Worker的tryRelease方法

      *也就是將AQS狀態(tài)置為0,允許中斷

      */

      w.unlock();?//?allow?interrupts

      //是否突然完成

      boolean?completedAbruptly?=?true;

      try?{

      //worker實(shí)例的task不為空,或者通過(guò)getTask獲取的不為空

      while?(task?!=?null?||?(task?=?getTask())?!=?null)?{

      //獲取鎖

      w.lock();

      //?If?pool?is?stopping,?ensure?thread?is?interrupted;

      //?if?not,?ensure?thread?is?not?interrupted.?This

      //?requires?a?recheck?in?second?case?to?deal?with

      //?shutdownNow?race?while?clearing?interrupt

      /*

      *獲取線程池的控制狀態(tài),至少要大于STOP狀態(tài)

      *如果狀態(tài)不對(duì),檢查當(dāng)前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP

      *如果上述滿足,檢查該對(duì)象是否處于中斷狀態(tài),不清除中斷標(biāo)記

      */

      if?((runStateAtLeast(ctl.get(),?STOP)?||

      (Thread.interrupted()?&&

      runStateAtLeast(ctl.get(),?STOP)))?&&

      !wt.isInterrupted())

      //中斷改對(duì)象

      wt.interrupt();

      try?{

      //執(zhí)行前的方法,由子類具體實(shí)現(xiàn)

      beforeExecute(wt,?task);

      Throwable?thrown?=?null;

      try?{

      //執(zhí)行任務(wù)

      task.run();

      }?catch?(RuntimeException?x)?{

      thrown?=?x;?throw?x;

      }?catch?(Error?x)?{

      thrown?=?x;?throw?x;

      }?catch?(Throwable?x)?{

      thrown?=?x;?throw?new?Error(x);

      }?finally?{

      //執(zhí)行完后調(diào)用的方法,也是由子類具體實(shí)現(xiàn)

      afterExecute(task,?thrown);

      }

      }?finally?{//執(zhí)行完后

      //task設(shè)置為null

      task?=?null;

      //已完成任務(wù)數(shù)+1

      w.completedTasks++;

      //釋放鎖

      w.unlock();

      }

      }

      completedAbruptly?=?false;

      }?finally?{

      Java并發(fā)之線程池ThreadPoolExecutor源碼分析學(xué)習(xí)

      //處理并退出當(dāng)前worker

      processWorkerExit(w,?completedAbruptly);

      }

      }

      接下來(lái)我們用文字來(lái)說(shuō)明一下執(zhí)行任務(wù)這個(gè)方法的具體邏輯和流程。

      首先在方法一進(jìn)來(lái),就執(zhí)行了w.unlock(),這是為了將AQS的狀態(tài)改為0,因?yàn)橹挥術(shù)etState() >= 0的時(shí)候,線程才可以被中斷;

      判斷firstTask是否為空,為空則通過(guò)getTask()獲取任務(wù),不為空接著往下執(zhí)行

      判斷是否符合中斷狀態(tài),符合的話設(shè)置中斷標(biāo)記

      執(zhí)行beforeExecute(),task.run(),afterExecute()方法

      任何一個(gè)出異常都會(huì)導(dǎo)致任務(wù)執(zhí)行的終止;進(jìn)入processWorkerExit來(lái)退出任務(wù)

      正常執(zhí)行的話會(huì)接著回到步驟2

      附上一副簡(jiǎn)單的流程圖:

      6.getTask()

      在上面的runWorker方法當(dāng)中我們可以看出,當(dāng)firstTask為空的時(shí)候,會(huì)通過(guò)該方法來(lái)接著獲取任務(wù)去執(zhí)行,那我們就看看獲取任務(wù)這個(gè)方法到底是怎么樣的?

      private?Runnable?getTask()?{

      //標(biāo)志是否獲取任務(wù)超時(shí)

      boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?

      //死循環(huán)

      for?(;;)?{

      //獲取線程池的控制狀態(tài)

      int?c?=?ctl.get();

      //獲取線程池的runState

      int?rs?=?runStateOf(c);

      //?Check?if?queue?empty?only?if?necessary.

      /*

      *判斷線程池的狀態(tài),出現(xiàn)以下兩種情況

      *1、runState大于等于SHUTDOWN狀態(tài)

      *2、runState大于等于STOP或者阻塞隊(duì)列為空

      *將會(huì)通過(guò)CAS操作,進(jìn)行workerCount-1并返回null

      */

      if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{

      decrementWorkerCount();

      return?null;

      }

      //獲取線程池的workerCount

      int?wc?=?workerCountOf(c);

      //?Are?workers?subject?to?culling?

      /*

      *allowCoreThreadTimeOut:是否允許core?Thread超時(shí),默認(rèn)false

      *workerCount是否大于核心核心線程池

      */

      boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;

      /*

      *1、wc大于maximumPoolSize或者已超時(shí)

      *2、隊(duì)列不為空時(shí)保證至少有一個(gè)任務(wù)

      */

      if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))

      &&?(wc?>?1?||?workQueue.isEmpty()))?{

      /*

      *通過(guò)CAS操作,workerCount-1

      *能進(jìn)行-1操作,證明wc大于maximumPoolSize或者已經(jīng)超時(shí)

      */

      if?(compareAndDecrementWorkerCount(c))

      //-1操作成功,返回null

      return?null;

      //-1操作失敗,繼續(xù)循環(huán)

      continue;

      }

      try?{

      /*

      *wc大于核心線程池

      *執(zhí)行poll方法

      *小于核心線程池

      *執(zhí)行take方法

      */

      Runnable?r?=?timed??

      workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:

      workQueue.take();

      //判斷任務(wù)不為空返回任務(wù)

      if?(r?!=?null)

      return?r;

      //獲取一段時(shí)間沒(méi)有獲取到,獲取超時(shí)

      timedOut?=?true;

      }?catch?(InterruptedException?retry)?{

      timedOut?=?false;

      }

      }

      }

      還是文字解說(shuō)一下上面的代碼邏輯和流程:

      獲取線程池控制狀態(tài)和runState,判斷線程池是否已經(jīng)關(guān)閉或者正在關(guān)閉,是的話則workerCount-1操作返回null

      獲取workerCount判斷是否大于核心線程池

      判斷workerCount是否大于最大線程池?cái)?shù)目或者已經(jīng)超時(shí),是的話workerCount-1,-1成功則返回null,不成功則回到步驟1重新繼續(xù)

      判斷workerCount是否大于核心線程池,大于則用poll方法從隊(duì)列獲取任務(wù),否則用take方法從隊(duì)列獲取任務(wù)

      判斷任務(wù)是否為空,不為空則返回獲取的任務(wù),否則回到步驟1重新繼續(xù)

      接下來(lái)依然有一副流程圖:

      7.processWorkerExit

      明顯的,在執(zhí)行任務(wù)當(dāng)中,會(huì)去獲取任務(wù)進(jìn)行執(zhí)行,那既然是執(zhí)行任務(wù),肯定就會(huì)有執(zhí)行完或者出現(xiàn)異常中斷執(zhí)行的時(shí)候,那這時(shí)候肯定也會(huì)有相對(duì)應(yīng)的操作,至于具體操作是怎么樣的,我們還是直接去看源碼最實(shí)際。

      private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{

      /*

      *completedAbruptly:在runWorker出現(xiàn),代表是否突然完成的意思

      *也就是在執(zhí)行任務(wù)過(guò)程當(dāng)中出現(xiàn)異常,就會(huì)突然完成,傳true

      *

      *如果是突然完成,需要通過(guò)CAS操作,workerCount-1

      *不是突然完成,則不需要-1,因?yàn)間etTask方法當(dāng)中已經(jīng)-1

      *

      *下面的代碼注釋貌似與代碼意思相反了

      */

      if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted

      decrementWorkerCount();

      //生成重入鎖

      final?ReentrantLock?mainLock?=?this.mainLock;

      //獲取鎖

      mainLock.lock();

      try?{

      //線程池統(tǒng)計(jì)的完成任務(wù)數(shù)completedTaskCount加上worker當(dāng)中完成的任務(wù)數(shù)

      completedTaskCount?+=?w.completedTasks;

      //從HashSet中移除

      workers.remove(w);

      }?finally?{

      //釋放鎖

      mainLock.unlock();

      }

      //因?yàn)樯鲜霾僮魇轻尫湃蝿?wù)或線程,所以會(huì)判斷線程池狀態(tài),嘗試終止線程池

      tryTerminate();

      //獲取線程池的控制狀態(tài)

      int?c?=?ctl.get();

      //判斷runState是否小魚STOP,即是RUNNING或者SHUTDOWN

      //如果是RUNNING或者SHUTDOWN,代表沒(méi)有成功終止線程池

      if?(runStateLessThan(c,?STOP))?{

      /*

      *是否突然完成

      *如若不是,代表已經(jīng)沒(méi)有任務(wù)可獲取完成,因?yàn)間etTask當(dāng)中是while循環(huán)

      */

      if?(!completedAbruptly)?{

      /*

      *allowCoreThreadTimeOut:是否允許core?thread超時(shí),默認(rèn)false

      *min-默認(rèn)是corePoolSize

      */

      int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;

      //允許core?thread超時(shí)并且隊(duì)列不為空

      //min為0,即允許core?thread超時(shí),這樣就不需要維護(hù)核心核心線程池了

      //如果workQueue不為空,則至少保持一個(gè)線程存活

      if?(min?==?0?&&?!?workQueue.isEmpty())

      min?=?1;

      //如果workerCount大于min,則表示滿足所需,可以直接返回

      if?(workerCountOf(c)?>=?min)

      return;?//?replacement?not?needed

      }

      //如果是突然完成,添加一個(gè)空任務(wù)的worker線程--這里我也不太理解

      addWorker(null,?false);

      }

      }

      首先判斷線程是否突然終止,如果是突然終止,通過(guò)CAS,workerCount-1

      統(tǒng)計(jì)線程池完成任務(wù)數(shù),并將worker從workers當(dāng)中移除

      判斷線程池狀態(tài),嘗試終止線程池

      線程池沒(méi)有成功終止

      判斷是否突然完成任務(wù),不是則進(jìn)行下一步,是則進(jìn)行第三步

      如允許核心線程超時(shí),隊(duì)列不為空,則至少保證一個(gè)線程存活

      添加一個(gè)空任務(wù)的worker線程

      Worker內(nèi)部類

      我們?cè)谏厦嬉呀?jīng)算是挺詳細(xì)地講了線程池執(zhí)行任務(wù) execute 的執(zhí)行流程和一些細(xì)節(jié),在上面頻繁地出現(xiàn)了一個(gè)字眼,那就是worker實(shí)例,那么這個(gè)worker究竟是什么呢?里面都包含了一些什么信息,以及worker這個(gè)任務(wù)究竟是怎么執(zhí)行的呢?

      我們就在這個(gè)部分來(lái)介紹一下吧,還是直接上源碼:

      我們可以看到Worker內(nèi)部類繼承AQS同步器并且實(shí)現(xiàn)了Runnable接口,所以Worker很明顯就是一個(gè)可執(zhí)行任務(wù)并且又可以控制中斷、起到鎖效果的類。

      private?final?class?Worker

      extends?AbstractQueuedSynchronizer

      implements?Runnable

      {

      /**

      *?This?class?will?never?be?serialized,?but?we?provide?a

      *?serialVersionUID?to?suppress?a?javac?warning.

      */

      private?static?final?long?serialVersionUID?=?6138294804551838833L;

      /**?工作線程,如果工廠失敗則為空.?*/

      final?Thread?thread;

      /**?初始化任務(wù),有可能為空?*/

      Runnable?firstTask;

      /**?已完成的任務(wù)計(jì)數(shù)?*/

      volatile?long?completedTasks;

      /**

      *?創(chuàng)建并初始化第一個(gè)任務(wù),使用線程工廠來(lái)創(chuàng)建線程

      *?初始化有3步

      *1、設(shè)置AQS的同步狀態(tài)為-1,表示該對(duì)象需要被喚醒

      *2、初始化第一個(gè)任務(wù)

      *3、調(diào)用ThreadFactory來(lái)使自身創(chuàng)建一個(gè)線程,并賦值給worker的成員變量thread

      */

      Worker(Runnable?firstTask)?{

      setState(-1);?//?inhibit?interrupts?until?runWorker

      this.firstTask?=?firstTask;

      this.thread?=?getThreadFactory().newThread(this);

      }

      //重寫Runnable的run方法

      /**?Delegates?main?run?loop?to?outer?runWorker?*/

      public?void?run()?{

      //調(diào)用ThreadPoolExecutor的runWorker方法

      runWorker(this);

      }

      //?Lock?methods

      //

      //?The?value?0?represents?the?unlocked?state.

      //?The?value?1?represents?the?locked?state.

      //代表是否獨(dú)占鎖,0-非獨(dú)占?1-獨(dú)占

      protected?boolean?isHeldExclusively()?{

      return?getState()?!=?0;

      }

      //重寫AQS的tryAcquire方法嘗試獲取鎖

      protected?boolean?tryAcquire(int?unused)?{

      //嘗試將AQS的同步狀態(tài)從0改為1

      if?(compareAndSetState(0,?1))?{

      //如果改變成,則將當(dāng)前獨(dú)占模式的線程設(shè)置為當(dāng)前線程并返回true

      setExclusiveOwnerThread(Thread.currentThread());

      return?true;

      }

      //否則返回false

      return?false;

      }

      //重寫AQS的tryRelease嘗試釋放鎖

      protected?boolean?tryRelease(int?unused)?{

      //設(shè)置當(dāng)前獨(dú)占模式的線程為null

      setExclusiveOwnerThread(null);

      //設(shè)置AQS同步狀態(tài)為0

      setState(0);

      //返回true

      return?true;

      }

      //獲取鎖

      public?void?lock()?{?acquire(1);?}

      //嘗試獲取鎖

      public?boolean?tryLock()?{?return?tryAcquire(1);?}

      //釋放鎖

      public?void?unlock()?{?release(1);?}

      //是否被獨(dú)占

      public?boolean?isLocked()?{?return?isHeldExclusively();?}

      void?interruptIfStarted()?{

      Thread?t;

      if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{

      try?{

      t.interrupt();

      }?catch?(SecurityException?ignore)?{

      }

      }

      }

      }

      小結(jié)

      寫這個(gè)線程池就真的是不容易了,歷時(shí)兩個(gè)星期,中途有很多的地方不懂,而且《Java并發(fā)編程的藝術(shù)》的這本書當(dāng)中對(duì)線程池的介紹其實(shí)并不算多,所以自己看起來(lái)也挺痛苦的,還經(jīng)常會(huì)看了這個(gè)方法就不知道為什么要調(diào)用這個(gè)以及調(diào)用這個(gè)方法是出何用意。而且在這學(xué)習(xí)的過(guò)程當(dāng)中,有在懷疑自己的學(xué)習(xí)方法對(duì)不對(duì),因?yàn)橐灿腥烁艺f(shuō)不需要一句句去看去分析源碼,只需要知道流程就可以了,但是后來(lái)還是想想按照自己的學(xué)習(xí)路線走,多讀源碼總是有好處的,在這里我也給程序猿一些建議,有自己的學(xué)習(xí)方法的時(shí)候,按照自己的方式堅(jiān)定走下去。

      開發(fā)者 編程語(yǔ)言

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:后端開發(fā)實(shí)踐系列——Spring Boot項(xiàng)目模板
      下一篇:公式樹開源庫(kù)分析
      相關(guān)文章
      亚洲一区二区三区国产精华液| 亚洲精品国产情侣av在线| 久久精品视频亚洲| 亚洲视频在线免费| 亚洲高清免费视频| 亚洲国产精品成人久久蜜臀 | 亚洲毛片免费观看| 国产日韩成人亚洲丁香婷婷| 亚洲av无码av制服另类专区| 亚洲Av熟妇高潮30p| 亚洲专区先锋影音| 亚洲精品熟女国产| 亚洲欧洲日产韩国在线| 亚洲国产美女福利直播秀一区二区| 亚洲短视频在线观看| 亚洲人成片在线观看| 国产成人精品日本亚洲专一区| 亚洲国产精品免费观看| 国产精品亚洲综合久久| 亚洲精品国产首次亮相| WWW亚洲色大成网络.COM | 另类图片亚洲校园小说区| 亚洲av午夜成人片精品电影| 亚洲人成影院在线观看| 久久夜色精品国产亚洲av | 亚洲综合国产成人丁香五月激情| 亚洲日本一线产区和二线| 亚洲爆乳无码精品AAA片蜜桃| 国产精品无码亚洲一区二区三区| 无码专区一va亚洲v专区在线 | 色噜噜AV亚洲色一区二区| 亚洲国产精品SSS在线观看AV| 亚洲日本va午夜中文字幕一区| 亚洲高清视频在线播放| 亚洲一区二区影视| 亚洲AV日韩AV一区二区三曲| 亚洲国产一区国产亚洲| 久久久亚洲AV波多野结衣 | 亚洲香蕉久久一区二区| 性色av极品无码专区亚洲| 亚洲天堂在线视频|