2022 年Python3 爬蟲教程 - 協(xié)程的基本原理

      網(wǎng)友投稿 890 2025-03-31

      我們知道爬蟲是 IO 密集型任務(wù),比如如果我們使用 requests 庫來爬取某個站點的話,發(fā)出一個請求之后,程序必須要等待網(wǎng)站返回響應(yīng)之后才能接著運行,而在等待響應(yīng)的過程中,整個爬蟲程序是一直在等待的,實際上沒有做任何事情。對于這種情況,我們有沒有優(yōu)化方案呢?

      當(dāng)然有,下面我們就來了解一下異步爬蟲的基本概念和實現(xiàn)。

      要實現(xiàn)異步機制的爬蟲,那自然和協(xié)程脫不了關(guān)系。

      1. 案例引入

      在介紹協(xié)程之前,我們先來看一個案例網(wǎng)站,鏈接地址為:https://httpbin.org/delay/5,如果我們訪問這個鏈接,需要等待五秒之后才能得到結(jié)果,這是因為服務(wù)器強制等待了 5 秒的時間才返回響應(yīng)。

      平時我們?yōu)g覽網(wǎng)頁的時候,絕大部分網(wǎng)頁響應(yīng)速度還是很快的,如果我們寫爬蟲來爬取的話,發(fā)出 Request 到收到 Response 的時間不會很長,因此我們需要等待的時間并不多。

      然而像上面這個網(wǎng)站,一次 Request 就需要 5 秒才能得到 Response,如果我們用 requests 寫爬蟲來爬取的話,那每次 requests 都要等待 5 秒才能拿到結(jié)果了。

      我們來測試下,下面我們來用 requests 寫一個遍歷程序,直接遍歷 100 次試試看,實現(xiàn)代碼如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      import requests

      import logging

      import time

      logging.basicConfig(level=logging.INFO,

      format='%(asctime)s - %(levelname)s: %(message)s')

      TOTAL_NUMBER = 100

      URL = 'https://httpbin.org/delay/5'

      start_time = time.time()

      for _ in range(1, TOTAL_NUMBER + 1):

      logging.info('scraping %s', URL)

      response = requests.get(URL)

      end_time = time.time()

      logging.info('total time %s seconds', end_time - start_time)

      這里我們直接用循環(huán)的方式構(gòu)造了 100 個 Request,使用的是 requests 單線程,在爬取之前和爬取之后記錄了時間,最后輸出爬取了 100 個頁面消耗的時間。

      運行結(jié)果如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      2020-08-03 01:01:36,781 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:01:43,410 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:01:50,029 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:01:56,702 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:02:03,345 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:02:09,958 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:02:16,500 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:02:23,143 - INFO: scraping https://httpbin.org/delay/5

      ...

      2020-08-03 01:12:19,867 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:12:26,479 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:12:33,083 - INFO: scraping https://httpbin.org/delay/5

      2020-08-03 01:12:39,758 - INFO: total time 662.9764430522919 seconds

      由于每個頁面至少要等待 5 秒才能加載出來,因此 100 個頁面至少要花費 500 秒的時間,加上網(wǎng)站本身負載的問題,總的爬取時間最終為 663 秒,大約 11 分鐘。

      這在實際情況下是很常見的,有些網(wǎng)站本身加載速度就比較慢,稍慢的可能 1~3 秒,更慢的說不定 10 秒以上。如果我們用 requests 單線程這么爬取的話,總的耗時是非常多的。此時如果我們開了多線程或多進程來爬取的話,其爬取速度確實會成倍提升,那是否有更好的解決方案呢?

      本節(jié)就來了解一下使用協(xié)程來加速的方法,此種方法對于 IO 密集型任務(wù)非常有效。如將其應(yīng)用到網(wǎng)絡(luò)爬蟲中,爬取效率甚至可以成百倍地提升。

      2. 基礎(chǔ)知識

      在了解協(xié)程之前,我們首先了解一些基礎(chǔ)概念,如阻塞和非阻塞、同步和異步、多進程和協(xié)程。

      阻塞

      阻塞狀態(tài)指程序未得到所需計算資源時被掛起的狀態(tài)。程序在等待某個操作完成期間,自身無法繼續(xù)干別的事情,則稱該程序在該操作上是阻塞的。

      常見的阻塞形式有:網(wǎng)絡(luò) I/O 阻塞、磁盤 I/O 阻塞、用戶輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時,所有的進程都無法真正干事情,它們也會被阻塞。如果是多核 CPU,則正在執(zhí)行上下文切換操作的核不可被利用。

      非阻塞

      程序在等待某操作的過程中,自身不被阻塞,可以繼續(xù)運行干別的事情,則稱該程序在該操作上是非阻塞的。

      非阻塞并不是在任何程序級別、任何情況下都存在的。僅當(dāng)程序封裝的級別可以囊括獨立的子程序單元時,它才可能存在非阻塞狀態(tài)。

      非阻塞的存在是因為阻塞存在,正因為某個操作阻塞導(dǎo)致的耗時與效率低下,我們才要把它變成非阻塞的。

      同步

      不同程序單元為了完成某個任務(wù),在執(zhí)行過程中需靠某種通信方式以協(xié)調(diào)一致,此時這些程序單元是同步執(zhí)行的。

      例如在購物系統(tǒng)中更新商品庫存時,需要用 “行鎖” 作為通信信號,讓不同的更新請求強制排隊順序執(zhí)行,那更新庫存的操作是同步的。

      簡言之,同步意味著有序。

      異步

      為了完成某個任務(wù),有時不同程序單元之間無須通信協(xié)調(diào)也能完成任務(wù),此時不相關(guān)的程序單元之間可以是異步的。

      例如,爬取下載網(wǎng)頁。調(diào)度程序調(diào)用下載程序后,即可調(diào)度其他任務(wù),而無須與該下載任務(wù)保持通信以協(xié)調(diào)行為。不同網(wǎng)頁的下載、保存等操作都是無關(guān)的,也無須相互通知協(xié)調(diào)。這些異步操作的完成時刻并不確定。

      簡言之,異步意味著無序。

      多進程

      多進程就是利用 CPU 的多核優(yōu)勢,在同一時間并行執(zhí)行多個任務(wù),可以大大提高執(zhí)行效率。

      協(xié)程

      協(xié)程,英文叫作 coroutine,又稱微線程、纖程,它是一種用戶態(tài)的輕量級線程。

      協(xié)程擁有自己的寄存器上下文和棧。協(xié)程調(diào)度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復(fù)先前保存的寄存器上下文和棧。因此,協(xié)程能保留上一次調(diào)用時的狀態(tài),即所有局部狀態(tài)的一個特定組合,每次過程重入時,就相當(dāng)于進入上一次調(diào)用的狀態(tài)。

      協(xié)程本質(zhì)上是個單進程,它相對于多進程來說,無須線程上下文切換的開銷,無須原子操作鎖定及同步的開銷,編程模型也非常簡單。

      我們可以使用協(xié)程來實現(xiàn)異步操作,比如在網(wǎng)絡(luò)爬蟲場景下,我們發(fā)出一個請求之后,需要等待一定時間才能得到響應(yīng),但其實在這個等待過程中,程序可以干許多其他事情,等到響應(yīng)得到之后才切換回來繼續(xù)處理,這樣可以充分利用 CPU 和其他資源,這就是協(xié)程的優(yōu)勢。

      3. 協(xié)程的用法

      接下來,讓我們來了解一下協(xié)程的實現(xiàn)。從 Python 3.4 開始,Python 中加入了協(xié)程的概念,但這個版本的協(xié)程還是以生成器對象為基礎(chǔ),Python 3.5 則增加了 async/await,使得協(xié)程的實現(xiàn)更加方便。

      Python 中使用協(xié)程最常用的庫莫過于 asyncio,所以本節(jié)會以 asyncio 為基礎(chǔ)來介紹協(xié)程的用法。

      首先,我們需要了解下面幾個概念:

      event_loop:事件循環(huán),相當(dāng)于一個無限循環(huán),我們可以把一些函數(shù)注冊到這個事件循環(huán)上,當(dāng)滿足條件發(fā)生的時候,就會調(diào)用對應(yīng)的處理方法。

      coroutine:中文翻譯叫協(xié)程,在 Python 中常指代協(xié)程對象類型,我們可以將協(xié)程對象注冊到時間循環(huán)中,它會被事件循環(huán)調(diào)用。我們可以使用 async 關(guān)鍵字來定義一個方法,這個方法在調(diào)用時不會立即被執(zhí)行,而是返回一個協(xié)程對象。

      task:任務(wù),它是對協(xié)程對象的進一步封裝,包含了任務(wù)的各個狀態(tài)。

      future:代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果,實際上和 task 沒有本質(zhì)區(qū)別。

      另外,我們還需要了解 async/await 關(guān)鍵字,它是從 Python 3.5 才出現(xiàn)的,專門用于定義協(xié)程。其中,async 定義一個協(xié)程,await 用來掛起阻塞方法的執(zhí)行。

      4. 準備工作

      在本節(jié)開始之前,請確保安裝的 Python 版本為 3.5 及以上,如果版本是 3.4 及以下,則下方的案例是不能運行的。

      具體的安裝方法可以參考:https://setup.scrape.center/python。

      安裝好合適的 Python 版本之后我們就可以開始本節(jié)的學(xué)習(xí)了。

      5. 定義協(xié)程

      首先,我們來定義一個協(xié)程,體驗一下它和普通進程在實現(xiàn)上的不同之處,代碼如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      import asyncio

      async def execute(x):

      print('Number:', x)

      coroutine = execute(1)

      print('Coroutine:', coroutine)

      print('After calling execute')

      loop = asyncio.get_event_loop()

      loop.run_until_complete(coroutine)

      print('After calling loop')

      運行結(jié)果如下:

      1

      2

      3

      4

      Coroutine:

      After calling execute

      Number: 1

      After calling loop

      首先,我們引入了 asyncio 這個包,這樣我們才可以使用 async 和 await,然后使用 async 定義了一個 execute 方法,該方法接收一個數(shù)字參數(shù),執(zhí)行之后會打印這個數(shù)字。

      隨后我們直接調(diào)用了這個方法,然而這個方法并沒有執(zhí)行,而是返回了一個 coroutine 協(xié)程對象。隨后我們使用 get_event_loop 方法創(chuàng)建了一個事件循環(huán) loop,并調(diào)用了 loop 對象的 run_until_complete 方法將協(xié)程注冊到事件循環(huán) loop 中,然后啟動。最后,我們才看到 execute 方法打印了輸出結(jié)果。

      可見,async 定義的方法就會變成一個無法直接執(zhí)行的 coroutine 對象,必須將其注冊到事件循環(huán)中才可以執(zhí)行。

      前面我們還提到了 task,它是對 coroutine 對象的進一步封裝,比 coroutine 對象多了運行狀態(tài),比如 running、finished 等,我們可以用這些狀態(tài)來獲取協(xié)程對象的執(zhí)行情況。

      在上面的例子中,當(dāng)我們將 coroutine 對象傳遞給 run_until_complete 方法的時候,實際上它進行了一個操作,就是將 coroutine 封裝成了 task 對象。我們也可以顯式地進行聲明,如下所示:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      import asyncio

      async def execute(x):

      print('Number:', x)

      return x

      coroutine = execute(1)

      print('Coroutine:', coroutine)

      print('After calling execute')

      loop = asyncio.get_event_loop()

      task = loop.create_task(coroutine)

      print('Task:', task)

      loop.run_until_complete(task)

      print('Task:', task)

      print('After calling loop')

      運行結(jié)果如下:

      1

      2

      3

      4

      5

      6

      Coroutine:

      After calling execute

      Task: >

      Number: 1

      Task: result=1>

      After calling loop

      這里我們定義了 loop 對象之后,接著調(diào)用了它的 create_task 方法將 coroutine 對象轉(zhuǎn)化為 task 對象,隨后我們打印輸出一下,發(fā)現(xiàn)它是 pending 狀態(tài)。接著,我們將 task 對象添加到事件循環(huán)中執(zhí)行,隨后打印輸出 task 對象,發(fā)現(xiàn)它的狀態(tài)變成了 finished,同時還可以看到其 result 變成了 1,也就是我們定義的 execute 方法的返回結(jié)果。

      另外,定義 task 對象還有一種方式,就是直接通過 asyncio 的 ensure_future 方法,返回結(jié)果也是 task 對象,這樣的話我們就可以不借助 loop 來定義。即使我們還沒有聲明 loop,也可以提前定義好 task 對象,寫法如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      import asyncio

      async def execute(x):

      print('Number:', x)

      return x

      coroutine = execute(1)

      print('Coroutine:', coroutine)

      print('After calling execute')

      task = asyncio.ensure_future(coroutine)

      print('Task:', task)

      loop = asyncio.get_event_loop()

      loop.run_until_complete(task)

      print('Task:', task)

      print('After calling loop')

      運行結(jié)果如下:

      1

      2

      3

      4

      5

      6

      Coroutine:

      After calling execute

      Task: >

      Number: 1

      Task: result=1>

      After calling loop

      可以發(fā)現(xiàn),其運行效果都是一樣的。

      6. 綁定回調(diào)

      另外,我們也可以為某個 task 綁定一個回調(diào)方法。比如,我們來看下面的例子:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      import asyncio

      import requests

      async def request():

      url = 'https://www.baidu.com'

      status = requests.get(url)

      return status

      def callback(task):

      print('Status:', task.result())

      coroutine = request()

      task = asyncio.ensure_future(coroutine)

      task.add_done_callback(callback)

      print('Task:', task)

      loop = asyncio.get_event_loop()

      loop.run_until_complete(task)

      print('Task:', task)

      這里我們定義了一個 request 方法,請求了百度,獲取其狀態(tài)碼,但是這個方法里面我們沒有任何 print 語句。隨后我們定義了一個 callback 方法,這個方法接收一個參數(shù),是 task 對象,然后調(diào)用 print 方法打印了 task 對象的結(jié)果。這樣我們就定義好了一個 coroutine 對象和一個回調(diào)方法。我們現(xiàn)在希望的效果是,當(dāng) coroutine 對象執(zhí)行完畢之后,就去執(zhí)行聲明的 callback 方法。

      那么它們兩者怎樣關(guān)聯(lián)起來呢?很簡單,只需要調(diào)用 add_done_callback 方法即可。我們將 callback 方法傳遞給封裝好的 task 對象,這樣當(dāng) task 執(zhí)行完畢之后,就可以調(diào)用 callback 方法了。同時 task 對象還會作為參數(shù)傳遞給 callback 方法,調(diào)用 task 對象的 result 方法就可以獲取返回結(jié)果了。

      運行結(jié)果如下:

      1

      2

      3

      Task: cb=[callback() at demo.py:11]>

      Status:

      Task: result=>

      實際上不用回調(diào)方法,直接在 task 運行完畢之后,也可以直接調(diào)用 result 方法獲取結(jié)果,如下所示:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      import asyncio

      import requests

      async def request():

      url = 'https://www.baidu.com'

      【2022 年】Python3 爬蟲教程 - 協(xié)程的基本原理

      status = requests.get(url)

      return status

      coroutine = request()

      task = asyncio.ensure_future(coroutine)

      print('Task:', task)

      loop = asyncio.get_event_loop()

      loop.run_until_complete(task)

      print('Task:', task)

      print('Task Result:', task.result())

      運行結(jié)果是一樣的:

      1

      2

      3

      Task: >

      Task: result=>

      Task Result:

      7. 多任務(wù)協(xié)程

      上面的例子我們只執(zhí)行了一次請求,如果想執(zhí)行多次請求,應(yīng)該怎么辦呢?我們可以定義一個 task 列表,然后使用 asyncio 的 wait 方法即可執(zhí)行。看下面的例子:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      import asyncio

      import requests

      async def request():

      url = 'https://www.baidu.com'

      status = requests.get(url)

      return status

      tasks = [asyncio.ensure_future(request()) for _ in range(5)]

      print('Tasks:', tasks)

      loop = asyncio.get_event_loop()

      loop.run_until_complete(asyncio.wait(tasks))

      for task in tasks:

      print('Task Result:', task.result())

      這里我們使用一個 for 循環(huán)創(chuàng)建了 5 個 task,組成了一個列表,然后把這個列表首先傳遞給了 asyncio 的 wait 方法,再將其注冊到時間循環(huán)中,就可以發(fā)起 5 個任務(wù)了。最后,我們再將任務(wù)的運行結(jié)果輸出出來,具體如下:

      1

      2

      3

      4

      5

      6

      Tasks: [>, >, >, >, >]

      Task Result:

      Task Result:

      Task Result:

      Task Result:

      Task Result:

      可以看到,5 個任務(wù)被順次執(zhí)行了,并得到了運行結(jié)果。

      8. 協(xié)程實現(xiàn)

      前面說了這么一通,又是 async,又是 coroutine,又是 task,又是 callback,但似乎并沒有看出協(xié)程的優(yōu)勢?反而寫法上更加奇怪和麻煩了。別急,上面的案例只是為后面的使用作鋪墊。接下來,我們正式來看下協(xié)程在解決 IO 密集型任務(wù)上有怎樣的優(yōu)勢。

      在上面的代碼中,我們用一個網(wǎng)絡(luò)請求作為示例,這就是一個耗時等待操作,因為我們請求網(wǎng)頁之后需要等待頁面響應(yīng)并返回結(jié)果。耗時等待操作一般都是 IO 操作,比如文件讀取、網(wǎng)絡(luò)請求等。協(xié)程對于處理這種操作是有很大優(yōu)勢的,當(dāng)遇到需要等待的情況時,程序可以暫時掛起,轉(zhuǎn)而去執(zhí)行其他操作,從而避免一直等待一個程序而耗費過多的時間,充分利用資源。

      為了表現(xiàn)出協(xié)程的優(yōu)勢,我們還是以本節(jié)開頭介紹的網(wǎng)站 https://httpbin.org/delay/5 為例,因為該網(wǎng)站響應(yīng)比較慢,所以我們可以通過爬取時間來直觀感受到爬取速度的提升。

      為了讓大家更好地理解協(xié)程的正確使用方法,這里我們先來看看大家使用協(xié)程時常犯的錯誤,后面再給出正確的例子來對比一下。

      首先,我們還是拿之前的 requests 庫來進行網(wǎng)頁請求,接下來再重新使用上面的方法請求一遍:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      import asyncio

      import requests

      import time

      start = time.time()

      async def request():

      url = 'https://httpbin.org/delay/5'

      print('Waiting for', url)

      response = requests.get(url)

      print('Get response from', url, 'response', response)

      tasks = [asyncio.ensure_future(request()) for _ in range(10)]

      loop = asyncio.get_event_loop()

      loop.run_until_complete(asyncio.wait(tasks))

      end = time.time()

      print('Cost time:', end - start)

      這里我們還是創(chuàng)建了 10 個 task,然后將 task 列表傳給 wait 方法并注冊到時間循環(huán)中執(zhí)行。

      運行結(jié)果如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      Waiting for https://httpbin.org/delay/5

      Get response from https://httpbin.org/delay/5 response

      Waiting for https://httpbin.org/delay/5

      ...

      Get response from https://httpbin.org/delay/5 response

      Waiting for https://httpbin.org/delay/5

      Get response from https://httpbin.org/delay/5 response

      Waiting for https://httpbin.org/delay/5

      Get response from https://httpbin.org/delay/5 response

      Cost time: 66.64284420013428

      可以發(fā)現(xiàn),這和正常的請求并沒有什么區(qū)別,依然還是順次執(zhí)行的,耗時 66 秒,平均一個請求耗時 6.6 秒,說好的異步處理呢?

      其實,要實現(xiàn)異步處理,我們得先要有掛起的操作,當(dāng)一個任務(wù)需要等待 IO 結(jié)果的時候,可以掛起當(dāng)前任務(wù),轉(zhuǎn)而去執(zhí)行其他任務(wù),這樣我們才能充分利用好資源。上面的方法都是一本正經(jīng)地串行走下來,連個掛起都沒有,怎么可能實現(xiàn)異步?想太多了。

      要實現(xiàn)異步,接下來我們再了解一下 await 的用法,它可以將耗時等待的操作掛起,讓出控制權(quán)。當(dāng)協(xié)程執(zhí)行的時候遇到 await,時間循環(huán)就會將本協(xié)程掛起,轉(zhuǎn)而去執(zhí)行別的協(xié)程,直到其他協(xié)程掛起或執(zhí)行完畢。

      所以,我們可能會將代碼中的 request 方法改成如下的樣子:

      1

      2

      3

      4

      5

      async def request():

      url = 'https://httpbin.org/delay/5'

      print('Waiting for', url)

      response = await requests.get(url)

      print('Get response from', url, 'response', response)

      僅僅是在 requests 前面加了一個關(guān)鍵字 await,然而此時執(zhí)行代碼,會得到如下報錯:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      Waiting for https://httpbin.org/delay/5

      Waiting for https://httpbin.org/delay/5

      Waiting for https://httpbin.org/delay/5

      Waiting for https://httpbin.org/delay/5

      ...

      Task exception was never retrieved

      future: exception=TypeError("object Response can't be used in 'await' expression")>

      Traceback (most recent call last):

      File "demo.py", line 11, in request

      response = await requests.get(url)

      TypeError: object Response can't be used in 'await' expression

      這次它遇到 await 方法確實掛起了,也等待了,但是最后卻報了這個錯誤。這個錯誤的意思是 requests 返回的 Response 對象不能和 await 一起使用,為什么呢?因為根據(jù)官方文檔說明,await 后面的對象必須是如下格式之一(具體可以參見 https://www.python.org/dev/peps/pep-0492/#await-expression):

      一個原生 coroutine 對象;

      一個由 types.coroutine 修飾的生成器,這個生成器可以返回 coroutine 對象;

      一個包含 __await__ 方法的對象返回的一個迭代器。

      reqeusts 返回的 Response 對象不符合上面任一條件,因此就會報上面的錯誤了。

      有的讀者可能已經(jīng)發(fā)現(xiàn)了,既然 await 后面可以跟一個 coroutine 對象,那么我用 async 把請求的方法改成 coroutine 對象不就可以了嗎?所以就改寫成如下的樣子:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      import asyncio

      import requests

      import time

      start = time.time()

      async def get(url):

      return requests.get(url)

      async def request():

      url = 'https://httpbin.org/delay/5'

      print('Waiting for', url)

      response = await get(url)

      print('Get response from', url, 'response', response)

      tasks = [asyncio.ensure_future(request()) for _ in range(10)]

      loop = asyncio.get_event_loop()

      loop.run_until_complete(asyncio.wait(tasks))

      end = time.time()

      print('Cost time:', end - start)

      這里我們將請求頁面的方法獨立出來,并用 async 修飾,這樣就得到了一個 coroutine 對象。運行一下看看:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      Waiting for https://httpbin.org/delay/5

      Get response fromhttps://httpbin.org/delay/5 response

      Waiting for https://httpbin.org/delay/5

      Get response from https://httpbin.org/delay/5 response

      Waiting for https://httpbin.org/delay/5

      ...

      Get response from https://httpbin.org/delay/5 response

      Waiting for https://httpbin.org/delay/5

      Get response from https://httpbin.org/delay/5 response

      Waiting for https://httpbin.org/delay/5

      Get response from https://httpbin.org/delay/5 response

      Cost time: 65.394437756259273

      還是不行,它還不是異步執(zhí)行的,也就是說我們僅僅將涉及 IO 操作的代碼封裝到 async 修飾的方法里面是不可行的。我們必須要使用支持異步操作的請求方式才可以實現(xiàn)真正的異步,所以這里就需要 aiohttp 派上用場了。

      9. 使用 aiohttp

      aiohttp 是一個支持異步請求的庫,配合使用它和 asyncio,我們可以非常方便地實現(xiàn)異步請求操作。我們使用 pip3 安裝即可:

      1

      pip3 install aiohttp

      具體的安裝方法可以參考:https://setup.scrape.center/aiohttp。

      aiohttp 的官方文檔鏈接為 https://aiohttp.readthedocs.io/,它分為兩部分,一部分是 Client,一部分是 Server,詳細的內(nèi)容可以參考官方文檔。

      下面我們將 aiohttp 用上來,將代碼改成如下樣子:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      import asyncio

      import aiohttp

      import time

      start = time.time()

      async def get(url):

      session = aiohttp.ClientSession()

      response = await session.get(url)

      await response.text()

      await session.close()

      return response

      async def request():

      url = 'https://httpbin.org/delay/5'

      print('Waiting for', url)

      response = await get(url)

      print('Get response from', url, 'response', response)

      tasks = [asyncio.ensure_future(request()) for _ in range(10)]

      loop = asyncio.get_event_loop()

      loop.run_until_complete(asyncio.wait(tasks))

      end = time.time()

      print('Cost time:', end - start)

      這里我們將請求庫由 requests 改成了 aiohttp,通過 aiohttp 的 ClientSession 類的 get 方法進行請求,結(jié)果如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      Waiting for https://httpbin.org/delay/5

      Waiting for https://httpbin.org/delay/5

      Waiting for https://httpbin.org/delay/5

      Waiting for https://httpbin.org/delay/5

      ...

      Get response from https://httpbin.org/delay/5 response

      ...

      Get response from https://httpbin.org/delay/5 response

      Cost time: 6.033240079879761

      成功了!我們發(fā)現(xiàn)這次請求的耗時由 51 秒直接變成了 6 秒,耗費時間減少了非常多。

      在代碼里面,我們使用了 await,后面跟了 get 方法。在執(zhí)行這 10 個協(xié)程的時候,如果遇到了 await,就會將當(dāng)前協(xié)程掛起,轉(zhuǎn)而去執(zhí)行其他協(xié)程,直到其他協(xié)程也掛起或執(zhí)行完畢,再執(zhí)行下一個協(xié)程。

      開始運行時,時間循環(huán)會運行第一個 task。針對第一個 task 來說,當(dāng)執(zhí)行到第一個 await 跟著的 get 方法時,它被掛起,但這個 get 方法第一步的執(zhí)行是非阻塞的,掛起之后立馬被喚醒,所以立即又進入執(zhí)行,創(chuàng)建了 ClientSession 對象,接著遇到了第二個 await,調(diào)用了 session.get 請求方法,然后就被掛起了。由于請求需要耗時很久,所以一直沒有被喚醒,好在第一個 task 被掛起了,那么接下來該怎么辦呢?事件循環(huán)會尋找當(dāng)前未被掛起的協(xié)程繼續(xù)執(zhí)行,于是就轉(zhuǎn)而執(zhí)行第二個 task 了,也是一樣的流程操作,直到執(zhí)行了第十個 task 的 session.get 方法之后,全部的 task 都被掛起了。所有 task 都已經(jīng)處于掛起狀態(tài),那咋辦?只好等待了。5 秒之后,幾個請求幾乎同時都有了響應(yīng),然后幾個 task 也被喚醒接著執(zhí)行,輸出請求結(jié)果,最后總耗時 6 秒!

      怎么樣?這就是異步操作的便捷之處,當(dāng)遇到阻塞式操作時,任務(wù)被掛起,程序接著去執(zhí)行其他任務(wù),而不是傻傻地等著,這樣可以充分利用 CPU 時間,而不必把時間浪費在等待 IO 上。

      有人會說,既然這樣的話,在上面的例子中,在發(fā)出網(wǎng)絡(luò)請求后,既然接下來的 5 秒都是在等待的,在 5 秒之內(nèi),CPU 可以處理的 task 數(shù)量遠不止這些,那么豈不是我們放 10 個、20 個、50 個、100 個、1000 個 task 一起執(zhí)行,最后得到所有結(jié)果的耗時不都是差不多的嗎?因為這幾個任務(wù)被掛起后都是一起等待的。

      理論來說,確實是這樣的,不過有個前提,那就是服務(wù)器在同一時刻接受無限次請求都能保證正常返回結(jié)果,也就是服務(wù)器無限抗壓。另外,還要忽略 IO 傳輸時延,確實可以做到無限 task 一起執(zhí)行且在預(yù)想時間內(nèi)得到結(jié)果。但由于不同服務(wù)器處理的實現(xiàn)機制不同,可能某些服務(wù)器并不能承受這么高的并發(fā),因此響應(yīng)速度也會減慢。

      這里我們以百度為例,測試一下并發(fā)數(shù)量為 1、3、5、10…500 的情況下的耗時情況,代碼如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      import asyncio

      import aiohttp

      import time

      def test(number):

      start = time.time()

      async def get(url):

      session = aiohttp.ClientSession()

      response = await session.get(url)

      await response.text()

      await session.close()

      return response

      async def request():

      url = 'https://www.baidu.com/'

      await get(url)

      tasks = [asyncio.ensure_future(request()) for _ in range(number)]

      loop = asyncio.get_event_loop()

      loop.run_until_complete(asyncio.wait(tasks))

      end = time.time()

      print('Number:', number, 'Cost time:', end - start)

      for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:

      test(number)

      運行結(jié)果如下:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      Number: 1 Cost time: 0.05885505676269531

      Number: 3 Cost time: 0.05773782730102539

      Number: 5 Cost time: 0.05768704414367676

      Number: 10 Cost time: 0.15174412727355957

      Number: 15 Cost time: 0.09603095054626465

      Number: 30 Cost time: 0.17843103408813477

      Number: 50 Cost time: 0.3741800785064697

      Number: 75 Cost time: 0.2894289493560791

      Number: 100 Cost time: 0.6185381412506104

      Number: 200 Cost time: 1.0894129276275635

      Number: 500 Cost time: 1.8213098049163818

      可以看到,即使我們增加了并發(fā)數(shù)量,但在服務(wù)器能承受高并發(fā)的前提下,其爬取速度幾乎不太受影響。

      綜上所述,使用了異步請求之后,我們幾乎可以在相同的時間內(nèi)實現(xiàn)成百上千倍次的網(wǎng)絡(luò)請求,把這個運用在爬蟲中,速度提升可謂是非常可觀了。

      10. 總結(jié)

      以上便是 Python 中協(xié)程的基本原理和用法,在后面一節(jié)中我們會詳細介紹 aiohttp 的用法和爬取實戰(zhàn),實現(xiàn)快速高并發(fā)的爬取。

      本節(jié)代碼:https://github.com/Python3WebSpider/AsyncTest

      Python 任務(wù)調(diào)度

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:excel如何使用dget函數(shù)
      下一篇:excel怎樣使用ACCRINT函數(shù)
      相關(guān)文章
      中文字幕一精品亚洲无线一区| 亚洲av无码国产综合专区| 久久亚洲国产成人影院| 亚洲综合久久久久久中文字幕| 亚洲av网址在线观看| 亚洲AV无码专区国产乱码4SE| 亚洲精品无码av人在线观看| 中文字幕第13亚洲另类| 中文字幕不卡亚洲| 亚洲人JIZZ日本人| 国产成人A人亚洲精品无码| 亚洲gv白嫩小受在线观看| 久久久久亚洲AV成人无码网站| 久久精品国产精品亚洲色婷婷| 无码欧精品亚洲日韩一区| 亚洲视频精品在线| 亚洲国产精品xo在线观看| 激情综合亚洲色婷婷五月| 亚洲砖码砖专无区2023| 亚洲AV综合永久无码精品天堂| 国产精品亚洲AV三区| 亚洲人成人无码网www国产| 中文字幕亚洲电影| 亚洲成AV人片一区二区| 久久久久亚洲AV无码麻豆| 亚洲成人高清在线观看| 国产精品亚洲专区在线观看| 亚洲日韩av无码中文| 亚洲精品无码久久久久APP| 国产亚洲情侣久久精品| 亚洲综合激情另类专区| 亚洲国产精品久久久天堂| 日本久久久久亚洲中字幕| 亚洲国产中文在线二区三区免| 亚洲欧洲日韩极速播放| 337p日本欧洲亚洲大胆人人 | 亚洲国产成人久久一区WWW| 精品国产日韩亚洲一区| 久久亚洲AV午夜福利精品一区 | 亚洲精品一级无码中文字幕| 亚洲欧洲无码AV电影在线观看|