Python 進程操作之進程間通信--隊列

      網友投稿 713 2022-05-30

      進程間通信——隊列(multiprocess.Queue)

      1、進程間通信

      IPC(Inter-Process Communication)

      2、隊列

      創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

      Queue([maxsize])? 創建共享的進程隊列。 參數?:maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現。

      方法介紹:

      Python 進程操作之進程間通信--隊列

      Queue([maxsize])? 創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。 另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。? Queue的實例q具有以下方法: q.get(?[?block?[?,timeout?]?]?)? 返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,默認為True.?如果設置為False, 將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。 q.get_nowait(?)? 同q.get(False)方法。 q.put(item?[,?block?[,timeout?]?]?)? 將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False, 將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。 q.qsize()? 返回隊列中目前項目的正確數量。此函數的結果并不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。 在某些系統上,此方法可能引發NotImplementedError異常。 q.empty()? 如果調用此方法時?q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。 q.full()? 如果q已滿,返回為True.?由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。

      其他方法(了解):

      q.close()? 關閉隊列,防止隊列中加入更多數據。調用此方法時,后臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集, 將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上, 關閉生產者中的隊列不會導致get()方法返回錯誤。 q.cancel_join_thread()? 不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞。 q.join_thread()? 連接隊列的后臺線程。此方法用于在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。 調用q.cancel_join_thread()方法可以禁止這種行為。

      單看隊列用法:

      ''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基于消息傳遞實現的,但是隊列接口 ''' from?multiprocessing?import?Queue q=Queue(3) #put?,get?,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) #?q.put(3)???#?如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。 ???????????#?如果隊列中的數據一直不被取走,程序就會永遠停在這里。 try: ????q.put_nowait(3)?#?可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。 except:?#?因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。 ????print('隊列已經滿了') #?因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。 print(q.full())?#滿了 print(q.get()) print(q.get()) print(q.get()) #?print(q.get())?#?同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。 try: ????q.get_nowait(3)?#?可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。 except:?#?因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 ????print('隊列已經空了') print(q.empty())?#空了

      上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。

      子進程發送數據給父進程:

      import?time from?multiprocessing?import?Process,?Queue def?f(q): ????q.put([time.asctime(),?'from?Eva',?'hello'])??#調用主函數中p進程傳遞過來的進程參數?put函數為向隊列中添加一條數據。 if?__name__?==?'__main__': ????q?=?Queue()?#創建一個Queue對象 ????p?=?Process(target=f,?args=(q,))?#創建一個進程 ????p.start() ????print(q.get()) ????p.join()

      上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微復雜一些的例子:

      批量生產數據放入隊列再批量獲取結果:

      import?os import?time import?multiprocessing #?向queue中輸入數據的函數 def?inputQ(queue): ????info?=?str(os.getpid())?+?'(put):'?+?str(time.asctime()) ????queue.put(info) #?向queue中輸出數據的函數 def?outputQ(queue): ????info?=?queue.get() ????print?('%s%s\033[32m%s\033[0m'%(str(os.getpid()),?'(get):',info)) #?Main if?__name__?==?'__main__': ????multiprocessing.freeze_support() ????record1?=?[]???#?store?input?processes ????record2?=?[]???#?store?output?processes ????queue?=?multiprocessing.Queue(3) ????#?輸入進程 ????for?i?in?range(10): ????????process?=?multiprocessing.Process(target=inputQ,args=(queue,)) ????????process.start() ????????record1.append(process) ????#?輸出進程 ????for?i?in?range(10): ????????process?=?multiprocessing.Process(target=outputQ,args=(queue,)) ????????process.start() ????????record2.append(process) ????for?p?in?record1: ????????p.join() ????for?p?in?record2: ????????p.join()

      在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

      在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。

      生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。

      基于隊列實現生產者消費者模型:

      from?multiprocessing?import?Process,Queue import?time,random,os def?consumer(q): ????while?True: ????????res=q.get() ????????time.sleep(random.randint(1,3)) ????????print('\033[45m%s?吃?%s\033[0m'?%(os.getpid(),res)) def?producer(q): ????for?i?in?range(10): ????????time.sleep(random.randint(1,3)) ????????res='包子%s'?%i ????????q.put(res) ????????print('\033[44m%s?生產了?%s\033[0m'?%(os.getpid(),res)) if?__name__?==?'__main__': ????q=Queue() ????#生產者們:即廚師們 ????p1=Process(target=producer,args=(q,)) ????#消費者們:即吃貨們 ????c1=Process(target=consumer,args=(q,)) ????#開始 ????p1.start() ????c1.start() ????print('主')

      此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。

      解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。

      改良版--生產者消費者模型:

      from?multiprocessing?import?Process,Queue import?time,random,os def?consumer(q): ????while?True: ????????res=q.get() ????????if?res?is?None:break?#收到結束信號則結束 ????????time.sleep(random.randint(1,3)) ????????print('\033[45m%s?吃?%s\033[0m'?%(os.getpid(),res)) def?producer(q): ????for?i?in?range(10): ????????time.sleep(random.randint(1,3)) ????????res='包子%s'?%i ????????q.put(res) ????????print('\033[44m%s?生產了?%s\033[0m'?%(os.getpid(),res)) ????q.put(None)?#發送結束信號 if?__name__?==?'__main__': ????q=Queue() ????#生產者們:即廚師們 ????p1=Process(target=producer,args=(q,)) ????#消費者們:即吃貨們 ????c1=Process(target=consumer,args=(q,)) ????#開始 ????p1.start() ????c1.start() ????print('主')

      注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號

      主進程在生產者生產完畢后發送介紹信號None:

      from?multiprocessing?import?Process,Queue import?time,random,os def?consumer(q): ????while?True: ????????res=q.get() ????????if?res?is?None:break?#收到結束信號則結束 ????????time.sleep(random.randint(1,3)) ????????print('\033[45m%s?吃?%s\033[0m'?%(os.getpid(),res)) def?producer(q): ????for?i?in?range(2): ????????time.sleep(random.randint(1,3)) ????????res='包子%s'?%i ????????q.put(res) ????????print('\033[44m%s?生產了?%s\033[0m'?%(os.getpid(),res)) if?__name__?==?'__main__': ????q=Queue() ????#生產者們:即廚師們 ????p1=Process(target=producer,args=(q,)) ????#消費者們:即吃貨們 ????c1=Process(target=consumer,args=(q,)) ????#開始 ????p1.start() ????c1.start() ????p1.join() ????q.put(None)?#發送結束信號 ????print('主')

      但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決

      多個消費者的例子:有幾個消費者就需要發送幾次介紹信號

      from?multiprocessing?import?Process,Queue import?time,random,os def?consumer(q): ????while?True: ????????res=q.get() ????????if?res?is?None:break?#收到結束信號則結束 ????????time.sleep(random.randint(1,3)) ????????print('\033[45m%s?吃?%s\033[0m'?%(os.getpid(),res)) def?producer(name,q): ????for?i?in?range(2): ????????time.sleep(random.randint(1,3)) ????????res='%s%s'?%(name,i) ????????q.put(res) ????????print('\033[44m%s?生產了?%s\033[0m'?%(os.getpid(),res)) if?__name__?==?'__main__': ????q=Queue() ????#生產者們:即廚師們 ????p1=Process(target=producer,args=('包子',q)) ????p2=Process(target=producer,args=('骨頭',q)) ????p3=Process(target=producer,args=('泔水',q)) ????#消費者們:即吃貨們 ????c1=Process(target=consumer,args=(q,)) ????c2=Process(target=consumer,args=(q,)) ????#開始 ????p1.start() ????p2.start() ????p3.start() ????c1.start() ????p1.join()?#必須保證生產者全部生產完畢,才應該發送結束信號 ????p2.join() ????p3.join() ????q.put(None)?#有幾個消費者就應該發送幾次結束信號None ????q.put(None)?#發送結束信號 ????print('主')

      JoinableQueue([maxsize])

      創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

      方法介紹:

      JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法: q.task_done()? 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。 q.join()? 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。? 下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,并等待它們被處理。

      JoinableQueue隊列實現消費之生產者模型:

      from?multiprocessing?import?Process,JoinableQueue import?time,random,os def?consumer(q): ????while?True: ????????res=q.get() ????????time.sleep(random.randint(1,3)) ????????print('\033[45m%s?吃?%s\033[0m'?%(os.getpid(),res)) ????????q.task_done()?#向q.join()發送一次信號,證明一個數據已經被取走了 def?producer(name,q): ????for?i?in?range(10): ????????time.sleep(random.randint(1,3)) ????????res='%s%s'?%(name,i) ????????q.put(res) ????????print('\033[44m%s?生產了?%s\033[0m'?%(os.getpid(),res)) ????q.join()?#生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。 if?__name__?==?'__main__': ????q=JoinableQueue() ????#生產者們:即廚師們 ????p1=Process(target=producer,args=('包子',q)) ????p2=Process(target=producer,args=('骨頭',q)) ????p3=Process(target=producer,args=('泔水',q)) ????#消費者們:即吃貨們 ????c1=Process(target=consumer,args=(q,)) ????c2=Process(target=consumer,args=(q,)) ????c1.daemon=True ????c2.daemon=True ????#開始 ????p_l=[p1,p2,p3,c1,c2] ????for?p?in?p_l: ????????p.start() ????p1.join() ????p2.join() ????p3.join() ????print('主')? ????#主進程等--->p1,p2,p3等---->c1,c2 ????#p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據 ????#因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨著主進程的結束而結束,所以設置成守護進程就可以了。

      軟件開發 人工智能 云計算 機器學習

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:這個夏天資本市場很冷,但RPA公司依舊可以融資
      下一篇:真正理解Git的重寫歷史是啥
      相關文章
      在线播放亚洲第一字幕| 亚洲中文无码a∨在线观看| 亚洲香蕉久久一区二区| 亚洲电影在线播放| 亚洲AV人人澡人人爽人人夜夜| 亚洲福利中文字幕在线网址| 亚洲高清免费视频| 久久精品国产精品亚洲人人| 久久久久亚洲av成人无码电影 | 亚洲精品偷拍视频免费观看| 国产精品亚洲二区在线| 亚洲精华国产精华精华液网站| 亚洲日本VA中文字幕久久道具| 亚洲国产精品成人AV在线| 亚洲免费网站观看视频| 亚洲av无码专区在线观看亚| 国产精品亚洲一区二区无码| 久久精品国产亚洲av品善| 亚洲AV无码一区二区三区网址 | 午夜亚洲WWW湿好爽| 亚洲欧美日韩国产精品一区| 亚洲av永久无码| 亚洲欧洲精品成人久久奇米网| 中文字幕日韩亚洲| 国产精品亚洲精品日韩已满| 亚洲精品乱码久久久久66| 亚洲小说区图片区另类春色| 亚洲AV永久精品爱情岛论坛| 亚洲一区二区在线免费观看| 亚洲日韩乱码中文无码蜜桃| 亚洲天堂免费在线| 蜜桃传媒一区二区亚洲AV| 亚洲成av人片在线观看天堂无码| 亚洲色偷拍区另类无码专区| 久久亚洲精品视频| 亚洲欧洲日本天天堂在线观看| 亚洲天堂2017无码中文| 国产精品亚洲精品日韩电影| 亚洲日韩精品一区二区三区| 91精品国产亚洲爽啪在线影院| 97亚洲熟妇自偷自拍另类图片|