ES寫入內(nèi)核流程
1???? 概述
在ES中,寫入單個(gè)文檔的請(qǐng)求稱為Index請(qǐng)求,批量寫入的請(qǐng)求稱為Bulk請(qǐng)求。它們都使用相同的處理邏輯,被統(tǒng)一封裝為BulkRequest。以下從源碼角度分析ES的bulk請(qǐng)求寫入流程。
2???? 源碼分析
2.1????? Rest層請(qǐng)求轉(zhuǎn)化為Transport層請(qǐng)求
2.1.1??????? RestController.java
1.ES會(huì)提前將處理各種http請(qǐng)求(GET、PUT、POST、DELETE等)的handler注冊(cè)到一個(gè)handler列表中,RestController# registerHandler:
2.http請(qǐng)求發(fā)送到ES后,在RestController中進(jìn)行實(shí)際的分發(fā)過(guò)程。首先根據(jù)http請(qǐng)求找到對(duì)應(yīng)的handler,再調(diào)用handler的handleRequest方法處理請(qǐng)求:
3.handlerRequest調(diào)用prepareRequest:
2.1.1??????? RestBulkAction.java
1.對(duì)于bulk操作,其請(qǐng)求對(duì)應(yīng)的handler是RestBulkAction,該類會(huì)在其構(gòu)造函數(shù)中將其注冊(cè)到RestController,代碼如下:
2. RestBulkAction會(huì)將RestRequest解析并轉(zhuǎn)化為BulkRequest,然后再對(duì)BulkRequest做處理,這塊的邏輯在prepareRequest方法中,代碼如下:
3.上圖最后一行是NodeClient對(duì)bulk請(qǐng)求的處理
2.1.1??????? NodeClient.java
NodeClient在處理BulkRequest請(qǐng)求時(shí),會(huì)將請(qǐng)求的action轉(zhuǎn)化為對(duì)應(yīng)Transport層的action,然后再由Transport層的action來(lái)處理BulkRequest,action轉(zhuǎn)化的代碼如下:
TransportAction會(huì)調(diào)用一個(gè)請(qǐng)求過(guò)濾鏈來(lái)處理請(qǐng)求:
對(duì)于Bulk請(qǐng)求,TransportAction的具體實(shí)現(xiàn)類為TransportBulkAction,其doExecute方法繼續(xù)執(zhí)行寫入邏輯。至此轉(zhuǎn)化完成。
2.2? ? ??協(xié)調(diào)節(jié)點(diǎn)處理并轉(zhuǎn)發(fā)請(qǐng)求
2.2.1??????? TransportBulkAction.java
TransportBulkAction#doExecute先判斷bulk請(qǐng)求中的索引是否存在,不存在則調(diào)用自動(dòng)創(chuàng)建流程:
可見(jiàn),邏輯為先遍歷bulk中的索引,如果開(kāi)啟了自動(dòng)創(chuàng)建索引則放到autoCreateIndices集合中,最后通過(guò)createIndex方法創(chuàng)建。
創(chuàng)建完index后,index的各shard已在數(shù)據(jù)節(jié)點(diǎn)上創(chuàng)建完成,協(xié)調(diào)節(jié)點(diǎn)將轉(zhuǎn)發(fā)寫入請(qǐng)求到文檔對(duì)應(yīng)的primary shard。協(xié)調(diào)節(jié)點(diǎn)轉(zhuǎn)發(fā)的入口為TransportBulkAction #executeBulk方法:
執(zhí)行邏輯在BulkOperation的doRun方法中:
1)首先遍歷bulkRequest請(qǐng)求,然后根據(jù)請(qǐng)求的操作類型執(zhí)行相應(yīng)邏輯。對(duì)于index請(qǐng)求,會(huì)先根據(jù)IndexMetaData信息為每條寫入請(qǐng)求生成路由信息,如果用戶沒(méi)有指定doc id,則會(huì)在process方法中生成:
2)然后根據(jù)每個(gè)IndexRequest請(qǐng)求的路由信息(默認(rèn)為doc id)得到所要寫入的目標(biāo)shard id,再將DocWriteRequest封裝為BulkItemRequest并添加到請(qǐng)求列表:
3)然后將請(qǐng)求按shard分組封裝為BulkShardRequest并交由TransportShardBulkAction來(lái)處理:
4)執(zhí)行邏輯最終進(jìn)入TransportReplicationAction#doRun方法
2.2.2? ? ? ??TransportReplicationAction.java
TransportReplicationAction#doRun會(huì)通過(guò)ClusterState獲取到primary shard的路由信息,然后得到primary shard所在的node,如果node為當(dāng)前協(xié)調(diào)節(jié)點(diǎn)則直接將請(qǐng)求發(fā)往本地,否則發(fā)往遠(yuǎn)端:
2.3? ? ??主分片再副本分片節(jié)點(diǎn)執(zhí)行寫入
2.3.1??????? ReplicationOperation.java
ReplicationOperation#execute方法執(zhí)行主分片節(jié)點(diǎn)寫入:
primary.perform執(zhí)行主分片寫入,主分片寫入完成調(diào)用handlerPrimaryResult方法,發(fā)送寫副本分片的請(qǐng)求:
2.3.2? ? ? ??TransportShardBulkAction.java
著重看寫主分片的邏輯,在TransportShardBulkAction#shardOperationOnPrimary方法中:
再調(diào)用InternalEngine.Index將數(shù)據(jù)寫入lucene,再寫入translog:
寫lucene和translog的整體流程如下:
(1)數(shù)據(jù)寫入buffer緩沖和translog日志文件
(2)每隔一秒鐘,buffer中的數(shù)據(jù)被寫入新的segment file,并進(jìn)入os cache,此時(shí)segment被打開(kāi)并供search使用
(3)buffer被清空
(4)重復(fù)1~3,新的segment不斷添加,buffer不斷被清空,而translog中的數(shù)據(jù)不斷累加
(5)當(dāng)translog長(zhǎng)度達(dá)到一定程度的時(shí)候,commit操作發(fā)生:
(5-1)buffer中的所有數(shù)據(jù)寫入一個(gè)新的segment,并寫入os cache,打開(kāi)供使用
(5-2)buffer被清空
(5-3)一個(gè)commit ponit被寫入磁盤,標(biāo)明了所有的index segment
(5-4)filesystem cache中的所有index segment file緩存數(shù)據(jù),被fsync強(qiáng)行刷到磁盤上
(5-5)現(xiàn)有的translog被清空,創(chuàng)建一個(gè)新的translog
以上寫lucene和translog對(duì)應(yīng)ES中幾個(gè)關(guān)鍵概念:
(1)fresh: 內(nèi)存緩沖區(qū)被清空寫到段中,段被打開(kāi)可進(jìn)行搜索
(2)commit point: 記錄當(dāng)前所有可用的segement
(3)flush: 內(nèi)存緩沖區(qū)被清空寫到段中,一個(gè)提交點(diǎn)被寫入硬盤,文件系統(tǒng)緩存通過(guò)?fsync?被刷新,老的 translog 被刪除
(4)fsync: 即/_flush/sync命令,邏輯是flush translog并且將sync_id同步到各個(gè)分片,可以實(shí)現(xiàn)快速恢復(fù)
Elasticsearch NAT
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。