Docker原理解讀
1196
2025-03-31
1. 前言
前些日子寫過幾篇關(guān)于線程和進(jìn)程的文章,概要介紹了Python內(nèi)置的線程模塊(threading)和進(jìn)程模塊(multiprocessing)的使用方法,側(cè)重點(diǎn)是線程間同步和進(jìn)程間同步。隨后,陸續(xù)收到了不少讀者的私信,咨詢進(jìn)程、線程和協(xié)程的使用方法,進(jìn)程、線程和協(xié)程分別適用于何種應(yīng)用場景,以及混合使用進(jìn)程、線程和協(xié)程的技巧。歸納起來,核心的問題大致有以下幾個(gè):
使用線程是為了并行還是加速?
為什么我使用多線程之后,處理速度并沒有預(yù)期的快,甚至更慢了?
我應(yīng)該選擇多進(jìn)程處理還是多線程處理?
協(xié)程和線程有什么不同?
什么情況下使用協(xié)程?
在進(jìn)程、線程和協(xié)程的使用上,初學(xué)者之所以感到困惑,最主要的原因是對任務(wù)的理解不到位。任務(wù)是由一個(gè)進(jìn)程、或者線程、或者協(xié)程獨(dú)立完成的、相對獨(dú)立的一系列工作組合。通常,我們會(huì)把任務(wù)寫成一個(gè)函數(shù)。任務(wù)有3種類型:
計(jì)算密集型任務(wù):任務(wù)包含大量計(jì)算,CPU占用率高
IO密集型任務(wù):任務(wù)包含頻繁的、持續(xù)的網(wǎng)絡(luò)IO和磁盤IO
混合型任務(wù):既有計(jì)算也有IO
也有觀點(diǎn)認(rèn)為還有一種數(shù)據(jù)密集型任務(wù),但我認(rèn)為數(shù)據(jù)密集型任務(wù)一般出現(xiàn)在分布式系統(tǒng)或異構(gòu)系統(tǒng)上,必定伴隨著計(jì)算密集和IO密集,因此,仍然可以歸類到混合型任務(wù)。
下面,我們就以幾個(gè)實(shí)例來講解演示進(jìn)程、線程和協(xié)程的適用場景、使用方法,以及如何優(yōu)化我們的代碼。
2. 線程
2.1 線程的最大意義在于多任務(wù)并行
通常,代碼是單線程順序執(zhí)行的,這個(gè)線程就是主線程。僅有主線程的話,在同一時(shí)刻就只能做一件事情;如果有多件事情要做,那也只能做完一件再去做另一件。這有點(diǎn)類似于過去的說書藝人,情節(jié)人物復(fù)雜時(shí),只能“花開兩朵,各表一枝”。下面這個(gè)題目,就是一個(gè)需要同時(shí)做兩件事情的例子。
請寫一段代碼,提示用戶從鍵盤輸入任意字符,然后等待用戶輸入。如果用戶在10秒鐘完成輸入(按回車鍵),則顯示輸入內(nèi)容并結(jié)束程序;否則,不再等待用戶輸入,而是直接提示超時(shí)并結(jié)束程序。
我們知道,input()函數(shù)用于從鍵盤接收輸入,time.sleep()函數(shù)可以令程序停止運(yùn)行指定的時(shí)長。不過,在等待鍵盤輸入的時(shí)候,sleep()函數(shù)就無法計(jì)時(shí),而在休眠的時(shí)候,input()函數(shù)就無法接收鍵盤輸入。不借助于線程,我們無法同時(shí)做這兩件事情。如果使用線程技術(shù)的話,我們可以在主線程中接收鍵盤輸入,在子線程中啟動(dòng)sleep()函數(shù),一旦休眠結(jié)束,子線程就殺掉主線程,結(jié)束程序。
import os, time import threading def monitor(): time.sleep(10) print('\n超時(shí)退出!') os._exit(0) m = threading.Thread(target=monitor) m.setDaemon(True) m.start() s = input('請輸入>>>') print('接收到鍵盤輸入:%s'%s) print('程序正常結(jié)束。')
2.2 使用線程處理IO密集型任務(wù)
假如從100個(gè)網(wǎng)站抓取數(shù)據(jù),使用單線程的話,就需要逐一請求這100個(gè)站點(diǎn)并處理應(yīng)答結(jié)果,所花費(fèi)時(shí)間就是每個(gè)站點(diǎn)花費(fèi)時(shí)間的總和。如果使用多個(gè)線程來實(shí)現(xiàn)的話,結(jié)果會(huì)怎樣呢?
import time import requests import threading urls = ['https://www.baidu.com', 'https://cn.bing.com'] def get_html(n): for i in range(n): url = urls[i%2] resp = requests.get(url) #print(resp.ok, url) t0 = time.time() get_html(100) # 請求100次 t1 = time.time() print('1個(gè)線程請求100次,耗時(shí)%0.3f秒鐘'%(t1-t0)) for n_thread in (2,5,10,20,50): t0 = time.time() ths = list() for i in range(n_thread): ths.append(threading.Thread(target=get_html, args=(100//n_thread,))) ths[-1].setDaemon(True) ths[-1].start() for i in range(n_thread): ths[i].join() t1 = time.time() print('%d個(gè)線程請求100次,耗時(shí)%0.3f秒鐘'%(n_thread, t1-t0))
上面的代碼用百度和必應(yīng)兩個(gè)網(wǎng)站來模擬100個(gè)站點(diǎn),運(yùn)行結(jié)果如下所示。單線程處理大約需要30秒鐘。分別使用2、5、10個(gè)線程來處理的話,所耗時(shí)間與線程數(shù)量基本上保持反比關(guān)系。當(dāng)線程數(shù)量繼續(xù)增加20個(gè)時(shí),速度不再有顯著提升。若將線程數(shù)量增至50個(gè),時(shí)間消耗反倒略有增加。
1個(gè)線程請求100次,耗時(shí)30.089秒鐘
2個(gè)線程請求100次,耗時(shí)15.087秒鐘
5個(gè)線程請求100次,耗時(shí)7.803秒鐘
10個(gè)線程請求100次,耗時(shí)4.112秒鐘
20個(gè)線程請求100次,耗時(shí)3.160秒鐘
50個(gè)線程請求100次,耗時(shí)3.564秒鐘
這個(gè)結(jié)果表明,對于IO密集型(本例僅測試網(wǎng)絡(luò)IO,沒有磁盤IO)的任務(wù),適量的線程可以在一定程度上提高處理速度。隨著線程數(shù)量的增加,速度的提升不再明顯。
2.3 使用線程處理計(jì)算密集型任務(wù)
對于曝光不足或明暗變化劇烈的照片可以通過算法來修正。下圖左是一張落日圖,因?yàn)樘柟饩€較強(qiáng)導(dǎo)致暗區(qū)細(xì)節(jié)無法辨識,通過低端增強(qiáng)算法可以還原為下圖右的樣子。
低端增強(qiáng)算法(也有人叫做伽馬矯正)其實(shí)很簡單:對于
[
0
,
255
]
[0,255]
[0,255]區(qū)間內(nèi)的灰度值
v
0
v_0
v0 ,指定矯正系數(shù)
γ
\gamma
γ,使用下面的公式,即可得到矯正后的灰度值
v
1
v_1
v1 ,其中
γ
\gamma
γ一般選擇2或者3,上圖右就是
γ
\gamma
γ為3的效果。
v
1
=
255
×
(
v
0
255
)
1
γ
v_1 = 255\times(\frac{v_0}{255})^{\frac{1}{\gamma}}
v1 =255×(255v0 )γ1
下面的代碼,對于一張分辨率為4088x2752的照片實(shí)施低端增強(qiáng)算法,這是一項(xiàng)計(jì)算密集型的任務(wù)。代碼中分別使用了廣播和矢量計(jì)算、單線程逐像素計(jì)算、多線程逐像素計(jì)算等三種方法,以驗(yàn)證多線程對于計(jì)算密集型任務(wù)是否有提速效果。
import time import cv2 import numpy as np import threading def gamma_adjust_np(im, gamma, out_file): """伽馬增強(qiáng)函數(shù):使用廣播和矢量化計(jì)算""" out = (np.power(im.astype(np.float32)/255, 1/gamma)*255).astype(np.uint8) cv2.imwrite(out_file, out) def gamma_adjust_py(im, gamma, out_file): """伽馬增強(qiáng)函數(shù):使用循環(huán)逐像素計(jì)算""" rows, cols = im.shape out = im.astype(np.float32) for i in range(rows): for j in range(cols): out[i,j] = pow(out[i,j]/255, 1/3)*255 cv2.imwrite(out_file, out.astype(np.uint8)) im = cv2.imread('river.jpg', cv2.IMREAD_GRAYSCALE) rows, cols = im.shape print('照片分辨率為%dx%d'%(cols, rows)) t0 = time.time() gamma_adjust_np(im, 3, 'river_3.jpg') t1 = time.time() print('借助NumPy廣播特性,耗時(shí)%0.3f秒鐘'%(t1-t0)) t0 = time.time() im_3 = gamma_adjust_py(im, 3, 'river_3_cycle.jpg') t1 = time.time() print('單線程逐像素處理,耗時(shí)%0.3f秒鐘'%(t1-t0)) t0 = time.time() th_1 = threading.Thread(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg')) th_1.setDaemon(True) th_1.start() th_2 = threading.Thread(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg')) th_2.setDaemon(True) th_2.start() th_1.join() th_2.join() t1 = time.time() print('啟用兩個(gè)線程逐像素處理,耗時(shí)%0.3f秒鐘'%(t1-t0))
運(yùn)行結(jié)果如下:
照片分辨率為4088x2752
借助NumPy廣播特性,耗時(shí)0.381秒鐘
單線程逐像素處理,耗時(shí)34.228秒鐘
啟用兩個(gè)線程逐像素處理,耗時(shí)36.087秒鐘
結(jié)果顯示,對一張千萬級像素的照片做低端增強(qiáng),借助于NumPy的廣播和矢量化計(jì)算,耗時(shí)0.38秒鐘;單線程逐像素處理的話,耗時(shí)相當(dāng)于NumPy的100倍;啟用多線程的話,速度不僅沒有加快,反倒是比單線程更慢。這說明,對于計(jì)算密集型的任務(wù)來說,多線程并不能提高處理速度,相反,因?yàn)橐獎(jiǎng)?chuàng)建和管理線程,處理速度會(huì)更慢一些。
2.4 線程池
盡管多線程可以并行處理多個(gè)任務(wù),但開啟線程不僅花費(fèi)時(shí)間,也需要占用系統(tǒng)資源。因此,線程數(shù)量不是越多越快,而是要保持在合理的水平上。線程池可以讓我們用固定數(shù)量的線程完成比線程數(shù)量多得多的任務(wù)。下面的代碼演示了使用 Python 的標(biāo)準(zhǔn)模塊創(chuàng)建線程池,計(jì)算多個(gè)數(shù)值的平方。
>>> from concurrent.futures import ThreadPoolExecutor >>> def pow2(x): return x*x >>> with ThreadPoolExecutor(max_workers=4) as pool: # 4個(gè)線程的線程池 result = pool.map(pow2, range(10)) # 使用4個(gè)線程分別計(jì)算0~9的平方 >>> list(result) # result是一個(gè)生成器,轉(zhuǎn)成列表才可以直觀地看到計(jì)算結(jié)果 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
如果每個(gè)線程的任務(wù)各不相同,使用不同的線程函數(shù),任務(wù)結(jié)束后的結(jié)果處理也不一樣,同樣可以使用這個(gè)線程池。下面的代碼對多個(gè)數(shù)值中的奇數(shù)做平方運(yùn)算,偶數(shù)做立方運(yùn)算,線程任務(wù)結(jié)束后,打印各自的計(jì)算結(jié)果。
>>> from concurrent.futures import ThreadPoolExecutor >>> def pow2(x): return x*x >>> def pow3(x): return x*x*x >>> def save_result(task): # 保存線程計(jì)算結(jié)果 global result result.append(task.result()) >>> result = list() >>> with ThreadPoolExecutor(max_workers=3) as pool: for i in range(10): if i%2: # 奇數(shù)做平方運(yùn)算 task = pool.submit(pow2, i) else: # 偶數(shù)做立方運(yùn)算 task = pool.submit(pow3, i) task.add_done_callback(save_result) # 為每個(gè)線程指定結(jié)束后的回調(diào)函數(shù) >>> result [0, 1, 8, 9, 64, 25, 216, 49, 512, 81]
3. 進(jìn)程
3.1 使用進(jìn)程處理計(jì)算密集型任務(wù)
和線程相比,進(jìn)程的最大優(yōu)勢是可以充分例用計(jì)算資源——這一點(diǎn)不難理解,因?yàn)椴煌倪M(jìn)程可以運(yùn)行在不同CPU的不同的核上。假如一臺計(jì)算機(jī)的CPU共有16核,則可以啟動(dòng)16個(gè)或更多個(gè)進(jìn)程來并行處理任務(wù)。對于上面的例子,我們改用進(jìn)程來處理,效果會(huì)怎樣呢?
import time import cv2 import numpy as np import multiprocessing as mp def gamma_adjust_py(im, gamma, out_file): """伽馬增強(qiáng)函數(shù):使用循環(huán)逐像素計(jì)算""" rows, cols = im.shape out = im.astype(np.float32) for i in range(rows): for j in range(cols): out[i,j] = pow(out[i,j]/255, 1/3)*255 cv2.imwrite(out_file, out.astype(np.uint8)) if __name__ == '__main__': mp.freeze_support() im_fn = 'river.jpg' im = cv2.imread(im_fn, cv2.IMREAD_GRAYSCALE) rows, cols = im.shape print('照片分辨率為%dx%d'%(cols, rows)) t0 = time.time() pro_1 = mp.Process(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg')) pro_1.daemon = True pro_1.start() pro_2 = mp.Process(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg')) pro_2.daemon = True pro_2.start() pro_1.join() pro_2.join() t1 = time.time() print('啟用兩個(gè)進(jìn)程逐像素處理,耗時(shí)%0.3f秒鐘'%(t1-t0))
運(yùn)行結(jié)果如下:
照片分辨率為4088x2752
啟用兩個(gè)進(jìn)程逐像素處理,耗時(shí)17.786秒鐘
使用單個(gè)線程或兩個(gè)線程的時(shí)候,耗時(shí)大約30+秒,改用兩個(gè)進(jìn)程后,耗時(shí)17.786秒,差不多快了一倍。如果使用4個(gè)進(jìn)程(前提是運(yùn)行代碼的計(jì)算機(jī)至少有4個(gè)CPU核)的話,速度還能提高一倍,有興趣的朋友可以試一下。這個(gè)測試表明,對于計(jì)算密集型的任務(wù),使用多進(jìn)程并行處理是有效的提速手段。通常,進(jìn)程數(shù)量選擇CPU核數(shù)的整倍數(shù)。
3.2 進(jìn)程間通信示例
多進(jìn)程并行彌補(bǔ)了多線程技術(shù)的不足,我們可以在每一顆 CPU 上,或多核 CPU 的每一個(gè)核上啟動(dòng)一個(gè)進(jìn)程。如果有必要,還可以在每個(gè)進(jìn)程內(nèi)再創(chuàng)建適量的線程,最大限度地使用計(jì)算資源來解決問題。不過,進(jìn)程技術(shù)也有很大的局限性,因?yàn)檫M(jìn)程不在同一塊內(nèi)存區(qū)域內(nèi),所以和線程相比,進(jìn)程間的資源共享、通信、同步等都要麻煩得多,受到的限制也更多。
我們知道,線程間通信可以使用隊(duì)列、互斥鎖、信號量、事件和條件等多種同步方式,同樣的,這些手段也可以應(yīng)用在進(jìn)程間。此外,multiprocessing 模塊還提供了管道和共享內(nèi)存等進(jìn)程間通信的手段。下面僅演示一個(gè)進(jìn)程間使用隊(duì)列通信,更多的通信方式請參考由人民郵電出版社出版的拙著《Python高手修煉之道》。
這段代碼演示了典型的生產(chǎn)者—消費(fèi)者模式。進(jìn)程 A 負(fù)責(zé)隨機(jī)地往地上“撒錢”(寫隊(duì)列),進(jìn)程 B 負(fù)責(zé)從地上“撿錢”(讀隊(duì)列)。
import os, time, random import multiprocessing as mp def sub_process_A(q): """A進(jìn)程函數(shù):生成數(shù)據(jù)""" while True: time.sleep(5*random.random()) # 在0-5秒之間隨機(jī)延時(shí) q.put(random.randint(10,100)) # 隨機(jī)生成[10,100]之間的整數(shù) def sub_process_B(q): """B進(jìn)程函數(shù):使用數(shù)據(jù)""" words = ['哈哈,', '天哪!', 'My God!', '咦,天上掉餡餅了?'] while True: print('%s撿到了%d塊錢!'%(words[random.randint(0,3)], q.get())) if __name__ == '__main__': print('主進(jìn)程(%s)開始,按回車鍵結(jié)束本程序'%os.getpid()) q = mp.Queue(10) p_a = mp.Process(target=sub_process_A, args=(q,)) p_a.daemon = True p_a.start() p_b = mp.Process(target=sub_process_B, args=(q,)) p_b.daemon = True p_b.start() input()
3.3 進(jìn)程池
使用多進(jìn)程并行處理任務(wù)時(shí),處理效率和進(jìn)程數(shù)量并不總是成正比。當(dāng)進(jìn)程數(shù)量超過一定限度后,完成任務(wù)所需時(shí)間反而會(huì)延長。進(jìn)程池提供了一個(gè)保持合理進(jìn)程數(shù)量的方案,但合理進(jìn)程數(shù)量需要根據(jù)硬件狀況及運(yùn)行狀況來確定,通常設(shè)置為 CPU 的核數(shù)。
multiprocessing.Pool(n) 可創(chuàng)建 n 個(gè)進(jìn)程的進(jìn)程池供用戶調(diào)用。如果進(jìn)程池任務(wù)不滿,則新的進(jìn)程請求會(huì)被立即執(zhí)行;如果進(jìn)程池任務(wù)已滿,則新的請求將等待至有可用進(jìn)程時(shí)才被執(zhí)行。向進(jìn)程池提交任務(wù)有以下兩種方式。
apply_async(func[, args[, kwds[, callback]]]) :非阻塞式提交。即使進(jìn)程池已滿,也會(huì)接受新的任務(wù),不會(huì)阻塞主進(jìn)程。新任務(wù)將處于等待狀態(tài)。
apply(func[, args[, kwds]]) :阻塞式提交。若進(jìn)程池已滿,則主進(jìn)程阻塞,直至有空閑進(jìn)程可以使用。
下面的代碼演示了進(jìn)程池的典型用法。讀者可自行嘗試阻塞式提交和非阻塞式提交兩種方法的差異。
import time import multiprocessing as mp def power(x, a=2): """進(jìn)程函數(shù):冪函數(shù)""" time.sleep(1) print('%d的%d次方等于%d'%(x, a, pow(x, a))) def demo(): mpp = mp.Pool(processes=4) for item in [2,3,4,5,6,7,8,9]: mpp.apply_async(power, (item, )) # 非阻塞提交新任務(wù) #mpp.apply(power, (item, )) # 阻塞提交新任務(wù) mpp.close() # 關(guān)閉進(jìn)程池,意味著不再接受新的任務(wù) print('主進(jìn)程走到這里,正在等待子進(jìn)程結(jié)束') mpp.join() # 等待所有子進(jìn)程結(jié)束 print('程序結(jié)束') if __name__ == '__main__': demo()
4. 協(xié)程
4.1 協(xié)程和線程的區(qū)別
如前文所述,線程常用于多任務(wù)并行。對于可以切分的IO密集型任務(wù),將切分的每一小塊任務(wù)分配給一個(gè)線程,可以顯著提高處理速度。而協(xié)程,無論有多少個(gè),都被限定在一個(gè)線程內(nèi)執(zhí)行,因此,協(xié)程又被稱為微線程。
從宏觀上看,線程任務(wù)和協(xié)程任務(wù)都是并行的。從微觀上看,線程任務(wù)是分時(shí)切片輪流執(zhí)行的,這種切換是系統(tǒng)自動(dòng)完成的,無需程序員干預(yù);而協(xié)程則是根據(jù)任務(wù)特點(diǎn),在任務(wù)阻塞時(shí)將控制權(quán)交給其他協(xié)程,這個(gè)權(quán)力交接的時(shí)機(jī)和位置,由程序員指定。由此可以看出,參與協(xié)程管理的每一個(gè)任務(wù),必須存在阻塞的可能,且阻塞條件會(huì)被其它任務(wù)破壞,從而得以在阻塞解除后繼續(xù)執(zhí)行。
盡管協(xié)程難以駕馭,但是由于是在一個(gè)線程內(nèi)運(yùn)行,免除了線程或進(jìn)程的切換開銷,因而協(xié)程的運(yùn)行效率高,在特定場合下仍然被廣泛使用。
4.2 協(xié)程演進(jìn)史
Py2時(shí)代,Python并不支持協(xié)程,僅可通過yield實(shí)現(xiàn)部分的協(xié)程功能。另外,還可以通過gevent等第三方庫實(shí)現(xiàn)協(xié)程。gevent最好玩的,莫過于monkey_patch(猴子補(bǔ)?。?jīng)有一段時(shí)間,我特別喜歡使用它。
從Py3.4開始,Python內(nèi)置asyncio標(biāo)準(zhǔn)庫,正式原生支持協(xié)程。asyncio的異步操作,需要在協(xié)程中通過yield from完成,協(xié)程函數(shù)則需要使用@asyncio.coroutine裝飾器。
不理解生成器的同學(xué),很難駕馭yield這個(gè)反人類思維的東西,為了更貼近人類思維,Py3.5引入了新的語法async和await,可以讓協(xié)程的代碼稍微易懂一點(diǎn)點(diǎn)。如果此前沒有接觸過協(xié)程,我建議你只學(xué)習(xí)async和await的用法就足夠了,不需要去了解早期的yield和后來的yield from。本質(zhì)上,async就是@asyncio.coroutine,await就是yield from,換個(gè)馬甲,看起來就順眼多了。
4.3 協(xié)程應(yīng)用示例
作為基礎(chǔ)知識,在介紹協(xié)程應(yīng)用示例前,先來介紹一下隊(duì)列。在進(jìn)程、線程、協(xié)程模塊中,都有隊(duì)列(Queue)對象。隊(duì)列作為進(jìn)程、線程、協(xié)程間最常用的通信方式,有一個(gè)不容忽視的特性:阻塞式讀和寫。當(dāng)隊(duì)列為空時(shí),讀會(huì)被阻塞,直到讀出數(shù)據(jù);當(dāng)隊(duì)列滿時(shí),寫會(huì)被阻塞,直到隊(duì)列空出位置后寫入成功。因?yàn)殛?duì)列具有阻塞式讀寫的特點(diǎn),正好可以在協(xié)程中利用阻塞切換其他協(xié)程任務(wù)。
我們來構(gòu)思一個(gè)這樣的應(yīng)用:某個(gè)富豪(rich)手拿一沓鈔票,隨機(jī)取出幾張,撒在地上(如果地上已經(jīng)有鈔票的話,就等有人撿走了再撒);另有名為A、B、C的三個(gè)幸運(yùn)兒(lucky),緊盯著撒錢的富豪,只要富豪把錢撒到地上,他們立刻就去撿起來。
如果用協(xié)程實(shí)現(xiàn)上述功能的話,我們可以用長度為1的協(xié)程隊(duì)列來存放富豪每一次拋撒的錢。一旦隊(duì)列中有錢(隊(duì)列滿),富豪就不能繼續(xù)拋撒了,拋撒被阻塞,協(xié)程控制權(quán)轉(zhuǎn)移。三個(gè)幸運(yùn)兒中的某一個(gè)獲得控制權(quán),就去讀隊(duì)列(撿錢),如果隊(duì)列中沒有錢(隊(duì)列空),撿錢被阻塞,協(xié)程控制權(quán)轉(zhuǎn)移。依靠隊(duì)列的阻塞和解除阻塞,一個(gè)富豪和三個(gè)幸運(yùn)兒可以順利地分配完富豪手中的鈔票。為了讓這個(gè)過程可以慢到適合觀察,可以在富豪拋錢之前,再增加一個(gè)隨機(jī)延時(shí)。當(dāng)然,這個(gè)延時(shí)不能使用time模塊的sleep()函數(shù),而是使用協(xié)程模塊asyncio的sleep()函數(shù)。下面是完整的撒錢-撿錢代碼。
import asyncio, random async def rich(q, total): """任性的富豪,隨機(jī)撒錢""" while total > 0: money = random.randint(10,100) total -= money await q.put(money) # 隨機(jī)生成[10,100]之間的整數(shù) print('富豪瀟灑地拋了%d塊錢'%money) await asyncio.sleep(3*random.random()) # 在0-3秒之間隨機(jī)延時(shí) async def lucky(q, name): """隨時(shí)可以撿到錢的幸運(yùn)兒""" while True: money = await q.get() q.task_done() print('%s撿到了%d塊錢!'%(name, money)) async def run(): q = asyncio.Queue(1) producers = [asyncio.create_task(rich(q, 300))] consumers = [asyncio.create_task(lucky(q, name)) for name in 'ABC'] await asyncio.gather(*producers,) await q.join() for c in consumers: c.cancel() if __name__ == '__main__': asyncio.run(run())
運(yùn)行結(jié)果如下:
富豪瀟灑地拋了42塊錢
A撿到了42塊錢!
富豪瀟灑地拋了97塊錢
A撿到了97塊錢!
富豪瀟灑地拋了100塊錢
B撿到了100塊錢!
富豪瀟灑地拋了35塊錢
C撿到了35塊錢!
富豪瀟灑地拋了17塊錢
A撿到了17塊錢!
富豪拋完了手中的錢,轉(zhuǎn)身離去
Python 任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。