python操作MongoDB

      網友投稿 702 2025-04-01

      文章目錄

      前言

      MongoDB客戶端類

      連接管理

      集合管理

      索引管理

      增刪改查

      文件操作

      聚合操作

      前言

      曾經有一段時間,大約是2014年和2015年,在所有的項目里面,但凡需要數據庫的場合,我們無一例外地選擇了MongoDB。在此之前,我們更多的是使用Oracle數據庫。盡管Oracle已經做得非常棒了,但面對來自全球各地空間天氣觀測臺站和世界各國衛星數據的時候,仍然顯得捉襟見肘,疲于應付。不是Oracle不夠優秀,是數據太過繁雜,僅僅依賴二維關系模型已經無法高效地處理數據了。

      MongoDB 是由C++語言編寫的,是一個基于分布式文件存儲的開源數據庫系統。MongoDB號稱是最像關系型數據庫的非關系型數據庫,在一些簡單的項目里面,我們也用它來存儲一些結構化的數據??偨Y使用感受,我覺得MongoDB 就像一個個性鮮明的優秀青年,有能力,也有脾氣,優點和缺點一樣突出。要用好MongoDB,首先要弄清楚你是否真的需要NoSQL數據庫。

      MongoDB的優點:

      無模式(太過隨意,有時反倒是缺點)

      支持對象存儲

      支持Map/reduce和聚合操作

      擴展方便

      可靠性高

      MongoDB的缺點不多,但很要命,這就是被很多人詬病的“內存貪婪”:它會占用操作系統幾乎所有的空閑內存,讓其他進程活得不舒適,而我們一直對該機制缺乏了解,也沒有相應的應對手段。

      MongoDB客戶端類

      pymongo是Python訪問MongoDB的模塊,使用該模塊,我們定義了一個操作MongoDB的類PyMongoClient,包含了連接管理、集合管理、索引管理、增刪改查、文件操作、聚合操作等方法。

      # --------------------------------------------------------------------------------------------------- # PyMongoClient(host='localhost', port='27017', db='test', user=None, passwd=None, loop=5, rate=8) # --------------------------------------------------------------------------------------------------- # PyMongoClient.SetDatabase(db, user, passwd) 設置(修改)當前數據庫 # PyMongoClient.CloseConnection() 關閉連接 # PyMongoClient.Logout() 注銷用戶 # PyMongoClient.GetStatus() 獲取MogoDB服務器的狀態 # PyMongoClient.IsMongos() 判斷是否是MongoS # PyMongoClient.GetDateTime() 獲取MongoDB服務器的當前時間(需要權限支持) # --------------------------------------------------------------------------------------------------- # PyMongoClient.GetCollections() 獲取當前數據庫的全部集合 # PyMongoClient.CreateCollection(collection) 創建集合 # PyMongoClient.DropCollection(collection) 刪除集合 # --------------------------------------------------------------------------------------------------- # PyMongoClient.IndexInformation(collection) 獲取集合的索引信息 # PyMongoClient.EnsureIndex(collection, key_or_list) 檢查索引是否存在,若不存在,則創建索引 # PyMongoClient.CreateIndex(collection, key_or_list) 創建索引 # PyMongoClient.DropIndex(collection, key=None) 刪除索引,key=None時刪除全部索引(_id除外) # --------------------------------------------------------------------------------------------------- # PyMongoClient.InsertDoc(collection, data) data為字典時,單條插入,data為列表時,批量插入 # PyMongoClient.RemoveDoc(collection, docFilter=None) 刪除文檔,docFilter=None時刪除集合的全部文檔 # PyMongoClient.UpdateDoc(collection, docFilter, data, modifier=None) 更新文檔,支持使用$inc/$set/$unset等修改器 # PyMongoClient.UpsertDoc(collection, docFilter, data) 如果文檔不存在,則插入文檔;如果文檔存在,則更新文檔 # PyMongoClient.GetDoc(collection, docFilter=None, colFilter=None) 返回單個文檔 # PyMongoClient.CountDoc(collection, docFilter=None) 返回集合或查詢的文檔總數 # PyMongoClient.GetCursor(collection, docFilter=None, colFilter=None) 返回多個文檔的游標 # PyMongoClient.CountCursor(cursor) 返回游標的文檔總數 # PyMongoClient.SortCursor(cursor, col_or_list, director='ASC') 游標排序,默認升序,取值ASC/DESC # PyMongoClient.SubCursor(cursor, limit, skip=0) 游標截取 # --------------------------------------------------------------------------------------------------- # PyMongoClient.Aggregate(collection, pipleline) 聚合 # PyMongoClient.RunCommand(collection, cmdObj) 運行數據庫命令 # --------------------------------------------------------------------------------------------------- # PyMongoClient.Str2ObjectId(id_str) id字符串轉id對象 # PyMongoClient.ObjectId2Str(id_obj) id對象轉id字符串 # PyMongoClient.GetBinaryFromFile(sourceFile) 讀文件,返回二進制內容 # PyMongoClient.SaveBinaryToFile(binary, targetFile) 將二進制內容保存為文件 # --------------------------------------------------------------------------------------------------- # PyMongoClient.PutFile(localFilePath, dbFileName=None) 將文件保存到GridFS并返回FileId # PyMongoClient.GetFile(fileId, localFilePath) 將文件從GridFS取出,并保存到文件中 # PyMongoClient.GetFilesCursor(docFilter=None, colFilter=None) 取得文件信息游標 # PyMongoClient.DeleteFile(fileId) 刪除GridFS中的文件

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      連接管理

      class PyMongoClient(): def __init__(self, host='localhost', port=27017, db='test', user=None, passwd=None, loop=5, rate=8): self.loop = loop # 數據庫失去連接后,嘗試執行數據庫操作的次數 self.rate = float(rate) # 數據庫失去連接后,嘗試執行數據庫操作的時間間隔,首次嘗試的間隔是rate的倒數,以后間隔時間增倍 try: self.conn = pymongo.MongoClient(host, int(port)) self.SetDatabase(db, user, passwd) except Exception, errMsg: raise Exception(errMsg) # --------------------------------------------------------------------------------------------------- def SetDatabase(self, db, user, passwd): # 設置(修改)當前數據庫 self.db = self.conn[db] if user and passwd: if not self.db.authenticate(user, passwd): raise Exception(u'數據庫權限驗證失??!') def CloseConnection(self): # 關閉數據庫連接 self.conn.close() def Logout(self): # 注銷用戶 self.db.logout() def GetStatus(self): # 獲取MogoDB服務器的狀態 return self.db.last_status() def IsMongos(self): # 判斷是否是MongoS return self.conn.is_mongos def GetDateTime(self): # 獲取MongoDB服務器的當前時間(需要權限支持,若無權限,則返回本地時間) for i in range(self.loop): try: return self.db.eval("return new Date();") except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) except Exception, e: return datetime.datetime.now() raise Exception(u'重連數據庫失??!')

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      python操作MongoDB

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      集合管理

      class PyMongoClient(): def GetCollections(self): # 獲取當前數據庫的全部集合 for i in range(self.loop): try: return self.db.collection_names() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def CreateCollection(self, collection): # 在當前數據庫內創建新的集合 for i in range(self.loop): try: self.db.create_collection(collection) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) except pymongo.errors.CollectionInvalid: return raise Exception(u'重連數據庫失敗!') def DropCollection(self, collection): # 刪除當前數據庫內名為collection的集合 for i in range(self.loop): try: self.db.drop_collection(collection) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!')

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      索引管理

      class PyMongoClient(): def IndexInformation(self, collection): # 獲取索引信息 for i in range(self.loop): try: return self.db[collection].index_information().keys() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def EnsureIndex(self, collection, key_or_list): # 檢查索引是否存在,若不存在,則創建索引,若存在,返回None # list參數形如:[('start_time', pymongo.ASCENDING), ('end_time', pymongo.ASCENDING)] for i in range(self.loop): try: self.db[collection].ensure_index(key_or_list) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def CreateIndex(self, collection, key_or_list): # 創建索引(推薦使用EnsureIndex) for i in range(self.loop): try: self.db[collection].create_index(key_or_list) return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失?。?) def DropIndex(self, collection, key=None): # 刪除索引,key=None時刪除全部索引(_id除外) for i in range(self.loop): try: if key: self.db[collection].drop_index(key) else: self.db[collection].drop_indexes() return except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!')

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      增刪改查

      class PyMongoClient(): def InsertDoc(self, collection, data): # data為字典時,單條插入,data為列表時,批量插入。批量上限20W # 單條插入時返回單個id對象,批量插入時,返回id對象列表 for i in range(self.loop): try: return self.db[collection].insert(data, manipulate=True, save=False, check_keys=True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def RemoveDoc(self, collection, docFilter=None): # 刪除文檔,docFilter=None時刪除集合collection的全部文檔 for i in range(self.loop): try: return self.db[collection].remove(docFilter) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!') def UpdateDoc(self, collection, docFilter, data, modifier=None): # 更新文檔,docFilter為更新對象的查找條件,data為更新數據,可以使用$inc/$set/$unset等修改器 for i in range(self.loop): try: if modifier: return self.db[collection].update(docFilter, {modifier:data}, multi=True) else: return self.db[collection].update(docFilter, data, multi=True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def UpsertDoc(self, collection, docFilter, data): # 如果文檔不存在,則插入文檔;如果文檔存在,則更新文檔 for i in range(self.loop): try: return self.db[collection].update(docFilter, data, True) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def GetDoc(self, collection, docFilter=None, colFilter=None, sort=None): # 返回單個文檔 for i in range(self.loop): try: if colFilter: return self.db[collection].find_one(docFilter, colFilter, sort=sort) else: return self.db[collection].find_one(docFilter, sort=sort) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失?。?) def CountDoc(self, collection, docFilter=None): # 返回集合或查詢的文檔總數 for i in range(self.loop): try: return self.db[collection].find(docFilter).count() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def GetCursor(self, collection, docFilter=None, colFilter=None): # 返回多個文檔的游標 for i in range(self.loop): try: if colFilter: return self.db[collection].find(docFilter, colFilter).batch_size(100) else: return self.db[collection].find(docFilter).batch_size(100) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def CountCursor(self, cursor): # 返回游標的文檔總數 for i in range(self.loop): try: return cursor.count() except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!') def SortCursor(self, cursor, col_or_list, director='ASC'): # 游標排序,默認ASCENDING(升序),取值ASC/DESC # col_or_list,列名或者是由(列名,方向)組成的列表 if isinstance(col_or_list, list): args = [] for col in col_or_list: if col[1] == 'ASC': args.append((col[0],pymongo.ASCENDING)) else: args.append((col[0],pymongo.DESCENDING)) for i in range(self.loop): try: return cursor.sort(args) # cursor.sort([("UserName",pymongo.ASCENDING),("Email",pymongo.DESCENDING)]) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失?。?) else: if director == 'ASC': director = pymongo.ASCENDING else: director = pymongo.DESCENDING for i in range(self.loop): try: return cursor.sort(col_or_list, director) # director取值:pymongo.ASCENDING(升序)、pymongo.DESCENDING(降序) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!') def SubCursor(self, cursor, limit, skip=0): # 截取游標 for i in range(self.loop): try: if skip: return cursor.skip(skip).limit(limit) else: return cursor.limit(limit) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!')

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      56

      57

      58

      59

      60

      61

      62

      63

      64

      65

      66

      67

      68

      69

      70

      71

      72

      73

      74

      75

      76

      77

      78

      79

      80

      81

      82

      83

      84

      85

      86

      87

      88

      89

      90

      91

      92

      93

      94

      95

      96

      97

      98

      99

      100

      101

      102

      103

      104

      105

      106

      107

      108

      109

      110

      111

      112

      113

      114

      115

      116

      117

      118

      119

      120

      121

      122

      文件操作

      class PyMongoClient(): def GetBinaryFromFile(self, sourceFile): # 讀文件,返回二進制內容 # 適用于在文檔中直接保存小于16M的小文件,若文件較大時,應使用GridFS try: fp = open(sourceFile,'rb') return bson.Binary(fp.read()) except: return False finally: fp.close() def SaveBinaryToFile(self, binary, targetFile): # 將二進制內容保存為文件 try: fp = open(targetFile,'wb') fp.write(binary) return True except: return False finally: fp.close() def Str2ObjectId(self, id_str): return bson.ObjectId(id_str) def ObjectId2Str(self, id_obj): return str(id_obj) def PutFile(self, localFilePath, dbFileName=None): ''' 向GridFS中上傳文件,并返回文件ID @localFilePath 本地文件路徑 @dbFileName 保存到GridFS中的文件名,如果為None則使用本地路徑中的文件名 ''' fs = gridfs.GridFS(self.db) fp = open(localFilePath, 'rb') if dbFileName == None: dbFileName = os.path.split(localFilePath)[1] id = fs.put(fp,filename=dbFileName, chunkSize=4*1024*1024) fp.close() return id def GetFile(self, fileId, localFilePath=None): ''' 根據文件ID從GridFS中下載文件 @fileId 文件ID @localFilePath 要保存的本地文件路徑 ''' if isinstance(fileId, str): fileId = self.Str2ObjectId(fileId) fs = gridfs.GridFS(self.db) if localFilePath: fp = open(localFilePath, 'wb') try: fp.write(fs.get(fileId).read()) return True except: return False finally: fp.close() else: try: return fs.get(fileId).read() except: return False def GetFilesCursor(self, docFilter=None, colFilter=None): ''' 取得GridFS中文件的游標 可以進行過濾或檢索的字段名有 _id 文件ID filename 文件名 length 文件大小 md5 md5校驗碼 chunkSize 文件塊大小 uploadDate 更新時間 ''' return self.GetCursor('fs.files', docFilter=docFilter, colFilter=colFilter) def DeleteFile(self, fileId): ''' 根據文件ID從GridFS中刪除文件 @fileId 文件ID ''' fs = gridfs.GridFS(self.db) fs.delete(fileId)

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      56

      57

      58

      59

      60

      61

      62

      63

      64

      65

      66

      67

      68

      69

      70

      71

      72

      73

      74

      75

      76

      77

      78

      79

      80

      81

      82

      83

      84

      85

      86

      87

      88

      89

      90

      91

      聚合操作

      class PyMongoClient(): def Aggregate(self, collection, pipleline): # 聚合 # pipleline是一個由篩選、投射、分組、排序、限制、跳過等一系列構件組成管道隊列 for i in range(self.loop): try: return self.db[collection].aggregate(pipleline) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失??!') def RunCommand(self, collection, cmdObj): # 運行數據庫命令 # if cmdObj is a string, turns it into {cmdObj:1} for i in range(self.loop): try: return self.db[collection].runCommand(cmdObj) except pymongo.errors.AutoReconnect: time.sleep(pow(2,i)/self.rate) raise Exception(u'重連數據庫失敗!')

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      MongoDB Python 數據庫

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:vi和vim編輯器使用教程
      下一篇:WPS表格怎么實現分類求和(wps分類求和)
      相關文章
      亚洲AV乱码一区二区三区林ゆな| 亚洲国产成人久久综合野外| 日本系列1页亚洲系列| 亚洲av福利无码无一区二区| 亚洲色中文字幕无码AV| 亚洲视频在线一区二区| 亚洲国产成人久久综合一区77| 国产精品亚洲а∨天堂2021| 激情小说亚洲图片| 激情小说亚洲图片| 国产精品亚洲片在线花蝴蝶| 国产尤物在线视精品在亚洲| www.91亚洲| 久久国产成人亚洲精品影院 | 亚洲中文字幕伊人久久无码| 亚洲性日韩精品一区二区三区| 亚洲AV中文无码乱人伦在线视色| 亚洲黄片毛片在线观看| 亚洲乱码中文字幕综合234| 亚洲av片一区二区三区| 亚洲精品无码永久在线观看 | 久久亚洲av无码精品浪潮| 久久久久亚洲AV成人网| 久久国产成人亚洲精品影院| 亚洲精品一二三区| 亚洲AV日韩综合一区尤物 | 精品亚洲456在线播放| 亚洲人成色77777在线观看| 亚洲AV无码专区亚洲AV桃| 国产成人亚洲精品电影| 亚洲成年看片在线观看| 最新国产AV无码专区亚洲| 亚洲A丁香五香天堂网| 亚洲日产无码中文字幕| 亚洲高清在线观看| 亚洲人妖女同在线播放| 亚洲精品无码久久久久牙蜜区| 青青青国产色视频在线观看国产亚洲欧洲国产综合| 日本亚洲中午字幕乱码| 国产亚洲精品看片在线观看 | 亚洲伊人久久大香线蕉AV|