springboot 2.6.2集成elasticsearch 7.16

      網(wǎng)友投稿 1137 2022-05-29

      前面說到elasticsearch 7.16集群安裝,本文介紹通過springboot 2.6.2集成es的java api對其進行操作。

      首先看一下pom文件

      pom.xml

      4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.2 com.example springboot-elasticsearch 0.0.1-SNAPSHOT springboot-elasticsearch Demo project for Spring Boot 1.8 5.7.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web RELEASE co.elastic.clients elasticsearch-java 7.16.0 com.fasterxml.jackson.core jackson-databind 2.12.3 com.alibaba fastjson 1.2.76 cn.hutool hutool-all ${hutool.version} org.springframework.boot spring-boot-starter-test test junit junit 4.13.2 test org.springframework.boot spring-boot-maven-plugin

      Elasticsearch升級到7.16之后,已經(jīng)廢棄了High-level API了,統(tǒng)一使用Low-Level API,所以某些接口發(fā)生了變化,下面列出Elasticsearch Low-Level API的一些基本操作:

      從application.properties文件讀取Elasticsearch配置信息

      springboot 2.6.2集成elasticsearch 7.16

      server.port=8899 spring.application.name=qa-search elasticsearch.hosts=10.0.2.9:9200,10.0.2.78:9200,10.0.2.211:9200 elasticsearch.username=elastic elasticsearch.password=elastic elasticsearch.connection.timeout=10000 elasticsearch.socket.timeout=10000 elasticsearch.connection.request.timeout=10000

      配置類

      ElasticSearchConfig.java

      package com.zh.ch.springboot.elasticsearch.config; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.zh.ch.springboot.elasticsearch.service.ElasticsearchServiceImpl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.Node; import org.elasticsearch.client.RestClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @Configuration public class ElasticSearchConfig { private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class); @Value("${elasticsearch.hosts}") public String elasticsearchHost; @Value("${elasticsearch.username}") public String elasticsearchUsername; @Value("${elasticsearch.password}") public String elasticsearchPassword; @Value("${elasticsearch.connection.timeout}") public int elasticsearchConnectionTimeout; @Value("${elasticsearch.socket.timeout}") public int elasticsearchSocketTimeout; @Value("${elasticsearch.connection.request.timeout}") public int getElasticsearchConnectionRequestTimeout; @Bean public ElasticsearchClient elasticsearchClient() { RestClient restClient = RestClient.builder(getESHttpHosts()).setRequestConfigCallback(requestConfigBuilder -> { //設(shè)置連接超時時間 requestConfigBuilder.setConnectTimeout(elasticsearchConnectionTimeout); requestConfigBuilder.setSocketTimeout(elasticsearchSocketTimeout); requestConfigBuilder.setConnectionRequestTimeout(getElasticsearchConnectionRequestTimeout); return requestConfigBuilder; }).setFailureListener(new RestClient.FailureListener() { //某節(jié)點失敗,這里可以做一些告警 @Override public void onFailure(Node node) { logger.error(node); } }).setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.disableAuthCaching(); //設(shè)置賬密 return getHttpAsyncClientBuilder(httpClientBuilder); }).build(); ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); return new ElasticsearchClient(transport); } /** * ElasticSearch 連接地址 * 多個逗號分隔 * 示例:127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203 */ private HttpHost[] getESHttpHosts() { String[] hosts = elasticsearchHost.split(","); HttpHost[] httpHosts = new HttpHost[hosts.length]; for (int i = 0; i < httpHosts.length; i++) { String host = hosts[i]; host = host.replaceAll("http://", "").replaceAll("https://", ""); Assert.isTrue(host.contains(":"), String.format("your host %s format error , Please refer to [ 127.0.0.1:9200 ] ", host)); httpHosts[i] = new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http"); } return httpHosts; } private HttpAsyncClientBuilder getHttpAsyncClientBuilder(HttpAsyncClientBuilder httpClientBuilder) { if (StringUtils.isEmpty(elasticsearchUsername) || StringUtils.isEmpty(elasticsearchPassword)) { return httpClientBuilder; } //賬密設(shè)置 CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); //es賬號密碼(一般使用,用戶elastic) credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticsearchUsername, elasticsearchPassword)); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpClientBuilder; } }

      接口類

      ElasticsearchService.java

      package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch.core.IndexResponse; import co.elastic.clients.elasticsearch.core.search.HitsMetadata; import java.util.List; import java.util.Map; public interface ElasticsearchService { /** * 判斷索引是否存在 * * @param index 索引 * @return boolean */ public boolean existsIndex(String index); /** * 創(chuàng)建索引 * * @param index 索引 * @param aliasename aliasename * @return boolean */ public boolean createIndex(String index, String aliasename, int numOfShards, Map properties); /** * 刪除索引 * * @param indexList indexList * @return boolean */ public boolean deleteIndex(List indexList); /** * 判斷文檔是否存在 * @param index index * @param id id * @return boolean */ public boolean existsDocument(String index, String id, Class clazz); /** * 保存文檔 * 如果文檔存在則更新文檔 * @param index index * @param id id * @param qa qa * @return IndexResponse */ public IndexResponse saveOrUpdateDocument(String index, String id, T qa); /** * 不指定IO保存文檔 * @param index 索引 * @param qa 數(shù)據(jù) * @return IndexResponse */ public IndexResponse saveOrUpdateDocument(String index, T qa); /** * 根據(jù)id獲取文檔 * @param index index * @param id id * @param clazz clazz * @return T */ public T getById(String index, String id, Class clazz); /** * 根據(jù)id列表獲取文檔 * @param index index * @param idList id * @param clazz clazz * @return List */ public List getByIdList(String index, List idList, Class clazz); /** * 分頁查詢 * @param index index * @param pageNo pageNo * @param pageSize pageSize * @param clazz clazz * @return HitsMetadata */ public HitsMetadata searchByPages(String index, Integer pageNo, Integer pageSize, Class clazz); /** * 根據(jù)id刪除文檔 * @param id id */ public boolean deleteById(String index, String id); }

      實現(xiàn)類

      ElasticsearchServiceImpl.java

      package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; import co.elastic.clients.elasticsearch._types.query_dsl.FieldAndFormat; import co.elastic.clients.elasticsearch._types.query_dsl.Query; import co.elastic.clients.elasticsearch._types.query_dsl.QueryStringQuery; import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.elasticsearch.core.search.Highlight; import co.elastic.clients.elasticsearch.core.search.HighlightField; import co.elastic.clients.elasticsearch.core.search.HitsMetadata; import co.elastic.clients.elasticsearch.indices.*; import co.elastic.clients.elasticsearch.indices.ExistsRequest; import co.elastic.clients.transport.endpoints.BooleanResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; @Component public class ElasticsearchServiceImpl implements ElasticsearchService { private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class); private ElasticsearchClient client; @Autowired public void setClient(ElasticsearchClient client) { this.client = client; } public boolean existsIndex(String index) { try { ExistsRequest existsRequest = new ExistsRequest.Builder().index(index).build(); BooleanResponse response = client.indices().exists(existsRequest); return response.value(); } catch (IOException e) { logger.error("There is an error while getting index", e); } return false; } @Override public boolean createIndex(String indexName, String aliasesName, int numOfShards, Map properties) { try { TypeMapping typeMapping = new TypeMapping.Builder().properties(properties).build(); IndexSettings indexSettings = new IndexSettings.Builder().numberOfShards(String.valueOf(numOfShards)).build(); CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder() .index(indexName) .aliases(aliasesName, new Alias.Builder().isWriteIndex(true).build()) .mappings(typeMapping) .settings(indexSettings) .build(); CreateIndexResponse response = client.indices().create(createIndexRequest); return response.acknowledged(); } catch (IOException e) { logger.error("There is an error while creating index", e); } return false; } @Override public boolean deleteIndex(List indexList) { try { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(indexList).build(); DeleteIndexResponse response = client.indices().delete(deleteIndexRequest); return response.acknowledged(); } catch (IOException e) { logger.error("There is an error while deleting index", e); } return false; } @Override public boolean existsDocument(String index, String id, Class clazz) { try { GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build(); GetResponse getResponse = client.get(getRequest, clazz); return getResponse.found(); } catch (IOException e) { logger.error("There is an error while judging if the document exists", e); } return false; } @Override public IndexResponse saveOrUpdateDocument(String index, String id, T t) { try { IndexRequest indexRequest = new IndexRequest.Builder().index(index).id(id).document(t).build(); return client.index(indexRequest); } catch (IOException e) { logger.error("There is an error while saving the document", e); } return null; } @Override public IndexResponse saveOrUpdateDocument(String index, T t) { try { IndexRequest indexRequest = new IndexRequest.Builder().index(index).document(t).build(); return client.index(indexRequest); } catch (IOException e) { logger.error("There is an error while saving the document", e); } return null; } @Override public T getById(String index, String id, Class clazz) { try { GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build(); GetResponse getResponse = client.get(getRequest, clazz); return getResponse.source(); } catch (IOException e) { logger.error("There is an error while getting the document", e); } return null; } @Override public List getByIdList(String index, List idList, Class clazz) { try { List tList = new ArrayList<>(idList.size()); for (String id : idList) { tList.add(client.get(new GetRequest.Builder().index(index).id(id).build(), clazz).source()); } return tList; } catch (IOException e) { logger.error("There is an error while getting the document list", e); } return null; } @Override public HitsMetadata searchByPages(String index, Integer pageNo, Integer pageSize, Class clazz) { try { SearchRequest searchRequest = new SearchRequest.Builder().index(Collections.singletonList(index)).from(pageNo).size(pageSize).build(); SearchResponse searchResponse = client.search(searchRequest, clazz); return searchResponse.hits(); } catch (IOException e) { logger.error("There is an error while searching by pages", e); } return null; } public boolean deleteById(String index, String id) { try { DeleteRequest deleteRequest = new DeleteRequest.Builder().index(index).id(id).build(); DeleteResponse deleteResponse = client.delete(deleteRequest); return "deleted".equals(deleteResponse.result().jsonValue()); } catch (IOException e) { logger.error("There is an error while deleting id document", e); } return false; } }

      測試類

      package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch._types.mapping.DateProperty; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch.core.IndexResponse; import co.elastic.clients.elasticsearch.core.search.Hit; import co.elastic.clients.elasticsearch.core.search.HitsMetadata; import com.alibaba.fastjson.JSON; import com.zh.ch.springboot.elasticsearch.SpringbootElasticsearchApplication; import com.zh.ch.springboot.elasticsearch.bean.QA; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.*; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringbootElasticsearchApplication.class) class ElasticsearchServiceImplTest { private ElasticsearchServiceImpl elasticsearchService; @Autowired public void setElasticsearchService(ElasticsearchServiceImpl elasticsearchService) { this.elasticsearchService = elasticsearchService; } @Test void existsIndex() { String index = "es_index_test_1"; boolean existsIndexFlag = elasticsearchService.existsIndex(index); System.out.printf("%s 是否存在 %b%n", index, existsIndexFlag); } @Test void createIndex() { String index = "es_index_test_1"; String indexAliasesName = "es_index_test_1_aliases"; Map map = new HashMap<>(); map.put("id", new Property(new DateProperty.Builder().index(true).store(true).build())); boolean createIndexFlag = elasticsearchService.createIndex(index, indexAliasesName, 12, map); System.out.printf("創(chuàng)建索引, index:%s , createIndexFlag:%b%n", index, createIndexFlag); } @Test void deleteIndex() { List indexList = new ArrayList<>(); indexList.add("es_index_test_1"); boolean deleteIndexFlag = elasticsearchService.deleteIndex(indexList); System.out.printf("刪除 %s 索引是否成功 %b", indexList, deleteIndexFlag); } @Test void existsDocument() { String index = "bigdata"; String id = "1"; boolean existsDocumentFlag = elasticsearchService.existsDocument(index, id, QA.class); System.out.printf("文檔 index為 %s, id為 %s 是否存在于es中: %b",index, id, existsDocumentFlag); } @Test void saveOrUpdateDocument() { QA qa = new QA(); qa.setType_name("flink"); qa.setTitle("# Checkpoint 做恢復(fù)的過程中出現(xiàn)Savepoint failed with error \"Checkpoint expired before completing\"的問題"); qa.setContent("該問題字面意思看是由于flink在做cp落地hdfs的時候,出現(xiàn)超時失敗的問題\n" + "\n" + "\t/** The default timeout of a checkpoint attempt: 10 minutes. */\n" + "\tpublic static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;\n" + "可以看到是超時失敗的問題(默認超時10min失敗)。\n" + "\n" + "## 1.原因分析與排查:\n" + "第一種情況:自身設(shè)置了超時時間(自身做持久化的內(nèi)存也不大的情況)\n" + "\n" + "http://例如:僅僅間隔6sec就做持久化\n" + "env.getCheckpointConfig.setCheckpointTimeout( 6 * 1000) //6sec內(nèi)完成checkpoint\n" + "![](https://search.lrting.top/images/20190313190334179.png)" + "如上圖所示:查看Flink-web-ui的DashBoard中看到checkpoint欄目下的history中各個失敗的checkpoint快照,然后查看失敗時候,各個算子中使用時間,總有一些大部分完成的算子,但是另外一部分算子做checkpoint時候出現(xiàn)失敗的情況。此時要做的是查看這部分算子的計算處理速度慢的原因。\n" + "\n" + "參考這個:[](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepoint-failed-with-error-quot-Checkpoint-expired-before-completing-quot-td24177.html)\n" + "![](https://search.lrting.top/images/2019031211302593.png" + "## 2.因此,解決辦法在于:\n" + "1.檢查是否數(shù)據(jù)傾斜;(比如:數(shù)據(jù)傾斜導(dǎo)致的個別算子計算能力差異巨大)\n" + "\n" + "2.開啟并發(fā)增長個別處理慢的算子的處理能力;\n" + "\n" + "3.檢查代碼中是否存在計算速度特別慢的操作(如讀寫磁盤、數(shù)據(jù)庫、網(wǎng)絡(luò)傳輸、創(chuàng)建大對象等耗時操作)\n" + "\n" + "部分檢查點成功問題(剛開始成功,過了幾個檢查點之后持久化失敗的問題,參考https://blog.csdn.net/fct2001140269/article/details/88715808)\n"); SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); qa.setDt(f.format(new Date())); qa.setUser_id(1); IndexResponse indexResponse = elasticsearchService.saveOrUpdateDocument("bigdata", qa); System.out.printf("插入書籍是否成功 %s", indexResponse.result()); } @Test void getById() { String index = "bigdata"; QA qaList = elasticsearchService.getById(index, "1", QA.class); System.out.println(JSON.toJSONString(qaList)); } @Test void getByIdList() { String index = "bigdata"; List idList = new ArrayList<>(); idList.add("1"); idList.add("2"); List qaList = elasticsearchService.getByIdList(index, idList, QA.class); for (QA qa : qaList) { System.out.println(JSON.toJSONString(qa)); } } @Test void searchByPages() { String index = "bigdata"; Integer pageNo = 0; Integer pageSize = 10; HitsMetadata qaList = elasticsearchService.searchByPages(index, pageNo, pageSize, QA.class); System.out.println(qaList.hits().size()); } @Test void searchByQuery() { String queryString = "大數(shù)據(jù)"; HitsMetadata qaList = elasticsearchService.searchByQuery(queryString, QA.class); for (Hit hit : qaList.hits()) { System.out.println(hit.highlight()); } } @Test void deleteById() { String index = "bigdata"; String id = "ee00B34BwyhfTnq-1xYe"; boolean deleteByIdFlag = elasticsearchService.deleteById(index, id); System.out.println(deleteByIdFlag); } }

      完整代碼示例(https://git.lrting.top/xiaozhch5/springboot-elasticsearch.git):

      API Elasticsearch Spring Boot

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:源碼分析 RocketMQ DLedger(多副本) 之日志追加流程
      下一篇:圍觀小白做實驗-HCIE云服務(wù)實驗第5章-云數(shù)據(jù)庫架構(gòu)設(shè)計實驗
      相關(guān)文章
      亚洲日本中文字幕一区二区三区| 亚洲色少妇熟女11p| 亚洲性天天干天天摸| 亚洲乱码一二三四区乱码| 成人伊人亚洲人综合网站222| 91嫩草私人成人亚洲影院| 亚洲第一视频在线观看免费| 亚洲AV无码精品蜜桃| 亚洲av永久无码精品秋霞电影影院| 亚洲综合av一区二区三区| 亚洲一区二区三区首页| 亚洲av女电影网| 中文字幕人成人乱码亚洲电影| 亚洲日韩亚洲另类激情文学| 在线观看亚洲AV每日更新无码| 亚洲中文字幕AV每天更新| 亚洲依依成人亚洲社区| 亚洲成熟丰满熟妇高潮XXXXX| 亚洲激情黄色小说| 亚洲色精品88色婷婷七月丁香| 国产精品亚洲av色欲三区| 亚洲成年人电影在线观看| 亚洲成人在线免费观看| 亚洲mv国产精品mv日本mv| 亚洲中文字幕无码亚洲成A人片| 亚洲美国产亚洲AV| 一区国严二区亚洲三区| 亚洲综合精品网站| 好看的电影网站亚洲一区| 亚洲综合色视频在线观看| 亚洲色WWW成人永久网址| 久久久无码精品亚洲日韩蜜桃| 亚洲理论精品午夜电影| 亚洲日本乱码卡2卡3卡新区| 亚洲男人天堂影院| 亚洲视频一区在线| 国产精品亚洲自在线播放页码| 亚洲精品无播放器在线播放| 亚洲国产一区视频| 亚洲国产精品嫩草影院久久| 伊人久久亚洲综合|