Spark 操作 Elasticsearch 性能優化

      網友投稿 1139 2025-04-01

      1 - 背景說明

      elasticsearch 在對大批量數據進行統計、聚合等操作時,性能差,主要原因有:

      ES 是通過?批量加載數據到內存中,然后進行計算的,其 scroll.size 的默認最大值為 10000,超過此值就會報錯 —— 需要修改配置文件;

      ES 使用 JVM 堆內存進行計算,但官方建議單個 ES 實例的堆內存要低于 32 GB(不能等于),否則將有資源的浪費、性能的損耗 —— 主要與 JVM 的指針壓縮算法有關。

      基于此,在大批量數據下的統計、聚合、排序等場景,可借助 Spark 提升運算性能。

      2 - 開發方法

      2.1 引入依賴

      ES 官方提供了一款工具:elasticsearch-hadoop,通過 Maven 引入即可使用。

      ????org.elasticsearch ????elasticsearch-spark-20_2.11 ????7.6.0

      可參考官方文檔:Elasticsearch for Apache Hadoop 7.6 ? Apache Spark

      2.2 代碼開發

      開發步驟:

      創建 SparkContext;

      從 ES 中加載數據,分組統計,然后排序;

      將排序后的結果保存到 HDFS 中。

      示例代碼如下:

      public?static?void?main(String[]?args)?{ ????LOG.info("*****?Start?to?run?the?Spark?on?ES?test."); ????try?{ ????????//?Create?a?configuration?class?SparkConf, ????????//?meanwhile?set?the?Secure?configuration?that?the?Elasticsearch?Cluster?needed, ????????//?finally?create?a?SparkContext. ????????SparkConf?conf?=?new?SparkConf() ????????????.setAppName("SparkOnEs") ????????????.set("es.nodes",?esServerHost) ????????????//?when?you?specified?in?es.nodes,?then?es.port?is?not?necessary ????????????//?.set("es.port",?"24100") ????????????.set("es.nodes.discovery",?"true") ????????????.set("es.index.auto.create",?"true") ????????????.set("es.internal.spark.sql.pushdown",?"true") ????????????//?要返回的字段,多個時以“,”分隔,此配置的性能要高于?es.read.field.include ????????????.set("es.read.source.filter",?"name,age") ????????????//?滾動查詢時,批大小最大值默認為10000 ????????????.set("es.scroll.size",?"10000") ????????????//?每個分區最多處理100萬條數據 ????????????.set("es.input.max.docs.per.partition",?"1000000"); ????????JavaSparkContext?jsc?=?new?JavaSparkContext(conf); ????????//?Group?data?from?ES ????????groupBySpark(jsc);? ????????jsc.stop(); ????}?catch?(IOException?e)?{ ????????LOG.error("*****?There?are?exceptions?in?main.",?e); ????} } /** ?*?1.?query?all?data?from?ES?by?JavaEsSpark, ?*?2.?group?by?specified?field?and?sort?the?result, ?*?3.?save?the?result?to?HDFS?or?local?files ?* ?*?@param?jsc?the?Java?Spark?Context?which?has?already?initialization ?*/ public?static?void?groupBySpark(JavaSparkContext?jsc)?{ ????long?begin?=?System.currentTimeMillis(); ????JavaPairRDD?pairRDD?=?JavaEsSpark.esRDD(jsc,?esIndex); ????//?根據?age?字段進行分組 ????final?String?field?=?"age"; ????JavaPairRDDresultRdd?=?pairRDD.mapPartitionsToPair( ????????new?PairFlatMapFunction>,?String,?Long>()?{ ????????????@Override ????????????public?Iterator?call( ????????????????Iterator>?iterator)?throws?Exception?{ ????????????????List?list?=?new?ArrayList<>(10000); ????????????????iterator.forEachRemaining( ????????????????????row?->?list.add(new?Tuple2<>(row._2.get(field).toString(),?1L))); ????????????????return?list.iterator(); ????????????} ????????}) ????????.reduceByKey((v1,?v2)?->?(v1?+?v2)) ????????.mapToPair(row?->?new?Tuple2<>(row._2,?row._1)) ????????//?對不同年齡的人數,倒序排序 ????????.sortByKey(false) ????????.mapToPair(row?->?new?Tuple2<>(row._2,?row._1)); ????long?end?=?System.currentTimeMillis(); ????long?spentTime?=?end?-?begin; ????LOG.info("*****?GroupBy?data?from?ES?successful,?spent?time:?{}?ms",?spentTime); ????resultRdd.saveAsTextFile("/user/spark-on-es/group-result/"); ????LOG.info("*****?Save?all?result?to?HDFS?successful."); }

      3 - 運行任務

      3.1 打包項目

      在 FI 8.0.0 中下載 Spark 客戶端,獲取樣例代碼后解壓,用 IDEA 打開:File -> Open,選中項目的 pom.xml 文件 -> OK,即可完成項目的導入。

      (1)修改項目中的相關配置,與要測試集群中的信息一致;

      (2) 通過 IDEA 自帶的 Maven 工具,打包項目,生成?target\SparkOnES-1.0.jar;

      (3)將打包生成的 jar 包上傳到 Spark 客戶端所在的服務器下,這里以?/opt/spark-on-es/?為例;

      (4)將?esParams.properties、user.keytab、krb5.conf?三個文件上傳到?/opt/spark-on-es/?下;

      (5)將項目所需的 jar 包上傳到?/opt/spark-on-es/libs/?下。

      說明:樣例代碼運行至少需要如下 jar 包,請從 Elasticsearch 的客戶端、Maven 中心倉等處獲取相關包。

      3.2 client 模式提交 Spark 任務

      運行命令如下:

      cd?/opt/spark-on-es/ #?下述命令為一條命令,其中?/opt/spark-on-es/libs/?是外部依賴的jar包路徑: spark-submit?--class?com.huawei.bigdata.spark.examples.SparkOnEs?\ --master?yarn?--deploy-mode?client?\ --jars?$(files=(/opt/spark-on-es/libs/*.jar);?IFS=,;?echo?"${files[*]}")?\ ./SparkOnEs-1.0.jar

      3.3 cluster 模式提交 Spark 任務

      運行命令如下:

      cd?/opt/spark-on-es/ #?下述命令為一條命令,其中?--files?參數指定配置文件: spark-submit?--class?com.huawei.bigdata.spark.examples.SparkOnEs?\ --master?yarn?--deploy-mode?cluster?\ --jars?$(files=(/opt/spark-on-es/libs/*.jar);?IFS=,;?echo?"${files[*]}")?\ --files?./user.keytab,./krb5.conf,./esParams.properties?\ --driver-memory?6g?\ --executor-cores?5?\ --num-executors?150?\ --executor-memory?5g?\ ./SparkOnEs-1.0.jar

      3.4 查看運行結果

      (1) 查詢 Elasticsearch 中的數據:

      #?安全模式集群下:kinit認證后,查看Elasticsearch中的index: curl?-XGET?--tlsv1.2?--negotiate?-k?-u?:?'https://10.10.10.11:24100/_cat/indices?v' #?普通模式集群下:使用http(而非https)查詢即可: curl?-XGET?'http://10.10.10.11:24100/_cat/indices?v' #?通過下述命令對people索引中的數據進行范圍查詢: curl?-XPOST?'http://10.10.10.11:24100/people/_search?pretty'?-H?'Content-Type:application/json'?-d?' { ????"query":?{ ????????"range":?{ ????????????"createdTime":?{"gte":?"2010-01-01T00:00:00Z",?"lt":?"2015-12-31T23:59:59Z"} ????????} ????} }'

      注:Elasticsearch相關查詢命令,請參考【業務操作指南】-【Elasticsearch】-【Linux下curl命令的使用】。

      (2) 查詢 HDFS 中的分組文件:

      樣例代碼中將分組結果保存到 HDFS 中,安裝客戶端、kinit 認證后,可通過下述命令進行統計:

      #?查看所有的文件,及其大小: [root@10.10.10.11?spark-on-es]#?hdfs?dfs?-du?-s?-h?/user/spark-on-es/result/* 0?????????0????????/user/spark-on-es/result/_SUCCESS 709???????2.1?K????/user/spark-on-es/result/part-00000 3.5?K?????10.4?K???/user/spark-on-es/result/part-00001 2.1?K?????6.4?K????/user/spark-on-es/result/part-00002 3.1?K?????9.4?K????/user/spark-on-es/result/part-00003 ...... #?統計結果集的個數: [root@10.10.10.11?spark-on-es]#?hdfs?dfs?-cat?/user/spark-on-es/result/*?|?wc?-l 2000000 #?查看某個文件中的內容: [root@10.10.10.11?spark-on-es]#?hdfs?dfs?-cat?/user/spark-on-es/result/part-00000 (573267,99) (1929095,98)

      4 - 性能優化方法

      此樣例代碼的性能瓶頸:Spark 讀取 Elasticsearch 中全量數據的過程,耗時最久,優化思路有:

      1)增加索引的分片個數:elasticsearch-spark 工具讀取 Elasticsearch 中的數據時,任務的并行度默認是索引的分片個數,因此分片個數越多,并行度越高;

      Elasticsearch 中索引的分片個數不宜太大,此時可通過?es.input.max.docs.per.partition?參數規劃 Spark 讀取 Elasticsearch 中數據的 Partition 個數,也可提升并行度。(詳見示例代碼)

      2)增大 scroll.size 的值:elasticsearch-spark 工具通過 Scroll 滾動讀取數據,其大小默認是50,可以提高至10000,建議不高于50000,否則容易產生 OOM;

      3)合理使用 Yarn 資源:參考?《產品文檔-業務操作指南-Yarn-性能調優-節點配置調優》?中的說明,合理設置 Yarn 的資源,以 cluster 模式運行任務時,應適當修改 spark-submit 的參數:

      --driver-memory?6g?????##?Driver的內存大小 --executor-cores?5?????##?每個Executor可用的?CPU?核數 --num-executors?150????##?Executor的個數,num-executors?*?executor-cores?不能超過?Yarn的VCores數 --executor-memory?5g???##?每個Executor的內存大小,num-executors?*?executor-memory?不能超過Yarn的最大內存

      5 - 參考資料

      Elasticsearch for Apache Hadoop 7.6 ? Configuration

      6 - 其他代碼參考

      通過 Scroll 滾動查詢大批量數據的邏輯: /** * Query data from ES by ES rest client */ private static void queryDataByRestHighLevelClient(JavaSparkContext jsc) { LOG.info("=====> Query data from ES by Rest High Level Client beginning..."); LOG.info("=====> Query string: {}", esQueryJsonString); long begin = System.currentTimeMillis(); List> resultMap = new ArrayList<>(1024 * 100); try { // query data by scroll api, avoid the OOM SearchRequest searchRequest = new SearchRequest(index); final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); searchRequest.scroll(scroll); // set the size of result, note: if the number of size was too large, may cause the OOM SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(2000); searchSourceBuilder.query(QueryBuilders.rangeQuery(esQueryField).gte(esQueryRangeBegin).lt(esQueryRangeEnd)); String[] includeFields = new String[] {"id", "name", "birthday"}; String[] excludeFields = new String[] {"age"}; searchSourceBuilder.fetchSource(includeFields, excludeFields); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); String scrollId = searchResponse.getScrollId(); SearchHit[] searchHits = searchResponse.getHits().getHits(); while (searchHits != null && searchHits.length > 0) { for (SearchHit hit : searchHits) { Map source = hit.getSourceAsMap(); resultMap.add(source); } // continue scroll search SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(scroll); searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT); scrollId = searchResponse.getScrollId(); searchHits = searchResponse.getHits().getHits(); } } catch (Exception e) { LOG.error("***** Query data failed, exception occurred.", e); } JavaRDD> rdd = jsc.parallelize(resultMap); long end = System.currentTimeMillis(); long spentTime = end - begin; LOG.info("=====> Query data from ES by Rest High Level Client, rdd's count: {}, spent time: {} ms", rdd.count(), spentTime); } 通過 bulkPut 創建大量測試數據的邏輯: /** * Put data by a bulk request * * @param restClient the Client of Elasticsearch */ private static void putDataByBulk(RestClient restClient) { LOG.info("***** Bulk put data beginning..."); // total number of documents need to index long totalRecordNum = 100000; // number of document per bulk request long oneCommit = 500; long circleNumber = totalRecordNum / oneCommit; StringEntity entity; Gson gson = new Gson(); Map esMap = new HashMap<>(); String str = "{ \"index\" : { \"_index\" : \"" + index + "\", \"_type\" : \"" + type + "\"} }"; for (int i = 0; i < circleNumber; i++) { StringBuilder builder = new StringBuilder(); for (int j = 1; j <= oneCommit; j++) { esMap.clear(); esMap.put("id", (i * oneCommit + j) + ""); esMap.put("name", getName()); esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30)); esMap.put("birthday", getBirthday()); /* esMap.clear(); id = i * oneCommit + j + ""; esMap.put("id", id); esMap.put("name", "name-" + id); esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30)); esMap.put("birthday", new Date()); */ String strJson = gson.toJson(esMap); builder.append(str).append("\n"); builder.append(strJson).append("\n"); } entity = new StringEntity(builder.toString(), ContentType.APPLICATION_JSON); entity.setContentEncoding("UTF-8"); Response response; try { Request request = new Request("PUT", "/_bulk"); request.addParameter("pretty", "true"); request.setEntity(entity); response = restClientTest.performRequest(request); if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { LOG.info("***** Already input documents: " + oneCommit * i); } else { LOG.error("***** Bulk failed."); } LOG.info("Bulk response entity is : " + EntityUtils.toString(response.getEntity())); } catch (Exception e) { LOG.error("Bulk failed, exception occurred.", e); } } } // 較真實的數據 private static String[] firstName = {"Jenny", "James", "Linda", "Judy", "Karen", "Kelly", "Margaret", "Rose", "Nora", "Wendy"}; private static String[] lastName = {"Abel", "Abraham", "Kent", "Brown", "White", "Cotton", "Hawk", "George", "Henry", "David"}; private static String getName() { int index = ThreadLocalRandom.current().nextInt(0, 10); return firstName[index] + " " + lastName[index]; } private static String getBirthday() { ThreadLocalRandom random = ThreadLocalRandom.current(); int year = random.nextInt(1990, 2021); int month = random.nextInt(1, 13); int day = random.nextInt(1, 29); int hour = random.nextInt(0, 24); int minute = random.nextInt(0, 60); int second = random.nextInt(0, 60); LocalDateTime time = LocalDateTime.of(year, Month.of(month), day, hour, minute, second); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return formatter.format(time); } }

      Spark 操作 Elasticsearch 性能優化

      MapReduce服務 spark Elasticsearch

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Excel表格中mod函數的使用方法有哪些(excel mod函數用法)
      下一篇:繪制甘特圖前置任務(繪制甘特圖前置任務怎么做)
      相關文章
      亚洲精品国产肉丝袜久久| 亚洲嫩草影院在线观看| 亚洲午夜福利在线视频| 91亚洲精品麻豆| 亚洲激情在线视频| 久久国产亚洲精品麻豆| 亚洲深深色噜噜狠狠爱网站| 国产亚洲精品AA片在线观看不加载 | 亚洲av午夜国产精品无码中文字| 456亚洲人成在线播放网站| 亚洲人成电影网站| 亚洲另类自拍丝袜第1页| 亚洲国产精品成人综合色在线婷婷| 亚洲妇女水蜜桃av网网站| 亚洲国产精品成人精品小说| 亚洲国产av一区二区三区丶| 国产精品亚洲自在线播放页码| 亚洲久悠悠色悠在线播放| 亚洲中文字幕无码中文字| 亚洲熟妇少妇任你躁在线观看| 亚洲真人无码永久在线观看| 亚洲大尺度无码无码专线一区| 国产成人亚洲精品电影| 亚洲精品无码你懂的网站| 亚洲一区日韩高清中文字幕亚洲| 在线亚洲97se亚洲综合在线| 国产日韩亚洲大尺度高清| 久久精品亚洲综合专区| 亚洲欧洲免费视频| 亚洲午夜精品国产电影在线观看| 亚洲www77777| 日本亚洲中午字幕乱码| 亚洲人成色7777在线观看不卡| 亚洲级αV无码毛片久久精品| 亚洲国产成人私人影院| 亚洲欧洲日本精品| 亚洲最大的成人网站| 亚洲国产高清在线一区二区三区| 国外亚洲成AV人片在线观看| 亚洲AV日韩AV永久无码免下载| 亚洲精品亚洲人成在线麻豆|