Python 進階 — 協程
目錄
文章目錄
目錄
協程
協程的應用場景
搶占式調度的缺點
用戶態協同調度的優勢
協程的運行原理
Python 中的協程
參考文檔
協程
協程(co-routine,又稱微線程)是一種多方協同的工作方式。當前執行者在某個時刻主動讓出(yield)控制流,并記住自身當前的狀態,以便在控制流返回時能從上次讓出的位置恢復(resume)執行。
簡而言之,協程的核心思想就在于執行者對控制流的 “主動讓出” 和 “恢復”。相對于,線程此類的 “搶占式調度” 而言,協程是一種 “協作式調度” 方式。
協程的應用場景
搶占式調度的缺點
在 I/O 密集型場景中,搶占式調度的解決方案是 “異步 + 回調” 機制。
其存在的問題是,在某些場景中會使得整個程序的可讀性非常差。以圖片下載為例,圖片服務中臺提供了異步接口,發起者請求之后立即返回,圖片服務此時給了發起者一個唯一標識 ID,等圖片服務完成下載后把結果放到一個消息隊列,此時需要發起者不斷消費這個 MQ 才能拿到下載是否完成的結果。
可見,整體的邏輯被拆分為了好幾個部分,各個子部分都會存在狀態的遷移,日后必然是 BUG 的高發地。
用戶態協同調度的優勢
而隨著網絡技術的發展和高并發要求,協程所能夠提供的用戶態協同調度機制的優勢,在網絡操作、文件操作、數據庫操作、消息隊列操作等重 I/O 操作場景中逐漸被挖掘。
協程將 I/O 的處理權從內核態的操作系統交還給用戶態的程序自身。用戶態程序在執行 I/O 時,主動的通過 yield(讓出)CPU 的執行權給其他協程,多個協程之間處于平等、對稱、合作的關系。
協程的運行原理
當程序運行時,操作系統會為每個程序分配一塊同等大小的虛擬內存空間,并將程序的代碼和所有靜態數據加載到其中。然后,創建和初始化 Stack 存儲,用于儲存程序的局部變量,函數參數和返回地址;創建和初始化 Heap 內存;創建和初始化 I/O 相關的任務。當前期準備工作完成后,操作系統將 CPU 的控制權移交給新創建的進程,進程開始運行。
一個進程可以有一個或多個線程,同一進程中的多個線程將共享該進程中的全部系統資源,如:虛擬地址空間,文件描述符和信號處理等等。但同一進程中的多個線程有各自的調用棧和線程本地存儲。
協程是一種比線程更加輕量級的存在,協程不是被操作系統內核所管理,而完全是由用戶態程序所控制。協程與線程以及進程的關系如下圖所示。可見,協程自身無法利用多核,需要配合進程來使用才可以在多核平臺上發揮作用。
協程之間的切換不需要涉及任何 System Call(系統調用)或任何阻塞調用。
協程只在一個線程中執行,切換由用戶態控制,而線程的阻塞狀態是由操作系統內核來完成的,因此協程相比線程節省線程創建和切換的開銷。
協程中不存在同時寫變量的沖突,因此,也就不需要用來守衛關鍵區塊的同步性原語,比如:互斥鎖、信號量等,并且不需要來自操作系統的支持。
協程通過 “掛起點” 來主動 yield(讓出)CPU,并保存自身的狀態,等候恢復。例如:首先在 funcA 函數中執行,運行一段時間后調用協程,協程開始執行,直到第一個掛起點,此后就像普通函數一樣返回 funcA 函數。 funcA 函數執行一些代碼后再次調用該協程,注意,協程這時就和普通函數不一樣了。協程并不是從第一條指令開始執行而是從上一次的掛起點開始執行,執行一段時間后遇到第二個掛起點,這時協程再次像普通函數一樣返回 funcA 函數,funcA 函數執行一段時間后整個程序結束。
可見,協程之所可以能夠 “主動讓出” 和 “被恢復”,是解析器在函數運行時堆棧中保存了其運行的 Context(上下文)。
Python 中的協程
Python 對協程的支持經歷了多個版本:
Python2.x 對協程的支持比較有限,通過 yield 關鍵字支持的生成器實現了一部分協程的功能但不完全。
第三方庫 gevent 對協程有更好的支持。
Python3.4 中提供了 asyncio 模塊。
Python3.5 中引入了 async/await 關鍵字。
Python3.6 中 asyncio 模塊更加完善和穩定。
Python3.7 中內置了 async/await 關鍵字。
async/await 的示例程序:
import asyncio from pathlib import Path import logging from urllib.request import urlopen, Request import os from time import time import aiohttp logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png', 'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg', 'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg', 'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg', 'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png'] async def download_image_async(session, dir, img_url): download_path = dir / os.path.basename(img_url) async with session.get(img_url) as response: with download_path.open('wb') as f: while True: # 在 async 函數中使用 await 關鍵字表示等待 task 執行完成,也就是等待 yeild 讓出控制權。 # 同時,asyncio 使用事件循環 event_loop 來實現整個過程。 chunk = await response.content.read(512) if not chunk: break f.write(chunk) logger.info('Downloaded: ' + img_url) # 使用 async 關鍵字聲明一個異步/協程函數。 # 調用該函數時,并不會立即運行,而是返回一個協程對象,后續在 event_loop 中執行。 async def main(): images_dir = Path("codeflex_images") Path("codeflex_images").mkdir(parents=False, exist_ok=True) async with aiohttp.ClientSession() as session: tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS] await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': start = time() # event_loop 事件循環充當管理者的角色,將控制權在幾個協程函數之間切換。 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close() logger.info('Download time: %s seconds', time() - start)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
參考文檔
https://mp.weixin.qq.com/s/LItTjy2uN6iJvN2MqNPQ7Q
https://mp.weixin.qq.com/s/Yj7O3mucbFHxx3p5UnlvnA
Python 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。