python并發編程之多線程
并發與并行
cup是順序執行的,操作系統輪流讓各個任務交替執行,每個任務在時間片內執行,時間片結束切換到下一個任務,這是并發。
真正的并行執行多任務只能在?多核CPU?上實現,但是,由于任務數量遠遠多于?CPU?的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。
并發性:是指兩個或多個事件在同一時間間隔內發生。
并行性:是指兩個或多個事件在同一時刻發生。
舉個例子:我們有一個車間(cpu),可以生產各種東西(任務),比如汽車,飛機,自行車。制造汽車時:車間每生產一個零件(計算操作)后,需要等它冷卻,定型,組裝好后才能生產下一個零件,在這個冷卻,定型,組裝的過程(IO操作)中車間是不工作的(程序霸占著cpu時間,效率低)。為了讓我們車間效率高,我們讓它在冷卻,定型,組裝的時間中生產其他的零件,等組裝好后再繼續生產汽車零件。某個確定的時間時做一個事情,但在一個時間段(時間片)內做多個事情,這就是并發。多個車間同時工作就是并行。
【拓展:時間片】
每個進程被分配一個時間段,稱作它的時間片,即該進程允許運行的時間。如果在時間片結束時進程還在運行,則CPU將被剝奪并分配給另一個進程。如果進程在時間片結束前阻塞或結束,則CPU當即進行切換。調度程序所要做的就是維護一張就緒進程列表,當進程用完它的時間片后,它被移到隊列的末尾。
從一個進程切換到另一個進程是需要一定時間的--保存和裝入寄存器值及內存映像,更新各種表格和隊列等。
時間片設得太短會導致過多的進程切換,降低了CPU效率;而設得太長又可能引起對短的交互請求的響應變差。將時間片設為100毫秒通常是一個比較合理的折中。
進程和線程
進程
進程(Process):是計算機中的程序關于某數據集合上的一次運行活動,是系統進行資源分配和調度的基 本單位,是操作系統結構的基礎。
程序是源代碼編譯后的文件,而這些文件存放在磁盤上。當程序被操作系統加載到 內存中,就是進程,進程中存放著指令和數據(資源)。一個程序的執行實例就是一個進程。它也是線 程的容器。
線程
線程 thread :線程也叫輕量級進程,是操作系統能夠進行運算調度的最小單位,它被包涵在進程之中,是進程中的實際運作單位。線程自己不擁有系統資源,只擁有一點兒在運行中必不可少的資源,但它可與同屬一個進程的其他線程共享進程所擁有的全部資源。一條線程指的是進程中一個單一順序的控制流,一個線程可以創建和撤銷另一個線程,同一個進程中的多個線程之間可以并發執行,每條線程并行執行不同的任務。
每個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。但是線程不能夠獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
每個線程都有他自己的一組CPU寄存器,稱為線程的上下文,該上下文反映了線程上次運行該線程的CPU寄存器的狀態。
指令指針和堆棧指針寄存器是線程上下文中兩個最重要的寄存器,線程總是在進程得到上下文中運行的,這些地址都用于標志擁有線程的進程地址空間中的內存。
線程可以被搶占(中斷)。
在其他線程正在運行時,線程可以暫時擱置(也稱為睡眠) -- 這就是線程的退讓。
線程可以分為:
內核線程:由操作系統內核創建和撤銷。
用戶線程:不需要內核支持而在用戶程序中實現的線程。
線程的狀態
狀態
含義
就緒(Ready)
線程能夠運行,但在等待被調度。可能線程剛剛創建啟動,或剛剛從阻塞中恢?復,或者被其他線程搶占
運行(Running)
線程正在運行
阻塞(Blocked)
線程等待外部事件發生而無法運行,如I/O操作
終止 (Terminated)
線程完成,或退出,或被取消
進程和程序的關系
Linux進程有父進程、子進程,Windows的進程是平等關系。
在實現了線程的操作系統中,線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是
進程中的實際運作單位。
一個標準的線程由線程ID,當前指令指針(PC)、寄存器集合和堆、棧組成。
在許多系統中,創建一個線程比創建一個進程快10-100倍。
進程、線程的理解
現代操作系統提出進程的概念,每一個進程都認為自己獨占所有的計算機硬件資源。
進程就是獨立的王國,進程間不可以隨便的共享數據。
線程就是省份,同一個進程內的線程可以共享進程的資源,每一個線程擁有自己獨立的堆棧。
Python中的進程和線程
運行程序會啟動一個解釋器進程,線程共享一個解釋器進程。
進程靠線程執行代碼,至少有一個主線程,其它線程是工作線程。
主線程是第一個啟動的線程。
父線程:如果線程A中啟動了一個線程B,??A就是B的父線程。
子線程:??B就是A的子線程。
多線程
為什么要使用多線程?
線程在程序中是獨立的、并發的執行流。與分隔的進程相比,進程中線程之間的隔離程度要小,它們共享內存、文件句柄
和其他進程應有的狀態。
因為線程的劃分尺度小于進程,使得多線程程序的并發性高。進程在執行過程之中擁有獨立的內存單元,而多個線程共享
內存,從而極大的提升了程序的運行效率。
線程比進程具有更高的性能,這是由于同一個進程中的線程都有共性,多個線程共享一個進程的虛擬空間。線程的共享環境
包括進程代碼段、進程的共有數據等,利用這些共享的數據,線程之間很容易實現通信。
操作系統在創建進程時,必須為改進程分配獨立的內存空間,并分配大量的相關資源,但創建線程則簡單得多。因此,使用多線程
來實現并發比使用多進程的性能高得要多。
總結起來,使用多線程編程具有如下幾個優點:
進程之間不能共享內存,但線程之間共享內存非常容易。
操作系統在創建進程時,需要為該進程重新分配系統資源,但創建線程的代價則小得多。因此使用多線程來實現多任務并發執行比使用多進程的效率高
python的多線程
python語言內置了多線程功能支持,而不是單純地作為底層操作系統的調度方式,從而簡化了python的多線程編程。
在 CPython 中,由于存在?全局解釋器鎖,同一時刻只有一個線程可以執行 Python 代碼,如果你想讓你的應用更好地利用多核心計算機的計算資源,推薦你使用 multiprocessing?或?concurrent.futures.ProcessPoolExecutor。 但是,如果你想要同時運行多個 I/O 密集型任務,則多線程仍然是一個合適的模型。
python實現多線程
_thread
threading(推薦使用)
thread 模塊已被廢棄。用戶可以使用 threading 模塊代替。所以,在 Python3 中不能再使用"thread" 模塊。為了兼容性,Python3 將 thread 重命名為 "_thread"。
這里只講threading。
threading模塊
簡單示例:
import threading # 最簡單的線程程序 def worker(): print("I'm working") print('Fineshed') t = threading.Thread(target=worker, name='worker') # 線程對象 t.start() # 啟動
threading.Thread參數解析
target ? ?線程調用的對象,就是目標函數
name ? ?為線程起個名字
args ? ?為目標函數傳遞實參,元組
kwargs ? ?為目標函數關鍵字傳參,字典
通過threading.Thread創建一個線程對象,??target是目標函數,可以使用name為線程指定名稱。?但是線程沒有啟動,需要調用start方法。
線程之所以執行函數,是因為線程中就是要執行代碼的,而最簡單的代碼封裝就是函數,所以還是函數?調用。
函數執行完,線程也就退出了。
線程退出
Python沒有提供線程退出的方法,線程在下面情況時退出
線程函數內語句執行完畢
線程函數中拋出未處理的異常
# 程序拋異常線程退出 import threading import time def worker(): for i in range(10): time.sleep(0.5) if i > 5: #break # 終止循環 #return # 函數返回 raise RuntimeError # 拋異常 print('I am working') print('finished') t = threading.Thread(target=worker, name='worker') t.start() print('=' * 30)
這個模塊定義了以下函數:
threading.main_thread()
返回主 Thread 對象。一般情況下,主線程是Python解釋器開始時創建的線程。
threading.current_thread():
返回當前對應調用者的控制線程的?Thread?對象。如果調用者的控制線程不是利用?threading 創建,會返回一個功能受限的虛擬線程對象。
threading.enumerate():
返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
threading.active_count():
返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
threading.get_ident():
返回當前線程的 “線程標識符”。它是一個非零的整數。它的值沒有直接含義,主要是用作 magic cookie,比如作為含有線程相關數據的字典的索引。線程標識符可能會在線程退出,新線程創建時被復用。
threading.excepthook(args, /)
處理由 Thread.run() 引發的未捕獲異常。
...
import threading import time def showtreadinfo(): # 打印當前線程對象,主線程對象,活躍線程數 print('current thread = {}\nmain thread = {}\nactive count = {}' .format( threading.current_thread(), threading.main_thread(), threading.active_count())) # 打印正在運行的線程的list,線程標識符, print('threading.enumerate = {}\nthreading.get_ident = {}'.format( threading.enumerate(),threading.get_ident())) def worker(): showtreadinfo() for i in range(2): time.sleep(1) print('i am working') print('finished') t = threading.Thread(target=worker, name='worker') # 線程對象 showtreadinfo() time.sleep(1) t.start() # 啟動 print('===end===') # current thread = <_MainThread(MainThread, started 17200)> # main thread = <_MainThread(MainThread, started 17200)> # active count = 1 # threading.enumerate = [<_MainThread(MainThread, started 17200)>] # threading.get_ident = 17200 # current thread =
線程對象
除了使用方法外,線程模塊同樣提供了Thread類來處理線程
threading.Thread()
創建一個daemon=True的守護線程:
import time import threading def foo(): time.sleep(1) for i in range(2): print(i) # 主線程是non-daemon線程 t = threading.Thread(target=foo, daemon=True) # 主線程執行完直接退出,不會等待t結束 t.start() print('Main Thread Exits') # Main Thread Exits
主線程是non-daemon線程,即daemon?=?False。
線程具有一個daemon屬性,可以手動設置為True或False,也可以不設置,則取默認值None?如果不設置daemon,就取當前線程的daemon來設置它
主線程是non-daemon線程,即daemon?=?False
從主線程創建的所有線程的不設置daemon屬性,則默認都是daemon = False,也就是non-?daemon線程
Python程序在沒有活著的non-daemon線程運行時,程序退出,也就是除主線程之外剩下的只能?都是daemon線程,主線程才能退出,否則主線程就只能等待
Thread類提供了以下屬性和方法:
name ? ?只是一個名字,只是個標識,名稱可以重名。 ?getName()、 ?setName()獲取、設置這 個名詞
ident ? ?線程ID,它是非0整數。線程啟動后才會有ID,否則為None。線程退出,此ID依舊可 以訪問。此ID可以重復使用
native_id ? ?此線程的線程 ID (TID),由 OS (內核) 分配。 這是一個非負整數,或者如果線程還未啟動則為?None。
daemon ?一個表示這個線程是(True)否(False)守護線程的布爾值。一定要在調用 start() 前設置好,不然會拋出 RuntimeError 。初始值繼承于創建線程;主線程不是守護線程,因此主線程創建的所有線程默認都是 daemon = False。當沒有存活的非守護線程時,整個Python程序才會退出。
start():啟動線程活動。start方法才能啟動操作系統線程,并運行run方法。??run方法內部調用了目標函數。
run():?用以表示線程活動的方法。
join([time]):?等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
is_alive():?返回線程是否活動的。
getName():?返回線程名。
setName():?設置線程名。
isDaemon()? ??返回線程是否是守護線程
setDaemon()?設置daemon值
創建多個線程:
import threading import time import sys def worker(f=sys.stdout): t = threading.current_thread() for i in range(2): time.sleep(1) print('i am working' , t.name, t.ident, file=f) print('finished', file=f) t1 = threading.Thread(target=worker, name='worker1') t2 = threading.Thread(target=worker, name='worker2', args=(sys.stderr,)) t1.start() t2.start() # i am working worker1 18412 # finished # i am working worker2 1312 # finished # i am working worker2 1312 # finished # i am working worker1 18412 # finished
可以看到worker1和work2交替執行。
當使用start方法啟動線程后,進程內有多個活動的線程并行的工作,就是多線程。
一個進程中至少有一個線程,并作為程序的入口,這個線程就是主線程。
一個進程至少有一個主線程。其他線程稱為工作線程。
線程安全
多線程執行一段代碼,不會產生不確定的結果,那這段代碼就是線程安全的。
多線程在運行過程中,由于共享同一進程中的數據,多線程并發使用同一個數據,那么數據就有可能被?相互修改,從而導致某些時刻無法確定這個數據的值,最終隨著多線程運行,運行結果不可預期,這就?是線程不安全。
import threading import time import logging FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) class A: def __init__(self): self.x = 0 # 全局對象 global_data = A() def worker(): global_data .x = 0 for i in range(100): time.sleep(0.0001) global_data .x += 1 logging.info(global_data.x) for i in range(10): threading.Thread(target=worker, name='t-{}'.format(i)).start() # 每次執行global_data .x的值都是不確定的
threading.local類
python提供?threading.local?類,將這個類實例化得到一個全局對象,但是不同的線程使用這個對象存儲的數據其他線程看不見。
import threading import time import logging FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) # 全局對象 global_data = threading.local() def worker(): global_data .x = 0 for i in range(100): time.sleep(0.0001) global_data .x += 1 logging.info(global_data.x) for i in range(10): threading.Thread(target=worker, name='t-{}'.format(i)).start()
結果顯示和使用局部變量的效果一樣。
# 當前線程創建的threading.local()的屬性被其他線程訪問時會報錯 import logging import threading import time FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) # 全局對象 X = 'abc' global_data = threading.local() global_data .x = 100 print(global_data , global_data .x) print('~' * 30) time.sleep(2) def worker(): logging.info(X) logging.info(global_data) logging.info(global_data.x) worker() # 普通函數調用 print('=' * 30) time.sleep(2) threading.Thread(target=worker, name='worker').start() # 啟動一個線程 # AttributeError: '_thread._local' object has no attribute 'x'
threading.local類構建了一個大字典,存放所有線程相關的字典,定義如下:
{ id(Thread) -> (ref(Thread), thread-local dict) }
每一線程實例的id為key,元組為value。
value中2部分為,線程對象引用,每個線程自己的字典。
本質
運行時,??threading.local實例處在不同的線程中,就從大字典中找到當前線程相關鍵值對中的字?典,覆蓋threading.local實例的__dict__?。
這樣就可以在不同的線程中,安全地使用線程獨有的數據,做到了線程間數據隔離,如同本地變量?一樣安全。
線程同步
線程同步,線程間協同,通過某種技術,讓一個線程訪問某些數據時,其他線程不能訪問這些數據,直到該線程完成對數據的操作。
Event
實現事件對象的類。事件對象管理一個內部標識,調用 set() 方法可將其設置為true。調用 clear() 方法可將其設置為 false 。調用 wait() 方法將進入阻塞直到標識為true。這個標識初始時為 false 。
set()
將內部標識設置為 true 。所有正在等待這個事件的線程將被喚醒。當標識為 true 時,調用 wait() 方法的線程不會被被阻塞。
clear()
is_set()
當且僅當內部標識為 true 時返回 True 。
wait(timeout=None)
# 老板監視工人生產10個杯子 from threading import Event, Thread import logging import time FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO) def boss(event:Event): logging.info("I'm boss, waiting for U") event.wait() # 阻塞等待 logging.info('Good Job.') def worker(event:Event, count=10): logging.info('I am working for U') cups = [] while not event.wait(0.1): logging.info('make 1 cup') time.sleep(0.1) cups.append(1) if len(cups) >= count: event.set() logging.info('I finished my job. cups={}' .format(cups)) event = Event() b = Thread(target=boss, name='boss', args=(event,)) w = Thread(target=worker, name='worker', args=(event,)) b.start() w.start()
生產完成even事件為True,老板監視進程阻塞結束,打印good job
總結
需要使用同一個Event對象的標記flag。
誰wait就是等到flag變為True,或等到超時返回False。
不限制等待者的個數,通知所有等待者。
鎖對象
原始鎖處于 "鎖定" 或者 "非鎖定" 兩種狀態之一。它被創建時為非鎖定狀態。它有兩個基本方法, acquire() 和 release() 。當狀態為非鎖定時, acquire() 將狀態改為 鎖定 并立即返回。當狀態是鎖定時, acquire() 將阻塞至其他線程調用 release() 將其改為非鎖定狀態,然后 acquire() 調用重置其為鎖定狀態并返回。 release() 只在鎖定狀態下調用; 它將狀態改為非鎖定并立即返回。如果嘗試釋放一個非鎖定的鎖,則會引發 RuntimeError ?異常。
鎖同樣支持 上下文管理協議。
當多個線程在 acquire() 等待狀態轉變為未鎖定被阻塞,然后 release() 重置狀態為未鎖定時,只有一個線程能繼續執行;至于哪個等待線程繼續執行沒有定義,并且會根據實現而不同。
所有方法的執行都是原子性的。
threading.Lock
Lock類實現原始鎖對象的類,是mutex互斥鎖。一旦一個線程獲得一個鎖,會阻塞隨后嘗試獲得鎖的線程,直到它被釋放;任何線程都可以釋放它;一旦一個線程獲得鎖,其它試圖獲取鎖的線程將被阻塞,只到擁有鎖的線程釋放鎖
凡是存在共享資源爭搶的地方都可以使用鎖,從而保證只有一個使用者可以完全使用這個資源。
acquire(blocking=True,timeout=-1)
可以阻塞或非阻塞地獲得鎖。默認阻塞,阻塞可以設置超時時間。非阻塞時,??timeout禁止 設置。
成功獲取鎖,返回True,否則返回False
release()
釋放鎖。可以從任何線程調用釋放。已上鎖的鎖,會被重置為unlocked,未上鎖的鎖上調用,拋RuntimeError異常。如果其他線程正在等待這個鎖解鎖而被阻塞,只允許其中一個允許。
如果獲得了鎖則返回真值。
示例:訂單要求生產1000個杯子,組織10個工人生產
import threading from threading import Thread, Lock import time import logging FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) cups = [] lock = Lock() # 鎖 def worker(count=100): logging.info("I'm working") while True: # 生產杯子期間加鎖(改變cups列表) lock.acquire() # 獲取鎖 if len(cups) >= count: lock.release() break time.sleep(0.0001) # 為了看出線程切換效果,模擬杯子制作時間 cups.append(1) lock.release() logging.info('I finished my job. cups = {}' .format(len(cups))) for i in range(1, 11): t = Thread(target=worker, name="w{}".format(i), args=(1000,)) t.start()
上下文支持
鎖是典型必須釋放的,??Python提供了上下文支持。查看Lock類的上下文方法,??__enter__方法返回bool 表示是否獲得鎖,??__exit__方法中釋放鎖。
由此上例可以修改為
import threading from threading import Thread, Lock import time import logging FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) cups = [] lock = Lock() # 鎖 def worker(count=100): logging.info("I'm working") while True: with lock: # 獲取鎖,離開with釋放鎖 if len(cups) >= count: logging.info('leaving') break time.sleep(0.0001) # 為了看出線程切換效果,模擬杯子制作時間 cups.append(1) logging.info(lock.locked()) logging.info('I finished my job. cups = {}' .format(len(cups))) for i in range(1, 11): t = Thread(target=worker, name="w{}".format(i), args=(1000,)) t.start()
threading.RLock
重入鎖是一個可以被同一個線程多次獲取的同步基元組件。在內部,它在基元鎖的鎖定/非鎖定狀態上附加了 "所屬線程" 和 "遞歸等級" 的概念。在鎖定狀態下,某些線程擁有鎖 ; 在非鎖定狀態下, 沒有線程擁有它。
若要鎖定鎖,線程調用其 acquire() 方法;一旦線程擁有了鎖,方法將返回。若要解鎖,線程調用 release() 方法。 acquire()/release() 對可以嵌套;只有最終 release() (最外面一對的 release() ) 將鎖解開,才能讓其他線程繼續處理 acquire() 阻塞。
遞歸鎖也支持 上下文管理協議。
若要鎖定鎖,線程調用其 acquire() 方法;一旦線程擁有了鎖,方法將返回。若要解鎖,線程調用 release() 方法。 acquire()/release() 對可以嵌套;只有最終 release() (最外面一對的 release() ) 將鎖解開,才能讓其他線程繼續處理 acquire() 阻塞。
遞歸鎖也支持 上下文管理協議。
acquire(blocking=True,
timeout=-1)
默認阻塞,阻塞可以設置超時時間。非阻塞時,??timeout禁止 設置。
當無參數調用時: 如果這個線程已經擁有鎖,遞歸級別增加一,并立即返回。否則,如果其他線程擁有該鎖,則阻塞至該鎖解鎖。一旦鎖被解鎖(不屬于任何線程),則搶奪所有權,設置遞歸等級為一,并返回。如果多個線程被阻塞,等待鎖被解鎖,一次只有一個線程能搶到鎖的所有權。在這種情況下,沒有返回值。
release()
釋放鎖,自減遞歸等級。如果減到零,則將鎖重置為非鎖定狀態(不被任何線程擁有),并且,如果其他線程正被阻塞著等待鎖被解鎖,則僅允許其中一個線程繼續。如果自減后,遞歸等級仍然不是零,則鎖保持鎖定,仍由調用線程擁有。
只有當前線程擁有鎖才能調用這個方法。如果鎖被釋放后調用這個方法,會引起?RuntimeError?異常。
沒有返回值。
# 遞歸鎖:RLcok類的用法和Lock類一模一樣,但它支持嵌套,在多個鎖沒有釋放的時候一般會使用RLock類 import threading import time def func(lock): global gl_num lock.acquire() gl_num += 1 time.sleep(1) print(gl_num) lock.release() if __name__ == '__main__': gl_num = 0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=func,args=(lock,)) t.start()
GIL VS Lock
Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock?
首先我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據
然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。
最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock
鎖的應用場景
鎖適用于訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候。
如果全部都是讀取同一個共享資源需要鎖嗎?
不需要。因為這時可以認為共享資源是不可變的,每一次讀取它都是一樣的值,所以不用加鎖 使用鎖的注意事項:
少用鎖,必要時用鎖。使用了鎖,多線程訪問被鎖的資源時,就成了串行,要么排隊執行,要么爭 搶執行
舉例,高速公路上車并行跑,可是到了省界只開放了一個收費口,過了這個口,車輛依然可以
在多車道上一起跑。過收費口的時候,如果排隊一輛輛過,加不加鎖一樣效率相當,但是一旦 出現爭搶,就必須加鎖一輛輛過。注意,不管加不加鎖,只要是一輛輛過,效率就下降了。
加鎖時間越短越好,不需要就立即釋放鎖
一定要避免死鎖
Semaphore信號量
信號量通常用于保護數量有限的資源,例如數據庫服務器。在資源數量固定的任何情況下,都應該使用有界信號量。在生成任何工作線程前,應該在主線程中初始化信號量。互斥鎖同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據
maxconnections = 5 # ... pool_sema = BoundedSemaphore(value=maxconnections)
工作線程生成后,當需要連接服務器時,這些線程將調用信號量的 acquire 和 release 方法:
with pool_sema: conn = connectdb() try: # ... use connection ... finally: conn.close()
舉例:
from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ' get semaphore') # time.sleep(2) # sm.release() def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
threading.Condition(lock=None)
實現條件變量對象的類。一個條件變量對象允許一個或多個線程在被其它線程所通知之前進行等待。如果給出了非 None 的 lock 參數,則它必須為 Lock 或者 RLock 對象,并且它將被用作底層鎖。否則,將會創建新的 RLock 對象,并將其用作底層鎖。
acquire(*args)
請求底層鎖。此方法調用底層鎖的相應方法,返回值是底層鎖相應方法的返回值。
release()
釋放底層鎖。此方法調用底層鎖的相應方法。沒有返回值。
wait(timeout=None)
等待直到被通知或發生超時。這個方法釋放底層鎖,然后阻塞,直到在另外一個線程中調用同一個條件變量的?notify()?或?notify_all()?喚醒它,或者直到可選的超時發生。一旦被喚醒或者超時,它重新獲得鎖并返回。
wait_for(predicate, timeout=None)
等待,直到條件計算為真。 predicate 應該是一個可調用對象而且它的返回值可被解釋為一個布爾值。可以提供 timeout 參數給出最大等待時間。這個實用方法會重復地調用 wait() 直到滿足判斷式或者發生超時。返回值是判斷式最后一個返回值,而且如果方法發生超時會返回 False 。
notify(n=1)
默認喚醒一個等待這個條件的線程。如果調用線程在沒有獲得鎖的情況下調用這個方法,會引發 RuntimeError 異常。
這個方法喚醒最多 n 個正在等待這個條件變量的線程;如果沒有線程在等待,這是一個空操作。
當前實現中,如果至少有 n 個線程正在等待,準確喚醒 n 個線程。但是依賴這個行為并不安全。未來,優化的實現有時會喚醒超過 n 個線程。
注意:被喚醒的線程并沒有真正恢復到它調用的 wait() ,直到它可以重新獲得鎖。 因為 notify() 不釋放鎖,其調用者才應該這樣做。
notify_all()
喚醒所有正在等待這個條件的線程。這個方法行為與 notify() 相似,但并不只喚醒單一線程,而是喚醒所有等待線程。如果調用線程在調用這個方法時沒有獲得鎖,會引發 RuntimeError 異常。
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()
python多線程總結
python針對不同類型的代碼執行效率也是不同的
1、CPU密集型代碼(各種循環處理、計算等),在這種情況下,由于計算工作多,ticks技術很快就會達到閥值,然后出發GIL的釋放與再競爭(多個線程來回切換當然是需要消耗資源的),所以python下的多線程對CPU密集型代碼并不友好。
2、IO密集型代碼(文件處理、網絡爬蟲等設計文件讀寫操作),多線程能夠有效提升效率(單線程下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多線程能在線程A等待時,自動切換到線程B,可以不浪費CPU的資源,從而能提升程序的執行效率)。所以python的多線程對IO密集型代碼比較友好。
主要要看任務的類型,我們把任務分為I/O密集型和計算密集型,而多線程在切換中又分為I/O切換和時間切換。
如果任務屬于是I/O密集型,若不采用多線程,我們在進行I/O操作時,勢必要等待前面一個I/O任務完成后面的I/O任務才能進行,在這個等待的過程中,CPU處于等待狀態,這時如果采用多線程的話,剛好可以切換到進行另一個I/O任務。這樣就剛好可以充分利用CPU避免CPU處于閑置狀態,提高效率。但是
如果多線程任務都是計算型,CPU會一直在進行工作,直到一定的時間后采取多線程時間切換的方式進行切換線程,此時CPU一直處于工作狀態,
此種情況下并不能提高性能,相反在切換多線程任務時,可能還會造成時間和資源的浪費,導致效能下降。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。