MongoDB執行兩階段提交

      網友投稿 890 2025-03-31

      簡介

      本篇文檔提供了一個通過使用兩階段提交將數據寫入多個文檔的方法來處理多文檔更新或“多文檔事務”。此外,你可以擴展實現類似數據回滾的功能。

      背景

      在MongoDB中,操作單個文檔(document)是原子性的;但是,涉及到多個文檔的操作,也就是常說的“多文檔事務”,是非原子性的。由于document可以設計的非常復雜,包含多個“內嵌的”文檔,因此單個文檔的原子性為很多實際場景提供了必要的支持。

      盡管單文檔原子操作很強大,但在很多場景下依然需要多文檔事務。當執行一個由幾個順序操作組成的事務時,可能會出現某些問題,例如:

      原子性:如果某個操作失敗了,在同一個事務中前面的操作回滾到最初的狀態(即,要么全做,要么全部做)。

      一致性:如果發生了嚴重故障將事務中斷(比如:網絡、硬件故障),數據庫必須能夠恢復到一致的狀態。

      在需要多文檔事務的場景中,你可以實現兩階段提交來完成場景需求。兩階段提交可以保證數據的一致性,如果發生錯誤,可以恢復到事務開始之前的狀態。在事務執行過程中,無論發生什么情況都可以還原到數據和狀態的準備階段。

      注:

      因為在MongoDB中只有單文檔操作是原子性的,兩階段提交只能提供類似事務的語義。在兩階段提交或回滾過程中,應用程序可以返回的任意步驟點的中間數據。

      模式

      概述

      考慮這樣一個場景,你想從賬戶A轉賬給賬戶B。在關系型數據庫系統中,你可以在一個多語句事務中先減少賬戶A的資金然后增加賬戶B的資金。在MongoDB中,你可以模擬實現一個兩階段提交的得到同樣的結果。

      本節中的示例使用下面兩個集合:

      集合accounts保存賬戶信息。

      集合transactions保存轉賬事務信息。

      初始化源賬戶和目標賬戶

      將賬戶A和賬戶B的信息寫入到集合accounts。

      db.accounts.insert( [ { _id: "A", balance: 1000, pendingTransactions: [] }, { _id: "B", balance: 1000, pendingTransactions: [] } ] )

      1

      2

      3

      4

      5

      6

      上面的語句返回一個包含了本次操作的狀態信息的BulkWriteResult() 對象。如果成功寫入,BulkWriteResult()對象中的nInserted的值為2。

      初始化轉帳數據

      將每筆轉賬信息寫入到transactions表,轉賬數據包含以下字段:

      source和destination字段,指向accounts集合中的_id

      value字段,表示轉賬金額,影響源賬戶和目標賬戶的余額

      state字段,表示轉賬操作當前狀態,state字段可選值范圍為initial, pending, applied, done, canceling和 canceled

      lastModified字段,表示最后更新時間

      將賬戶A向賬戶B轉賬100的操作信息初始化到transactions集合,事務狀態(state字段)為"initial", lastModified字段值設為當前時間:

      db.transactions.insert( { _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() } )

      1

      2

      3

      上面的語句返回一個包含了本次操作的狀態信息的WriteResult() 對象。如果成功寫入,WriteResult()對象中的nInserted的值為2。

      使用兩階段提交進行轉賬

      1. 檢索交易開始

      在transactions集合中查詢一條state字段值為initial的數據。當前transactions集合中只有一條數據,也就是在初始化轉賬數據中添加的數據。如果集合中有另外的數據,查詢將會返回所有state字段為initial的數據,除非你附加一些別的查詢條件。

      var t = db.transactions.findOne( { state: "initial" } )

      1

      在mongo shell中定義變量t來保存返回的內容。上邊的語句會得到如下輸出(lastModified字段應該是你插入數據的時間):

      { "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified" : ISODate("2014-07-11T20:39:26.345Z") }

      1

      2. 將transaction數據的state字段設為pending

      將transaction數據的state字段從initial改為pending,并用$currentDate操作將lastModified字段設為當前時間。

      db.transactions.update( { _id: t._id, state: "initial" }, { $set: { state: "pending" }, $currentDate: { lastModified: true } } )

      1

      2

      3

      4

      5

      6

      7

      這個操作返回一個WriteResult()對象,包含本次更新操作的狀態信息,如果更新成功,nMatched和nModified值為1。

      在修改塊中,state: "initial"條件確保沒有其它線程更新過本條數據。如果nMatched和nModified值為0,回到第一步重新獲取一條數據然后執行。

      3. 賬戶間轉賬

      如果賬戶不包含該事務t的信息,用update()方法更新帳戶信息。在更新條件中,帶有pendingTransactions: { $ne: t._id }是為了避免重復同一次轉賬。

      同時更新balance字段和pendingTransactions字段來實現轉賬。

      在事務中操作account集合,同事修改balance字段和pendingTransactions字段。

      更新源賬戶信息,為balance字段減去transaction數據的value值,并將transaction的_id寫入到pendingTransactions字段的數組中。

      db.accounts.update( { _id: t.source, pendingTransactions: { $ne: t._id } }, { $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } } )

      1

      2

      3

      4

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      更新目標賬戶信息,為balance字段加上transaction數據的value值,并將transaction的_id寫入到pendingTransactions字段的數組中。

      db.accounts.update( { _id: t.destination, pendingTransactions: { $ne: t._id } }, { $inc: { balance: t.value }, $push: { pendingTransactions: t._id } } )

      1

      2

      3

      4

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      4. 將transaction數據的state設為applied

      用下面的update()操作將transaction數據的state值設為applied,并更新lastModified字段值為當前時間:

      db.transactions.update( { _id: t._id, state: "pending" }, { $set: { state: "applied" }, $currentDate: { lastModified: true } } )

      1

      2

      3

      4

      5

      6

      7

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      5. 修改兩個賬戶的事務待定列表

      將transaction數據的_id值從兩個賬戶的pendingTransactions字段中移除。

      修改源賬戶。

      db.accounts.update( { _id: t.source, pendingTransactions: t._id }, { $pull: { pendingTransactions: t._id } } )

      1

      2

      3

      4

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      修改目標賬戶。

      db.accounts.update( { _id: t.destination, pendingTransactions: t._id }, { $pull: { pendingTransactions: t._id } } )

      1

      2

      3

      4

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      6. 更新transaction數據的state為done.

      將transaction數據的state設為done,更新lastModified為當前時間,這也標志著本次事務的結束:

      db.transactions.update( { _id: t._id, state: "applied" }, { $set: { state: "done" }, $currentDate: { lastModified: true } } )

      1

      2

      3

      4

      5

      6

      在 MongoDB 中執行兩階段提交

      7

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      從失敗場景恢復

      其實事務最重要的部分不是上面示例中比較順利的場景,而是當事務未成功完成時有沒有可能從各種各樣失敗情況中恢復。本節會概括各種可能出現的失敗場景,并教你一些步驟,如何從這些事件中恢復。

      恢復操作

      兩階段提交模式運行應用程勛運行一系列的操作來恢復事務到一致狀態。在應用啟動時或一個定時任務來運行恢復操作,可以用來捕捉未完成的事務。

      在一致性問題上對于時間的需求取決于應用間隔多長時間來恢復每個事務。

      接下來的恢復過程根據lastModified字段做為指標來決定pending狀態的事務是否需要進行恢復;再具體點,如果pending或applied 狀態的事務已經有30分鐘沒有更新過,恢復程序會認為這些事務需要進行恢復。你可以用不同的條件來決定事務是否需要恢復。

      pending狀態的事務

      要恢復發生在上文舉例的“將transaction數據的state字段設為pending” 步驟之后,但發生在“將transaction數據的state設為applied”步驟之前的錯誤,先從transactions集合中獲取一條pending狀態的數據:

      var dateThreshold = new Date(); dateThreshold.setMinutes(dateThreshold.getMinutes() - 30); var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );

      1

      2

      3

      然后從上文的 “賬戶間轉賬”步驟開始繼續執行。

      applied狀態的事務

      To recover from failures that occur after step “Update transaction state to applied.” but before “Update transaction state to done.” step, retrieve from the transactions collection an applied transaction for recovery:

      要恢復發生在上文舉例的“將transaction數據的state設為applied”步驟之后,但發生在“更新transaction數據的state為done”步驟之前的錯誤,先從transactions集合中獲取一條applied狀態的數據:

      var dateThreshold = new Date(); dateThreshold.setMinutes(dateThreshold.getMinutes() - 30); var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );

      1

      2

      3

      然后從上文的“修改兩個賬戶的事務待定列表”步驟開始繼續執行。

      回滾操作

      在一些情況下,你可能需要“回滾”或取消事務;舉例來說,比如應用程序需要去“取消”事務,或者事務中的某個賬戶不存在,或者在事務運行階段賬戶不存在了。

      applied狀態的事務

      在“將transaction數據的state設為applied”步驟之后,你最好不要回滾事務了。而應該完成這個事務,然后啟動一個補償事務,將上個事務的源賬戶和目標賬戶調換一下,再做一次轉賬。

      pending狀態的事務

      在“將transaction數據的state字段設為pending”步驟之后,在“將transaction數據的state設為applied”步驟之前,你可以根據下面的流程來回滾事務:

      將transaction的state從pending設為canceling。

      db.transactions.update( { _id: t._id, state: "pending" }, { $set: { state: "canceling" }, $currentDate: { lastModified: true } } )

      1

      2

      3

      4

      5

      6

      7

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      在兩個賬戶上撤銷事務,如果事務已經是applied狀態,執行反向操作。在update條件的中加上pendingTransactions:t._id來篩選狀態是applied的數據。

      更新目標賬戶信息,在balance字段上減去transaction數據的value值,并從pendingTransactions數組中移除transaction數據的_id。

      db.accounts.update( { _id: t.destination, pendingTransactions: t._id }, { $inc: { balance: -t.value }, $pull: { pendingTransactions: t._id } } )

      1

      2

      3

      4

      5

      6

      7

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。如果轉賬事務沒有發生在該賬戶上,那么上面的更新操作匹配不到數據,nMatched和nModified值會是0。

      更新源賬戶信息,在balance字段上加上transaction數據的value值,并從pendingTransactions數組中移除transaction數據的_id。

      db.accounts.update( { _id: t.source, pendingTransactions: t._id }, { $inc: { balance: t.value}, $pull: { pendingTransactions: t._id } } )

      1

      2

      3

      4

      5

      6

      7

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。如果轉賬事務沒有發生在該賬戶上,那么上面的更新操作匹配不到數據,nMatched和nModified值會是0。

      將transaction數據的state從canceling設為cancelled來完成回滾。

      db.transactions.update( { _id: t._id, state: "canceling" }, { $set: { state: "cancelled" }, $currentDate: { lastModified: true } } )

      1

      2

      3

      4

      5

      6

      7

      操作成功后,方法會返回WriteResult()對象,nMatched和nModified值為1。

      多應用

      從某種程度上說,事務的存在是為了便于多個應用并發創建和運行某些操作,而不會引發數據不一致和沖突。在我們的程序中,更新或獲取transaction集合的數據時,更新條件中都會包含state字段條件,這能防止多應用沖突的請求transaction數據。

      例如,App1和App2同時獲取了某條相同的state為initial的transaction數據。在App2開工前,App1執行了完整的事務,當App2試圖執行步驟“將transaction數據的state字段設為pending”時,由于更新條件中包含有state: "initial"語句,更新操作匹配不到數據,nMatched和nModified的值會是0。這會讓App2返回到第一步去獲取另一條transaction數據重新開始事務流程。

      當多個應用運行時,最關鍵的是在任意時刻只能有唯一一個應用能操作一條給定的transaction數據。同樣的,除了在更新條件中包含預期的事務狀態之外,你還可以為transaction數據創建一個標記來鑒別正在操作該transaction數據的應用。用findAndModify()方法來修改并返回transaction數據:

      t = db.transactions.findAndModify( { query: { state: "initial", application: { $exists: false } }, update: { $set: { state: "pending", application: "App1" }, $currentDate: { lastModified: true } }, new: true } )

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      修改事務操作,可以確保只有匹配上application字段標識的應用才能操作相應的transaction數據。

      如果App1在事務執行過程中失敗了,你可以用恢復程序進行恢復,但是在恢復之前,應用程序必須確定它們“擁有”相應的transaction數據。例如要找到并繼續執行一個pending狀態的事務,使用類似下面的查詢:

      var dateThreshold = new Date(); dateThreshold.setMinutes(dateThreshold.getMinutes() - 30); db.transactions.find( { application: "App1", state: "pending", lastModified: { $lt: dateThreshold } } )

      1

      2

      3

      4

      5

      6

      7

      8

      9

      在生產環境下使用兩階段提交

      本文中的事務例子有意的設計簡單寫。比如,假設總能夠對賬戶做回滾操作,并且賬戶余額是負數。

      生產環境中的情況可能會更復雜一些。例如,真實場景下賬戶需要的信息還包括當前余額、待轉出、待轉入。

      對于所有的事務來說,在你部署的時候需要保證使用合適等級的write concern。

      AI MongoDB

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:甘特圖怎么體現計劃和實際(甘特圖怎么看計劃和實際活動)
      下一篇:git命令
      相關文章
      亚洲精品91在线| 久久久亚洲精品视频| 亚洲综合一区二区| 久久精品国产亚洲av成人| 亚洲午夜久久久影院伊人| 国产亚洲老熟女视频| 日韩亚洲精品福利| 最新亚洲人成无码网www电影| 亚洲精品无码专区在线播放| 亚洲色大网站WWW永久网站| 精品亚洲国产成人| 国产人成亚洲第一网站在线播放| 亚洲一区二区三区高清不卡| 亚洲人xxx日本人18| 亚洲中文字幕无码爆乳| 亚洲乱亚洲乱妇24p| 亚洲AV永久无码精品一福利| mm1313亚洲国产精品无码试看| 国产亚洲女在线线精品| 亚洲国产精品无码久久久久久曰| 亚洲欧洲日产国码高潮αv| 亚洲人成影院在线观看| 亚洲香蕉网久久综合影视| 国产亚洲色婷婷久久99精品| 亚洲avav天堂av在线不卡| 久久夜色精品国产噜噜噜亚洲AV| 亚洲精品在线播放| 亚洲人成www在线播放| 亚洲av无码专区在线观看亚| yy6080久久亚洲精品| 国产亚洲精品无码专区| 亚洲日韩中文字幕在线播放| 久久噜噜噜久久亚洲va久| 91亚洲国产成人精品下载| 亚洲一区中文字幕| 亚洲丶国产丶欧美一区二区三区| 亚洲av日韩片在线观看| 青青草原亚洲视频| 亚洲av成人无码久久精品 | 亚洲mv国产精品mv日本mv| 亚洲色偷偷色噜噜狠狠99网|