從一簡單Java單例示例談談并發(下)

      網友投稿 782 2022-05-30

      locks

      鎖是用來控制多個線程訪問共享資源的形式,Java SE 5之后,J.U.C中新增了locks來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能。只是在使用時需要顯示的獲取和釋放鎖。雖然它缺少了隱式獲取和釋放鎖的便捷性,但是卻擁有了鎖獲取和釋放的可操作性、可中斷的獲取鎖及超時獲取鎖等多種synchronized關鍵字不具備的同步特性。

      locks在這我們只介紹下核心的AQS(AbstractQueuedSynchronizer,隊列同步器),AQS是用來構建鎖或者其他同步組件的基礎框架,它使用一個用volatile修飾的int成員變量表示同步狀態。通過內置的FIFO隊列來完成資源獲取線程的排隊工作。同步器的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態,在抽象方法的實現過程免不了要對同步狀態進行更改,這時候就會使用到AQS提供的3個方法:getState()、setState()和compareAndSetState()來進行操作,這是因為它們能夠保證狀態的改變是原子性的。為什么這么設計呢?因為鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現細節,而AQS面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操作。鎖和AQS很好的隔離了使用者和實現者鎖關注的領域。

      現在我們就自定義一個獨占鎖來詳細解釋下AQS的實現機制

      public?class?Mutex?implements?Lock?{? private?static?class?Sync?extends?AbstractQueuedSynchronizer?{? private?static?final?long?serialVersionUID?=?-4387327721959839431L;? protected?boolean?isHeldExclusively()?{? return?getState()?==?1;? }? public?boolean?tryAcquire(int?acquires)?{? assert?acquires?==?1;? //?Otherwise?unused? if?(compareAndSetState(0,?1))?{? setExclusiveOwnerThread(Thread.currentThread());? return?true;? }? return?false;? }? protected?boolean?tryRelease(int?releases)?{? assert?releases?==?1;? //?Otherwise?unused? if?(getState()?==?0)?throw?new?IllegalMonitorStateException();? setExclusiveOwnerThread(null);? setState(0);? return?true;? }? Condition?newCondition()?{? return?new?ConditionObject();? }? }? private?final?Sync?sync?=?new?Sync();? public?void?lock()?{? sync.acquire(1);? }? public?boolean?tryLock()?{? return?sync.tryAcquire(1);? }? public?void?unlock()?{? sync.release(1);? }? public?Condition?newCondition()?{? return?sync.newCondition();? }? public?boolean?isLocked()?{? return?sync.isHeldExclusively();? }? public?boolean?hasQueuedThreads()?{? return?sync.hasQueuedThreads();? }? public?void?lockInterruptibly()?throws?InterruptedException?{? sync.acquireInterruptibly(1);? }? public?boolean?tryLock(long?timeout,?TimeUnit?unit)?throws?InterruptedException?{? return?sync.tryAcquireNanos(1,?unit.toNanos(timeout));? }? }

      實現自定義組件的時候,我們可以看到,AQS可重寫的方法是tryAcquire()——獨占式獲取同步狀態、tryRelease()——獨占式釋放同步狀態、tryAcquireShared()——共享式獲取同步狀態、tryReleaseShared ()——共享式釋放同步狀態、isHeldExclusively()——是否被當前線程所獨占。這個示例中,獨占鎖Mutex是一個自定義同步組件,它在同一時刻只允許一個線程占有鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器并實現了獨占式獲取和釋放同步狀態。在tryAcquire()中,如果經過CAS設置成功(同步狀態設置為1),則表示獲取了同步狀態,而在tryRelease()中,只是將同步狀態重置為0。接著我們對比一下重入鎖(ReentrantLock)的源碼實現

      public?class?ReentrantLock?implements?Lock,?java.io.Serializable?{? private?static?final?long?serialVersionUID?=?7373984872572414699L;? /**?Synchronizer?providing?all?implementation?mechanics?*/? private?final?Sync?sync;? /**?*?Base?of?synchronization?control?for?this?lock.?Subclassed? *?into?fair?and?nonfair?versions?below.?Uses?AQS?state?to? *?represent?the?number?of?holds?on?the?lock.?*/? abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{? private?static?final?long?serialVersionUID?=?-5179523762034025860L;? /**?*?Performs?{@link?Lock#lock}.?The?main?reason?for?subclassing? *?is?to?allow?fast?path?for?nonfair?version.?*/? abstract?void?lock();? /**?*?Performs?non-fair?tryLock.?tryAcquire?is? *?implemented?in?subclasses,?but?both?need?nonfair? *?try?for?trylock?method.?*/? final?boolean?nonfairTryAcquire(int?acquires)?{? final?Thread?current?=?Thread.currentThread();? int?c?=?getState();? if?(c?==?0)?{? if?(compareAndSetState(0,?acquires))?{? setExclusiveOwnerThread(current);?return?true;? }? }?else?if?(current?==?getExclusiveOwnerThread())?{ int?nextc?=?c?+?acquires;? if?(nextc?

      重入鎖分公平鎖和不公平鎖,默認使用的是不公平鎖,在這我們看到實現重入鎖大體上跟我們剛才自定義的獨占鎖差不多,但是有什么區別呢?我們看看重入鎖nonfairTryAcquire()方法實現:首先獲取同步狀態(默認是0),如果是0的話,CAS設置同步狀態,非0的話則判斷當前線程是否已占有鎖,如果是的話,則偏向更新同步狀態。從這里我們不難推斷出重入鎖的概念,同一個線程可以多次獲得同一把鎖,在釋放的時候也必須釋放相同次數的鎖。通過對比相信大家對自定義一個鎖有了一個初步的概念,也許你存在疑問我們重寫的這幾個方法在AQS哪地方用呢?現在我們來繼續往下跟蹤,我們深入跟蹤下剛才自定義獨占鎖lock()方法里面acquire()的實現

      這個方法在AQS類里面,看到里面的tryAcquire(arg)大家也就明白了,tryAcquire(arg)方法獲取同步狀態,后面acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法就是說的節點構造、加入同步隊列及在同步隊列中自旋等待的AQS沒暴露給我們的相關操作。大體的流程就是首先調用自定義同步器實現的tryAcquire()方法,該方法保證線程安全的獲取同步狀態,如果獲取同步狀態失敗,則構造同步節點(獨占式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)并通過addWaiter()方法將該節點加入到同步隊列的尾部,最后調用acquireQueued()方法,使得該節點以“死循環”的方式獲取同步狀態。如果獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要靠前驅節點的出隊或阻塞線程被中斷來實現。也許你還是不明白剛才所說的,那么我們繼續跟蹤下addWaiter()方法的實現

      上面的代碼通過使用compareAndSetTail()方法來確保節點能夠被線程安全添加。在enq()方法中,同步器通過“死循環”來確保節點的正確添加,在”死循環“中只有通過CAS將節點設置成為尾節點之后,當前線程才能夠從該方法返回,否則,當前線程不斷地嘗試重試設置。

      在節點進入同步隊列之后,發生了什么呢?現在我們繼續跟蹤下acquireQueued()方法

      從上面的代碼我們不難看出,節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說每個線程)都在自省的觀察,當條件滿足時(自己的前驅節點是頭節點就進行CAS設置同步狀態)就獲得同步狀態,然后就可以從自旋的過程中退出,否則依舊在這個自旋的過程中。

      collections

      從前面的思維導圖我們可以看到并發容器包括鏈表、隊列、HashMap等.它們都是線程安全的。

      ConcurrentHashMap : 一個高效的線程安全的HashMap。

      CopyOnWriteArrayList : 在讀多寫少的場景中,性能非常好,遠遠高于vector。

      ConcurrentLinkedQueue : 高效并發隊列,使用鏈表實現,可以看成線程安全的LinkedList。

      BlockingQueue : 一個接口,JDK內部通過鏈表,數組等方式實現了這個接口,表示阻塞隊列,非常適合用作數據共享 。

      ConcurrentSkipListMap : 跳表的實現,這是一個Map,使用跳表數據結構進行快速查找 。

      另外Collections工具類可以幫助我們將任意集合包裝成線程安全的集合。在這里重點說下ConcurrentHashMap和BlockingQueue這兩個并發容器。

      我們都知道HashMap線程不安全的,而我們可以通過Collections.synchronizedMap(new HashMap<>())來包裝一個線程安全的HashMap或者使用線程安全的HashTable,但是它們的效率都不是很好,這時候我們就有了ConcurrentHashMap。為什么ConcurrentHashMap高效且線程安全呢?其實它使用了鎖分段技術來提高了并發的訪問率。假如容器里有多把鎖,每一把鎖用于鎖容器的一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效地提高并發訪問效率,這就是鎖分段技術。首先將數據分成一段段的存儲,然后給每段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。而既然數據被分成了多個段,線程如何定位要訪問的段的數據呢?這里其實是通過散列算法來定位的。

      現在來談談阻塞隊列,阻塞隊列其實跟后面要談的線程池息息相關的,JDK7提供了7個阻塞隊列,分別是

      ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。

      LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。

      PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。

      DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。

      SynchronousQueue:一個不存儲元素的阻塞隊列。

      LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。

      LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

      如果隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,讓生產者和消費者能夠高效率的進行通訊呢?讓我們先來看看JDK是如何實現的。

      使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列里添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。通過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼如下:

      當我們往隊列里插入一個元素時,如果隊列不可用,阻塞生產者主要通過LockSupport.park(this)來實現

      繼續進入源碼,發現調用setBlocker先保存下將要阻塞的線程,然后調用unsafe.park阻塞當前線程。

      unsafe.park是個native方法,代碼如下:

      park這個方法會阻塞當前線程,只有以下四種情況中的一種發生時,該方法才會返回。

      與park對應的unpark執行或已經執行時。注意:已經執行是指unpark先執行,然后再執行的park。

      線程被中斷時。

      如果參數中的time不是零,等待了指定的毫秒數時。

      發生異常現象時。這些異常事先無法確定。

      我們繼續看一下JVM是如何實現park方法的,park在不同的操作系統使用不同的方式實現,在linux下是使用的是系統方法pthread_cond_wait實現。實現代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代碼如下:

      pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思可以理解為線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實現的。park 在windows下則是使用WaitForSingleObject實現的。

      當隊列滿時,生產者往阻塞隊列里插入一個元素,生產者線程會進入WAITING (parking)狀態。

      executor

      Executor框架提供了各種類型的線程池,不同的線程池應用了前面介紹的不同的堵塞隊列

      Executor框架最核心的類是ThreadPoolExecutor,它是線程池的實現類。 對于核心的幾個線程池,無論是newFixedThreadPool()、newSingleThreadExecutor()還是newCacheThreadPool()方法,雖然看起來創建的線程具有完全不同的功能特點,但其內部均使用了ThreadPoolExecutor實現

      newFixedThreadPool()方法的實現,它返回一個corePoolSize和maximumPoolSize一樣的,并使用了LinkedBlockingQueue任務隊列(無界隊列)的線程池。當任務提交非常頻繁時,該隊列可能迅速膨脹,從而系統資源耗盡。

      newSingleThreadExecutor()返回單線程線程池,是newFixedThreadPool()方法的退化,只是簡單的將線程池數量設置為1。

      newCachedThreadPool()方法返回corePoolSize為0而maximumPoolSize無窮大的線程池,這意味著沒有任務的時候線程池內沒有現場,而當任務提交時,該線程池使用空閑線程執行任務,若無空閑則將任務加入SynchronousQueue隊列,而SynchronousQueue隊列是直接提交隊列,它總是破事線程池增加新的線程來執行任務。當任務執行完后由于corePoolSize為0,因此空閑線程在指定時間內(60s)被回收。對于newCachedThreadPool(),如果有大量任務提交,而任務又不那么快執行時,那么系統變回開啟等量的線程處理,這樣做法可能會很快耗盡系統的資源,因為它會增加無窮大數量的線程。

      由以上線程池的實現可以看到,它們都只是ThreadPoolExecutor類的封裝。我們看下ThreadPoolExecutor最重要的構造函數:

      從一個簡單的Java單例示例談談并發(下)

      ThreadPoolExecutor的任務調度邏輯如下

      從上圖我們可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:

      首先線程池判斷基本線程池是否已滿,如果沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。

      其次線程池判斷工作隊列是否已滿,如果沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。

      最后線程池判斷整個線程池是否已滿,如果沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。

      下面我們來看看ThreadPoolExecutor核心調度代碼

      從上面的源碼我們可以知道execute的執行步驟:

      如果當前運行的線程少于corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。

      如果運行的線程等于或多于corePoolSize,則將任務加入到BlockingQueue。

      如果無法將任務假如BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。

      如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,并調用RejectedExecutionHandler.rejectedExecution()方法。

      ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能的避免獲取全局鎖(那將會是一個嚴重的 可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后(當前運行的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要獲取全局鎖。

      參考閱讀

      本文部分內容參考自《Java并發編程的藝術》、《深入理解Java虛擬機(第2版)》、《實戰Java高并發程序設計》、《深入Java內存模型》、《Java并發編程實踐》,感興趣的可自行查閱。

      本文轉載自異步社區。

      軟件開發 Web應用防火墻 WAF

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:LabVIEW控制Arduino LED燈閃爍(基礎篇—2)
      下一篇:linux之strace命令跟蹤進程的系統調用
      相關文章
      亚洲精品在线观看视频| 最新亚洲人成网站在线观看| 亚洲日本在线观看视频| 国产AV无码专区亚洲AV麻豆丫| 亚洲av无码专区在线| 亚洲福利视频网址| 亚洲午夜电影在线观看| 亚洲一级视频在线观看| 亚洲一级视频在线观看| 国产亚洲精品VA片在线播放| 亚洲中文字幕无码爆乳app| 国产亚洲中文日本不卡二区| 亚洲色大网站WWW永久网站| 亚洲无人区码一二三码区别图片| 一本色道久久88亚洲精品综合| 成人区精品一区二区不卡亚洲| 亚洲午夜理论片在线观看| 亚洲国产一区二区三区在线观看| 亚洲第一街区偷拍街拍| 色偷偷噜噜噜亚洲男人| 亚洲电影日韩精品| 久久久久一级精品亚洲国产成人综合AV区 | 亚洲国产午夜中文字幕精品黄网站| 国产亚洲精品欧洲在线观看| 亚洲AⅤ无码一区二区三区在线| 亚洲AV网站在线观看| 精品亚洲一区二区三区在线播放| 337p日本欧洲亚洲大胆裸体艺术| 亚洲色婷婷六月亚洲婷婷6月 | 国产精品亚洲а∨无码播放| 国产AV无码专区亚洲AV毛网站 | 亚洲熟女乱色一区二区三区| 亚洲av无码一区二区三区天堂 | 亚洲国产精品无码久久青草| 中文字幕亚洲激情| 国产v亚洲v天堂无码网站| 亚洲一区精品中文字幕| 亚洲午夜电影在线观看| 亚洲AV无码一区二区三区电影 | 亚洲中文字幕第一页在线| 久久夜色精品国产亚洲AV动态图|