源碼分析 RocketMQ DLedger(多副本) 之日志追加流程

      網友投稿 835 2022-05-29

      上一篇我們詳細分析了源碼分析 RocketMQ DLedger 多副本之 Leader 選主,本文將詳細分析日志復制的實現。

      本節目錄

      1、日志復制基本流程

      1.1 如何判斷 Push 隊列是否已滿

      1.2 Leader 節點存儲數據

      1.3 主節點等待從節點復制 ACK

      1.3.1 updatePeerWaterMark 方法

      1.3.2 wakeUpDispatchers 詳解

      2、日志存儲實現詳情

      2.1 MmapFileList 的 preAppend 詳解

      2.2 MmapFileList 的 append 詳解

      有了前篇文章的基礎,本文將直接從 Leader 處理客戶端請求入口開始,其入口為:DLedgerServer 的 handleAppend 方法開始講起。

      1、日志復制基本流程

      在正式分析 RocketMQ DLedger 多副本復制之前,我們首先來了解客戶端發送日志的請求協議字段,其類圖如下所示:

      我們先一一介紹各個字段的含義:

      String group

      該集群所屬組名。

      String remoteId

      請求目的節點ID。

      String localId

      節點ID。

      int code

      請求響應字段,表示返回響應碼。

      String leaderId = null

      集群中的Leader Id。

      long term

      集群當前的選舉輪次。

      byte[] body

      待發送的數據。

      日志的請求處理處理入口為 DLedgerServer 的 handleAppend 方法。

      DLedgerServer#handleAppend

      PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId()); reConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup()); PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

      1

      2

      3

      Step1:首先驗證請求的合理性:

      如果請求的節點ID不是當前處理節點,則拋出異常。

      如果請求的集群不是當前節點所在的集群,則拋出異常。

      如果當前節點不是主節點,則拋出異常。

      DLedgerServer#handleAppend

      long currTerm = memberState.currTerm(); if (dLedgerEntryPusher.isPendingFull(currTerm)) { // @1 AppendEntryResponse appendEntryResponse = new AppendEntryResponse(); appendEntryResponse.setGroup(memberState.getGroup()); appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode()); appendEntryResponse.setTerm(currTerm); appendEntryResponse.setLeaderId(memberState.getSelfId()); return AppendFuture.newCompletedFuture(-1, appendEntryResponse); } else { // @2 DLedgerEntry dLedgerEntry = new DLedgerEntry(); dLedgerEntry.setBody(request.getBody()); DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry); return dLedgerEntryPusher.waitAck(resEntry); }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      Step2:如果預處理隊列已經滿了,則拒絕客戶端請求,返回 LEADER_PENDING_FULL 錯誤碼;如果未滿,將請求封裝成 DledgerEntry,則調用 dLedgerStore 方法追加日志,并且通過使用 dLedgerEntryPusher 的 waitAck 方法同步等待副本節點的復制響應,并最終將結果返回給調用方法。

      代碼@1:如果 dLedgerEntryPusher 的 push 隊列已滿,則返回追加一次,其錯誤碼為 LEADER_PENDING_FULL。

      代碼@2:追加消息到 Leader 服務器,并向從節點廣播,在指定時間內如果未收到從節點的確認,則認為追加失敗。

      接下來就按照上述三個要點進行展開:

      判斷 Push 隊列是否已滿

      Leader 節點存儲消息

      主節點等待從節點復制 ACK

      1.1 如何判斷 Push 隊列是否已滿

      DLedgerEntryPusher#isPendingFull

      public boolean isPendingFull(long currTerm) { checkTermForPendingMap(currTerm, "isPendingFull"); // @1 return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); // @2 }

      1

      2

      3

      4

      主要分兩個步驟:

      代碼@1:檢查當前投票輪次是否在 PendingMap 中,如果不在,則初始化,其結構為:Map< Long/* 投票輪次*/, ConcurrentMap>>。

      代碼@2:檢測當前等待從節點返回結果的個數是否超過其最大請求數量,可通過maxPendingRequests

      Num 配置,該值默認為:10000。

      上述邏輯比較簡單,但疑問隨著而來,ConcurrentMap> 中的數據是從何而來的呢?我們不妨接著往下看。

      1.2 Leader 節點存儲數據

      Leader 節點的數據存儲主要由 DLedgerStore 的 appendAsLeader 方法實現。DLedger 分別實現了基于內存、基于文件的存儲實現,本文重點關注基于文件的存儲實現,其實現類為:DLedgerMmapFileStore。

      下面重點來分析一下數據存儲流程,其入口為DLedgerMmapFileStore 的 appendAsLeader 方法。

      DLedgerMmapFileStore#appendAsLeader

      PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);

      1

      2

      Step1:首先判斷是否可以追加數據,其判斷依據主要是如下兩點:

      當前節點的狀態是否是 Leader,如果不是,則拋出異常。

      當前磁盤是否已滿,其判斷依據是 DLedger 的根目錄或數據文件目錄的使用率超過了允許使用的最大值,默認值為85%。

      ByteBuffer dataBuffer = localEntryBuffer.get(); ByteBuffer indexBuffer = localIndexBuffer.get();

      1

      2

      Step2:從本地線程變量獲取一個數據與索引 buffer。其中用于存儲數據的 ByteBuffer,其容量固定為 4M ,索引的 ByteBuffer 為兩個索引條目的長度,固定為64個字節。

      DLedgerEntryCoder.encode(entry, dataBuffer); public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) { byteBuffer.clear(); int size = entry.computSizeInBytes(); //always put magic on the first position byteBuffer.putInt(entry.getMagic()); byteBuffer.putInt(size); byteBuffer.putLong(entry.getIndex()); byteBuffer.putLong(entry.getTerm()); byteBuffer.putLong(entry.getPos()); byteBuffer.putInt(entry.getChannel()); byteBuffer.putInt(entry.getChainCrc()); byteBuffer.putInt(entry.getBodyCrc()); byteBuffer.putInt(entry.getBody().length); byteBuffer.put(entry.getBody()); byteBuffer.flip(); }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      Step3:將 DLedgerEntry,即將數據寫入到 ByteBuffer中,從這里看出,每一次寫入會調用 ByteBuffer 的 clear 方法,將數據清空,從這里可以看出,每一次數據追加,只能存儲4M的數據。

      DLedgerMmapFileStore#appendAsLeader

      源碼分析 RocketMQ DLedger(多副本) 之日志追加流程

      synchronized (memberState) { PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null); // ... 省略代碼 }

      1

      2

      3

      4

      Step4:鎖定狀態機,并再一次檢測節點的狀態是否是 Leader 節點。

      DLedgerMmapFileStore#appendAsLeader

      long nextIndex = ledgerEndIndex + 1; entry.setIndex(nextIndex); entry.setTerm(memberState.currTerm()); entry.setMagic(CURRENT_MAGIC); DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);

      1

      2

      3

      4

      5

      Step5:為當前日志條目設置序號,即 entryIndex 與 entryTerm (投票輪次)。并將魔數、entryIndex、entryTerm 等寫入到 bytebuffer 中。

      DLedgerMmapFileStore#appendAsLeader

      long prePos = dataFileList.preAppend(dataBuffer.remaining()); entry.setPos(prePos); PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null); DLedgerEntryCoder.setPos(dataBuffer, prePos);

      1

      2

      3

      4

      Step6:計算新的消息的起始偏移量,關于 dataFileList 的 preAppend 后續詳細介紹其實現,然后將該偏移量寫入日志的 bytebuffer 中。

      DLedgerMmapFileStore#appendAsLeader

      for (AppendHook writeHook : appendHooks) { writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET); }

      1

      2

      3

      Step7:執行鉤子函數。

      DLedgerMmapFileStore#appendAsLeader

      long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining()); PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null); PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);

      1

      2

      3

      Step8:將數據追加到 pagecache 中。該方法稍后詳細介紹。

      DLedgerMmapFileStore#appendAsLeader

      DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer); long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false); PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);

      1

      2

      3

      Step9:構建條目索引并將索引數據追加到 pagecache。

      DLedgerMmapFileStore#appendAsLeader

      ledgerEndIndex++; ledgerEndTerm = memberState.currTerm(); if (ledgerBeginIndex == -1) { ledgerBeginIndex = ledgerEndIndex; } updateLedgerEndIndexAndTerm();

      1

      2

      3

      4

      5

      6

      Step10:ledgerEndeIndex 加一(下一個條目)的序號。并設置 leader 節點的狀態機的 ledgerEndIndex 與 ledgerEndTerm。

      Leader 節點數據追加就介紹到這里,稍后會重點介紹與存儲相關方法的實現細節。

      1.3 主節點等待從節點復制 ACK

      其實現入口為 dLedgerEntryPusher 的 waitAck 方法。

      DLedgerEntryPusher#waitAck

      public CompletableFuture waitAck(DLedgerEntry entry) { updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex()); // @1 if (memberState.getPeerMap().size() == 1) { // @2 AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setLeaderId(memberState.getSelfId()); response.setIndex(entry.getIndex()); response.setTerm(entry.getTerm()); response.setPos(entry.getPos()); return AppendFuture.newCompletedFuture(entry.getPos(), response); } else { checkTermForPendingMap(entry.getTerm(), "waitAck"); AppendFuture future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); // @3 future.setPos(entry.getPos()); CompletableFuture old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future); // @4 if (old != null) { logger.warn("[MONITOR] get old wait at index={}", entry.getIndex()); } wakeUpDispatchers(); // @5 return future; } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      代碼@1:更新當前節點的 push 水位線。

      代碼@2:如果集群的節點個數為1,無需轉發,直接返回成功結果。

      代碼@3:構建 append 響應 Future 并設置超時時間,默認值為:2500 ms,可以通過 maxWaitAckTimeMs 配置改變其默認值。

      代碼@4:將構建的 Future 放入等待結果集合中。

      代碼@5:喚醒 Entry 轉發線程,即將主節點中的數據 push 到各個從節點。

      接下來分別對上述幾個關鍵點進行解讀。

      DLedgerEntryPusher#updatePeerWaterMark

      private void updatePeerWaterMark(long term, String peerId, long index) { // 代碼@1 synchronized (peerWaterMarksByTerm) { checkTermForWaterMark(term, "updatePeerWaterMark"); // 代碼@2 if (peerWaterMarksByTerm.get(term).get(peerId) < index) { // 代碼@3 peerWaterMarksByTerm.get(term).put(peerId, index); } } }

      1

      2

      3

      4

      5

      6

      7

      8

      代碼@1:先來簡單介紹該方法的兩個參數:

      long term

      當前的投票輪次。

      String peerId

      當前節點的ID。

      long index

      當前追加數據的序號。

      代碼@2:初始化 peerWaterMarksByTerm 數據結構,其結果為 < Long /** term */, Map< String /** peerId */, Long /** entry index*/>。

      代碼@3:如果 peerWaterMarksByTerm 存儲的 index 小于當前數據的 index,則更新。

      DLedgerEntryPusher#updatePeerWaterMark

      public void wakeUpDispatchers() { for (EntryDispatcher dispatcher : dispatcherMap.values()) { dispatcher.wakeup(); } }

      1

      2

      3

      4

      5

      該方法主要就是遍歷轉發器并喚醒。本方法的核心關鍵就是 EntryDispatcher,在詳細介紹它之前我們先來看一下該集合的初始化。

      DLedgerEntryPusher 構造方法

      for (String peer : memberState.getPeerMap().keySet()) { if (!peer.equals(memberState.getSelfId())) { dispatcherMap.put(peer, new EntryDispatcher(peer, logger)); } }

      1

      2

      3

      4

      5

      原來在構建 DLedgerEntryPusher 時會為每一個從節點創建一個 EntryDispatcher 對象。

      顯然,日志的復制由 DLedgerEntryPusher 來實現。由于篇幅的原因,該部分內容將在下篇文章中繼續。

      上面在講解 Leader 追加日志時并沒有詳細分析存儲相關的實現,為了知識體系的完備,接下來我們來分析一下其核心實現。

      2、日志存儲實現詳情

      本節主要對 MmapFileList 的 preAppend 與 append 方法進行詳細講解。

      存儲部分的設計請查閱筆者的博客:源碼分析 RocketMQ DLedger 多副本存儲實現,MmapFileList 對標 RocketMQ 的MappedFileQueue。

      2.1 MmapFileList 的 preAppend 詳解

      該方法最終會調用兩個參數的 preAppend方法,故我們直接來看兩個參數的 preAppend 方法。

      MmapFileList#preAppend

      public long preAppend(int len, boolean useBlank) { // @1 MmapFile mappedFile = getLastMappedFile(); // @2 start if (null == mappedFile || mappedFile.isFull()) { mappedFile = getLastMappedFile(0); } if (null == mappedFile) { logger.error("Create mapped file for {}", storePath); return -1; } // @2 end int blank = useBlank ? MIN_BLANK_LEN : 0; if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) { // @3 if (blank < MIN_BLANK_LEN) { logger.error("Blank {} should ge {}", blank, MIN_BLANK_LEN); return -1; } else { ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize() - mappedFile.getWrotePosition()); // @4 byteBuffer.putInt(BLANK_MAGIC_CODE); // @5 byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); // @6 if (mappedFile.appendMessage(byteBuffer.array())) { // @7 //need to set the wrote position mappedFile.setWrotePosition(mappedFile.getFileSize()); } else { logger.error("Append blank error for {}", storePath); return -1; } mappedFile = getLastMappedFile(0); if (null == mappedFile) { logger.error("Create mapped file for {}", storePath); return -1; } } } return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();// @8 }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      代碼@1:首先介紹其參數的含義:

      int len 需要申請的長度。

      boolean useBlank 是否需要填充,默認為true。

      代碼@2:獲取最后一個文件,即獲取當前正在寫的文件。

      代碼@3:如果需要申請的資源超過了當前文件可寫字節時,需要處理的邏輯。代碼@4-@7都是其處理邏輯。

      代碼@4:申請一個當前文件剩余字節的大小的bytebuffer。

      代碼@5:先寫入魔數。

      代碼@6:寫入字節長度,等于當前文件剩余的總大小。

      代碼@7:寫入空字節,代碼@4-@7的用意就是寫一條空Entry,填入魔數與 size,方便解析。

      代碼@8:如果當前文件足以容納待寫入的日志,則直接返回其物理偏移量。

      經過上述代碼解讀,我們很容易得出該方法的作用,就是返回待寫入日志的起始物理偏移量。

      2.2 MmapFileList 的 append 詳解

      最終會調用4個參數的 append 方法,其代碼如下:

      MmapFileList#append

      public long append(byte[] data, int pos, int len, boolean useBlank) { // @1 if (preAppend(len, useBlank) == -1) { return -1; } MmapFile mappedFile = getLastMappedFile(); // @2 long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); // @3 if (!mappedFile.appendMessage(data, pos, len)) { // @4 logger.error("Append error for {}", storePath); return -1; } return currPosition; }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      代碼@1:首先介紹一下各個參數:

      byte[] data

      待寫入的數據,即待追加的日志。

      int pos

      從 data 字節數組哪個位置開始讀取。

      int len

      待寫入的字節數量。

      boolean useBlank

      是否使用填充,默認為 true。

      代碼@2:獲取最后一個文件,即當前可寫的文件。

      代碼@3:獲取當前寫入指針。

      代碼@4:追加消息。

      最后我們再來看一下 appendMessage,具體的消息追加實現邏輯。

      DefaultMmapFile#appendMessage

      public boolean appendMessage(final byte[] data, final int offset, final int length) { int currentPos = this.wrotePosition.get(); if ((currentPos + length) <= this.fileSize) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // @1 byteBuffer.position(currentPos); byteBuffer.put(data, offset, length); this.wrotePosition.addAndGet(length); return true; } return false; }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      該方法我主要是想突出一下寫入的方式是 mappedByteBuffer,是通過 FileChannel 的 map 方法創建,即我們常說的 PageCache,即消息追加首先是寫入到 pageCache 中。

      本文詳細介紹了 Leader 節點處理客戶端消息追加請求的前面兩個步驟,即 判斷 Push 隊列是否已滿 與 Leader 節點存儲消息??紤]到篇幅的問題,各個節點的數據同步將在下一篇文章中詳細介紹。

      在進入下一篇的文章學習之前,我們不妨思考一下如下問題:

      如果主節點追加成功(寫入到 PageCache),但同步到從節點過程失敗或此時主節點宕機,集群中的數據如何保證一致性?

      親愛的讀者朋友們,都讀到這里了,麻煩幫忙個點個贊,謝謝。

      推薦閱讀:源碼分析RocketMQ DLedger 多副本系列連載中。

      1、RocketMQ 多副本前置篇:初探raft協議

      2、源碼分析 RocketMQ DLedger 多副本之 Leader 選主

      3、源碼分析 RocketMQ DLedger 多副本存儲實現

      見文如面,我是威哥,熱衷于成體系剖析JAVA主流中間件,關注公眾號『中間件興趣圈』,回復專欄可獲取成體系專欄導航,回復資料可以獲取筆者的學習思維導圖。

      TCP/IP

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:近期問題查詢記錄
      下一篇:springboot 2.6.2集成elasticsearch 7.16
      相關文章
      国产亚洲精品线观看动态图| 亚洲精品成人区在线观看| 亚洲精品无码久久久久sm| 在线观看亚洲网站| 亚洲国产精品日韩av不卡在线 | 久久久久亚洲精品无码网址 | 久久久久亚洲AV成人片| 亚洲精品综合一二三区在线| 国产亚洲成人久久| 中文字幕亚洲日韩无线码| 亚洲精品无码99在线观看| 亚洲麻豆精品国偷自产在线91| heyzo亚洲精品日韩| 亚洲av无码专区在线观看素人| 怡红院亚洲红怡院在线观看| 国产天堂亚洲精品| 亚洲精品国产精品乱码不卡| 亚洲伊人久久成综合人影院| 亚洲中文字幕丝袜制服一区| 国产亚洲色视频在线| 亚洲中文字幕在线第六区| 亚洲精品无码午夜福利中文字幕| 国产亚洲一区二区三区在线观看| 亚洲精品少妇30p| 亚洲AV无码成人精品区天堂| 亚洲AV日韩AV永久无码绿巨人| 亚洲国产精品自在线一区二区| 亚洲邪恶天堂影院在线观看| 亚洲精品国产情侣av在线| 亚洲一区二区三区在线观看蜜桃| 四虎必出精品亚洲高清| 亚洲中文字幕无码av永久| 久久久久久亚洲av无码蜜芽| 亚洲精品成人区在线观看| 亚洲欧洲∨国产一区二区三区 | 亚洲国产精品99久久久久久| 亚洲黄片手机免费观看| 久久亚洲精品视频| 亚洲高清免费在线观看| 亚洲精品123区在线观看| 欧美亚洲精品一区二区|