Linux-生產(chǎn)者消費(fèi)者模型

      網(wǎng)友投稿 961 2022-05-30

      @TOC

      生產(chǎn)者消費(fèi)者模型概念

      生產(chǎn)者消費(fèi)者模型又稱有限緩沖問題,是一個(gè)多進(jìn)程同步問題的經(jīng)典案例。該問題描述了共享固定大小緩沖區(qū)的兩個(gè)進(jìn)程——即所謂的“生產(chǎn)者”和“消費(fèi)者”——在實(shí)際運(yùn)行時(shí)會發(fā)生的問題。生產(chǎn)者的主要作用是生成一定量的數(shù)據(jù)放到緩沖區(qū)中,然后重復(fù)此過程。與此同時(shí),消費(fèi)者也在緩沖區(qū)消耗這些數(shù)據(jù)。

      如下圖:

      生產(chǎn)者消費(fèi)者模型特點(diǎn)

      生產(chǎn)者消費(fèi)者是多線程同步于互斥的經(jīng)典場景,其特點(diǎn)如下:

      3種關(guān)系:生產(chǎn)者VS生產(chǎn)者(互斥關(guān)系)、生產(chǎn)者VS消費(fèi)者(同步關(guān)系)、消費(fèi)者VS消費(fèi)者(互斥關(guān)系)

      2種角色:生產(chǎn)者和消費(fèi)者

      1個(gè)交易場所:通常指內(nèi)存中的一段緩沖區(qū)

      我們用代碼實(shí)現(xiàn)的話就要維護(hù)上面的3種關(guān)系即可。

      生產(chǎn)者和生產(chǎn)者、消費(fèi)者和消費(fèi)者為什么要互斥?

      當(dāng)1個(gè)生產(chǎn)者開始往緩沖區(qū)里寫入數(shù)據(jù)時(shí)當(dāng)然不希望有其他的生產(chǎn)者進(jìn)入緩沖區(qū),所以要在緩沖區(qū)用互斥鎖保護(hù)起來。同樣,1個(gè)消費(fèi)者讀取數(shù)據(jù)也要避免其他的消費(fèi)者干擾,所以在1個(gè)消費(fèi)者讀取緩沖區(qū)數(shù)據(jù)時(shí)用互斥鎖保護(hù)起來。

      生產(chǎn)者和消費(fèi)者同步關(guān)系?

      當(dāng)生產(chǎn)者把緩沖區(qū)數(shù)據(jù)加滿后,再生產(chǎn)數(shù)據(jù)就會失敗。消費(fèi)者把緩沖區(qū)數(shù)據(jù)消費(fèi)完再消費(fèi)也會失敗。

      所以讓生產(chǎn)者在緩沖區(qū)滿時(shí)休眠,等到下次消費(fèi)者消耗緩沖區(qū)中的數(shù)據(jù)的時(shí)候,生產(chǎn)者才能被喚醒,開始往緩沖區(qū)添加數(shù)據(jù)。同樣,也可以讓消費(fèi)者在緩沖區(qū)空時(shí)進(jìn)入休眠,等到生產(chǎn)者往緩沖區(qū)添加數(shù)據(jù)之后,再喚醒消費(fèi)者。

      生產(chǎn)者消費(fèi)者模型優(yōu)點(diǎn)

      支持并發(fā)

      由于生產(chǎn)者與消費(fèi)者是兩個(gè)獨(dú)立的并發(fā)體,他們之間是用緩沖區(qū)作為橋梁連接,生產(chǎn)者只需要往緩沖區(qū)里丟數(shù)據(jù),就可以繼續(xù)生產(chǎn)下一個(gè)數(shù)據(jù),而消費(fèi)者只需要從緩沖區(qū)了拿數(shù)據(jù)即可,這樣就不會因?yàn)楸舜说奶幚硭俣榷l(fā)生阻塞。

      解耦

      支持忙時(shí)不均

      如果制造數(shù)據(jù)的速度時(shí)快時(shí)慢,緩沖區(qū)的好處就體現(xiàn)出來了。當(dāng)數(shù)據(jù)制造快的時(shí)候,消費(fèi)者來不及處理,未處理的數(shù)據(jù)可以暫時(shí)存在緩沖區(qū)中。 等生產(chǎn)者的制造速度慢下來,消費(fèi)者再慢慢處理掉。

      基于BlockingQueue的生產(chǎn)者消費(fèi)者模型

      基于阻塞隊(duì)列的生產(chǎn)者消費(fèi)者模型

      在多線程編程中阻塞隊(duì)列(Blocking Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu)。

      和普通隊(duì)列的區(qū)別:

      當(dāng)隊(duì)列為空時(shí),從隊(duì)列獲取元素的操作將會被阻塞,直到隊(duì)列中被放入了元素

      當(dāng)隊(duì)列滿時(shí),往隊(duì)列里存放元素的操作也會被阻塞,直到有元素被從隊(duì)列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊(duì)列進(jìn)程操作時(shí)會被阻塞)

      模擬實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

      我們先模擬1個(gè)單生產(chǎn)者和單消費(fèi)者

      這個(gè)BlockQueue我們就用C++STL中的Queue即可。

      整體的邏輯:

      現(xiàn)在實(shí)現(xiàn)單生產(chǎn)者和單消費(fèi)者,那我們維護(hù)生產(chǎn)者和消費(fèi)者的同步關(guān)系即可

      設(shè)置BlockQueue的存儲數(shù)據(jù)上限,不能讓生產(chǎn)者一直生產(chǎn)下去,具體的實(shí)現(xiàn)根據(jù)業(yè)務(wù)的需求

      BlockQueue是臨界資源,所以需要1把互斥鎖進(jìn)行保護(hù)

      當(dāng)生產(chǎn)者向阻塞隊(duì)列中放入數(shù)據(jù)時(shí),若此時(shí)阻塞隊(duì)列滿了,那生產(chǎn)者此時(shí)要先通知消費(fèi)者開始消費(fèi)數(shù)據(jù),自己掛起

      當(dāng)消費(fèi)者取阻塞隊(duì)列中的數(shù)據(jù)時(shí),若此時(shí)阻塞隊(duì)列為空了,那消費(fèi)者先通知生產(chǎn)者開始生產(chǎn)數(shù)據(jù),自己掛起

      因此我們還需要2個(gè)條件變量,1個(gè)當(dāng)阻塞隊(duì)列滿時(shí),生產(chǎn)者需要在此條件變量下等待。1個(gè)條件變量當(dāng)阻塞隊(duì)列為空時(shí),消費(fèi)者在此條件變量下等待

      無論是生產(chǎn)者還是消費(fèi)者都要進(jìn)入臨界區(qū)判斷條件是否滿足,若條件不滿足就要被掛起,此時(shí)它們中的1個(gè)是拿著鎖在等待的,為了避免死鎖的問題,在調(diào)用pthread_cond_wait時(shí)要傳入互斥鎖,當(dāng)被掛起時(shí)釋放手中的鎖資源

      主函數(shù)創(chuàng)建2個(gè)線程,1個(gè)線程不斷生產(chǎn)數(shù)據(jù),另一個(gè)線程不斷的讀取數(shù)據(jù)即可

      代碼如下:

      BlockQueue:

      1 #include 2 #include 3 #include 4 using namespace std; 5 6 7 class BlockQueue 8 { 9 private: 10 pthread_mutex_t mutex; 11 pthread_cond_t c_cond; // 消費(fèi)者在次條件變量下等 12 pthread_cond_t p_cond; // 生產(chǎn)者在此條件變量下等 13 size_t cap; 14 queue q; 15 public: 16 BlockQueue(int _cap = 5):cap(_cap) 17 {} 18 void get_init() 19 { 20 pthread_mutex_init(&mutex,nullptr); 21 pthread_cond_init(&c_cond,nullptr); 22 pthread_cond_init(&p_cond,nullptr); 23 } 24 void LockQueue() 25 { 26 pthread_mutex_lock(&mutex); 27 } 28 void UnLockQueue() 29 { 30 pthread_mutex_unlock(&mutex); 31 } 32 void ProductWait() 33 { 34 pthread_cond_wait(&p_cond,&mutex); 35 } 36 void ConsumerWait() 36 void ConsumerWait() 37 { 38 pthread_cond_wait(&c_cond,&mutex); 39 } 40 void WakeUpConsumer() 41 { 42 pthread_cond_signal(&c_cond); 43 } 44 void WakeUpProduct() 45 { 46 pthread_cond_signal(&p_cond); 47 } 48 bool IsFull() 49 { 50 return cap == q.size(); 51 } 52 bool IsEmpty() 53 { 54 return q.empty(); 55 } 56 void Push(int &in) 57 { 58 LockQueue(); 59 while(IsFull()) 60 { 61 //隊(duì)列滿了,先喚醒消費(fèi)者,生產(chǎn)者掛起 62 WakeUpConsumer(); 63 ProductWait(); 64 } 65 q.push(in); 66 UnLockQueue(); 67 } 68 void Pop(int &out) 69 { 70 LockQueue(); 71 while(IsEmpty()) 72 { 73 //隊(duì)列為空,先喚醒生產(chǎn)者,消費(fèi)者掛起 74 WakeUpProduct(); 75 ConsumerWait(); 76 } 77 out = q.front(); 78 q.pop(); 79 UnLockQueue(); 80 } 81 82 ~BlockQueue() 83 { 84 pthread_mutex_destroy(&mutex); 85 pthread_cond_destroy(&c_cond); 86 pthread_cond_destroy(&p_cond); 87 } 88 };

      mian:

      1 #include"BlockQueue.hpp" 2 3 4 void* Consumer(void* arg) 5 { 6 BlockQueue* bq = (BlockQueue*)arg; 7 while(true) 8 { 9 int data; 10 bq->Pop(data); 11 cout<<"Consumer data done:"<Push(data); 21 cout<<"Product data done:"<get_init(); 28 pthread_t c,p; 29 pthread_create(&c,nullptr,Consumer,(void*)bq); 30 pthread_create(&c,nullptr,Product,(void*)bq); 31 32 33 pthread_join(c,nullptr); 34 pthread_join(p,nullptr); 35 delete bq; 36 return 0; }

      先讓生產(chǎn)者慢點(diǎn)生產(chǎn)。因?yàn)槲沂亲岅?duì)列滿了,生產(chǎn)者喚醒消費(fèi)者消費(fèi),生產(chǎn)者等待。這種情況應(yīng)該是生產(chǎn)者先執(zhí)行,生產(chǎn)者滿了通知消費(fèi)者,消費(fèi)者是一瞬間就消費(fèi)完了。

      跟我們預(yù)期的結(jié)果一樣,生產(chǎn)者生產(chǎn)滿了后消費(fèi)者一瞬間消費(fèi)完。

      讓消費(fèi)者慢,生產(chǎn)者快,一瞬間隊(duì)列里數(shù)據(jù)滿了,生產(chǎn)者通知消費(fèi)者消費(fèi)數(shù)據(jù),生產(chǎn)者等待

      生產(chǎn)者一下就生產(chǎn)完了,消費(fèi)者是慢慢的在消費(fèi)。

      計(jì)算任務(wù)的生產(chǎn)者和消費(fèi)者模型

      上面打印數(shù)字是為了好理解現(xiàn)象,我們再封裝1個(gè)Task的任務(wù)類,該類中包含1個(gè)run函數(shù),該函數(shù)處理計(jì)算的任務(wù)。

      6 class Task 7 { 8 public: 9 int x; 10 int y; 11 public: 12 Task(int _x=0,int _y=0) 13 :x(_x),y(_y) 14 {} 15 int run() 16 { 17 return x + y; 18 } 19 20 };

      生產(chǎn)者放入阻塞隊(duì)列里的就是Task對象,消費(fèi)者拿出隊(duì)列中的任務(wù),調(diào)用tun函數(shù)處理任務(wù)

      Linux-生產(chǎn)者消費(fèi)者模型

      5 void* Consumer(void* arg) 6 { 7 BlockQueue* bq = (BlockQueue*)arg; 8 // pthread_mutex_lock(&c_mutex); 9 while(true) 10 { 11 sleep(1); 12 Task t; 13 bq->Pop(t); 14 cout<<" Consumer :"<

      這次可以讓阻塞隊(duì)列中的任務(wù)大于隊(duì)列容量的一半就喚醒消費(fèi)者自己掛起等待,讓消費(fèi)者消費(fèi)到隊(duì)列容量的一半喚醒生產(chǎn)者自己掛起等待。并讓生產(chǎn)者快,消費(fèi)者慢

      74 void Push(Task &t) 75 { 76 LockQueue(); 77 while(IsFull()) 78 { 79 //隊(duì)列滿了,生產(chǎn)者掛起,通知消費(fèi)者 80 //WakeUpConsumer(); 81 ProductWait(); 82 } 83 q.push(t); 84 if(q.size() >= _cap / 2) 85 { 86 87 WakeUpConsumer(); 88 89 } 90 UnLockQueue(); 91 } 92 void Pop(Task &t) 93 { 94 LockQueue(); 95 while(IsEmpty()) 96 { 97 //隊(duì)列為空,消費(fèi)者掛起,通知生產(chǎn)者 98 //WakeUpProduct(); 99 ConsumerWait(); 100 } 101 t = q.front(); 102 q.pop(); 103 if(q.size() <= _cap / 2) 104 { 105 106 WakeUpProduct(); 107 } 108 UnLockQueue(); 109 }

      我們可以看到生產(chǎn)者瞬間生產(chǎn)滿了,消費(fèi)者開始消費(fèi)了3個(gè)后,生產(chǎn)者又開始生產(chǎn)了。

      多生產(chǎn)者和多消費(fèi)者

      多生產(chǎn)者多消費(fèi)者在單生產(chǎn)者單消費(fèi)者的基礎(chǔ)上再加2把互斥鎖,讓生產(chǎn)者自己在組內(nèi)競爭,競爭贏的先去生產(chǎn),消費(fèi)者也是同樣的道理,先讓消費(fèi)者自己組內(nèi)競爭,贏的先去處理任務(wù)。BlockQueue和單生產(chǎn)者的一樣,我是直接定義了全局的2把鎖。當(dāng)然也可以封裝在BlockQueue。

      1 #include"BlockQueue.hpp" 2 pthread_mutex_t c_mutex; 3 pthread_mutex_t p_mutex; 4 5 void* Consumer(void* arg) 6 { 7 BlockQueue* bq = (BlockQueue*)arg; 8 //pthread_mutex_lock(&c_mutex); 9 while(true) 10 { 11 pthread_mutex_lock(&c_mutex); 12 Task t; 13 bq->Pop(t); 14 cout<<" Consumer [:"<Push(t); 31 cout<<" Product [:"<

      讓生產(chǎn)者和消費(fèi)者一起工作:

      有1個(gè)消費(fèi)者的競爭力比較強(qiáng),生產(chǎn)者剛生產(chǎn)1個(gè)就被消費(fèi)了,其余的都是3個(gè)生產(chǎn)者生產(chǎn),2個(gè)消費(fèi)者處理任務(wù)。這就是多生產(chǎn)者和多消費(fèi)者。

      linux 任務(wù)調(diào)度

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:三分鐘了解數(shù)據(jù)中臺——什么是數(shù)據(jù)中臺?
      下一篇:excel表格標(biāo)題鎖定的方法步驟圖
      相關(guān)文章
      亚洲好看的理论片电影| 亚洲国产成人精品91久久久| 亚洲欧洲无码AV电影在线观看| 亚洲国产日韩成人综合天堂| 日韩成人精品日本亚洲| 亚洲午夜无码久久| 亚洲精品无码你懂的| 亚洲国产精品嫩草影院| 亚洲国产成人久久精品软件| 亚洲精品无码专区在线| 亚洲av日韩av永久无码电影| 亚洲AV色欲色欲WWW| 国产午夜亚洲精品不卡电影| 国产亚洲人成在线影院| 亚洲国产精品日韩| 国产专区一va亚洲v天堂| 亚洲午夜久久久久久久久电影网| 亚洲人成网77777色在线播放| 国产亚洲综合久久系列| 亚洲av无码av制服另类专区| 亚洲综合久久综合激情久久 | 久久亚洲精品成人av无码网站| 亚洲国产一区二区a毛片| 亚洲视频日韩视频| 亚洲伊人久久大香线蕉在观| 中日韩亚洲人成无码网站| 亚洲s码欧洲m码吹潮| 亚洲AⅤ无码一区二区三区在线| 亚洲人成色77777在线观看大| 中文字幕第一页亚洲| 亚洲AV午夜福利精品一区二区| 亚洲国产精品线在线观看| 亚洲欧洲日产国码在线观看| 亚洲 日韩 色 图网站| 亚洲av乱码中文一区二区三区| 亚洲国产精品毛片av不卡在线| 亚洲国产另类久久久精品小说| 精品亚洲成a人片在线观看| 亚洲宅男精品一区在线观看| 亚洲av乱码一区二区三区按摩| 久久亚洲AV无码西西人体|