Linux-生產(chǎn)者消費(fèi)者模型
@TOC
生產(chǎn)者消費(fèi)者模型概念
生產(chǎn)者消費(fèi)者模型又稱有限緩沖問題,是一個(gè)多進(jìn)程同步問題的經(jīng)典案例。該問題描述了共享固定大小緩沖區(qū)的兩個(gè)進(jìn)程——即所謂的“生產(chǎn)者”和“消費(fèi)者”——在實(shí)際運(yùn)行時(shí)會發(fā)生的問題。生產(chǎn)者的主要作用是生成一定量的數(shù)據(jù)放到緩沖區(qū)中,然后重復(fù)此過程。與此同時(shí),消費(fèi)者也在緩沖區(qū)消耗這些數(shù)據(jù)。
如下圖:
生產(chǎn)者消費(fèi)者模型特點(diǎn)
生產(chǎn)者消費(fèi)者是多線程同步于互斥的經(jīng)典場景,其特點(diǎn)如下:
3種關(guān)系:生產(chǎn)者VS生產(chǎn)者(互斥關(guān)系)、生產(chǎn)者VS消費(fèi)者(同步關(guān)系)、消費(fèi)者VS消費(fèi)者(互斥關(guān)系)
2種角色:生產(chǎn)者和消費(fèi)者
1個(gè)交易場所:通常指內(nèi)存中的一段緩沖區(qū)
我們用代碼實(shí)現(xiàn)的話就要維護(hù)上面的3種關(guān)系即可。
生產(chǎn)者和生產(chǎn)者、消費(fèi)者和消費(fèi)者為什么要互斥?
當(dāng)1個(gè)生產(chǎn)者開始往緩沖區(qū)里寫入數(shù)據(jù)時(shí)當(dāng)然不希望有其他的生產(chǎn)者進(jìn)入緩沖區(qū),所以要在緩沖區(qū)用互斥鎖保護(hù)起來。同樣,1個(gè)消費(fèi)者讀取數(shù)據(jù)也要避免其他的消費(fèi)者干擾,所以在1個(gè)消費(fèi)者讀取緩沖區(qū)數(shù)據(jù)時(shí)用互斥鎖保護(hù)起來。
生產(chǎn)者和消費(fèi)者同步關(guān)系?
當(dāng)生產(chǎn)者把緩沖區(qū)數(shù)據(jù)加滿后,再生產(chǎn)數(shù)據(jù)就會失敗。消費(fèi)者把緩沖區(qū)數(shù)據(jù)消費(fèi)完再消費(fèi)也會失敗。
所以讓生產(chǎn)者在緩沖區(qū)滿時(shí)休眠,等到下次消費(fèi)者消耗緩沖區(qū)中的數(shù)據(jù)的時(shí)候,生產(chǎn)者才能被喚醒,開始往緩沖區(qū)添加數(shù)據(jù)。同樣,也可以讓消費(fèi)者在緩沖區(qū)空時(shí)進(jìn)入休眠,等到生產(chǎn)者往緩沖區(qū)添加數(shù)據(jù)之后,再喚醒消費(fèi)者。
生產(chǎn)者消費(fèi)者模型優(yōu)點(diǎn)
支持并發(fā)
由于生產(chǎn)者與消費(fèi)者是兩個(gè)獨(dú)立的并發(fā)體,他們之間是用緩沖區(qū)作為橋梁連接,生產(chǎn)者只需要往緩沖區(qū)里丟數(shù)據(jù),就可以繼續(xù)生產(chǎn)下一個(gè)數(shù)據(jù),而消費(fèi)者只需要從緩沖區(qū)了拿數(shù)據(jù)即可,這樣就不會因?yàn)楸舜说奶幚硭俣榷l(fā)生阻塞。
解耦
支持忙時(shí)不均
如果制造數(shù)據(jù)的速度時(shí)快時(shí)慢,緩沖區(qū)的好處就體現(xiàn)出來了。當(dāng)數(shù)據(jù)制造快的時(shí)候,消費(fèi)者來不及處理,未處理的數(shù)據(jù)可以暫時(shí)存在緩沖區(qū)中。 等生產(chǎn)者的制造速度慢下來,消費(fèi)者再慢慢處理掉。
基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
基于阻塞隊(duì)列的生產(chǎn)者消費(fèi)者模型
在多線程編程中阻塞隊(duì)列(Blocking Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu)。
和普通隊(duì)列的區(qū)別:
當(dāng)隊(duì)列為空時(shí),從隊(duì)列獲取元素的操作將會被阻塞,直到隊(duì)列中被放入了元素
當(dāng)隊(duì)列滿時(shí),往隊(duì)列里存放元素的操作也會被阻塞,直到有元素被從隊(duì)列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊(duì)列進(jìn)程操作時(shí)會被阻塞)
模擬實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
我們先模擬1個(gè)單生產(chǎn)者和單消費(fèi)者
這個(gè)BlockQueue我們就用C++STL中的Queue即可。
整體的邏輯:
現(xiàn)在實(shí)現(xiàn)單生產(chǎn)者和單消費(fèi)者,那我們維護(hù)生產(chǎn)者和消費(fèi)者的同步關(guān)系即可
設(shè)置BlockQueue的存儲數(shù)據(jù)上限,不能讓生產(chǎn)者一直生產(chǎn)下去,具體的實(shí)現(xiàn)根據(jù)業(yè)務(wù)的需求
BlockQueue是臨界資源,所以需要1把互斥鎖進(jìn)行保護(hù)
當(dāng)生產(chǎn)者向阻塞隊(duì)列中放入數(shù)據(jù)時(shí),若此時(shí)阻塞隊(duì)列滿了,那生產(chǎn)者此時(shí)要先通知消費(fèi)者開始消費(fèi)數(shù)據(jù),自己掛起
當(dāng)消費(fèi)者取阻塞隊(duì)列中的數(shù)據(jù)時(shí),若此時(shí)阻塞隊(duì)列為空了,那消費(fèi)者先通知生產(chǎn)者開始生產(chǎn)數(shù)據(jù),自己掛起
因此我們還需要2個(gè)條件變量,1個(gè)當(dāng)阻塞隊(duì)列滿時(shí),生產(chǎn)者需要在此條件變量下等待。1個(gè)條件變量當(dāng)阻塞隊(duì)列為空時(shí),消費(fèi)者在此條件變量下等待
無論是生產(chǎn)者還是消費(fèi)者都要進(jìn)入臨界區(qū)判斷條件是否滿足,若條件不滿足就要被掛起,此時(shí)它們中的1個(gè)是拿著鎖在等待的,為了避免死鎖的問題,在調(diào)用pthread_cond_wait時(shí)要傳入互斥鎖,當(dāng)被掛起時(shí)釋放手中的鎖資源
主函數(shù)創(chuàng)建2個(gè)線程,1個(gè)線程不斷生產(chǎn)數(shù)據(jù),另一個(gè)線程不斷的讀取數(shù)據(jù)即可
代碼如下:
BlockQueue:
1 #include
mian:
1 #include"BlockQueue.hpp" 2 3 4 void* Consumer(void* arg) 5 { 6 BlockQueue* bq = (BlockQueue*)arg; 7 while(true) 8 { 9 int data; 10 bq->Pop(data); 11 cout<<"Consumer data done:"<Push(data); 21 cout<<"Product data done:"<get_init(); 28 pthread_t c,p; 29 pthread_create(&c,nullptr,Consumer,(void*)bq); 30 pthread_create(&c,nullptr,Product,(void*)bq); 31 32 33 pthread_join(c,nullptr); 34 pthread_join(p,nullptr); 35 delete bq; 36 return 0; }
先讓生產(chǎn)者慢點(diǎn)生產(chǎn)。因?yàn)槲沂亲岅?duì)列滿了,生產(chǎn)者喚醒消費(fèi)者消費(fèi),生產(chǎn)者等待。這種情況應(yīng)該是生產(chǎn)者先執(zhí)行,生產(chǎn)者滿了通知消費(fèi)者,消費(fèi)者是一瞬間就消費(fèi)完了。
跟我們預(yù)期的結(jié)果一樣,生產(chǎn)者生產(chǎn)滿了后消費(fèi)者一瞬間消費(fèi)完。
讓消費(fèi)者慢,生產(chǎn)者快,一瞬間隊(duì)列里數(shù)據(jù)滿了,生產(chǎn)者通知消費(fèi)者消費(fèi)數(shù)據(jù),生產(chǎn)者等待
生產(chǎn)者一下就生產(chǎn)完了,消費(fèi)者是慢慢的在消費(fèi)。
計(jì)算任務(wù)的生產(chǎn)者和消費(fèi)者模型
上面打印數(shù)字是為了好理解現(xiàn)象,我們再封裝1個(gè)Task的任務(wù)類,該類中包含1個(gè)run函數(shù),該函數(shù)處理計(jì)算的任務(wù)。
6 class Task 7 { 8 public: 9 int x; 10 int y; 11 public: 12 Task(int _x=0,int _y=0) 13 :x(_x),y(_y) 14 {} 15 int run() 16 { 17 return x + y; 18 } 19 20 };
生產(chǎn)者放入阻塞隊(duì)列里的就是Task對象,消費(fèi)者拿出隊(duì)列中的任務(wù),調(diào)用tun函數(shù)處理任務(wù)
5 void* Consumer(void* arg) 6 { 7 BlockQueue* bq = (BlockQueue*)arg; 8 // pthread_mutex_lock(&c_mutex); 9 while(true) 10 { 11 sleep(1); 12 Task t; 13 bq->Pop(t); 14 cout<<" Consumer :"< 這次可以讓阻塞隊(duì)列中的任務(wù)大于隊(duì)列容量的一半就喚醒消費(fèi)者自己掛起等待,讓消費(fèi)者消費(fèi)到隊(duì)列容量的一半喚醒生產(chǎn)者自己掛起等待。并讓生產(chǎn)者快,消費(fèi)者慢 74 void Push(Task &t) 75 { 76 LockQueue(); 77 while(IsFull()) 78 { 79 //隊(duì)列滿了,生產(chǎn)者掛起,通知消費(fèi)者 80 //WakeUpConsumer(); 81 ProductWait(); 82 } 83 q.push(t); 84 if(q.size() >= _cap / 2) 85 { 86 87 WakeUpConsumer(); 88 89 } 90 UnLockQueue(); 91 } 92 void Pop(Task &t) 93 { 94 LockQueue(); 95 while(IsEmpty()) 96 { 97 //隊(duì)列為空,消費(fèi)者掛起,通知生產(chǎn)者 98 //WakeUpProduct(); 99 ConsumerWait(); 100 } 101 t = q.front(); 102 q.pop(); 103 if(q.size() <= _cap / 2) 104 { 105 106 WakeUpProduct(); 107 } 108 UnLockQueue(); 109 } 我們可以看到生產(chǎn)者瞬間生產(chǎn)滿了,消費(fèi)者開始消費(fèi)了3個(gè)后,生產(chǎn)者又開始生產(chǎn)了。 多生產(chǎn)者和多消費(fèi)者 多生產(chǎn)者多消費(fèi)者在單生產(chǎn)者單消費(fèi)者的基礎(chǔ)上再加2把互斥鎖,讓生產(chǎn)者自己在組內(nèi)競爭,競爭贏的先去生產(chǎn),消費(fèi)者也是同樣的道理,先讓消費(fèi)者自己組內(nèi)競爭,贏的先去處理任務(wù)。BlockQueue和單生產(chǎn)者的一樣,我是直接定義了全局的2把鎖。當(dāng)然也可以封裝在BlockQueue。 1 #include"BlockQueue.hpp" 2 pthread_mutex_t c_mutex; 3 pthread_mutex_t p_mutex; 4 5 void* Consumer(void* arg) 6 { 7 BlockQueue* bq = (BlockQueue*)arg; 8 //pthread_mutex_lock(&c_mutex); 9 while(true) 10 { 11 pthread_mutex_lock(&c_mutex); 12 Task t; 13 bq->Pop(t); 14 cout<<" Consumer [:"< 讓生產(chǎn)者和消費(fèi)者一起工作: 有1個(gè)消費(fèi)者的競爭力比較強(qiáng),生產(chǎn)者剛生產(chǎn)1個(gè)就被消費(fèi)了,其余的都是3個(gè)生產(chǎn)者生產(chǎn),2個(gè)消費(fèi)者處理任務(wù)。這就是多生產(chǎn)者和多消費(fèi)者。 linux 任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。