亞寵展、全球寵物產業風向標——亞洲寵物展覽會深度解析
998
2022-05-29
目錄
理解異步編程
構建同步 Web 服務器
對編程的不同思考
Programming Parents: Not as Easy as It Looks!
Thought Experiment #1: The Synchronous Parent
Thought Experiment #2: The Polling Parent
Thought Experiment #3: The Threading Parent
在實踐中使用 Python 異步功能
同步編程
簡單的協作并發
具有阻塞調用的協作并發
具有非阻塞調用的協作并發
同步(阻塞)HTTP 調用
異步(非阻塞)HTTP 調用
結論
你聽說過 Python 中的異步編程嗎?您是否想了解更多有關 Python 異步功能以及如何在工作中使用它們的信息?也許您甚至嘗試過編寫線程程序并遇到一些問題。如果您想了解如何使用 Python 異步功能,那么您來對地方了。
在本文中,您將了解:
什么是同步程序是
什么的異步程序是
為什么要編寫異步程序
如何使用 Python 異步功能
理解異步編程
甲同步程序被執行一次一個步驟。即使有條件分支、循環和函數調用,您仍然可以從一次執行一個步驟的角度考慮代碼。每一步完成后,程序就會進入下一個步驟。
以下是兩個以這種方式工作的程序示例:
批處理程序通常創建為同步程序。您獲得一些輸入,對其進行處理并創建一些輸出。步驟一個接一個,直到程序達到所需的輸出。程序只需要注意步驟和順序。
命令行程序是在終端中運行的小型快速進程。這些腳本用于創建某些內容、將一件事轉換為另一件事、生成報告或列出一些數據。這可以表示為一系列程序步驟,這些步驟按順序執行,直到程序完成。
一個異步程序的行為不同。它仍然一次執行一個步驟。不同之處在于,系統可能不會等待執行步驟完成后再繼續執行下一個步驟。
這意味著即使上一步尚未完成并且仍在其他地方運行,程序仍將繼續執行未來的執行步驟。這也意味著程序知道在上一步完成運行時該做什么。
為什么要以這種方式編寫程序?本文的其余部分將幫助您回答這個問題,并為您提供優雅地解決有趣的異步問題所需的工具。
構建同步 Web 服務器
Web 服務器的基本工作單元或多或少與批處理相同。服務器將獲取一些輸入、處理它并創建輸出。編寫為同步程序,這將創建一個可工作的 Web 服務器。
它也將是一個絕對糟糕的網絡服務器。
為什么?在這種情況下,一個工作單元(輸入、處理、輸出)并不是唯一的目的。真正的目的是盡快處理數百甚至數千個工作單元。這可能會持續很長時間,甚至可能同時到達多個工作單元。
同步網絡服務器可以做得更好嗎?當然,您可以優化執行步驟,以便盡快處理所有進入的工作。不幸的是,這種方法存在局限性。結果可能是 Web 服務器響應速度不夠快,無法處理足夠的工作,甚至在工作堆積時超時。
注意:如果您嘗試優化上述方法,您可能會看到其他限制。其中包括網絡速度、文件 IO 速度、數據庫查詢速度以及其他連接服務的速度,僅舉幾例。這些都有一個共同點,都是IO函數。所有這些項目都比 CPU 的處理速度慢幾個數量級。
在同步程序中,如果一個執行步驟啟動了一個數據庫查詢,那么 CPU 基本上是空閑的,直到數據庫查詢返回。對于面向批處理的程序,大多數時候這不是優先事項。處理該 IO 操作的結果是目標。通常,這可能比 IO 操作本身花費更長的時間。任何優化工作都將集中在處理工作上,而不是 IO。
異步編程技術允許您的程序通過釋放 CPU 來做其他工作來利用相對較慢的 IO 進程。
對編程的不同思考
當您開始嘗試理解異步編程時,您可能會看到很多關于阻塞或編寫非阻塞代碼重要性的討論。(就我個人而言,我很難從我詢問的人和我閱讀的文檔中很好地掌握這些概念。)
什么是非阻塞代碼?就此而言,什么是阻塞代碼?這些問題的答案會幫助您編寫更好的 Web 服務器嗎?如果是這樣,你怎么能做到?讓我們一探究竟吧!
編寫異步程序要求您對編程有不同的看法。雖然這種新的思維方式可能很難讓你理解,但它也是一個有趣的練習。那是因為現實世界幾乎完全是異步的,您與之交互的方式也是如此。
想象一下:你是一位試圖同時做幾件事的父母。你必須平衡支票簿,洗衣服,并留意孩子們。不知何故,您甚至可以不假思索地同時完成所有這些事情!讓我們分解一下:
平衡支票簿是一項同步任務。一步一個腳印,直到完成。你自己做所有的工作。
但是,您可以脫離支票簿去洗衣服。您卸下干衣機,將衣服從洗衣機移到干衣機,然后在洗衣機中開始另一次裝載。
使用洗衣機和烘干機是一項同步任務,但大部分工作發生在洗衣機和烘干機啟動之后。一旦你讓他們開始,你就可以走開并回到支票簿任務。此時,洗衣機和烘干機的任務已經變得異步了。洗衣機和烘干機將獨立運行,直到蜂鳴器響起(通知您任務需要注意)。
照看孩子是另一項異步任務。一旦他們設置好并開始演奏,他們就可以在大多數情況下獨立進行。當有人需要關注時,這種情況就會發生變化,比如有人餓了或受傷了。當您的一個孩子驚慌失措時,您會做出反應。孩子們是一項具有高優先級的長期任務。觀看它們會取代您可能正在做的任何其他任務,例如支票簿或洗衣。
這些示例有助于說明阻塞和非阻塞代碼的概念。讓我們從編程的角度考慮這個問題。在這個例子中,你就像 CPU。當你搬動衣服時,你(CPU)很忙,無法做其他工作,比如平衡支票簿。但這沒關系,因為任務相對較快。
另一方面,啟動洗衣機和烘干機不會妨礙您執行其他任務。這是一個異步函數,因為您不必等待它完成。一旦開始,你就可以回到其他事情上。這稱為上下文切換:您正在做的事情的上下文發生了變化,當洗衣任務完成時,機器的蜂鳴器將在未來某個時間通知您。
作為一個人,這就是你一直工作的方式。您自然而然地同時處理多種事情,通常不假思索。作為開發人員,訣竅是如何將這種行為轉換為執行相同操作的代碼。
Programming Parents: Not as Easy as It Looks!
如果您在上面的示例中認出了自己(或您的父母),那就太好了!您在理解異步編程方面取得了長足的進步。同樣,您可以很容易地在競爭任務之間切換上下文,選擇一些任務并恢復其他任務。現在您將嘗試將此行為編程為虛擬父母!
思想實驗#1:The Synchronous Parent
您將如何創建一個父程序以完全同步的方式完成上述任務?由于看孩子是一項高優先級的任務,也許您的程序會做到這一點。父母一邊看著孩子,一邊等待可能需要他們注意的事情發生。但是,在這種情況下,沒有其他事情(例如支票簿或洗衣店)可以完成。
現在,您可以按照自己的意愿重新排列任務的優先級,但在任何給定時間都只會發生其中一項。這是同步、循序漸進的方法的結果。就像上面描述的同步 Web 服務器一樣,這可以工作,但它可能不是最好的生活方式。在孩子們睡著之前,父母將無法完成任何其他任務。所有其他任務都會在之后進行,一直持續到深夜。(幾周后,許多真正的父母可能會跳出窗外!)
思想實驗#2:The Polling Parent
如果您使用polling,那么您可以更改內容以便完成多個任務。在這種方法中,父級會定期脫離當前任務并檢查是否還有其他任務需要注意。
讓我們將輪詢間隔設為十五分鐘。現在,您的父母每十五分鐘檢查一次洗衣機、烘干機或孩子是否需要任何關注。如果沒有,那么父母可以回去處理支票簿。但是,如果這些任務中的任何一個確實需要注意,那么父母會在回到支票簿之前處理它。這個循環一直持續到輪詢循環的下一次超時。
這種方法也很有效,因為多項任務受到關注。但是,有幾個問題:
家長可能會花很多時間檢查不需要注意的東西:洗衣機和烘干機還沒有完成,孩子們不需要任何注意,除非發生了意外。
父級可能會錯過需要注意的已完成任務:例如,如果洗衣機在輪詢間隔開始時完成了它的循環,那么它在長達 15 分鐘內都不會得到任何關注!更重要的是,照看孩子被認為是最優先的任務。當事情可能發生嚴重錯誤時,他們無法忍受十五分鐘的無人注意。
您可以通過縮短輪詢間隔來解決這些問題,但現在您的父級(CPU)將花費更多時間在任務之間進行上下文切換。這是您開始達到收益遞減點的時候。(再一次,像這樣生活了幾個星期,嗯……見之前關于窗戶和跳躍的評論。)
思想實驗#3:The Threading Parent
“要是我能克隆自己就好了……” 如果你是父母,那你可能也有類似的想法!由于您正在對虛擬父母進行編程,因此您基本上可以通過使用線程來做到這一點。這是一種允許一個程序的多個部分同時運行的機制。獨立運行的每一段代碼稱為一個線程,所有線程共享相同的內存空間。
如果您將每個任務視為一個程序的一部分,那么您可以將它們分開并作為線程運行。換句話說,您可以“克隆”父對象,為每項任務創建一個實例:看孩子、監視洗衣機、監視烘干機和平衡支票簿。所有這些“克隆”都是獨立運行的。
這聽起來是一個不錯的解決方案,但這里也存在一些問題。一是您必須明確告訴每個父實例在您的程序中做什么。這可能會導致一些問題,因為所有實例共享程序空間中的所有內容。
例如,假設家長 A 正在監控烘干機。家長 A 看到衣服已經干了,便控制了烘干機并開始卸下衣服。同時,B家長看到洗衣機洗好了,便控制了洗衣機,開始脫衣服。但是,家長 B 還需要控制烘干機,以便他們可以將濕衣服放入里面。這不可能發生,因為家長 A 目前控制著烘干機。
不一會,A家長就卸完衣服了。現在他們想要控制洗衣機并開始將衣服放入空的烘干機中。這也不可能發生,因為家長 B 目前控制著洗衣機!
這兩個父母現在僵持不下。兩者都控制自己的資源并希望控制其他資源。他們將永遠等待另一個父實例釋放控制權。作為程序員,您必須編寫代碼來解決這種情況。
注意:線程程序允許您創建多個共享相同內存空間的并行執行路徑。這既是優點也是缺點。線程之間共享的任何內存都受制于一個或多個線程試圖同時使用相同的共享內存。這可能會導致數據損壞、以無效狀態讀取的數據以及通常只是雜亂無章的數據。
在線程編程中,上下文切換發生在系統控制下,而不是程序員。系統控制何時切換上下文以及何時讓線程訪問共享數據,從而更改內存使用方式的上下文。所有這些類型的問題都可以在線程代碼中管理,但是很難正確處理,并且在錯誤時很難調試。
這是線程可能引起的另一個問題。假設一個孩子受傷了,需要緊急治療。家長C被分配了看管孩子的任務,所以他們馬上帶著孩子。在緊急護理中,家長 C 需要開一張相當大的支票來支付看病的費用。
與此同時,家長 D 正在家里處理支票簿。他們不知道有這么大的支票在寫,所以當家庭支票賬戶突然透支時,他們非常驚訝!
請記住,這兩個父實例在同一個程序中工作。家庭支票帳戶是共享資源,因此您必須想辦法讓看孩子的父母通知支票簿余額的父母。否則,您需要提供某種鎖定機制,以便支票簿資源一次只能由一個父母使用,并進行更新。
在實踐中使用 Python 異步功能
現在,您將采用上述思想實驗中概述的一些方法,并將它們轉換為可運行的 Python 程序。
您可能還想設置一個Python 虛擬環境來運行代碼,這樣您就不會干擾您的系統 Python。
同步編程
第一個示例顯示了一種讓任務從隊列中檢索工作并處理該工作的有點人為的方法。Python 中的隊列是一個很好的FIFO(先進先出)數據結構。它提供了將事物放入隊列并按照插入順序再次取出它們的方法。
在這種情況下,工作是從隊列中獲取一個數字,并讓循環計數達到該數字。它在循環開始時打印到控制臺,并再次輸出總數。該程序演示了多個同步任務處理隊列中工作的一種方法。
example_1.py下面完整列出了存儲庫中命名的程序:
1import queue 2 3def task(name, work_queue): 4 if work_queue.empty(): 5 print(f"Task {name} nothing to do") 6 else: 7 while not work_queue.empty(): 8 count = work_queue.get() 9 total = 0 10 print(f"Task {name} running") 11 for x in range(count): 12 total += 1 13 print(f"Task {name} total: {total}") 14 15def main(): 16 """ 17 This is the main entry point for the program 18 """ 19 # Create the queue of work 20 work_queue = queue.Queue() 21 22 # Put some work in the queue 23 for work in [15, 10, 5, 2]: 24 work_queue.put(work) 25 26 # Create some synchronous tasks 27 tasks = [(task, "One", work_queue), (task, "Two", work_queue)] 28 29 # Run the tasks 30 for t, n, q in tasks: 31 t(n, q) 32 33if __name__ == "__main__": 34 main()
讓我們來看看每一行的作用:
第 1 行導入queue模塊。這是程序存儲任務要完成的工作的地方。
第 3 到 13 行定義了task().?此功能將工作拉出work_queue并處理工作,直到沒有更多工作要做。
第 15 行定義main()運行程序任務。
第 20 行創建了work_queue.?所有任務都使用此共享資源來檢索工作。
第 23 到 24行將工作放入work_queue.?在這種情況下,它只是要處理的任務的隨機值計數。
第 27 行創建了一個任務元組列表,其中包含這些任務將被傳遞的參數值。
第 30 到 31 行迭代任務元組列表,調用每個元組并傳遞先前定義的參數值。
第 34 行調用main()運行程序。
這個程序中的任務只是一個接受字符串和隊列作為參數的函數。執行時,它會查找隊列中的任何內容以進行處理。如果有工作要做,那么它將值從隊列中拉出,開始一個for循環以計算到該值,并在最后輸出總數。它繼續從隊列中取出工作,直到沒有任何剩余并退出。
當這個程序運行時,它會產生你在下面看到的輸出:
Task One running Task One total: 15 Task One running Task One total: 10 Task One running Task One total: 5 Task One running Task One total: 2 Task Two nothing to do
這表明它可以Task One完成所有工作。該while循環是Task One內命中task()消耗所有的隊列和處理它的工作。當該循環退出時,Task Two有機會運行。但是,它發現隊列是空的,所以Task Two打印了一條語句,說它沒有任何事情可做,然后退出。沒有什么代碼同時允許Task One和Task Two切換背景和工作在一起。
簡單的協作并發
該程序的下一個版本允許這兩個任務一起工作。添加yield語句意味著循環將在指定點產生控制權,同時仍保持其上下文。這樣,yield 任務可以稍后重新啟動。
該yield語句變成task()了一個生成器。生成器函數就像 Python 中的任何其他函數一樣被調用,但是當yield執行語句時,控制權會返回給函數的調用者。這本質上是一個上下文切換,因為控制從生成器函數轉移到調用者。
有趣的是,可以通過調用生成器將控制權交還給生成器函數next()。這是一個返回到生成器函數的上下文切換,它使用在仍然完整的之前定義的所有函數變量來執行yield。
該while環路main()利用了這一點時,它調用next(t)。此語句會在任務之前產生的位置重新啟動任務。所有這一切意味著您在上下文切換發生時處于控制之中:當yield語句在task().
這是協作多任務處理的一種形式。該程序正在放棄對其當前上下文的控制,以便其他內容可以運行。在這種情況下,它允許while循環main()運行task()作為生成器函數的兩個實例。每個實例消耗來自同一個隊列的工作。這有點聰明,但要獲得與第一個程序相同的結果也需要做很多工作。該程序example_2.py演示了這種簡單的并發性,如下所示:
1import queue 2 3def task(name, queue): 4 while not queue.empty(): 5 count = queue.get() 6 total = 0 7 print(f"Task {name} running") 8 for x in range(count): 9 total += 1 10 yield 11 print(f"Task {name} total: {total}") 12 13def main(): 14 """ 15 This is the main entry point for the program 16 """ 17 # Create the queue of work 18 work_queue = queue.Queue() 19 20 # Put some work in the queue 21 for work in [15, 10, 5, 2]: 22 work_queue.put(work) 23 24 # Create some tasks 25 tasks = [task("One", work_queue), task("Two", work_queue)] 26 27 # Run the tasks 28 done = False 29 while not done: 30 for t in tasks: 31 try: 32 next(t) 33 except StopIteration: 34 tasks.remove(t) 35 if len(tasks) == 0: 36 done = True 37 38if __name__ == "__main__": 39 main()
這是上面代碼中發生的事情:
第 3 行到第 11 行定義task()同前,但第yield10 行的添加將函數變成了生成器。這是進行上下文切換并將控制權交還給 中的while循環的地方main()。
第 25 行創建了任務列表,但與您在前面的示例代碼中看到的方式略有不同。在這種情況下,每個任務都使用其在tasks列表變量中輸入的參數進行調用。這是task()第一次運行生成器函數所必需的。
第 31 到 36 行是對while循環的修改main(),允許task()協同運行。這是控制返回到每個實例的地方,task()當它讓步時,允許循環繼續并運行另一個任務。
第 32行將控制權交還給task(),并在yield調用點之后繼續執行。
第 36 行設置done變量。while當所有任務都完成并從 中刪除時,循環結束tasks。
這是運行此程序時產生的輸出:
Task One running Task Two running Task Two total: 10 Task Two running Task One total: 15 Task One running Task Two total: 5 Task One total: 2
您可以看到Task One和Task Two都在運行并消耗隊列中的工作。這就是預期的目的,因為兩個任務都在處理工作,每個任務負責隊列中的兩個項目。這很有趣,但同樣,要實現這些結果需要做很多工作。
這里的技巧是使用yield語句,它變成task()一個生成器并執行上下文切換。程序使用這個上下文切換來控制 中的while循環main(),允許一個任務的兩個實例協同運行。
請注意如何Task Two首先輸出其總數。這可能會讓您認為任務是異步運行的。但是,這仍然是一個同步程序。它的結構使這兩個任務可以來回交換上下文。首先Task Two輸出它的總數的原因是它只計數到 10,而Task One計數到 15。Task Two只是首先到達它的總數,所以它可以在 之前將其輸出打印到控制臺Task One。
注意:此后的所有示例代碼都使用一個名為codetiming的模塊來計時并輸出代碼段執行所需的時間。有一個偉大的文章,這里就RealPython是進入有關codetiming模塊以及如何使用它的深度。
該模塊是 Python Package Index 的一部分,由Real Python團隊成員Geir Arne Hjelle構建。Geir Arne 對我審閱和建議本文的內容有很大幫助。如果您正在編寫需要包含計時功能的代碼,Geir Arne 的 codetiming 模塊非常值得一看。
要使代碼定時模塊可用于后面的示例,您需要安裝它。這可以pip使用以下命令完成:pip install codetiming,或使用以下命令:pip install -r requirements.txt。該requirements.txt文件是示例代碼存儲庫的一部分。
具有阻塞調用的協作并發
程序的下一個版本與上一個版本相同,只是time.sleep(delay)在任務循環的主體中添加了 a?。這會根據從工作隊列中檢索到的值向任務循環的每次迭代添加延遲。延遲模擬任務中發生的阻塞調用的效果。
阻塞調用是在一段時間內阻止 CPU 執行任何其他操作的代碼。在上面的思想實驗中,如果父母在支票簿完成之前無法擺脫平衡,那將是一個阻塞調用。
time.sleep(delay)?在這個例子中做同樣的事情,因為 CPU 不能做任何其他事情,只能等待延遲到期。
1import time 2import queue 3from codetiming import Timer 4 5def task(name, queue): 6 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") 7 while not queue.empty(): 8 delay = queue.get() 9 print(f"Task {name} running") 10 timer.start() 11 time.sleep(delay) 12 timer.stop() 13 yield 14 15def main(): 16 """ 17 This is the main entry point for the program 18 """ 19 # Create the queue of work 20 work_queue = queue.Queue() 21 22 # Put some work in the queue 23 for work in [15, 10, 5, 2]: 24 work_queue.put(work) 25 26 tasks = [task("One", work_queue), task("Two", work_queue)] 27 28 # Run the tasks 29 done = False 30 with Timer(text="\nTotal elapsed time: {:.1f}"): 31 while not done: 32 for t in tasks: 33 try: 34 next(t) 35 except StopIteration: 36 tasks.remove(t) 37 if len(tasks) == 0: 38 done = True 39 40if __name__ == "__main__": 41 main()
下面是上面代碼的不同之處:
第 1 行導入time模塊以允許程序訪問time.sleep().
第 3 行Timer從codetiming模塊中導入代碼。
第 6 行創建了Timer用于測量任務循環每次迭代所用時間的實例。
第 10 行啟動timer實例
第 11 行更改task()為包含time.sleep(delay)模擬 IO 延遲的 a。這取代了for在 中進行計數的循環example_1.py。
第 12 行停止timer實例并輸出自timer.start()調用以來經過的時間。
第 30 行創建了一個Timer上下文管理器,它將輸出整個 while 循環執行所花費的時間。
運行此程序時,您將看到以下輸出:
Task One running Task One elapsed time: 15.0 Task Two running Task Two elapsed time: 10.0 Task One running Task One elapsed time: 5.0 Task Two running Task Two elapsed time: 2.0 Total elapsed time: 32.0
和以前一樣,Task One和Task Two都在運行,消耗隊列中的工作并處理它。然而,即使增加了延遲,您也可以看到協作并發并沒有給您帶來任何好處。延遲會停止整個程序的處理,CPU只是等待IO延遲結束。
這正是 Python 異步文檔中阻塞代碼的含義。您會注意到運行整個程序所花費的時間只是所有延遲的累積時間。以這種方式運行任務并不是一種勝利。
具有非阻塞調用的協作并發
該程序的下一個版本已進行了相當多的修改。它使用Python 3 中提供的asyncio/await來使用 Python 異步功能。
的time和queue模塊已被替換的asyncio包。這使您的程序可以訪問異步友好(非阻塞)睡眠和隊列功能。通過在第 4 行task()添加async前綴將其定義為異步。這向 Python 表明該函數將是異步的。
另一個重大變化是刪除time.sleep(delay)andyield語句,并將它們替換為await asyncio.sleep(delay).?這會創建一個非阻塞延遲,它將執行上下文切換回調用者main()。
while里面的循環main()不再存在。取而代之的是task_array對 的調用await asyncio.gather(...)。這說明了asyncio兩件事:
創建兩個任務task()并開始運行它們。
等待這兩個完成后再繼續。
程序的最后一行asyncio.run(main())運行main()。這會創建所謂的事件循環)。正是這個循環將運行main(),它將依次運行 的兩個實例task()。
事件循環是 Python 異步系統的核心。它運行所有代碼,包括main().?當任務代碼正在執行時,CPU 正忙于工作。當到達await關鍵字時,發生上下文切換,并且控制權返回到事件循環。事件循環查看等待事件(在本例中為asyncio.sleep(delay)超時)的所有任務,并將控制權傳遞給事件準備就緒的任務。
await asyncio.sleep(delay)就 CPU 而言是非阻塞的。CPU 不是等待延遲超時,而是在事件循環任務隊列上注冊睡眠事件,并通過將控制權交給事件循環來執行上下文切換。事件循環不斷尋找已完成的事件,并將控制權交還給等待該事件的任務。通過這種方式,CPU 可以在有工作的情況下保持忙碌,而事件循環會監視將來會發生的事件。
注意:異步程序在單個執行線程中運行。從一段代碼到另一段會影響數據的上下文切換完全在您的控制之下。這意味著您可以在進行上下文切換之前將所有共享內存數據訪問原子化并完成。這簡化了線程代碼中固有的共享內存問題。
該example_4.py代碼是下面列出:
1import asyncio 2from codetiming import Timer 3 4async def task(name, work_queue): 5 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") 6 while not work_queue.empty(): 7 delay = await work_queue.get() 8 print(f"Task {name} running") 9 timer.start() 10 await asyncio.sleep(delay) 11 timer.stop() 12 13async def main(): 14 """ 15 This is the main entry point for the program 16 """ 17 # Create the queue of work 18 work_queue = asyncio.Queue() 19 20 # Put some work in the queue 21 for work in [15, 10, 5, 2]: 22 await work_queue.put(work) 23 24 # Run the tasks 25 with Timer(text="\nTotal elapsed time: {:.1f}"): 26 await asyncio.gather( 27 asyncio.create_task(task("One", work_queue)), 28 asyncio.create_task(task("Two", work_queue)), 29 ) 30 31if __name__ == "__main__": 32 asyncio.run(main())
以下是該程序與 之間的不同之處example_3.py:
第 1 行導入asyncio以訪問 Python 異步功能。這將取代time導入。
第 2 行Timer從codetiming模塊導入代碼。
第 4 行顯示了async在task()定義前添加的關鍵字。這通知task可以異步運行的程序。
第 5 行創建了Timer用于測量任務循環每次迭代所用時間的實例。
第 9 行啟動timer實例
第 10 行替換time.sleep(delay)為 non-blocking?asyncio.sleep(delay),這也將控制(或切換上下文)返回到主事件循環。
第 11 行停止timer實例并輸出自timer.start()調用以來經過的時間。
第 18 行創建了非阻塞異步work_queue.
第 21 到 22 行work_queue使用await關鍵字以異步方式將工作放入。
第 25 行創建了一個Timer上下文管理器,它將輸出整個 while 循環執行所花費的時間。
第 26 到 29 行創建了兩個任務并將它們聚集在一起,因此程序將等待兩個任務完成。
第 32 行啟動異步運行的程序。它還啟動內部事件循環。
當你看這個程序的輸出,通知如何都Task One與Task Two在同一時間啟動,然后等待在模擬IO電話:
Task One running Task Two running Task Two total elapsed time: 10.0 Task Two running Task One total elapsed time: 15.0 Task One running Task Two total elapsed time: 5.0 Task One total elapsed time: 2.0 Total elapsed time: 17.0
這表明它await asyncio.sleep(delay)是非阻塞的,并且正在完成其他工作。
在程序結束時,您會注意到總example_3.py運行時間實際上是運行時間的一半。這就是使用 Python 異步功能的程序的優勢!每個任務都能夠同時運行await asyncio.sleep(delay)。程序的總執行時間現在少于其各部分的總和。你已經脫離了同步模型!
同步(阻塞)HTTP 調用
該程序的下一個版本既是一種進步,也是一種退步。該程序通過向 URL 列表發出 HTTP 請求并獲取頁面內容,對真實 IO 進行了一些實際工作。但是,它是以阻塞(同步)方式進行的。
程序已修改為導入精彩requests模塊以進行實際的 HTTP 請求。此外,隊列現在包含一個 URL 列表,而不是數字。此外,task()不再增加計數器。相反,requests獲取從隊列中檢索到的 URL 的內容,并打印執行此操作所需的時間。
該example_5.py代碼是下面列出:
1import queue 2import requests 3from codetiming import Timer 4 5def task(name, work_queue): 6 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") 7 with requests.Session() as session: 8 while not work_queue.empty(): 9 url = work_queue.get() 10 print(f"Task {name} getting URL: {url}") 11 timer.start() 12 session.get(url) 13 timer.stop() 14 yield 15 16def main(): 17 """ 18 This is the main entry point for the program 19 """ 20 # Create the queue of work 21 work_queue = queue.Queue() 22 23 # Put some work in the queue 24 for url in [ 25 "http://google.com", 26 "http://yahoo.com", 27 "http://linkedin.com", 28 "http://apple.com", 29 "http://microsoft.com", 30 "http://facebook.com", 31 "http://twitter.com", 32 ]: 33 work_queue.put(url) 34 35 tasks = [task("One", work_queue), task("Two", work_queue)] 36 37 # Run the tasks 38 done = False 39 with Timer(text="\nTotal elapsed time: {:.1f}"): 40 while not done: 41 for t in tasks: 42 try: 43 next(t) 44 except StopIteration: 45 tasks.remove(t) 46 if len(tasks) == 0: 47 done = True 48 49if __name__ == "__main__": 50 main()
下面是這個程序中發生的事情:
第 2 行導入requests,它提供了一種進行 HTTP 調用的便捷方式。
第 3 行Timer從codetiming模塊中導入代碼。
第 6 行創建了Timer用于測量任務循環每次迭代所用時間的實例。
第 11 行啟動timer實例
第 12 行引入了一個延遲,類似于example_3.py。但是,這次它調用session.get(url),它返回從 檢索到的 URL 的內容work_queue。
第 13 行停止timer實例并輸出自timer.start()調用以來經過的時間。
第 23 到 32行將 URL 列表放入work_queue.
第 39 行創建了一個Timer上下文管理器,它將輸出整個 while 循環執行所花費的時間。
運行此程序時,您將看到以下輸出:
Task One getting URL: http://google.com Task One total elapsed time: 0.3 Task Two getting URL: http://yahoo.com Task Two total elapsed time: 0.8 Task One getting URL: http://linkedin.com Task One total elapsed time: 0.4 Task Two getting URL: http://apple.com Task Two total elapsed time: 0.3 Task One getting URL: http://microsoft.com Task One total elapsed time: 0.5 Task Two getting URL: http://facebook.com Task Two total elapsed time: 0.5 Task One getting URL: http://twitter.com Task One total elapsed time: 0.4 Total elapsed time: 3.2
就像在程序的早期版本中一樣,yield變成task()了一個生成器。它還執行上下文切換,讓其他任務實例運行。
每個任務從工作隊列中獲取一個 URL,檢索頁面的內容,并報告獲取該內容所花費的時間。
和以前一樣,yield允許您的兩個任務協同運行。但是,由于該程序是同步運行的,因此每次session.get()調用都會阻塞 CPU,直到檢索到頁面為止。請注意最后運行整個程序所花費的總時間。這對于下一個示例將是有意義的。
異步(非阻塞)HTTP 調用
此版本的程序修改了前一個版本以使用 Python 異步功能。它還導入aiohttp模塊,該模塊是一個使用asyncio.
yield由于進行 HTTPGET調用的代碼不再阻塞,這里的任務已被修改以移除調用。它還執行上下文切換回事件循環。
該example_6.py程序如下:
1import asyncio 2import aiohttp 3from codetiming import Timer 4 5async def task(name, work_queue): 6 timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") 7 async with aiohttp.ClientSession() as session: 8 while not work_queue.empty(): 9 url = await work_queue.get() 10 print(f"Task {name} getting URL: {url}") 11 timer.start() 12 async with session.get(url) as response: 13 await response.text() 14 timer.stop() 15 16async def main(): 17 """ 18 This is the main entry point for the program 19 """ 20 # Create the queue of work 21 work_queue = asyncio.Queue() 22 23 # Put some work in the queue 24 for url in [ 25 "http://google.com", 26 "http://yahoo.com", 27 "http://linkedin.com", 28 "http://apple.com", 29 "http://microsoft.com", 30 "http://facebook.com", 31 "http://twitter.com", 32 ]: 33 await work_queue.put(url) 34 35 # Run the tasks 36 with Timer(text="\nTotal elapsed time: {:.1f}"): 37 await asyncio.gather( 38 asyncio.create_task(task("One", work_queue)), 39 asyncio.create_task(task("Two", work_queue)), 40 ) 41 42if __name__ == "__main__": 43 asyncio.run(main())
下面是這個程序中發生的事情:
第 2 行導入aiohttp庫,它提供了一種異步方式來進行 HTTP 調用。
第 3 行Timer從codetiming模塊中導入代碼。
第 5 行標記task()為異步函數。
第 6 行創建了Timer用于測量任務循環每次迭代所用時間的實例。
第 7 行創建了一個aiohttp會話上下文管理器。
第 8 行創建了一個aiohttp響應上下文管理器。它還GET對從work_queue.
第 11 行啟動timer實例
第 12 行使用會話異步獲取從 URL 檢索的文本。
第 13 行停止timer實例并輸出自timer.start()調用以來經過的時間。
第 39 行創建了一個Timer上下文管理器,它將輸出整個 while 循環執行所花費的時間。
運行此程序時,您將看到以下輸出:
Task One getting URL: http://google.com Task Two getting URL: http://yahoo.com Task One total elapsed time: 0.3 Task One getting URL: http://linkedin.com Task One total elapsed time: 0.3 Task One getting URL: http://apple.com Task One total elapsed time: 0.3 Task One getting URL: http://microsoft.com Task Two total elapsed time: 0.9 Task Two getting URL: http://facebook.com Task Two total elapsed time: 0.4 Task Two getting URL: http://twitter.com Task One total elapsed time: 0.5 Task Two total elapsed time: 0.3 Total elapsed time: 1.7
查看總經過時間,以及獲取每個 URL 內容的時間。您會看到持續時間大約是所有 HTTPGET調用的累計時間的一半。這是因為 HTTPGET調用是異步運行的。換句話說,通過允許 CPU 一次發出多個請求,您可以有效地更好地利用 CPU。
由于 CPU 速度如此之快,本示例可能會創建與 URL 一樣多的任務。在這種情況下,程序的運行時間將是單個最慢的 URL 檢索的運行時間。
結論
本文為您提供了開始使異步編程技術成為您的技能的一部分所需的工具。使用 Python 異步功能可讓您以編程方式控制何時發生上下文切換。這意味著您在線程編程中可能會看到的許多更棘手的問題更容易處理。
異步編程是一種強大的工具,但并不是對每種程序都有用。例如,如果您正在編寫一個將 pi 計算到小數點后百萬位的程序,那么異步代碼將無濟于事。那種程序受 CPU 限制,沒有太多 IO。但是,如果您正在嘗試實現執行 IO(如文件或網絡訪問)的服務器或程序,那么使用 Python 異步功能可能會產生巨大的差異。
總結一下,你已經學會了:
什么同步程序是
如何異步程序是不同的,而且功能強大且易于管理
為什么你可能想要編寫異步程序
如何使用 Python 內置的異步功能
Python 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。