ELK 設置定時清理腳本清理索引
717
2022-05-30
微信原文:elasticsearch 6.3.2的啟動流程
博客原文:Elasticsearch 6.3.2的啟動流程
前言
本文探究Elasticsearch 6.3.2的啟動流程
環境準備
使用工具:IDEA,XMind
關于ES調試環境的搭建,可以參考前面的文章 《教你編譯調試Elasticsearch 6.3.2源碼》
然后通過設置斷點,從 org.elasticsearch.bootstrap.ElasticSearch 的入口函數開始,一步一步調試
IDEA 2018.2 調試按鈕
上圖為使用 IDEA 2018.2 進行調試的一個截圖,左上角84行出紅點為一個斷點,1、2、3編號的3個按鈕是較為常用的按鈕,作用如下:
按鈕1:step over,執行到下一行,遇到方法不進入方法內部
微信原文:Elasticsearch 6.3.2的啟動流程
博客原文:Elasticsearch 6.3.2的啟動流程
前言
本文探究Elasticsearch 6.3.2的啟動流程
使用工具:IDEA,XMind
關于ES調試環境的搭建,可以參考前面的文章 《教你編譯調試Elasticsearch 6.3.2源碼》
然后通過設置斷點,從 org.elasticsearch.bootstrap.ElasticSearch 的入口函數開始,一步一步調試
上圖為使用 IDEA 2018.2 進行調試的一個截圖,左上角84行出紅點為一個斷點,1、2、3編號的3個按鈕是較為常用的按鈕,作用如下:
按鈕1:step over,執行到下一行,遇到方法不進入方法內部
按鈕2:step into,執行到下一句代碼,遇到方法則進入方法內部
按鈕3:Run to cursor,執行到下一個斷點處,后面沒有斷點則執行到結束
Elasticsearch 解析 Command,加載配置
Bootstrap 初始化,資源檢查
Node 創建節點
Bootstrap 啟動節點和保活線程
Elasticsearch 解析 Command,加載配置
首先可以看一下入口方法 Elasticsearch.main:
public?static?void?main(final?String[]?args)?throws?Exception?{
System.setSecurityManager(new?SecurityManager()?{
@Override
public?void?checkPermission(Permission?perm)?{
//?grant?all?permissions?so?that?we?can?later?set?the?security?manager?to?the?one?that?we?want
}
});
LogConfigurator.registerErrorListener();
final?Elasticsearch?elasticsearch?=?new?Elasticsearch();
int?status?=?main(args,?elasticsearch,?Terminal.DEFAULT);
if?(status?!=?ExitCodes.OK)?{
exit(status);
}
}
1.1, 創建 SecurityManager 安全管理器
關于 SecurityManager:
安全管理器在Java語言中的作用就是檢查操作是否有權限執行,通過則順序進行,否則拋出一個異常
網上一篇文章:Java安全——安全管理器、訪問控制器和類裝載器
1.2, LogConfigurator.registerErrorListener() 注冊偵聽器
1.3, 創建Elasticsearch對象
Elasticsearch 入口類的繼承關系如下:
可以看到Elasticsearch繼承了EnvironmentAwareCommand,Command,這幾個類的功能簡要介紹如下:
Elasticsearch: This class starts elasticsearch.
EnvironmentAwareCommand: A cli command which requires an org.elasticsearch.env.Environment to use current paths and settings
Command: An action to execute within a cli.
可以看出Elasticsearch的一個重要作用是解析命令參數
執行帶 -h 參數的Elasticsearch啟動命令
可以發現這幾個參數與 Cammand 類 和 Elasticsearch 的幾個私有變量是對應的
Elasticsearch的構造函數如下:
Elasticsearch()?{
super("starts?elasticsearch",?()?->?{});?//?we?configure?logging?later?so?we?override?the?base?class?from?configuring?logging
versionOption?=?parser.acceptsAll(Arrays.asList("V",?"version"),?"Prints?elasticsearch?version?information?and?exits");
daemonizeOption?=?parser.acceptsAll(Arrays.asList("d",?"daemonize"),?"Starts?Elasticsearch?in?the?background")
.availableUnless(versionOption);
pidfileOption?=?parser.acceptsAll(Arrays.asList("p",?"pidfile"),?"Creates?a?pid?file?in?the?specified?path?on?start")
.availableUnless(versionOption).withRequiredArg().withValuesConvertedBy(new?PathConverter());
quietOption?=?parser.acceptsAll(Arrays.asList("q",?"quiet"),?"Turns?off?standard?output/error?streams?logging?in?console")
.availableUnless(versionOption).availableUnless(daemonizeOption);
}
1.4, 接著進入 Command.main 方法
該方法給當前Runtime類添加一個hook線程,該線程作用是:當Runtime異常關閉時打印異常信息
1.5, Command.mainWithoutErrorHandling 方法,根據命令行參數,打印或者設置參數,然后執行命令,有異常則拋出所有異常
1.6, EnvironmentAwareCommand.execute,確保 es.path.data, es.path.home, es.path.logs 等參數已設置,否則從 System.properties 中讀取
putSystemPropertyIfSettingIsMissing(settings,?"path.data",?"es.path.data");
putSystemPropertyIfSettingIsMissing(settings,?"path.home",?"es.path.home");
putSystemPropertyIfSettingIsMissing(settings,?"path.logs",?"es.path.logs");
execute(terminal,?options,?createEnv(terminal,?settings));
1.7, EnvironmentAwareCommand.createEnv,讀取config下的配置文件elasticsearch.yml內容,收集plugins,bin,lib,modules等目錄下的文件信息
createEnv最后返回一個 Environment 對象,執行結果如下
1.8, Elasticsearch.execute ,讀取daemonize, pidFile,quiet 的值,并 確保配置的臨時目錄(temp)是有效目錄
進入Bootstrap初始化階段
Bootstrap.init(!daemonize,?pidFile,?quiet,?initialEnv);
Bootstrap初始化階段
2.1, 進入 Bootstrap.init, This method is invoked by Elasticsearch#main(String[]) to startup elasticsearch.
INSTANCE = new Bootstrap();, 創建一個Bootstrap對象作為類對象,該類構造函數會創建一個用戶線程,添加到Runtime Hook中,進行 countDown 操作
private?final?CountDownLatch?keepAliveLatch?=?new?CountDownLatch(1);
/**?creates?a?new?instance?*/
Bootstrap()?{
keepAliveThread?=?new?Thread(new?Runnable()?{
@Override
public?void?run()?{
try?{
keepAliveLatch.await();
}?catch?(InterruptedException?e)?{
}
}
},?"elasticsearch[keepAlive/"?+?Version.CURRENT?+?"]");
keepAliveThread.setDaemon(false);
//?keep?this?thread?alive?(non?daemon?thread)?until?we?shutdown
Runtime.getRuntime().addShutdownHook(new?Thread()?{
@Override
public?void?run()?{
keepAliveLatch.countDown();
}
});
}
CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執行完后再執行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有框架服務之后執行。
CountDownLatch是通過一個計數器來實現的,計數器的初始化值為線程的數量。每當一個線程完成了自己的任務后,計數器的值就相應得減1。當計數器到達0時,表示所有的線程都已完成任務,然后在閉鎖上等待的線程就可以恢復執行任務。
更多介紹請看文章:并發工具類 CountDownLatch
2.2, 加載 keystore 安全配置,keystore文件不存在則創建,保存;存在則解密,更新keystore
2.3, 根據已有的配置信息,創建一個Environment對象
2.4, LogConfigurator log4j日志配置
2.5, 檢查pid文件是否存在,不存在則創建
關于 pid 文件:
(1) pid文件的內容:pid文件為文本文件,內容只有一行,記錄了該進程的ID,用cat命令可以看到。
(2) pid文件的作用:防止進程啟動多個副本。只有獲得pid文件(固定路徑固定文件名)寫入權限(F_WRLCK)的進程才能正常啟動并把自身的PID寫入該文件中,其它同一個程序的多余進程則自動退出。
2.6, 檢查Lucene版本與實際的Lucene Jar文件的版本是否一致,不一致則拋異常
2.7, 設置未捕獲異常的處理 Thread.setDefaultUncaughtExceptionHandler
在Thread ApI中提供了UncaughtExceptionHandle,它能檢測出某個由于未捕獲的異常而終結的情況
朱小廝 JAVA多線程之UncaughtExceptionHandler——處理非正常的線程中止
3.1,spawner.spawnNativeControllers(environment);
遍歷每個模塊,生成本機控制類(native Controller):讀取modules文件夾下所有的文件夾中的模塊信息,保存為一個 PluginInfo ?對象,為合適的模塊生成控制類,通過 Files.isRegularFile(spawnPath) 來判斷
嘗試為給定模塊生成控制器(native Controller)守護程序。 ? ?生成的進程將通過其stdin,stdout和stderr流保持與此JVM的連接,但對此包之外的代碼不能使用對這些流的引用。
3.2, initializeNatives(Path tmpFile, boolean mlockAll, boolean systemCallFilter, boolean ctrlHandler)初始化本地資源
檢查用戶是否為root用戶,是則拋異常;
嘗試啟用 系統調用過濾器 system call filter;
如果設置了則進行 mlockall
Windows關閉事件-
init lucene random seed.
這個過程中使用到了 Natives 類:
Natives類是一個包裝類,用于檢查調用本機方法所需的類是否在啟動時可用。如果它們不可用,則此類將避免調用加載這些類的代碼
3.3, 添加一個Hook: Runtime.getRuntime().addShutdownHook,當ES退出時用于關閉必要的IO流,日志器上下文和配置器等
3.4, 使用 JarHell 檢查重復的 jar 文件
3.5, 初始化 SecurityManager
//?install?SM?after?natives,?shutdown?hooks,?etc.
Security.configure(environment,?BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
創建 node 節點
node?=?new?Node(environment)?{
@Override
protected?void?validateNodeBeforeAcceptingRequests(
final?BootstrapContext?context,
final?BoundTransportAddress?boundTransportAddress,?List
BootstrapChecks.check(context,?boundTransportAddress,?checks);
}
};
4.1, 這里直接貼一下代碼(前半部分)
這里進行的主要操作有:
生命周期Lifecycle設置為 初始化狀態 INITIALIZED
創建一個 NodeEnvironment 對象保存節點環境信息,如各種數據文件的路徑
讀取JVM信息
創建 PluginsService 對象,創建過程中會讀取并加載所有的模塊和插件
創建一個最終的 Environment 對象
創建線程池 ThreadPool 后面各類對象基本都是通過線程來提供服務,這個線程池可以管理各類線程
創建 節點客戶端 NodeClient
這里重點介紹 PluginsService 和 ThreadPool 這兩個類
ThreadPool 線程池
fixed(固定):fixed線程池擁有固定數量的線程來處理請求,在沒有空閑線程時請求將被掛在隊列中。queue_size參數可以控制在沒有空閑線程時,能排隊掛起的請求數
fixed_auto_queue_size:此類型為實驗性的,將被更改或刪除,不關注
scaling(彈性):scaling線程池擁有的線程數量是動態的,這個數字介于core和max參數的配置之間變化。keep_alive參數用來控制線程在線程池中空閑的最長時間
direct:此類線程是一種不支持關閉的線程,就意味著一旦使用,則會一直存活下去.
generic:用于通用的請求(例如:后臺節點發現),線程池類型為 scaling。
index:用于index/delete請求,線程池類型為 fixed, 大小的為處理器數量,隊列大小為200,最大線程數為 1 + 處理器數量。
search:用于count/search/suggest請求。線程池類型為 fixed, 大小的為 int((處理器數量 3) / 2) +1,隊列大小為1000。*
get:用于get請求。線程池類型為 fixed,大小的為處理器數量,隊列大小為1000。
analyze:用于analyze請求。線程池類型為 fixed,大小的1,隊列大小為16
write:用于單個文檔的 index/delete/update 請求以及 bulk 請求,線程池類型為 fixed,大小的為處理器數量,隊列大小為200,最大線程數為 1 + 處理器數量。
snapshot:用于snaphost/restore請求。線程池類型為 scaling,線程保持存活時間為5分鐘,最大線程數為min(5, (處理器數量)/2)。
warmer:用于segment warm-up請求。線程池類型為 scaling,線程保持存活時間為5分鐘,最大線程數為min(5, (處理器數量)/2)。
refresh:用于refresh請求。線程池類型為 scaling,線程空閑保持存活時間為5分鐘,最大線程數為min(10, (處理器數量)/2)。
listener:主要用于Java客戶端線程-被設置為true時執行動作。線程池類型為 scaling,最大線程數為min(10, (處理器數量)/2)。
ThreadPool 類中除了以上線程隊列,還可以看到有 CachedTimeThread(緩存系統時間)、ExecutorService(在當前線程上執行提交的任務)、ThreadContext(線程上下文)、ScheduledThreadPoolExecutor(Java任務調度)等
回到 Node 節點的創建
4.2, 創建各種服務類對象 ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService
這些服務類是的功能可以根據名稱做一個大概的判斷,具體還需要看文檔和源碼,限于篇幅,在此不做探究
4.3, ModulesBuilder類加入各種模塊 ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
4.4, guice 綁定依賴以及依賴注入
關于 guice 可以參考之前的文章:
Google Guice 快速入門
Elasticsearch 中的 Guice
elasticsearch里面的組件基本都進行進行了模塊化管理,elasticsearch對guice進行了封裝,通過ModulesBuilder類構建es的模塊(一般包括的模塊在 4.3 中列舉了)
//?依賴綁定
modules.add(b?->?{
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new?SearchPhaseController(settings,
searchService::createReduceContext));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new?UpdateHelper(settings,?scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
RecoverySettings?recoverySettings?=?new?RecoverySettings(settings,?settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(),?recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new?PeerRecoverySourceService(settings,?transportService,
indicesService,?recoverySettings));
b.bind(PeerRecoveryTargetService.class).toInstance(new?PeerRecoveryTargetService(settings,?threadPool,
transportService,?recoverySettings,?clusterService));
}
httpBind.accept(b);
pluginComponents.stream().forEach(p?->?b.bind((Class)?p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
}
);
injector?=?modules.createInjector();
Bootstrap 啟動
5.1, 通過 injector 獲取各個類的對象,調用 start() 方法啟動(實際進入各個類的中 doStart 方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
這里簡要介紹一下各個服務類的職能:
IndicesService:索引管理
IndicesClusterStateService:跨集群同步
SnapshotsService:負責創建快照
SnapshotShardsService:此服務在數據和主節點上運行,并控制這些節點上當前快照的分片。 它負責啟動和停止分片級別快照
RoutingService:偵聽集群狀態,當它收到ClusterChangedEvent(集群改變事件)將驗證集群狀態,路由表可能會更新
SearchService:搜索服務
MonitorService:監控
NodeConnectionsService:此組件負責在節點添加到群集狀態后連接到節點,并在刪除它們時斷開連接。 此外,它會定期檢查所有連接是否仍處于打開狀態,并在需要時還原它們。 請注意,如果節點斷開/不響應ping,則此組件不負責從群集中刪除節點。 這是由NodesFaultDetection完成的。 主故障檢測由鏈接MasterFaultDetection完成。
ResourceWatcherService:通用資源觀察器服務
GatewayService:網關
如果該節點是主節點或數據節點,還需要進行相關的職能操作
5.2, 集群發現與監控等,啟動 HttpServerTransport, 綁定服務端口
5.3, 啟動保活線程 keepAliveThread.start 進行心跳檢測
小結
過程很漫長,后面很多類的功能未了解,之后補上
有理解錯誤的地方請大家多多指教
后記
歡迎評論、轉發、分享,您的支持是我最大的動力
更多內容可訪問我的個人博客:http://laijianfeng.org
關注【小旋鋒】微信公眾號,及時接收博文推送
Elasticsearch
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。