亞寵展、全球?qū)櫸锂a(chǎn)業(yè)風(fēng)向標(biāo)——亞洲寵物展覽會(huì)深度解析
915
2022-05-29
這兩天頻繁遇到MongoDB插入數(shù)據(jù)的問(wèn)題,這里記錄下。
問(wèn)題描述:我有多個(gè)線程在抓數(shù)據(jù),每天數(shù)據(jù)里有含有多個(gè)文檔(Document),使用Pymongo的插入方法,逐條插入。形如下
def save_to_mongo(data):
for i in data:
db.insert_one(i)
在接收到數(shù)據(jù)后直接調(diào)用該方法即可。但是運(yùn)維那邊反饋,數(shù)據(jù)庫(kù)壓力比較大,讓我修改。仔細(xì)想了想,可以使用insert_many方法。
插入可迭代的文檔
>>> db.test.count_documents({})
0
>>> result = db.test.insert_many([{'x': i} for i in range(2)])
>>> result.inserted_ids
[ObjectId('54f113fffba522406c9cc20e'), ObjectId('54f113fffba522406c9cc20f')]
>>> db.test.count_documents({})
2
有幾個(gè)參數(shù)需要了解
documents: 可迭代文檔
ordered :(可選)如果“True”(默認(rèn))文檔將按順序插入服務(wù)器,按提供的順序。 如果發(fā)生錯(cuò)誤,則中止所有剩余插入。 如果為“False”,文檔將以任意順序插入服務(wù)器,可能并行,并且將嘗試所有文檔插入。
bypass_document_validation: (可選)如果為“True”,則允許寫(xiě)入選擇退出文檔級(jí)別驗(yàn)證。 默認(rèn)為“False”。
session (optional): a ClientSession.
好了最簡(jiǎn)單的方法就是把所有需要保存的數(shù)據(jù)暫時(shí)存放在列表中,最后再插入。建議加上ordered=False參數(shù),可以防止數(shù)據(jù)保存異常。
def?save_mongo(): ????while?True: ????????while?len(tmp)?>?100: ????????????try: ????????????????c?=?db[collection_name] ????????????????c.insert_many(tmp,?ordered=False) ????????????????tmp.clear() ????????????except?pymongo.errors.BulkWriteError: ????????????????tmp.clear() ????????????except?Exception?as?e: ????????????????logging.error('mongodb_save?insert_many:?{},?{}'.format(e,?tmp)) ????????time.sleep(3) tmp?=?[] for?i?in?data: ????tmp.append(i) t_save?=?threading.Thread(target=save_mongo) t_save.setDaemon(True) t_save.start()
新開(kāi)一個(gè)線程去不停的檢查,如果列表數(shù)據(jù)大于100,則批量插入,或者等待3秒。
這里捕獲pymongo.errors.BulkWriteError異常,如果在insert_many時(shí)發(fā)生錯(cuò)誤,會(huì)產(chǎn)生該異常。在我這里通常是插入重復(fù)數(shù)據(jù)引起的。
還有一種情況,是在多線程情況下。多個(gè)線程共享一個(gè)列表對(duì)象,肯定是需要加鎖的,如果使用Lock來(lái)管理數(shù)據(jù)插入問(wèn)題,需要去給列表加鎖。之前還沒(méi)用過(guò)鎖,去看看教程。
import threading
class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
def __init__(self, initial_value = 0):
self._value = initial_value
self._value_lock = threading.Lock()
def incr(self,delta=1):
'''
Increment the counter with locking
'''
self._value_lock.acquire()
self._value += delta
self._value_lock.release()
def decr(self,delta=1):
'''
Decrement the counter with locking
'''
self._value_lock.acquire()
self._value -= delta
self._value_lock.release()
覺(jué)得太麻煩,可以將保存數(shù)據(jù)等方法封裝成一個(gè)類對(duì)象,實(shí)例化一個(gè)列表,在每個(gè)線程中實(shí)例化一個(gè)類對(duì)象即可,這樣多個(gè)線程中是不會(huì)共享列表數(shù)據(jù)的。
當(dāng)然也可以使用另外一種數(shù)據(jù)結(jié)構(gòu):Queue隊(duì)列。Queue是線程安全的,自帶鎖,使用的時(shí)候,不用對(duì)隊(duì)列加鎖操作。可以將數(shù)據(jù)暫時(shí)存入queue,然后用列表取出來(lái),數(shù)量大于100則插入,并清空列表。
數(shù)據(jù)庫(kù)
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。