Python 爬蟲進階五之多線程的用法
前言
我們之前寫的爬蟲都是單個線程的?這怎么夠?一旦一個地方卡到不動了,那不就永遠等待下去了?為此我們可以使用多線程或者多進程來處理。 首先聲明一點! 多線程和多進程是不一樣的!一個是 thread 庫,一個是 multiprocessing 庫。而多線程 thread 在 Python 里面被稱作雞肋的存在!而沒錯!本節介紹的是就是這個庫 thread。 不建議你用這個,不過還是介紹下了,如果想看可以看看下面,不想浪費時間直接看 multiprocessing 多進程
雞肋點
名言:
“Python 下多線程是雞肋,推薦使用多進程!”
那當然有同學會問了,為啥?
背景
1、GIL 是什么? GIL 的全稱是 Global Interpreter Lock (全局解釋器鎖),來源是 python 設計之初的考慮,為了數據安全所做的決定。 2、每個 CPU 在同一時間只能執行一個線程(在單核 CPU 下的多線程其實都只是并發,不是并行,并發和并行從宏觀上來講都是同時處理多路請求的概念。但并發和并行又有區別,并行是指兩個或者多個事件在同一時刻發生;而并發是指兩個或多個事件在同一時間間隔內發生。) 在 Python 多線程下,每個線程的執行方式:
獲取 GIL
執行代碼直到 sleep 或者是 python 虛擬機將其掛起。
釋放 GIL
可見,某個線程想要執行,必須先拿到 GIL,我們可以把 GIL 看作是 “通行證”,并且在一個 python 進程中,GIL 只有一個。拿不到通行證的線程,就不允許進入 CPU 執行。 在 Python2.x 里,GIL 的釋放邏輯是當前線程遇見 IO 操作或者 ticks 計數達到 100(ticks 可以看作是 Python 自身的一個計數器,專門做用于 GIL,每次釋放后歸零,這個計數可以通過 sys.setcheckinterval 來調整),進行釋放。 而每次釋放 GIL 鎖,線程進行鎖競爭、切換線程,會消耗資源。并且由于 GIL 鎖存在,python 里一個進程永遠只能同時執行一個線程 (拿到 GIL 的線程才能執行),這就是為什么在多核 CPU 上,python 的多線程效率并不高。
那么是不是 python 的多線程就完全沒用了呢?
在這里我們進行分類討論: 1、CPU 密集型代碼 (各種循環處理、計數等等),在這種情況下,由于計算工作多,ticks 計數很快就會達到閾值,然后觸發 GIL 的釋放與再競爭(多個線程來回切換當然是需要消耗資源的),所以 python 下的多線程對 CPU 密集型代碼并不友好。 2、IO 密集型代碼 (文件處理、網絡爬蟲等),多線程能夠有效提升效率 (單線程下有 IO 操作會進行 IO 等待,造成不必要的時間浪費,而開啟多線程能在線程 A 等待時,自動切換到線程 B,可以不浪費 CPU 的資源,從而能提升程序執行效率)。所以 python 的多線程對 IO 密集型代碼比較友好。 而在 python3.x 中,GIL 不使用 ticks 計數,改為使用計時器(執行時間達到閾值后,當前線程釋放 GIL),這樣對 CPU 密集型程序更加友好,但依然沒有解決 GIL 導致的同一時間只能執行一個線程的問題,所以效率依然不盡如人意。
多核性能
多核多線程比單核多線程更差,原因是單核下多線程,每次釋放 GIL,喚醒的那個線程都能獲取到 GIL 鎖,所以能夠無縫執行,但多核下,CPU0 釋放 GIL 后,其他 CPU 上的線程都會進行競爭,但 GIL 可能會馬上又被 CPU0 拿到,導致其他幾個 CPU 上被喚醒后的線程會醒著等待到切換時間后又進入待調度狀態,這樣會造成線程顛簸 (thrashing),導致效率更低
多進程為什么不會這樣?
每個進程有各自獨立的 GIL,互不干擾,這樣就可以真正意義上的并行執行,所以在 python 中,多進程的執行效率優于多線程 (僅僅針對多核 CPU 而言)。 所以在這里說結論:多核下,想做并行提升效率,比較通用的方法是使用多進程,能夠有效提高執行效率。 所以,如果不想浪費時間,可以直接看多進程。
直接利用函數創建多線程
Python 中使用線程有兩種方式:函數或者用類來包裝線程對象。
函數式:調用 thread 模塊中的 start_new_thread () 函數來產生新線程。語法如下:
1
thread.start_new_thread(function, args[, kwargs])
參數說明:
function - 線程函數。
args - 傳遞給線程函數的參數,他必須是個 tuple 類型。
kwargs - 可選參數。
先用一個實例感受一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: UTF-8 -*-
import thread
import time
# 為線程定義一個函數
def print_time(threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print "%s: %s" % (threadName, time.ctime(time.time()))
# 創建兩個線程
try:
thread.start_new_thread(print_time, ("Thread-1", 2,))
thread.start_new_thread(print_time, ("Thread-2", 4,))
except:
print "Error: unable to start thread"
while 1:
pass
print "Main Finished"
運行結果如下:
1
2
3
4
5
6
7
8
9
10
Thread-1: Thu Nov 3 16:43:01 2016
Thread-2: Thu Nov 3 16:43:03 2016
Thread-1: Thu Nov 3 16:43:03 2016
Thread-1: Thu Nov 3 16:43:05 2016
Thread-2: Thu Nov 3 16:43:07 2016
Thread-1: Thu Nov 3 16:43:07 2016
Thread-1: Thu Nov 3 16:43:09 2016
Thread-2: Thu Nov 3 16:43:11 2016
Thread-2: Thu Nov 3 16:43:15 2016
Thread-2: Thu Nov 3 16:43:19 2016
可以發現,兩個線程都在執行,睡眠 2 秒和 4 秒后打印輸出一段話。 注意到,在主線程寫了
1
2
while 1:
pass
這是讓主線程一直在等待 如果去掉上面兩行,那就直接輸出
1
Main Finished
程序執行結束。
使用 Threading 模塊創建線程
使用 Threading 模塊創建線程,直接從 threading.Thread 繼承,然后重寫 init 方法和 run 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import time
import thread
exitFlag = 0
class myThread (threading.Thread): #繼承父類threading.Thread
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self): #把要執行的代碼寫到run函數里面 線程在創建后會直接運行run函數
print "Starting " + self.name
print_time(self.name, self.counter, 5)
print "Exiting " + self.name
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
thread.exit()
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1
# 創建新線程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟線程
thread1.start()
thread2.start()
print "Exiting Main Thread"
運行結果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Starting Thread-1Starting Thread-2
Exiting Main Thread
Thread-1: Thu Nov 3 18:42:19 2016
Thread-2: Thu Nov 3 18:42:20 2016
Thread-1: Thu Nov 3 18:42:20 2016
Thread-1: Thu Nov 3 18:42:21 2016
Thread-2: Thu Nov 3 18:42:22 2016
Thread-1: Thu Nov 3 18:42:22 2016
Thread-1: Thu Nov 3 18:42:23 2016
Exiting Thread-1
Thread-2: Thu Nov 3 18:42:24 2016
Thread-2: Thu Nov 3 18:42:26 2016
Thread-2: Thu Nov 3 18:42:28 2016
Exiting Thread-2
有沒有發現什么奇怪的地方?打印的輸出格式好奇怪。比如第一行之后應該是一個回車的,結果第二個進程就打印出來了。 那是因為什么?因為這幾個線程沒有設置同步。
線程同步
如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。 使用 Thread 對象的 Lock 和 Rlock 可以實現簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對于那些需要每次只允許一個線程操作的數據,可以將其操作放到 acquire 和 release 方法之間。如下: 多線程的優勢在于可以同時運行多個任務(至少感覺起來是這樣)。但是當線程需要共享數據時,可能存在數據不同步的問題。 考慮這樣一種情況:一個列表里所有元素都是 0,線程”set” 從后向前把所有元素改成 1,而線程”print” 負責從前往后讀取列表并打印。 那么,可能線程”set” 開始改的時候,線程”print” 便來打印列表了,輸出就成了一半 0 一半 1,這就是數據的不同步。為了避免這種情況,引入了鎖的概念。 鎖有兩種狀態 —— 鎖定和未鎖定。每當一個線程比如”set” 要訪問共享數據時,必須先獲得鎖定;如果已經有別的線程比如”print” 獲得鎖定了,那么就讓線程”set” 暫停,也就是同步阻塞;等到線程”print” 訪問完畢,釋放鎖以后,再讓線程”set” 繼續。 經過這樣的處理,打印列表時要么全部輸出 0,要么全部輸出 1,不會再出現一半 0 一半 1 的尷尬場面。 看下面的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# -*- coding: UTF-8 -*-
import threading
import time
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print "Starting " + self.name
# 獲得鎖,成功獲得鎖定后返回True
# 可選的timeout參數不填時將一直阻塞直到獲得鎖定
# 否則超時后將返回False
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 釋放鎖
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1
threadLock = threading.Lock()
threads = []
# 創建新線程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟新線程
thread1.start()
thread2.start()
# 添加線程到線程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有線程完成
for t in threads:
t.join()
print "Exiting Main Thread"
在上面的代碼中運用了線程鎖還有 join 等待。 運行結果如下:
1
2
3
4
5
6
7
8
9
Starting Thread-1
Starting Thread-2
Thread-1: Thu Nov 3 18:56:49 2016
Thread-1: Thu Nov 3 18:56:50 2016
Thread-1: Thu Nov 3 18:56:51 2016
Thread-2: Thu Nov 3 18:56:53 2016
Thread-2: Thu Nov 3 18:56:55 2016
Thread-2: Thu Nov 3 18:56:57 2016
Exiting Main Thread
這樣一來,你可以發現就不會出現剛才的輸出混亂的結果了。
線程優先級隊列
Python 的 Queue 模塊中提供了同步的、線程安全的隊列類,包括 FIFO(先入先出) 隊列 Queue,LIFO(后入先出)隊列 LifoQueue,和優先級隊列 PriorityQueue。這些隊列都實現了鎖原語,能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。
Queue 模塊中的常用方法:
Queue.qsize () 返回隊列的大小
Queue.empty () 如果隊列為空,返回 True, 反之 False
Queue.full () 如果隊列滿了,返回 True, 反之 False
Queue.full 與 maxsize 大小對應
Queue.get ([block [, timeout]]) 獲取隊列,timeout 等待時間
Queue.get_nowait () 相當 Queue.get (False)
Queue.put (item) 寫入隊列,timeout 等待時間
Queue.put_nowait (item) 相當 Queue.put (item, False)
Queue.task_done () 在完成一項工作之后,Queue.task_done () 函數向任務已經完成的隊列發送一個信號
Queue.join () 實際上意味著等到隊列為空,再執行別的操作
用一個實例感受一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# -*- coding: UTF-8 -*-
import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print "Starting " + self.name
process_data(self.name, self.q)
print "Exiting " + self.name
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print "%s processing %s" % (threadName, data)
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# 創建新線程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# 填充隊列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待隊列清空
while not workQueue.empty():
pass
# 通知線程是時候退出
exitFlag = 1
# 等待所有線程完成
for t in threads:
t.join()
print "Exiting Main Thread"
運行結果:
1
2
3
4
5
6
7
8
9
10
11
12
Starting Thread-1
Starting Thread-2
Starting Thread-3
Thread-3 processing One
Thread-1 processing Two
Thread-2 processing Three
Thread-3 processing Four
Thread-2 processing Five
Exiting Thread-2
Exiting Thread-3
Exiting Thread-1
Exiting Main Thread
上面的例子用了 FIFO 隊列。當然你也可以換成其他類型的隊列。
參考文章
http://bbs.51cto.com/thread-1349105-1.html
http://www.runoob.com/python/python-multithreading.html
Python 任務調度 多線程
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。