python操作MongoDB
文章目錄
前言
MongoDB客戶端類
連接管理
集合管理
索引管理
增刪改查
文件操作
聚合操作
前言
曾經有一段時間,大約是2014年和2015年,在所有的項目里面,但凡需要數據庫的場合,我們無一例外地選擇了MongoDB。在此之前,我們更多的是使用Oracle數據庫。盡管Oracle已經做得非常棒了,但面對來自全球各地空間天氣觀測臺站和世界各國衛星數據的時候,仍然顯得捉襟見肘,疲于應付。不是Oracle不夠優秀,是數據太過繁雜,僅僅依賴二維關系模型已經無法高效地處理數據了。
MongoDB 是由C++語言編寫的,是一個基于分布式文件存儲的開源數據庫系統。MongoDB號稱是最像關系型數據庫的非關系型數據庫,在一些簡單的項目里面,我們也用它來存儲一些結構化的數據??偨Y使用感受,我覺得MongoDB 就像一個個性鮮明的優秀青年,有能力,也有脾氣,優點和缺點一樣突出。要用好MongoDB,首先要弄清楚你是否真的需要NoSQL數據庫。
MongoDB的優點:
無模式(太過隨意,有時反倒是缺點)
支持對象存儲
支持Map/reduce和聚合操作
擴展方便
可靠性高
MongoDB的缺點不多,但很要命,這就是被很多人詬病的“內存貪婪”:它會占用操作系統幾乎所有的空閑內存,讓其他進程活得不舒適,而我們一直對該機制缺乏了解,也沒有相應的應對手段。
MongoDB客戶端類
pymongo是Python訪問MongoDB的模塊,使用該模塊,我們定義了一個操作MongoDB的類PyMongoClient,包含了連接管理、集合管理、索引管理、增刪改查、文件操作、聚合操作等方法。
# --------------------------------------------------------------------------------------------------- # PyMongoClient(host='localhost', port='27017', db='test', user=None, passwd=None, loop=5, rate=8) # --------------------------------------------------------------------------------------------------- # PyMongoClient.SetDatabase(db, user, passwd) 設置(修改)當前數據庫 # PyMongoClient.CloseConnection() 關閉連接 # PyMongoClient.Logout() 注銷用戶 # PyMongoClient.GetStatus() 獲取MogoDB服務器的狀態 # PyMongoClient.IsMongos() 判斷是否是MongoS # PyMongoClient.GetDateTime() 獲取MongoDB服務器的當前時間(需要權限支持) # --------------------------------------------------------------------------------------------------- # PyMongoClient.GetCollections() 獲取當前數據庫的全部集合 # PyMongoClient.CreateCollection(collection) 創建集合 # PyMongoClient.DropCollection(collection) 刪除集合 # --------------------------------------------------------------------------------------------------- # PyMongoClient.IndexInformation(collection) 獲取集合的索引信息 # PyMongoClient.EnsureIndex(collection, key_or_list) 檢查索引是否存在,若不存在,則創建索引 # PyMongoClient.CreateIndex(collection, key_or_list) 創建索引 # PyMongoClient.DropIndex(collection, key=None) 刪除索引,key=None時刪除全部索引(_id除外) # --------------------------------------------------------------------------------------------------- # PyMongoClient.InsertDoc(collection, data) data為字典時,單條插入,data為列表時,批量插入 # PyMongoClient.RemoveDoc(collection, docFilter=None) 刪除文檔,docFilter=None時刪除集合的全部文檔 # PyMongoClient.UpdateDoc(collection, docFilter, data, modifier=None) 更新文檔,支持使用$inc/$set/$unset等修改器 # PyMongoClient.UpsertDoc(collection, docFilter, data) 如果文檔不存在,則插入文檔;如果文檔存在,則更新文檔 # PyMongoClient.GetDoc(collection, docFilter=None, colFilter=None) 返回單個文檔 # PyMongoClient.CountDoc(collection, docFilter=None) 返回集合或查詢的文檔總數 # PyMongoClient.GetCursor(collection, docFilter=None, colFilter=None) 返回多個文檔的游標 # PyMongoClient.CountCursor(cursor) 返回游標的文檔總數 # PyMongoClient.SortCursor(cursor, col_or_list, director='ASC') 游標排序,默認升序,取值ASC/DESC # PyMongoClient.SubCursor(cursor, limit, skip=0) 游標截取 # --------------------------------------------------------------------------------------------------- # PyMongoClient.Aggregate(collection, pipleline) 聚合 # PyMongoClient.RunCommand(collection, cmdObj) 運行數據庫命令 # --------------------------------------------------------------------------------------------------- # PyMongoClient.Str2ObjectId(id_str) id字符串轉id對象 # PyMongoClient.ObjectId2Str(id_obj) id對象轉id字符串 # PyMongoClient.GetBinaryFromFile(sourceFile) 讀文件,返回二進制內容 # PyMongoClient.SaveBinaryToFile(binary, targetFile) 將二進制內容保存為文件 # --------------------------------------------------------------------------------------------------- # PyMongoClient.PutFile(localFilePath, dbFileName=None) 將文件保存到GridFS并返回FileId # PyMongoClient.GetFile(fileId, localFilePath) 將文件從GridFS取出,并保存到文件中 # PyMongoClient.GetFilesCursor(docFilter=None, colFilter=None) 取得文件信息游標 # PyMongoClient.DeleteFile(fileId) 刪除GridFS中的文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
連接管理
class PyMongoClient(): def __init__(self, host='localhost', port=27017, db='test', user=None, passwd=None, loop=5, rate=8): self.loop = loop # 數據庫失去連接后,嘗試執行數據庫操作的次數 self.rate = float(rate) # 數據庫失去連接后,嘗試執行數據庫操作的時間間隔,首次嘗試的間隔是rate的倒數,以后間隔時間增倍 try: self.conn = pymongo.MongoClient(host, int(port)) self.SetDatabase(db, user, passwd) except Exception, errMsg: raise Exception(errMsg) # --------------------------------------------------------------------------------------------------- def SetDatabase(self, db, user, passwd): # 設置(修改)當前數據庫 self.db = self.conn[db] if user and passwd: if not self.db.authenticate(user, passwd): raise Exception(u'數據庫權限驗證失??!') def CloseConnection(self): # 關閉數據庫連接 self.conn.close() def Logout(self): # 注銷用戶 self.db.logout() def GetStatus(self): # 獲取MogoDB服務器的狀態 return self.db.last_status() def IsMongos(self): # 判斷是否是MongoS return self.conn.is_mongos def GetDateTime(self): # 獲取MongoDB服務器的當前時間(需要權限支持,若無權限,則返回本地時間) for i in range(self.loop): try: return self.db.eval("return new Date();") except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) except Exception, e: return datetime.datetime.now() raise Exception(u'重連數據庫失??!')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
集合管理
class PyMongoClient(): def GetCollections(self): # 獲取當前數據庫的全部集合 for i in range(self.loop): try: return self.db.collection_names() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def CreateCollection(self, collection): # 在當前數據庫內創建新的集合 for i in range(self.loop): try: self.db.create_collection(collection) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) except pymongo.errors.CollectionInvalid: return raise Exception(u'重連數據庫失敗!') def DropCollection(self, collection): # 刪除當前數據庫內名為collection的集合 for i in range(self.loop): try: self.db.drop_collection(collection) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
索引管理
class PyMongoClient(): def IndexInformation(self, collection): # 獲取索引信息 for i in range(self.loop): try: return self.db[collection].index_information().keys() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def EnsureIndex(self, collection, key_or_list): # 檢查索引是否存在,若不存在,則創建索引,若存在,返回None # list參數形如:[('start_time', pymongo.ASCENDING), ('end_time', pymongo.ASCENDING)] for i in range(self.loop): try: self.db[collection].ensure_index(key_or_list) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def CreateIndex(self, collection, key_or_list): # 創建索引(推薦使用EnsureIndex) for i in range(self.loop): try: self.db[collection].create_index(key_or_list) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失?。?) def DropIndex(self, collection, key=None): # 刪除索引,key=None時刪除全部索引(_id除外) for i in range(self.loop): try: if key: self.db[collection].drop_index(key) else: self.db[collection].drop_indexes() return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
增刪改查
class PyMongoClient(): def InsertDoc(self, collection, data): # data為字典時,單條插入,data為列表時,批量插入。批量上限20W # 單條插入時返回單個id對象,批量插入時,返回id對象列表 for i in range(self.loop): try: return self.db[collection].insert(data, manipulate=True, save=False, check_keys=True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def RemoveDoc(self, collection, docFilter=None): # 刪除文檔,docFilter=None時刪除集合collection的全部文檔 for i in range(self.loop): try: return self.db[collection].remove(docFilter) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!') def UpdateDoc(self, collection, docFilter, data, modifier=None): # 更新文檔,docFilter為更新對象的查找條件,data為更新數據,可以使用$inc/$set/$unset等修改器 for i in range(self.loop): try: if modifier: return self.db[collection].update(docFilter, {modifier:data}, multi=True) else: return self.db[collection].update(docFilter, data, multi=True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def UpsertDoc(self, collection, docFilter, data): # 如果文檔不存在,則插入文檔;如果文檔存在,則更新文檔 for i in range(self.loop): try: return self.db[collection].update(docFilter, data, True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def GetDoc(self, collection, docFilter=None, colFilter=None, sort=None): # 返回單個文檔 for i in range(self.loop): try: if colFilter: return self.db[collection].find_one(docFilter, colFilter, sort=sort) else: return self.db[collection].find_one(docFilter, sort=sort) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失?。?) def CountDoc(self, collection, docFilter=None): # 返回集合或查詢的文檔總數 for i in range(self.loop): try: return self.db[collection].find(docFilter).count() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def GetCursor(self, collection, docFilter=None, colFilter=None): # 返回多個文檔的游標 for i in range(self.loop): try: if colFilter: return self.db[collection].find(docFilter, colFilter).batch_size(100) else: return self.db[collection].find(docFilter).batch_size(100) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def CountCursor(self, cursor): # 返回游標的文檔總數 for i in range(self.loop): try: return cursor.count() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!') def SortCursor(self, cursor, col_or_list, director='ASC'): # 游標排序,默認ASCENDING(升序),取值ASC/DESC # col_or_list,列名或者是由(列名,方向)組成的列表 if isinstance(col_or_list, list): args = [] for col in col_or_list: if col[1] == 'ASC': args.append((col[0],pymongo.ASCENDING)) else: args.append((col[0],pymongo.DESCENDING)) for i in range(self.loop): try: return cursor.sort(args) # cursor.sort([("UserName",pymongo.ASCENDING),("Email",pymongo.DESCENDING)]) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失?。?) else: if director == 'ASC': director = pymongo.ASCENDING else: director = pymongo.DESCENDING for i in range(self.loop): try: return cursor.sort(col_or_list, director) # director取值:pymongo.ASCENDING(升序)、pymongo.DESCENDING(降序) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!') def SubCursor(self, cursor, limit, skip=0): # 截取游標 for i in range(self.loop): try: if skip: return cursor.skip(skip).limit(limit) else: return cursor.limit(limit) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
文件操作
class PyMongoClient(): def GetBinaryFromFile(self, sourceFile): # 讀文件,返回二進制內容 # 適用于在文檔中直接保存小于16M的小文件,若文件較大時,應使用GridFS try: fp = open(sourceFile,'rb') return bson.Binary(fp.read()) except: return False finally: fp.close() def SaveBinaryToFile(self, binary, targetFile): # 將二進制內容保存為文件 try: fp = open(targetFile,'wb') fp.write(binary) return True except: return False finally: fp.close() def Str2ObjectId(self, id_str): return bson.ObjectId(id_str) def ObjectId2Str(self, id_obj): return str(id_obj) def PutFile(self, localFilePath, dbFileName=None): ''' 向GridFS中上傳文件,并返回文件ID @localFilePath 本地文件路徑 @dbFileName 保存到GridFS中的文件名,如果為None則使用本地路徑中的文件名 ''' fs = gridfs.GridFS(self.db) fp = open(localFilePath, 'rb') if dbFileName == None: dbFileName = os.path.split(localFilePath)[1] id = fs.put(fp,filename=dbFileName, chunkSize=4*1024*1024) fp.close() return id def GetFile(self, fileId, localFilePath=None): ''' 根據文件ID從GridFS中下載文件 @fileId 文件ID @localFilePath 要保存的本地文件路徑 ''' if isinstance(fileId, str): fileId = self.Str2ObjectId(fileId) fs = gridfs.GridFS(self.db) if localFilePath: fp = open(localFilePath, 'wb') try: fp.write(fs.get(fileId).read()) return True except: return False finally: fp.close() else: try: return fs.get(fileId).read() except: return False def GetFilesCursor(self, docFilter=None, colFilter=None): ''' 取得GridFS中文件的游標 可以進行過濾或檢索的字段名有 _id 文件ID filename 文件名 length 文件大小 md5 md5校驗碼 chunkSize 文件塊大小 uploadDate 更新時間 ''' return self.GetCursor('fs.files', docFilter=docFilter, colFilter=colFilter) def DeleteFile(self, fileId): ''' 根據文件ID從GridFS中刪除文件 @fileId 文件ID ''' fs = gridfs.GridFS(self.db) fs.delete(fileId)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
聚合操作
class PyMongoClient(): def Aggregate(self, collection, pipleline): # 聚合 # pipleline是一個由篩選、投射、分組、排序、限制、跳過等一系列構件組成管道隊列 for i in range(self.loop): try: return self.db[collection].aggregate(pipleline) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def RunCommand(self, collection, cmdObj): # 運行數據庫命令 # if cmdObj is a string, turns it into {cmdObj:1} for i in range(self.loop): try: return self.db[collection].runCommand(cmdObj) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
MongoDB Python 數據庫
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。