小白爬蟲第四彈之爬蟲快跑(多進程 + 多線程)
PS:使用多線程時好像在目錄切換的問題上存在問題,可以給線程加個鎖試試 Hello 大家好!我又來了。 你是不是發現下載圖片速度特別慢、難以忍受?。τ谶@種問題 一般解決辦法就是多進程了!一個進程速度慢!我就用十個進程,相當于十個人一起干。速度就會快很多啦!(為什么不說多線程?懂點 Python 的小伙伴都知道、GIL 的存在 導致 Python 的多線程有點坑?。。┙裉炀徒檀蠹襾碜鲆粋€多進程的爬蟲(其實吧、可以用來做一個超簡化版的分布式爬蟲) 其實吧!還有一種加速的方法叫做 “異步”!不過這玩意兒我沒怎么整明白就不出來誤人子弟了!(因為爬蟲大部分時間都是在等待 response 中!‘異步’則能讓程序在等待 response 的時間去做的其他事情。) 學過 Python 基礎的同學都知道、在多進程中,進程之間是不能相互通信的,這就有一個很坑爹的問題的出現了!多個進程怎么知道那那些需要爬取、哪些已經被爬取了! 這就涉及到一個東西!這玩意兒叫做隊列!!隊列?。£犃校。∑鋵嵃烧碚f應該給大家用隊列來完成這個教程的, 比如 Tornado 的 queue 模塊。(如果需要更為穩定健壯的隊列,則請考慮使用 Celery 這一類的專用消息傳遞工具) 不過為了簡化技術種類?。。ú挪粫嬖V你們是我懶,嫌麻煩呢?。┻@次我們繼續使用 MongoDB。 好了!先來理一下思路: 每個進程需要知道那些 URL 爬取過了、哪些 URL 需要爬??!我們來給每個 URL 設置兩種狀態: outstanding: 等待爬取的 URL complete: 爬取完成的 URL 誒!等等我們好像忘了啥? 失敗的 URL 的怎么辦?。课覀冊谠黾右环N狀態: processing: 正在進行的 URL。 嗯!當一個所有初始的 URL 狀態都為 outstanding;當開始爬取的時候狀態改為:processing;爬取完成狀態改為:complete;失敗的 URL 重置狀態為:outstanding。為了能夠處理 URL 進程被終止的情況、我們設置一個計時參數,當超過這個值時;我們則將狀態重置為 outstanding。 下面開整 Go Go Go! 首先我們需要一個模塊:datetime (這個模塊比內置 time 模塊要好使一點) 不會裝??不是吧! pip install datetime 還有上一篇博文我們已經使用過的 pymongo 下面是隊列的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from datetime import datetime, timedelta
from pymongo import MongoClient, errors
class MogoQueue():
OUTSTANDING = 1 ##初始狀態
PROCESSING = 2 ##正在下載狀態
COMPLETE = 3 ##下載完成狀態
def __init__(self, db, collection, timeout=300):##初始mongodb連接
self.client = MongoClient()
self.Client = self.client[db]
self.db = self.Client[collection]
self.timeout = timeout
def __bool__(self):
"""
這個函數,我的理解是如果下面的表達為真,則整個類為真
至于有什么用,后面我會注明的(如果我的理解有誤,請指點出來謝謝,我也是Python新手)
$ne的意思是不匹配
"""
record = self.db.find_one(
{'status': {'$ne': self.COMPLETE}}
)
return True if record else False
def push(self, url, title): ##這個函數用來添加新的URL進隊列
try:
self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主題': title})
print(url, '插入隊列成功')
except errors.DuplicateKeyError as e: ##報錯則代表已經存在于隊列之中了
print(url, '已經存在于隊列中了')
pass
def push_imgurl(self, title, url):
try:
self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})
print('圖片地址插入成功')
except errors.DuplicateKeyError as e:
print('地址已經存在了')
pass
def pop(self):
"""
這個函數會查詢隊列中的所有狀態為OUTSTANDING的值,
更改狀態,(query后面是查詢)(update后面是更新)
并返回_id(就是我們的URL),MongDB好使吧,^_^
如果沒有OUTSTANDING的值則調用repair()函數重置所有超時的狀態為OUTSTANDING,
$set是設置的意思,和MySQL的set語法一個意思
"""
record = self.db.find_and_modify(
query={'status': self.OUTSTANDING},
update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
)
if record:
return record['_id']
else:
self.repair()
raise KeyError
def pop_title(self, url):
record = self.db.find_one({'_id': url})
return record['主題']
def peek(self):
"""這個函數是取出狀態為 OUTSTANDING的文檔并返回_id(URL)"""
record = self.db.find_one({'status': self.OUTSTANDING})
if record:
return record['_id']
def complete(self, url):
"""這個函數是更新已完成的URL完成"""
self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})
def repair(self):
"""這個函數是重置狀態$lt是比較"""
record = self.db.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
print('重置URL狀態', record['_id'])
def clear(self):
"""這個函數只有第一次才調用、后續不要調用、因為這是刪庫啊!"""
self.db.drop()
好了,隊列我們做好了,下面是獲取所有頁面的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from Download import request
from mongodb_queue import MogoQueue
from bs4 import BeautifulSoup
spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')
def start(url):
response = request.get(url, 3)
Soup = BeautifulSoup(response.text, 'lxml')
all_a = Soup.find('div', class_='all').find_all('a')
for a in all_a:
title = a.get_text()
url = a['href']
spider_queue.push(url, title)
"""上面這個調用就是把URL寫入MongoDB的隊列了"""
if __name__ == "__main__":
start('http://www.mzitu.com/all')
"""這一段兒就不解釋了哦!超級簡單的"""
下面就是多進程 + 多線程的下載代碼了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import os
import time
import threading
import multiprocessing
from mongodb_queue import MogoQueue
from Download import request
from bs4 import BeautifulSoup
SLEEP_TIME = 1
def mzitu_crawler(max_threads=10):
crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##這個是我們獲取URL的隊列
##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')
def pageurl_crawler():
while True:
try:
url = crawl_queue.pop()
print(url)
except KeyError:
print('隊列沒有數據')
break
else:
img_urls = []
req = request.get(url, 3).text
title = crawl_queue.pop_title(url)
mkdir(title)
os.chdir('D:\mzitu\\' + title)
max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()
for page in range(1, int(max_span) + 1):
page_url = url + '/' + str(page)
img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']
img_urls.append(img_url)
save(img_url)
crawl_queue.complete(url) ##設置為完成狀態
##img_queue.push_imgurl(title, img_urls)
##print('插入數據庫成功')
def save(img_url):
name = img_url[-9:-4]
print(u'開始保存:', img_url)
img = request.get(img_url, 3)
f = open(name + '.jpg', 'ab')
f.write(img.content)
f.close()
def mkdir(path):
path = path.strip()
isExists = os.path.exists(os.path.join("D:\mzitu", path))
if not isExists:
print(u'建了一個名字叫做', path, u'的文件夾!')
os.makedirs(os.path.join("D:\mzitu", path))
return True
else:
print(u'名字叫做', path, u'的文件夾已經存在了!')
return False
threads = []
while threads or crawl_queue:
"""
這兒crawl_queue用上了,就是我們__bool__函數的作用,為真則代表我們MongoDB隊列里面還有數據
threads 或者 crawl_queue為真都代表我們還沒下載完成,程序就會繼續執行
"""
for thread in threads:
if not thread.is_alive(): ##is_alive是判斷是否為空,不是空則在隊列中刪掉
threads.remove(thread)
while len(threads) < max_threads or crawl_queue.peek(): ##線程池中的線程少于max_threads 或者 crawl_qeue時
thread = threading.Thread(target=pageurl_crawler) ##創建線程
thread.setDaemon(True) ##設置守護線程
thread.start() ##啟動線程
threads.append(thread) ##添加進線程隊列
time.sleep(SLEEP_TIME)
def process_crawler():
process = []
num_cpus = multiprocessing.cpu_count()
print('將會啟動進程數為:', num_cpus)
for i in range(num_cpus):
p = multiprocessing.Process(target=mzitu_crawler) ##創建進程
p.start() ##啟動進程
process.append(p) ##添加進進程隊列
for p in process:
p.join() ##等待進程隊列里面的進程結束
if __name__ == "__main__":
process_crawler()
好啦!一個多進程多線的爬蟲就完成了,(其實你可以設置一下 MongoDB,然后調整一下連接配置,在多臺機器上跑哦!!嗯,就是超級簡化版的分布式爬蟲了,雖然很是簡陋。) 本來還想下載圖片那一塊兒加上異步(畢竟下載圖片是I\O等待最久的時間了,),可惜異步我也沒怎么整明白,就不拿出來貽笑大方了。 另外,各位小哥兒可以參考上面代碼,單獨處理圖片地址試試(就是多個進程直接下載圖片)? 我測試了一下八分鐘下載 100 套圖 PS:請務必使用 第二篇博文中的下載模塊,或者自己寫一個自動更換代理的下載模塊!??!不然寸步難行,分分鐘被服務器 BAN 掉! 小白教程就到此結束了,后面我教大家玩玩 Scrapy;目標 頂點小說網, 爬完全站的小說。 再后面帶大家玩玩 抓新浪 湯不熱、模擬登錄 之類的。或許維護一個公共代理 IP 池之類的。 這個所有代碼我放在這個位置了:https://github.com/thsheep/mzitu/
https 任務調度 多線程 數據庫
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。