小白爬蟲第四彈之爬蟲快跑(多進程 + 多線程)

      網友投稿 998 2022-05-30

      PS:使用多線程時好像在目錄切換的問題上存在問題,可以給線程加個鎖試試 Hello 大家好!我又來了。 你是不是發現下載圖片速度特別慢、難以忍受?。τ谶@種問題 一般解決辦法就是多進程了!一個進程速度慢!我就用十個進程,相當于十個人一起干。速度就會快很多啦!(為什么不說多線程?懂點 Python 的小伙伴都知道、GIL 的存在 導致 Python 的多線程有點坑?。。┙裉炀徒檀蠹襾碜鲆粋€多進程的爬蟲(其實吧、可以用來做一個超簡化版的分布式爬蟲) 其實吧!還有一種加速的方法叫做 “異步”!不過這玩意兒我沒怎么整明白就不出來誤人子弟了!(因為爬蟲大部分時間都是在等待 response 中!‘異步’則能讓程序在等待 response 的時間去做的其他事情。) 學過 Python 基礎的同學都知道、在多進程中,進程之間是不能相互通信的,這就有一個很坑爹的問題的出現了!多個進程怎么知道那那些需要爬取、哪些已經被爬取了! 這就涉及到一個東西!這玩意兒叫做隊列!!隊列?。£犃校。∑鋵嵃烧碚f應該給大家用隊列來完成這個教程的, 比如 Tornado 的 queue 模塊。(如果需要更為穩定健壯的隊列,則請考慮使用 Celery 這一類的專用消息傳遞工具) 不過為了簡化技術種類?。。ú挪粫嬖V你們是我懶,嫌麻煩呢?。┻@次我們繼續使用 MongoDB。 好了!先來理一下思路: 每個進程需要知道那些 URL 爬取過了、哪些 URL 需要爬??!我們來給每個 URL 設置兩種狀態: outstanding: 等待爬取的 URL complete: 爬取完成的 URL 誒!等等我們好像忘了啥? 失敗的 URL 的怎么辦?。课覀冊谠黾右环N狀態: processing: 正在進行的 URL。 嗯!當一個所有初始的 URL 狀態都為 outstanding;當開始爬取的時候狀態改為:processing;爬取完成狀態改為:complete;失敗的 URL 重置狀態為:outstanding。為了能夠處理 URL 進程被終止的情況、我們設置一個計時參數,當超過這個值時;我們則將狀態重置為 outstanding。 下面開整 Go Go Go! 首先我們需要一個模塊:datetime (這個模塊比內置 time 模塊要好使一點) 不會裝??不是吧! pip install datetime 還有上一篇博文我們已經使用過的 pymongo 下面是隊列的代碼:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      56

      57

      58

      59

      60

      61

      62

      63

      64

      65

      66

      67

      68

      69

      70

      71

      72

      73

      74

      75

      76

      77

      78

      79

      80

      81

      82

      83

      84

      85

      86

      87

      88

      from datetime import datetime, timedelta

      from pymongo import MongoClient, errors

      class MogoQueue():

      OUTSTANDING = 1 ##初始狀態

      PROCESSING = 2 ##正在下載狀態

      COMPLETE = 3 ##下載完成狀態

      def __init__(self, db, collection, timeout=300):##初始mongodb連接

      self.client = MongoClient()

      self.Client = self.client[db]

      self.db = self.Client[collection]

      self.timeout = timeout

      def __bool__(self):

      """

      這個函數,我的理解是如果下面的表達為真,則整個類為真

      至于有什么用,后面我會注明的(如果我的理解有誤,請指點出來謝謝,我也是Python新手)

      $ne的意思是不匹配

      """

      record = self.db.find_one(

      {'status': {'$ne': self.COMPLETE}}

      )

      return True if record else False

      def push(self, url, title): ##這個函數用來添加新的URL進隊列

      try:

      self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主題': title})

      print(url, '插入隊列成功')

      except errors.DuplicateKeyError as e: ##報錯則代表已經存在于隊列之中了

      print(url, '已經存在于隊列中了')

      pass

      def push_imgurl(self, title, url):

      try:

      self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})

      print('圖片地址插入成功')

      except errors.DuplicateKeyError as e:

      print('地址已經存在了')

      pass

      def pop(self):

      """

      這個函數會查詢隊列中的所有狀態為OUTSTANDING的值,

      更改狀態,(query后面是查詢)(update后面是更新)

      并返回_id(就是我們的URL),MongDB好使吧,^_^

      如果沒有OUTSTANDING的值則調用repair()函數重置所有超時的狀態為OUTSTANDING,

      $set是設置的意思,和MySQL的set語法一個意思

      """

      record = self.db.find_and_modify(

      query={'status': self.OUTSTANDING},

      update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}

      )

      if record:

      return record['_id']

      else:

      self.repair()

      raise KeyError

      def pop_title(self, url):

      record = self.db.find_one({'_id': url})

      return record['主題']

      def peek(self):

      """這個函數是取出狀態為 OUTSTANDING的文檔并返回_id(URL)"""

      record = self.db.find_one({'status': self.OUTSTANDING})

      if record:

      return record['_id']

      def complete(self, url):

      """這個函數是更新已完成的URL完成"""

      self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

      def repair(self):

      """這個函數是重置狀態$lt是比較"""

      record = self.db.find_and_modify(

      query={

      'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},

      'status': {'$ne': self.COMPLETE}

      },

      update={'$set': {'status': self.OUTSTANDING}}

      )

      if record:

      print('重置URL狀態', record['_id'])

      def clear(self):

      """這個函數只有第一次才調用、后續不要調用、因為這是刪庫啊!"""

      self.db.drop()

      好了,隊列我們做好了,下面是獲取所有頁面的代碼:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      from Download import request

      from mongodb_queue import MogoQueue

      from bs4 import BeautifulSoup

      小白爬蟲第四彈之爬蟲快跑(多進程 + 多線程)

      spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')

      def start(url):

      response = request.get(url, 3)

      Soup = BeautifulSoup(response.text, 'lxml')

      all_a = Soup.find('div', class_='all').find_all('a')

      for a in all_a:

      title = a.get_text()

      url = a['href']

      spider_queue.push(url, title)

      """上面這個調用就是把URL寫入MongoDB的隊列了"""

      if __name__ == "__main__":

      start('http://www.mzitu.com/all')

      """這一段兒就不解釋了哦!超級簡單的"""

      下面就是多進程 + 多線程的下載代碼了:

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      56

      57

      58

      59

      60

      61

      62

      63

      64

      65

      66

      67

      68

      69

      70

      71

      72

      73

      74

      75

      76

      77

      78

      79

      80

      81

      82

      83

      84

      85

      import os

      import time

      import threading

      import multiprocessing

      from mongodb_queue import MogoQueue

      from Download import request

      from bs4 import BeautifulSoup

      SLEEP_TIME = 1

      def mzitu_crawler(max_threads=10):

      crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##這個是我們獲取URL的隊列

      ##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')

      def pageurl_crawler():

      while True:

      try:

      url = crawl_queue.pop()

      print(url)

      except KeyError:

      print('隊列沒有數據')

      break

      else:

      img_urls = []

      req = request.get(url, 3).text

      title = crawl_queue.pop_title(url)

      mkdir(title)

      os.chdir('D:\mzitu\\' + title)

      max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()

      for page in range(1, int(max_span) + 1):

      page_url = url + '/' + str(page)

      img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']

      img_urls.append(img_url)

      save(img_url)

      crawl_queue.complete(url) ##設置為完成狀態

      ##img_queue.push_imgurl(title, img_urls)

      ##print('插入數據庫成功')

      def save(img_url):

      name = img_url[-9:-4]

      print(u'開始保存:', img_url)

      img = request.get(img_url, 3)

      f = open(name + '.jpg', 'ab')

      f.write(img.content)

      f.close()

      def mkdir(path):

      path = path.strip()

      isExists = os.path.exists(os.path.join("D:\mzitu", path))

      if not isExists:

      print(u'建了一個名字叫做', path, u'的文件夾!')

      os.makedirs(os.path.join("D:\mzitu", path))

      return True

      else:

      print(u'名字叫做', path, u'的文件夾已經存在了!')

      return False

      threads = []

      while threads or crawl_queue:

      """

      這兒crawl_queue用上了,就是我們__bool__函數的作用,為真則代表我們MongoDB隊列里面還有數據

      threads 或者 crawl_queue為真都代表我們還沒下載完成,程序就會繼續執行

      """

      for thread in threads:

      if not thread.is_alive(): ##is_alive是判斷是否為空,不是空則在隊列中刪掉

      threads.remove(thread)

      while len(threads) < max_threads or crawl_queue.peek(): ##線程池中的線程少于max_threads 或者 crawl_qeue時

      thread = threading.Thread(target=pageurl_crawler) ##創建線程

      thread.setDaemon(True) ##設置守護線程

      thread.start() ##啟動線程

      threads.append(thread) ##添加進線程隊列

      time.sleep(SLEEP_TIME)

      def process_crawler():

      process = []

      num_cpus = multiprocessing.cpu_count()

      print('將會啟動進程數為:', num_cpus)

      for i in range(num_cpus):

      p = multiprocessing.Process(target=mzitu_crawler) ##創建進程

      p.start() ##啟動進程

      process.append(p) ##添加進進程隊列

      for p in process:

      p.join() ##等待進程隊列里面的進程結束

      if __name__ == "__main__":

      process_crawler()

      好啦!一個多進程多線的爬蟲就完成了,(其實你可以設置一下 MongoDB,然后調整一下連接配置,在多臺機器上跑哦!!嗯,就是超級簡化版的分布式爬蟲了,雖然很是簡陋。) 本來還想下載圖片那一塊兒加上異步(畢竟下載圖片是I\O等待最久的時間了,),可惜異步我也沒怎么整明白,就不拿出來貽笑大方了。 另外,各位小哥兒可以參考上面代碼,單獨處理圖片地址試試(就是多個進程直接下載圖片)? 我測試了一下八分鐘下載 100 套圖 PS:請務必使用 第二篇博文中的下載模塊,或者自己寫一個自動更換代理的下載模塊!??!不然寸步難行,分分鐘被服務器 BAN 掉! 小白教程就到此結束了,后面我教大家玩玩 Scrapy;目標 頂點小說網, 爬完全站的小說。 再后面帶大家玩玩 抓新浪 湯不熱、模擬登錄 之類的。或許維護一個公共代理 IP 池之類的。 這個所有代碼我放在這個位置了:https://github.com/thsheep/mzitu/

      https 任務調度 多線程 數據庫

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:quartz在集群環境下的最終解決方案
      下一篇:c++調用python返回字典
      相關文章
      亚洲av日韩av天堂影片精品| 国产精品亚洲一区二区三区在线| 亚洲国产精品成人久久| 亚洲真人无码永久在线| 亚洲伊人久久综合影院| 国产亚洲综合视频| 亚洲伦乱亚洲h视频| 亚洲一区二区三区在线视频| 亚洲乱亚洲乱少妇无码| 综合亚洲伊人午夜网| 亚洲色成人WWW永久网站| 亚洲精品无码午夜福利中文字幕| 亚洲中文字幕在线观看| 好看的电影网站亚洲一区| 久久精品国产精品亚洲精品 | 国产成人精品久久亚洲高清不卡| 亚洲av永久无码精品秋霞电影秋| 亚洲av无码偷拍在线观看| 国产精品亚洲专区在线播放| 亚洲成AV人网址| 亚洲无码精品浪潮| 亚洲精品亚洲人成在线观看| 亚洲成AV人片在线观看| 久久亚洲AV成人无码国产| 亚洲国产成人手机在线电影bd | 中文亚洲AV片不卡在线观看| 亚洲一区二区三区在线观看精品中文| 亚洲精品无码久久久久sm| 久久久久亚洲Av片无码v | 国产亚洲精品美女久久久久久下载| 亚洲 国产 图片| 在线A亚洲老鸭窝天堂| 亚洲AV色香蕉一区二区| 亚洲国产精品午夜电影| 亚洲精品无码永久在线观看男男| 亚洲av区一区二区三| 亚洲精品制服丝袜四区| 亚洲欧洲视频在线观看| 亚洲综合一区二区三区四区五区| 美国毛片亚洲社区在线观看| 国产亚洲精品成人AA片新蒲金|