ELK 設置定時清理腳本清理索引
1139
2025-04-01
1 - 背景說明
elasticsearch 在對大批量數據進行統計、聚合等操作時,性能差,主要原因有:
ES 是通過?批量加載數據到內存中,然后進行計算的,其 scroll.size 的默認最大值為 10000,超過此值就會報錯 —— 需要修改配置文件;
ES 使用 JVM 堆內存進行計算,但官方建議單個 ES 實例的堆內存要低于 32 GB(不能等于),否則將有資源的浪費、性能的損耗 —— 主要與 JVM 的指針壓縮算法有關。
基于此,在大批量數據下的統計、聚合、排序等場景,可借助 Spark 提升運算性能。
2 - 開發方法
2.1 引入依賴
ES 官方提供了一款工具:elasticsearch-hadoop,通過 Maven 引入即可使用。
可參考官方文檔:Elasticsearch for Apache Hadoop 7.6 ? Apache Spark
2.2 代碼開發
開發步驟:
創建 SparkContext;
從 ES 中加載數據,分組統計,然后排序;
將排序后的結果保存到 HDFS 中。
示例代碼如下:
public?static?void?main(String[]?args)?{ ????LOG.info("*****?Start?to?run?the?Spark?on?ES?test."); ????try?{ ????????//?Create?a?configuration?class?SparkConf, ????????//?meanwhile?set?the?Secure?configuration?that?the?Elasticsearch?Cluster?needed, ????????//?finally?create?a?SparkContext. ????????SparkConf?conf?=?new?SparkConf() ????????????.setAppName("SparkOnEs") ????????????.set("es.nodes",?esServerHost) ????????????//?when?you?specified?in?es.nodes,?then?es.port?is?not?necessary ????????????//?.set("es.port",?"24100") ????????????.set("es.nodes.discovery",?"true") ????????????.set("es.index.auto.create",?"true") ????????????.set("es.internal.spark.sql.pushdown",?"true") ????????????//?要返回的字段,多個時以“,”分隔,此配置的性能要高于?es.read.field.include ????????????.set("es.read.source.filter",?"name,age") ????????????//?滾動查詢時,批大小最大值默認為10000 ????????????.set("es.scroll.size",?"10000") ????????????//?每個分區最多處理100萬條數據 ????????????.set("es.input.max.docs.per.partition",?"1000000"); ????????JavaSparkContext?jsc?=?new?JavaSparkContext(conf); ????????//?Group?data?from?ES ????????groupBySpark(jsc);? ????????jsc.stop(); ????}?catch?(IOException?e)?{ ????????LOG.error("*****?There?are?exceptions?in?main.",?e); ????} } /** ?*?1.?query?all?data?from?ES?by?JavaEsSpark, ?*?2.?group?by?specified?field?and?sort?the?result, ?*?3.?save?the?result?to?HDFS?or?local?files ?* ?*?@param?jsc?the?Java?Spark?Context?which?has?already?initialization ?*/ public?static?void?groupBySpark(JavaSparkContext?jsc)?{ ????long?begin?=?System.currentTimeMillis(); ????JavaPairRDD
3 - 運行任務
3.1 打包項目
在 FI 8.0.0 中下載 Spark 客戶端,獲取樣例代碼后解壓,用 IDEA 打開:File -> Open,選中項目的 pom.xml 文件 -> OK,即可完成項目的導入。
(1)修改項目中的相關配置,與要測試集群中的信息一致;
(2) 通過 IDEA 自帶的 Maven 工具,打包項目,生成?target\SparkOnES-1.0.jar;
(3)將打包生成的 jar 包上傳到 Spark 客戶端所在的服務器下,這里以?/opt/spark-on-es/?為例;
(4)將?esParams.properties、user.keytab、krb5.conf?三個文件上傳到?/opt/spark-on-es/?下;
(5)將項目所需的 jar 包上傳到?/opt/spark-on-es/libs/?下。
說明:樣例代碼運行至少需要如下 jar 包,請從 Elasticsearch 的客戶端、Maven 中心倉等處獲取相關包。
3.2 client 模式提交 Spark 任務
運行命令如下:
cd?/opt/spark-on-es/ #?下述命令為一條命令,其中?/opt/spark-on-es/libs/?是外部依賴的jar包路徑: spark-submit?--class?com.huawei.bigdata.spark.examples.SparkOnEs?\ --master?yarn?--deploy-mode?client?\ --jars?$(files=(/opt/spark-on-es/libs/*.jar);?IFS=,;?echo?"${files[*]}")?\ ./SparkOnEs-1.0.jar
3.3 cluster 模式提交 Spark 任務
運行命令如下:
cd?/opt/spark-on-es/ #?下述命令為一條命令,其中?--files?參數指定配置文件: spark-submit?--class?com.huawei.bigdata.spark.examples.SparkOnEs?\ --master?yarn?--deploy-mode?cluster?\ --jars?$(files=(/opt/spark-on-es/libs/*.jar);?IFS=,;?echo?"${files[*]}")?\ --files?./user.keytab,./krb5.conf,./esParams.properties?\ --driver-memory?6g?\ --executor-cores?5?\ --num-executors?150?\ --executor-memory?5g?\ ./SparkOnEs-1.0.jar
3.4 查看運行結果
(1) 查詢 Elasticsearch 中的數據:
#?安全模式集群下:kinit認證后,查看Elasticsearch中的index: curl?-XGET?--tlsv1.2?--negotiate?-k?-u?:?'https://10.10.10.11:24100/_cat/indices?v' #?普通模式集群下:使用http(而非https)查詢即可: curl?-XGET?'http://10.10.10.11:24100/_cat/indices?v' #?通過下述命令對people索引中的數據進行范圍查詢: curl?-XPOST?'http://10.10.10.11:24100/people/_search?pretty'?-H?'Content-Type:application/json'?-d?' { ????"query":?{ ????????"range":?{ ????????????"createdTime":?{"gte":?"2010-01-01T00:00:00Z",?"lt":?"2015-12-31T23:59:59Z"} ????????} ????} }'
注:Elasticsearch相關查詢命令,請參考【業務操作指南】-【Elasticsearch】-【Linux下curl命令的使用】。
(2) 查詢 HDFS 中的分組文件:
樣例代碼中將分組結果保存到 HDFS 中,安裝客戶端、kinit 認證后,可通過下述命令進行統計:
#?查看所有的文件,及其大小: [root@10.10.10.11?spark-on-es]#?hdfs?dfs?-du?-s?-h?/user/spark-on-es/result/* 0?????????0????????/user/spark-on-es/result/_SUCCESS 709???????2.1?K????/user/spark-on-es/result/part-00000 3.5?K?????10.4?K???/user/spark-on-es/result/part-00001 2.1?K?????6.4?K????/user/spark-on-es/result/part-00002 3.1?K?????9.4?K????/user/spark-on-es/result/part-00003 ...... #?統計結果集的個數: [root@10.10.10.11?spark-on-es]#?hdfs?dfs?-cat?/user/spark-on-es/result/*?|?wc?-l 2000000 #?查看某個文件中的內容: [root@10.10.10.11?spark-on-es]#?hdfs?dfs?-cat?/user/spark-on-es/result/part-00000 (573267,99) (1929095,98)
4 - 性能優化方法
此樣例代碼的性能瓶頸:Spark 讀取 Elasticsearch 中全量數據的過程,耗時最久,優化思路有:
1)增加索引的分片個數:elasticsearch-spark 工具讀取 Elasticsearch 中的數據時,任務的并行度默認是索引的分片個數,因此分片個數越多,并行度越高;
Elasticsearch 中索引的分片個數不宜太大,此時可通過?es.input.max.docs.per.partition?參數規劃 Spark 讀取 Elasticsearch 中數據的 Partition 個數,也可提升并行度。(詳見示例代碼)
2)增大 scroll.size 的值:elasticsearch-spark 工具通過 Scroll 滾動讀取數據,其大小默認是50,可以提高至10000,建議不高于50000,否則容易產生 OOM;
3)合理使用 Yarn 資源:參考?《產品文檔-業務操作指南-Yarn-性能調優-節點配置調優》?中的說明,合理設置 Yarn 的資源,以 cluster 模式運行任務時,應適當修改 spark-submit 的參數:
--driver-memory?6g?????##?Driver的內存大小 --executor-cores?5?????##?每個Executor可用的?CPU?核數 --num-executors?150????##?Executor的個數,num-executors?*?executor-cores?不能超過?Yarn的VCores數 --executor-memory?5g???##?每個Executor的內存大小,num-executors?*?executor-memory?不能超過Yarn的最大內存
5 - 參考資料
Elasticsearch for Apache Hadoop 7.6 ? Configuration
6 - 其他代碼參考
通過 Scroll 滾動查詢大批量數據的邏輯: /** * Query data from ES by ES rest client */ private static void queryDataByRestHighLevelClient(JavaSparkContext jsc) { LOG.info("=====> Query data from ES by Rest High Level Client beginning..."); LOG.info("=====> Query string: {}", esQueryJsonString); long begin = System.currentTimeMillis(); List
MapReduce服務 spark Elasticsearch
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。