Java生產(chǎn)者消費(fèi)者問題的演變

      網(wǎng)友投稿 956 2022-05-29

      想要了解更多關(guān)于Java生產(chǎn)者消費(fèi)者問題的演變嗎?那就看看這篇文章吧,我們分別用舊方法和新方法來(lái)處理這個(gè)問題。

      生產(chǎn)者消費(fèi)者問題是一個(gè)典型的多進(jìn)程同步問題。

      對(duì)于大多數(shù)人來(lái)說(shuō),這個(gè)問題可能是我們?cè)趯W(xué)校,執(zhí)行第一次并行算法所遇到的第一個(gè)同步問題。

      雖然它很簡(jiǎn)單,但一直是并行計(jì)算中的最大挑戰(zhàn) - 多個(gè)進(jìn)程共享一個(gè)資源。

      問題陳述

      生產(chǎn)者和消費(fèi)者兩個(gè)程序,共享一個(gè)大小有限的公共緩沖區(qū)。

      假設(shè)一個(gè)生產(chǎn)者“生產(chǎn)”一份數(shù)據(jù)并將其存儲(chǔ)在緩沖區(qū)中,而一個(gè)消費(fèi)者“消費(fèi)”這份數(shù)據(jù),并將這份數(shù)據(jù)從緩沖區(qū)中刪除。

      再假設(shè)現(xiàn)在這兩個(gè)程序在并發(fā)地運(yùn)行,我們需要確保當(dāng)緩沖區(qū)的數(shù)據(jù)已滿時(shí),生產(chǎn)者不會(huì)放置新數(shù)據(jù)進(jìn)來(lái),也要確保當(dāng)緩沖區(qū)的數(shù)據(jù)為空時(shí),消費(fèi)者不會(huì)試圖刪除數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)。

      解決方案

      為了解決上述的并發(fā)問題,生產(chǎn)者和消費(fèi)者將不得不相互通信。

      如果緩沖區(qū)已滿,生產(chǎn)者將處于睡眠狀態(tài),直到有通知信息喚醒。

      在消費(fèi)者將一些數(shù)據(jù)從緩沖區(qū)刪除后,消費(fèi)者將通知生產(chǎn)者,隨后生產(chǎn)者將重新開始填充數(shù)據(jù)到緩沖區(qū)中。

      如果緩沖區(qū)內(nèi)容為空的化,那么情況是一樣的,只不過,消費(fèi)者會(huì)先等待生產(chǎn)者的通知。

      但如果這種溝通做得不恰當(dāng),在進(jìn)程彼此等待的位置可能導(dǎo)致程序死鎖。

      經(jīng)典的方法

      首先來(lái)看一個(gè)典型的Java方案來(lái)解決這個(gè)問題。

      package?ProducerConsumer;import?java.util.LinkedList;import?java.util.Queue;public?class?ClassicProducerConsumerExample?{ ????public?static?void?main(String[]?args)?throws?InterruptedException?{ ????????Buffer?buffer?=?new?Buffer(2); ????????Thread?producerThread?=?new?Thread(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????buffer.produce(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????}); ????????Thread?consumerThread?=?new?Thread(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????buffer.consume(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????}); ????????producerThread.start(); ????????consumerThread.start(); ????????producerThread.join(); ????????consumerThread.join(); ????} ????static?class?Buffer?{ ????????private?Queue?list; ????????private?int?size; ????????public?Buffer(int?size)?{ ????????????this.list?=?new?LinkedList<>(); ????????????this.size?=?size; ????????} ????????public?void?produce()?throws?InterruptedException?{ ????????????int?value?=?0; ????????????while?(true)?{ ????????????????synchronized?(this)?{ ????????????????????while?(list.size()?>=?size)?{ ????????????????????????//?wait?for?the?consumer ????????????????????????wait(); ????????????????????} ????????????????????list.add(value); ????????????????????System.out.println("Produced?"?+?value); ????????????????????value++; ????????????????????//?notify?the?consumer ????????????????????notify(); ????????????????????Thread.sleep(1000); ????????????????} ????????????} ????????} ????????public?void?consume()?throws?InterruptedException?{ ????????????while?(true)?{ ????????????????synchronized?(this)?{ ????????????????????while?(list.size()?==?0)?{ ????????????????????????//?wait?for?the?producer ????????????????????????wait(); ????????????????????} ????????????????????int?value?=?list.poll(); ????????????????????System.out.println("Consume?"?+?value); ????????????????????//?notify?the?producer ????????????????????notify(); ????????????????????Thread.sleep(1000); ????????????????} ????????????} ????????} ????}}

      這里我們有生產(chǎn)者和消費(fèi)者兩個(gè)線程,它們共享一個(gè)公共緩沖區(qū)。生產(chǎn)者線程開始產(chǎn)生新的元素并將它們存儲(chǔ)在緩沖區(qū)。如果緩沖區(qū)已滿,那么生產(chǎn)者線程進(jìn)入睡眠狀態(tài),直到有通知喚醒。否則,生產(chǎn)者線程將會(huì)在緩沖區(qū)創(chuàng)建一個(gè)新元素然后通知消費(fèi)者。就像我之前說(shuō)的,這個(gè)過程也適用于消費(fèi)者。如果緩沖區(qū)為空,那么消費(fèi)者將等待生產(chǎn)者的通知。否則,消費(fèi)者將從緩沖區(qū)刪除一個(gè)元素并通知生產(chǎn)者。

      正如你所看到的,在之前的例子中,生產(chǎn)者和消費(fèi)者的工作都是管理緩沖區(qū)的對(duì)象。這些線程僅僅調(diào)用了buffer.produce()和buffer.consume()兩個(gè)方法就搞定了一切。

      對(duì)于緩沖區(qū)是否應(yīng)該負(fù)責(zé)創(chuàng)建或者刪除元素,一直都是一個(gè)有爭(zhēng)議的話題,但在我看來(lái),緩沖區(qū)不應(yīng)該做這種事情。當(dāng)然,這取決于你想要達(dá)到的目的,但在這種情況下,緩沖區(qū)應(yīng)該只是負(fù)責(zé)以線程安全的形式存儲(chǔ)合并元素,而不是生產(chǎn)新的元素。

      所以,讓我們把生產(chǎn)和消費(fèi)的邏輯從緩沖對(duì)象中進(jìn)行解耦。

      package?ProducerConsumer;import?java.util.LinkedList;import?java.util.Queue;public?class?ProducerConsumerExample2?{ ????public?static?void?main(String[]?args)?throws?InterruptedException?{ ????????Buffer?buffer?=?new?Buffer(2); ????????Thread?producerThread?=?new?Thread(()?->?{ ????????????try?{ ????????????????int?value?=?0; ????????????????while?(true)?{ ????????????????????buffer.add(value); ????????????????????System.out.println("Produced?"?+?value); ????????????????????value?++; ????????????????????Thread.sleep(1000); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}); ????????Thread?consumerThread?=?new?Thread(()?->?{ ????????????try?{ ????????????????while?(true)?{ ????????????????????int?value?=?buffer.poll(); ????????????????????System.out.println("Consume?"?+?value); ????????????????????Thread.sleep(1000); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}); ????????producerThread.start(); ????????consumerThread.start(); ????????producerThread.join(); ????????consumerThread.join(); ????} ????static?class?Buffer?{ ????????private?Queue?list; ????????private?int?size; ????????public?Buffer(int?size)?{ ????????????this.list?=?new?LinkedList<>(); ????????????this.size?=?size; ????????} ????????public?void?add(int?value)?throws?InterruptedException?{ ????????????synchronized?(this)?{ ????????????????while?(list.size()?>=?size)?{ ????????????????????wait(); ????????????????} ????????????????list.add(value); ????????????????notify(); ????????????} ????????} ????????public?int?poll()?throws?InterruptedException?{ ????????????synchronized?(this)?{ ????????????????while?(list.size()?==?0)?{ ????????????????????wait(); ????????????????} ????????????????int?value?=?list.poll(); ????????????????notify(); ????????????????return?value; ????????????} ????????} ????}}

      這樣好多了,至少現(xiàn)在緩沖區(qū)僅僅負(fù)責(zé)以線程安全的形式來(lái)存儲(chǔ)和刪除元素。

      隊(duì)列阻塞(BlockingQueue)

      不過,我們還可以進(jìn)一步改善。

      在前面的例子中,我們已經(jīng)創(chuàng)建了一個(gè)緩沖區(qū),每當(dāng)存儲(chǔ)一個(gè)元素之前,緩沖區(qū)將等待是否有可用的一個(gè)槽以防止沒有足夠的存儲(chǔ)空間,并且,在合并之前,緩沖區(qū)也會(huì)等待一個(gè)新的元素出現(xiàn),以確保存儲(chǔ)和刪除的操作是線程安全的。

      但是,Java本身的庫(kù)已經(jīng)整合了這些操作。它被稱之為BlockingQueue,在這里可以查看它的詳細(xì)文檔。

      BlockingQueue是一個(gè)以線程安全的形式存入和取出實(shí)例的隊(duì)列。而這就是我們所需要的。

      所以,如果我們?cè)谑纠惺褂肂lockingQueue,我們就不需要再去實(shí)現(xiàn)等待和通知的機(jī)制。

      接下來(lái),我們來(lái)看看具體的代碼。

      package?ProducerConsumer;import?java.util.concurrent.BlockingQueue;import?java.util.concurrent.LinkedBlockingDeque;public?class?ProducerConsumerWithBlockingQueue?{ ????public?static?void?main(String[]?args)?throws?InterruptedException?{ ????????BlockingQueue?blockingQueue?=?new?LinkedBlockingDeque<>(2); ????????Thread?producerThread?=?new?Thread(()?->?{ ????????????try?{ ????????????????int?value?=?0; ????????????????while?(true)?{ ????????????????????blockingQueue.put(value); ????????????????????System.out.println("Produced?"?+?value); ????????????????????value++; ????????????????????Thread.sleep(1000); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}); ????????Thread?consumerThread?=?new?Thread(()?->?{ ????????????try?{ ????????????????while?(true)?{ ????????????????????int?value?=?blockingQueue.take(); ????????????????????System.out.println("Consume?"?+?value); ????????????????????Thread.sleep(1000); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}); ????????producerThread.start(); ????????consumerThread.start(); ????????producerThread.join(); ????????consumerThread.join(); ????}}

      雖然runnables看起來(lái)跟之前一樣,他們按照之前的方式生產(chǎn)和消費(fèi)元素。

      唯一的區(qū)別在于,這里我們使用blockingQueue代替緩沖區(qū)對(duì)象。

      關(guān)于Blocking Queue的更多細(xì)節(jié)

      這兒有很多種類型的BlockingQueue:

      ×××隊(duì)列

      有界隊(duì)列

      一個(gè)×××隊(duì)列幾乎可以無(wú)限地增加元素,任何添加操作將不會(huì)被阻止。

      你可以以這種方式去創(chuàng)建一個(gè)×××隊(duì)列:

      BlockingQueue?blockingQueue?=?new?LinkedBlockingDeque<>();

      在這種情況下,由于添加操作不會(huì)被阻塞,生產(chǎn)者添加新元素時(shí)可以不用等待。每次當(dāng)生產(chǎn)者想要添加一個(gè)新元素時(shí),會(huì)有一個(gè)隊(duì)列先存儲(chǔ)它。但是,這里面也存在一個(gè)異常需要捕獲。如果消費(fèi)者刪除元素的速度比生產(chǎn)者添加新的元素要慢,那么內(nèi)存將被填滿,我們將可能得到一個(gè)OutOfMemory異常。

      與之相反的則是有界隊(duì)列,存在一個(gè)固定大小。你可以這樣去創(chuàng)建它:

      BlockingQueue?blockingQueue?=?new?LinkedBlockingDeque<>(10);

      兩者最主要的區(qū)別在于,使用有界隊(duì)列的情況下,如果隊(duì)列內(nèi)存已滿,而生產(chǎn)者仍然試圖往里面塞元素,那么隊(duì)列將會(huì)被阻塞(具體阻塞方式取決于添加元素的方法)直到有足夠的空間騰出來(lái)。

      往blocking queue里面添加元素一共有以下四種方式:

      add() - 如果插入成功返回true,否則拋出IllegalStateException

      put() - 往隊(duì)列中插入元素,并在有必要的情況下等待一個(gè)可用的槽(slot)

      offer() - 如果插入元素成功返回true,否則返回false

      offer(E e, long timeout, TimeUnit unit) – 在隊(duì)列沒有滿的情況下,或者為了一個(gè)可用的slot而等待指定的時(shí)間后,往隊(duì)列中插入一個(gè)元素。

      所以,如果你使用put()方法插入元素,而隊(duì)列內(nèi)存已滿的情況下,我們的生產(chǎn)者就必須等待,直到有可用的slot出現(xiàn)。

      以上就是我們上一個(gè)案例的全部,這跟ProducerConsumerExample2的工作原理是一樣的。

      使用線程池

      還有什么地方我們可以優(yōu)化的?那首先來(lái)分析一下我們干了什么,我們實(shí)例化了兩個(gè)線程,一個(gè)被叫做生產(chǎn)者,專門往隊(duì)列里面塞元素,另一個(gè)被叫做消費(fèi)者,負(fù)責(zé)從隊(duì)列里面刪元素。

      然而,好的軟件技術(shù)表明,手動(dòng)地去創(chuàng)建和銷毀線程是不好的做法。首先創(chuàng)建線程是一項(xiàng)昂貴的任務(wù),每創(chuàng)建一個(gè)線程,意味著要經(jīng)歷一遍下面的步驟:

      首先要分配內(nèi)存給一個(gè)線程堆棧

      操作系統(tǒng)要?jiǎng)?chuàng)建一個(gè)原生線程對(duì)應(yīng)于Java的線程

      跟這個(gè)線程相關(guān)的描述符被添加到JVM內(nèi)部的數(shù)據(jù)結(jié)構(gòu)中

      首先別誤會(huì)我,我們的案例中用了幾個(gè)線程是沒有問題的,而那也是并發(fā)工作的方式之一。這里的問題是,我們是手動(dòng)地去創(chuàng)建線程,這可以說(shuō)是一次糟糕的實(shí)踐。如果我們手動(dòng)地創(chuàng)建線程,除了創(chuàng)建過程中的消耗外,還有另一個(gè)問題,就是我們無(wú)法控制同時(shí)有多少個(gè)線程在運(yùn)行。舉個(gè)例子,如果同時(shí)有一百萬(wàn)次請(qǐng)求線上服務(wù),那么每一次請(qǐng)求都會(huì)相應(yīng)的創(chuàng)建一個(gè)線程,那么同時(shí)會(huì)有一百萬(wàn)個(gè)線程在后臺(tái)運(yùn)行,這將會(huì)導(dǎo)致[thread starvation](https://en.wikipedia.org/wiki/Starvation_(computer_science))

      所以,我們需要一種全局管理線程的方式,這就用到了線程池。

      線程池將基于我們選擇的策略來(lái)處理線程的生命周期。它擁有有限數(shù)量的空閑線程,并在需要解決任務(wù)時(shí)啟用它們。通過這種方式,我們不需要為每一個(gè)新的請(qǐng)求創(chuàng)建一個(gè)新線程,因此,我們可以避免出現(xiàn)線程饑餓的問題。

      Java線程池的實(shí)現(xiàn)包括:

      一個(gè)任務(wù)隊(duì)列

      一個(gè)工作線程的集合

      一個(gè)線程工廠

      管理線程池狀態(tài)的元數(shù)據(jù)

      為了同時(shí)運(yùn)行一些任務(wù),你必須把他們先放到任務(wù)隊(duì)列里。然后,當(dāng)一個(gè)線程可用的時(shí)候,它將接收一個(gè)任務(wù)并運(yùn)行它。可用的線程越多,并行執(zhí)行的任務(wù)就越多。

      除了管理線程生命周期,使用線程池還有另一個(gè)好處,當(dāng)你計(jì)劃如何分割任務(wù),以便同時(shí)執(zhí)行時(shí),你能想到更多種方式。并行性的單位不再是線程了,而是任務(wù)。你設(shè)計(jì)一些任務(wù)來(lái)并發(fā)執(zhí)行,而不是讓一些線程通過共享公共的內(nèi)存塊來(lái)并發(fā)運(yùn)行。按照功能需求來(lái)思考的方式可以幫助我們避免一些常見的多線程問題,如死鎖或數(shù)據(jù)競(jìng)爭(zhēng)等。沒有什么可以阻止我們?cè)俅紊钊脒@些問題,但是,由于使用了功能范式,我們沒辦法命令式地同步并行計(jì)算(鎖)。這比直接使用線程和共享內(nèi)存所能碰到的幾率要少的多。在我們的例子中,共享一個(gè)阻塞隊(duì)列不是想要的情況,但我就是想強(qiáng)調(diào)這個(gè)優(yōu)勢(shì)。

      在這里和這里你可以找到更多有關(guān)線程池的內(nèi)容。

      說(shuō)了那么多,接下來(lái)我們看看在案例中如何使用線程池。

      Java中生產(chǎn)者與消費(fèi)者問題的演變

      package?ProducerConsumer;import?java.util.concurrent.BlockingQueue;import?java.util.concurrent.ExecutorService;import?java.util.concurrent.Executors;import?java.util.concurrent.LinkedBlockingDeque;public?class?ProducerConsumerExecutorService?{ ????public?static?void?main(String[]?args)?{ ????????BlockingQueue?blockingQueue?=?new?LinkedBlockingDeque<>(2); ????????ExecutorService?executor?=?Executors.newFixedThreadPool(2); ????????Runnable?producerTask?=?()?->?{ ????????????try?{ ????????????????int?value?=?0; ????????????????while?(true)?{ ????????????????????blockingQueue.put(value); ????????????????????System.out.println("Produced?"?+?value); ????????????????????value++; ????????????????????Thread.sleep(1000); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}; ????????Runnable?consumerTask?=?()?->?{ ????????????try?{ ????????????????while?(true)?{ ????????????????????int?value?=?blockingQueue.take(); ????????????????????System.out.println("Consume?"?+?value); ????????????????????Thread.sleep(1000); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}; ????????executor.execute(producerTask); ????????executor.execute(consumerTask); ????????executor.shutdown(); ????}}

      這里的區(qū)別在于,我們不在手動(dòng)創(chuàng)建或運(yùn)行消費(fèi)者和生產(chǎn)者線程。我們建立一個(gè)線程池,它將收到兩個(gè)任務(wù),生產(chǎn)者和消費(fèi)者的任務(wù)。生產(chǎn)者和消費(fèi)者的任務(wù),實(shí)際上跟之前例子里面使用的runnable是相同的。現(xiàn)在,執(zhí)行程序(線程池實(shí)現(xiàn))將接收任務(wù),并安排它的工作線程去執(zhí)行他們。

      在我們簡(jiǎn)單的案例下,一切都跟之前一樣運(yùn)行。就像之前的例子,我們?nèi)匀挥袃蓚€(gè)線程,他們?nèi)匀灰酝瑯拥姆绞缴a(chǎn)和消費(fèi)元素。雖然我們并沒有讓性能得到提升,但是代碼看起來(lái)干凈多了。我們不再手動(dòng)創(chuàng)建線程,而只是具體說(shuō)明我們想要什么:我們想要并發(fā)執(zhí)行某些任務(wù)。

      所以,當(dāng)你使用一個(gè)線程池時(shí)。你不需要考慮線程是并發(fā)執(zhí)行的單位,相反的,你把一些任務(wù)看作并發(fā)執(zhí)行的就好。以上就是你需要知道的,剩下的由執(zhí)行程序去處理。執(zhí)行程序會(huì)收到一些任務(wù),然后,它會(huì)分配工作線程去處理它們。

      總結(jié)

      首先,我們看到了一個(gè)“傳統(tǒng)”的消費(fèi)者-生產(chǎn)者問題的解決方案。我們盡量避免了重復(fù)造沒有必要的車輪,恰恰相反,我們重用了已經(jīng)測(cè)試過的解決方案,因此,我們不是寫一個(gè)通知等待系統(tǒng),而是嘗試使用Java已經(jīng)提供的blocking queue,因?yàn)镴ava為我們提供了一個(gè)非常有效的線程池來(lái)管理線程生命周期,讓我們可以擺脫手動(dòng)創(chuàng)建線程。通過這些改進(jìn),消費(fèi)者-生產(chǎn)者問題的解決方案看起來(lái)更可靠和更好理解。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:一個(gè)網(wǎng)工的十年奮斗史 - 工作篇
      下一篇:【精選單品】算力共享——以更經(jīng)濟(jì)的方式實(shí)現(xiàn)全通道目標(biāo)識(shí)別,抓拍攝像機(jī)秒變識(shí)別攝像機(jī)
      相關(guān)文章
      国产专区一va亚洲v天堂| 亚洲av无码片区一区二区三区| 国产婷婷综合丁香亚洲欧洲| 亚洲va中文字幕无码久久| 国内精品久久久久久久亚洲| 亚洲Av无码乱码在线播放| 亚洲av色香蕉一区二区三区| 亚洲色成人WWW永久在线观看| 精品丝袜国产自在线拍亚洲| 波多野结衣亚洲一级| 亚洲白色白色在线播放| 亚洲欧洲国产综合| 亚洲婷婷天堂在线综合| 亚洲国产亚洲片在线观看播放| 亚洲精品国产手机| 亚洲精品国产啊女成拍色拍| 亚洲六月丁香六月婷婷色伊人| 亚洲国产精品成人综合色在线婷婷 | 亚洲毛片αv无线播放一区| 久久亚洲国产精品五月天婷| 亚洲午夜福利717| 久久亚洲国产午夜精品理论片| 亚洲av综合色区| 亚洲视频中文字幕| 亚洲国产成人在线视频 | 亚洲av无码成人精品区| 亚洲情a成黄在线观看| 国产成人麻豆亚洲综合无码精品| 在线日韩日本国产亚洲| 日韩亚洲人成在线综合日本| 久久综合图区亚洲综合图区| 久久水蜜桃亚洲av无码精品麻豆| 亚洲成av人片不卡无码| 久久亚洲中文字幕精品有坂深雪| 亚洲美女激情视频| 中文字幕乱码亚洲精品一区| 蜜臀亚洲AV无码精品国产午夜.| 午夜亚洲国产成人不卡在线| 中文字幕亚洲专区| 亚洲国产精品VA在线观看麻豆| 久久亚洲国产成人精品性色 |