Python進階必備:進程模塊multiprocessing
1. 再談線程和進程
在《Python進階必備:線程模塊threading》一文中,我們以經營物業管理公司為例,形象地介紹了線程和進程的概念。有了線程技術,我們就可以在一個進程中創建多個線程,讓它們在“同一時刻”分別去做不同的工作了。這些線程共享同一塊內存,線程之間可以共享對象、資源,如果有沖突或需要協同,還可以隨時溝通以解決沖突或保持同步。
不過,多線程技術不是萬金油,它有一個致命的缺點:在一個進程內,不管你創建了多少線程,它們總是被限定在一顆CPU內,或者多核CPU的一個核內。這意味著,多線程編程無法充分發揮多核計算資源的優勢。這也是使用多線程做任務并行處理時,線程數量超過一定數值后線程越多速度反倒越慢的原因。
多進程技術正好彌補了多線程編程的不足,我們可以在每一顆CPU上,或者多核CPU的每一個核上啟動一個進程,如果有必要,還可以在每個進程內再創建適量的線程,最大限度地使用計算資源解決問題。因為不在同一塊內存區域內,和線程相比,進程間的資源共享、通訊、同步等,都要麻煩的多,收到的限制也更多。
multiprocessing 是 Python 內置的標準進程模塊,可運行于 Unix 和 Windows 平臺臺上。依賴于該模塊,程序員得以充分利用機器上的多核資源。為便于使用,multiprocessing 模塊提供了和 threading 線程模塊相似 API。針對進程特點,multiprocessing 模塊還引入了在 threading 模塊中沒有的API,比如進程池(Pool)、共享內存(Array 和 Value)等。
2. 創建、啟動和管理進程
Process 類是 multiprocessing 模塊的子進程類,用于創建、啟動和管理子進程。Process 和線程模塊 treading.Thread 的 API 幾乎完全相同。Process 類用來描述一個進程對象。創建子進程的時候,只需要傳入進程函數和函數的參數即可完成 Process 實例化。
2.1 Process 原型
p = multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是進程函數名,進程函數的函數通過 args 和 kwargs 傳入。
2.2 Process的屬性和方法
2.3 應用示例
下面這段代碼,主進程啟動了兩個子進程,然后等待用戶的鍵盤輸入以結束程序。主進程結束后,子進程也隨之結束。
# -*- coding: utf-8 -*- import os, time import multiprocessing as mp def sub_process(name, delay): """進程函數""" while True: time.sleep(delay) print('我是子進程%s,進程id為%s'%(name, os.getpid())) if __name__ == '__main__': print('主進程(%s)開始,按任意鍵結束本程序'%os.getpid()) p_a = mp.Process(target=sub_process, args=('A', 1)) p_a.daemon = True # 設置子進程為守護進程 p_a.start() p_b = mp.Process(target=sub_process, args=('B', 2)) p_b.daemon = True # 如果子進程不是守護進程,主進程結束后子進程可能成為僵尸進程 p_b.start() input() # 利用input函數阻塞主進程。這是常用的調試手段之一。
如果上面代碼中兩個子進程的 daemon 設置為 False,則主進程結束后,兩個子進程不會隨之結束,從而成為僵尸進程。如下圖所示,在任務管理中查看當前進程,可以看到主進程以及兩個子進程使用的3個Python解釋器(如果你還有其他的 Python 程序,比如 IDLE 等,在運行的話,會看到有更多的 Python 解釋器在運行)。我們可以在任務管理中手工關閉僵尸進程。當然,我們也可以在主進程結束前,使用 is_live() 判斷進程是否還在運行,使用 terminate() 強制關閉運行中的進程。
3. 進程間通訊
不同的進程,雖然分屬于不同的內存區塊,但 multiprocessing 模塊仍然提供了一些支持進程間通訊的技術,這些技術可分為交換數據和共享狀態兩類。
3.1 交換數據
3.1.1 隊列
隊列是進程間交換數據最常用的方式之一,尤其適合生產者——消費者模式。multiprocessing 模塊提供了一個和 queue.Queue 近乎一摸一樣的 Queue 類,它的 put () 和 get() 兩個方法均默認為阻塞式,這意味著一旦隊列為空,則 get() 會被阻塞;一旦隊列滿,則 put() 會被阻塞。如果使用參數 block=False 設置讀寫 put () 和 get() 為非阻塞,則讀空或寫滿時會拋出異常,因此讀寫隊列之前需要使用 enmpy() 或 full() 判斷。Queue 類實例化時可以指定隊列長度。
下面的代碼,演示了典型的生產者——消費者模式:進程A負責往地上扔錢,進程B負責從地上撿錢。
# -*- coding: utf-8 -*- import os, time, random import multiprocessing as mp def sub_process_A(q): """A進程函數:生成數據""" while True: time.sleep(5*random.random()) # 在0-5秒之間隨機延時 q.put(random.randint(10,100)) # 隨機生成[10,100]之間的整數 def sub_process_B(q): """B進程函數:使用數據""" words = ['哈哈,', '天哪!', '賣狗的!', '咦,天上掉餡餅了?'] while True: print('%s撿到了%d塊錢!'%(words[random.randint(0,3)], q.get())) if __name__ == '__main__': print('主進程(%s)開始,按任意鍵結束本程序'%os.getpid()) q = mp.Queue(10) p_a = mp.Process(target=sub_process_A, args=(q,)) p_a.daemon = True p_a.start() p_b = mp.Process(target=sub_process_B, args=(q,)) p_b.daemon = True p_b.start() input()
3.1.2 管道
管道是除隊列之外的另一種進程間通訊的主要方式。multiprocessing 模塊提供了 Pipe 類用于管道通訊,默認是雙工的,管道的兩端都可以 send() 和 recv()。需要說明的是,recv() 是阻塞式的,并且沒有隊列那樣的 block 參數可以設置是否阻塞。
下面的代碼,演示了兩個進程猜數字的游戲:進程A在心中默想了一個 [0, 127] 之間的整數,讓進程B來猜。如果B猜對了,游戲結束;如果B猜的數字大于或者小于目標,則A會告訴B大了或者小了,讓B繼續。
# -*- coding: utf-8 -*- import time, random import multiprocessing as mp def sub_process_A(p_end): """A進程函數""" aim = random.randint(0, 127) p_end.send('我在閉區間[0,127]之間想好了一個數字,你猜是幾?') print('A: 我在閉區間[0,127]之間想好了一個數字,你猜是幾?') while True: guess = p_end.recv() time.sleep(0.5 + 0.5*random.random()) # 假裝思考一會兒 if guess == aim: p_end.send('恭喜你,猜中了!') print('A: 恭喜你,猜中了!') break elif guess < aim: p_end.send('猜小了') print('A: 不對,猜小了') else: p_end.send('猜大了') print('A: 不對,猜大了') def sub_process_B(p_end): """B進程函數""" result = p_end.recv() n_min, n_max = 0, 127 while True: time.sleep(0.5 + 2*random.random()) # 假裝思考一會兒 guess = n_min + (n_max-n_min)//2 p_end.send(guess) print('B:我猜是%d'%guess) result = p_end.recv() if result == '恭喜你,猜中了!': print('B:哈哈,被我猜中!') break elif result == '猜小了': n_min, n_max = guess+1, n_max else: n_min, n_max = n_min, guess if __name__ == '__main__': p_end_a, p_end_b = mp.Pipe() # 創建管道,返回管道的兩個端,均可收發信息 p_a = mp.Process(target=sub_process_A, args=(p_end_a,)) p_a.daemon = True p_a.start() p_b = mp.Process(target=sub_process_B, args=(p_end_b,)) p_b.daemon = True p_b.start() p_a.join() p_b.join()
3.2 共享狀態
3.2.1 共享內存
通過共享內存實現狀態共享非常簡單,但在多進程編程中,這并不是首選的方法,應當盡量避免使用。multiprocessing 模塊提供了 Value 和 Array 兩個共享內存對象,一個用于單值共享,一個用于數組共享。實例化 Value 和 Array 時,‘d’ 表示雙精度浮點數, ‘i’ 表示有符號整數。這些共享對象將是進程和線程安全的。
下面的例子演示了兩個進程如何共享單值內存和數組內存,順便實現了進程間同步(請注意:進程間同步有更專業的方法)。這個例子里面隱式地涉及到了 ctypes 模塊——這是一個用于在Python和C/C++架設溝通橋梁的模塊。
# -*- coding: utf-8 -*- import os, time import multiprocessing as mp def sub_process_A(flag, data): """A進程函數""" while True: if flag.value == 0: time.sleep(1) for i in range(len(data)): data[i] = i * 3.14 flag.value = 1 print([item for item in data]) def sub_process_B(flag, data): """B進程函數""" while True: if flag.value == 1: time.sleep(1) for i in range(len(data)): data[i] = i * 2.72 flag.value = 0 print([item for item in data]) if __name__ == '__main__': print('主進程(%s)開始,按任意鍵結束本程序'%os.getpid()) flag = mp.Value('i', 0) # flag類型是ctypes.c_long,不是普通的int data = mp.Array('d', range(5)) p_a = mp.Process(target=sub_process_A, args=(flag, data)) p_a.daemon = True p_a.start() p_b = mp.Process(target=sub_process_B, args=(flag, data)) p_b.daemon = True p_b.start() input()
3.2.2 服務進程管理器
使用共享內存時,Value 和 Array 只提供了簡單的數據結構,服務進程管理器 Manager 則可以支持 list / dict / Lock / RLock / Condition / Event / Queue / Value / Array 等類型。服務進程的管理器比共享內存對象更靈活,比使用共享內存更慢。下面的代碼演示了使用服務進程管理器的使用方法。
# -*- coding: utf-8 -*- import os, time import multiprocessing as mp def sub_process_A(m_dict, m_list): """A進程函數""" while True: time.sleep(1) for index, value in enumerate(m_list): m_dict.update({str(index):value}) print(m_dict) def sub_process_B(m_dict, m_list): """B進程函數""" while True: time.sleep(1) for index, value in enumerate(m_list): if str(index) in m_dict: m_list[index] += m_dict[str(index)] else: m_list[index] = 2 * value print(m_list) if __name__ == '__main__': print('主進程(%s)開始,按任意鍵結束本程序'%os.getpid()) m = mp.Manager() m_dict = m.dict() m_list = m.list(range(5)) p_a = mp.Process(target=sub_process_A, args=(m_dict, m_list)) p_a.daemon = True p_a.start() p_b = mp.Process(target=sub_process_B, args=(m_dict, m_list)) p_b.daemon = True p_b.start() input()
4. 進程間同步
在《Python進階必備:線程模塊threading》一文中,我用4個例子介紹了4種線程間同步的方法。multiprocessing 模塊也提供了與線程間同步一一對應的進程間同步技術。為閱讀方便,我借用線程同步的4個例子,用進程代碼逐一實現。
4.1 線程鎖 Lock
前幾天,我想在一個幾百人的微信群里統計喜歡吃蘋果的人數。有人說,大家從1開始報數吧,并敲了起始數字1,立馬有人敲了數字2,3。但是統計很快就進行不下去了,因為大家發現,有好幾個人敲4,有更多的人敲5。
這就是典型的資源競爭沖突:統計用的計數器就是唯一的資源,很多人(子線程)都想取得寫計數器的資格。怎么辦呢?Lock(互斥鎖)就是一個很好的解決方案。Lock只能有一個線程獲取,獲取該鎖的線程才能執行,否則阻塞;執行完任務后,必須釋放鎖。
請看演示代碼:
# -*- coding: utf-8 -*- import time import multiprocessing as mp lock = mp.Lock() # 創建互斥鎖 counter = mp.Value('i', 0) # 使用共享內存做計數器 def hello(lock, counter): """進程函數""" if lock.acquire(): # 請求互斥鎖,如果被占用,則阻塞,直至獲取到鎖 time.sleep(0.2) # 假裝思考、敲鍵盤需要0.2秒鐘 counter.value += 1 print('我是第%d個'%counter.value) lock.release() # 千萬不要忘記釋放互斥鎖,否則后果很嚴重 def demo(): p_list= list() for i in range(30): # 假設群里有30人,都喜歡吃蘋果 p_list.append(mp.Process(target=hello, args=(lock, counter))) p_list[-1].start() for t in p_list: t.join() print('統計完畢,共有%d人'%counter.value) if __name__ == '__main__': demo()
除了互斥鎖,線程鎖還有另一種形式,叫做遞歸鎖(RLock),又稱可重入鎖。已經獲得遞歸鎖的線程可以繼續多次獲得該鎖,而不會被阻塞,釋放的次數必須和獲取的次數相同才會真正釋放該鎖。欲了解詳情,同學們可以自行檢索資料。
4.2 信號量 Semaphore
上面的例子中,統計用的計數器是唯一的資源,因此使用了只能被一個線程獲取的互斥鎖。假如共享的資源有多個,多線程競爭時一般使用信號量(Semaphore)同步。信號量有一個初始值,表示當前可用的資源數,多線程執行過程中會通過 acquire() 和 release() 操作,動態的加減信號量。比如,有30個工人都需要電錘,但是電錘總共只有5把。使用信號量(Semaphore)解決競爭的代碼如下:
# -*- coding: utf-8 -*- import time import multiprocessing as mp S = mp.Semaphore(5) # 有5把電錘可供使用 def us_hammer(id, S): """進程函數""" S.acquire() # P操作,阻塞式請求電錘, time.sleep(0.2) print('%d號剛剛用完電錘'%id) S.release() # V操作,釋放資源(信號量加1) def demo(): p_list = list() for i in range(30): # 有30名工人要求使用電錘 p_list.append(mp.Process(target=us_hammer, args=(i, S))) p_list[-1].start() for t in p_list: t.join() print('所有進程工作結束') if __name__ == '__main__': demo()
4.3 事件Event
想象我們每天早上上班的場景:為了不遲到,總得提前幾分鐘(我一般都會提前30分鐘)到辦公室,打卡之后,一看表,還不到工作時間,大家就看看新聞、聊聊天啥的;工作時間一到,立馬開工。如果有人遲到了呢,自然就不能看新聞聊天了,得立即投入工作中。
這個場景中,每個人代表一個線程,工作時間到,表示事件(Event)發生。事件發生前,線程會調用 wait() 方法阻塞自己(對應看新聞聊天),一旦事件發生,會喚醒所有調用 wait() 而進入阻塞狀態的線程。
# -*- coding: utf-8 -*- import time import multiprocessing as mp E = mp.Event() # 創建事件 def work(id, E): """進程函數""" print('<%d號員工>上班打卡'%id) if E.is_set(): # 已經到點了 print('<%d號員工>遲到了'%id) else: # 還不到點 print('<%d號員工>瀏覽新聞中...'%id) E.wait() # 等上班鈴聲 print('<%d號員工>開始工作了...'%id) time.sleep(10) # 工作10秒后下班 print('<%d號員工>下班了'%id) def demo(): E.clear() # 設置為“未到上班時間” threads = list() for i in range(3): # 3人提前來到公司打卡 threads.append(mp.Process(target=work, args=(i, E))) threads[-1].start() time.sleep(5) # 5秒鐘后上班時間到 E.set() time.sleep(5) # 5秒鐘后,大佬(9號)到 threads.append(mp.Process(target=work, args=(9, E))) threads[-1].start() for t in threads: t.join() print('都下班了,關燈關門走人') if __name__ == '__main__': demo()
4.4 條件 Condition
兩位小朋友,Hider 和 Seeker,打算玩一個捉迷藏的游戲,規則是這樣的:Seeker 先找個眼罩把眼蒙住,喊一聲“我已經蒙上眼了”;聽到消息后,Hider 就找地方藏起來,藏好以后,也要喊一聲“我藏好了,你來找我吧”;Seeker 聽到后,也要回應一聲“我來了”,捉迷藏正式開始。各自隨機等了一段時間后,兩位小朋友都憋住了跑了出來。誰先跑出來,就算誰輸。
# -*- coding: utf-8 -*- import time import multiprocessing as mp import random cond = mp.Condition() # 創建條件對象 draw = mp.Array('i', [0,0]) # [Seeker小朋友認輸, Hider小朋友認輸] def seeker(cond, draw): """Seeker小朋友的進程函數""" global draw_Seeker, draw_Hidwer time.sleep(1) # 確保Hider小朋友已經進入消息等待狀態 cond.acquire() # 阻塞時請求資源 time.sleep(random.random()) # 假裝蒙眼需要花費時間 print('Seeker: 我已經蒙上眼了') cond.notify() # 把消息通知到Hider小朋友 cond.wait() # 釋放資源并等待Hider小朋友已經藏好的消息 print('Seeker: 我來了') # 收到Hider小朋友已經藏好的消息后 cond.notify() # 把消息通知到Hider小朋友 cond.release() # 不要再聽消息了,徹底釋放資源 time.sleep(random.randint(3,10)) # Seeker小朋友的耐心只有3-10秒鐘 if draw[1]: print('Seeker: 哈哈,我找到你了,我贏了') else: draw[0] = True print('Seeker: 算了,我找不到你,我認輸啦') def hider(cond, draw): """Hider小朋友的進程函數""" global draw_Seeker, draw_Hidwer cond.acquire() # 阻塞時請求資源 cond.wait() # 如果先于Seeker小朋友請求到資源,則立刻釋放并等待 time.sleep(random.random()) # 假裝找地方躲藏需要花費時間 print('Hider: 我藏好了,你來找我吧') cond.notify() # 把消息通知到Seeker小朋友 cond.wait() # 釋放資源并等待Seeker小朋友開始找人的消息 cond.release() # 不要再聽消息了,徹底釋放資源 time.sleep(random.randint(3,10)) # Hider小朋友的耐心只有3-10秒鐘 if draw[0]: print('Hider: 哈哈,你沒找到我,我贏了') else: draw[1] = True print('Hider: 算了,這里太悶了,我認輸,自己出來吧') def demo(): p_seeker = mp.Process(target=seeker, args=(cond, draw)) p_hider = mp.Process(target=hider, args=(cond, draw)) p_seeker.start() p_hider.start() p_seeker.join() p_hider.join() if __name__ == '__main__': demo()
5. 進程池
和線程一樣,處理并行任務時,處理效率和進程數量并不總是成正比——當進程數量超過一定限度后,完成任務所需時間反倒會延長。進程池提供了一個保持合理進程數量的方案,但合理進程數量則需要根據硬件狀況及運行狀況來確定。
multiprocessing.Pool(n) 用于創建n個進程的進程池供用戶調用。如果進程池任務不滿,則新的進程請求會被立即執行;如果進程池任務已滿,則新的請求將等待至有可用進程才被執行。向進程池提交任務有兩種方式:
apply_async(func[, args[, kwds[, callback]]])
非阻塞式提交。即使進程池已滿,也會接受新的任務,不會阻塞主進程。新任務將處于等待狀態。
apply(func[, args[, kwds]])
阻塞式提交。若進程池已滿,則主進程阻塞,直至有空閑進成可用
5.1 典型應用
下面的代碼,演示了進程池的典型用法。讀者可自行嘗試阻塞式提交和非阻塞式提交兩種方法的差異。
# -*- coding: utf-8 -*- import time import multiprocessing as mp def power(x, a=2): """進程函數:冪函數""" time.sleep(1) print('%d的%d次方等于%d'%(x, a, pow(x, a))) def demo(): mpp = mp.Pool(processes=4) for item in [2,3,4,5,6,7,8,9]: mpp.apply_async(power, (item, )) # 非阻塞提交新任務 #mpp.apply(power, (item, )) # 阻塞提交新任務 mpp.close() # 關閉進程池,意味著不再接受新的任務 print('主進程走到這里,正在等待子進程結束') mpp.join() # 等待所有子進程結束 print('程序結束') if __name__ == '__main__': demo()
5.2 Map & Reduce
MapReduce 是一種用于大規模數據集的并行運算編程模型,分為 Map(映射)和 Reduce(歸約)兩個步驟。Py2時代,map() 和 reduce() 都是標準函數。不知為何,Py3 把 reduce() 藏到了標準模塊 functools 中,只保留了 map() 在標準函數庫中。
進程池對象 Pool 自帶 map() 方法,遺憾的是沒有提供 reduce() 方法。沒關系,我們可以借用 Python 標準庫 functools 模塊中的 reduce(),實現完整的 Map & Reduce 的數據處理。下面以計算整數列表各元素的平方和為例,演示了 Map 和 Reduce 的用法。
# -*- coding: utf-8 -*- import time from functools import reduce import multiprocessing as mp def power(x, a=2): """進程函數:冪函數""" time.sleep(0.5) return pow(x, a) if __name__ == '__main__': #mp.freeze_support() # 如果亂彈窗口,請放開注釋 with mp.Pool(processes=8) as mpp: print(reduce(lambda result,x:result+x, mpp.map(power, range(100)), 0))
Python 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。