springboot 配置elasticsearch Published on Sep 15, 2023 in 随笔 with 0 comment ```java package com.jinw.cms.config; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.http.Header; import org.apache.http.HttpHeaders; import org.apache.http.HttpHost; import org.apache.http.HttpResponseInterceptor; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.message.BasicHeader; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContexts; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.util.StringUtils; import javax.net.ssl.SSLContext; import java.io.IOException; import java.io.InputStream; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.Certificate; import java.security.cert.CertificateException; import java.security.cert.CertificateFactory; import java.util.Arrays; @ConfigurationProperties(prefix = "spring.elasticsearch") //配置的前缀 @Configuration @Slf4j public class ElasticsearchConfig { @Setter private String uris; @Setter private String username; @Setter private String password; /** * 解析配置的字符串,转为HttpHost对象数组 * * @return */ private HttpHost[] toHttpHost() { if (!StringUtils.hasLength(uris)) { throw new RuntimeException("invalid elasticsearch configuration"); } String[] hostArray = uris.split(","); HttpHost[] httpHosts = new HttpHost[hostArray.length]; HttpHost httpHost; for (int i = 0; i < hostArray.length; i++) { String[] strings = hostArray[i].split(":"); httpHost = new HttpHost(strings[0], Integer.parseInt(strings[1]), "http"); httpHosts[i] = httpHost; } return httpHosts; } @Bean("clientByPasswd") public ElasticsearchClient clientByPasswd() throws Exception { ElasticsearchTransport transport = getElasticsearchTransport(username, password, toHttpHost()); return new ElasticsearchClient(transport); } private static SSLContext buildSSLContext() { ClassPathResource resource = new ClassPathResource("es01.crt"); SSLContext sslContext = null; try { CertificateFactory factory = CertificateFactory.getInstance("X.509"); Certificate trustedCa; try (InputStream is = resource.getInputStream()) { trustedCa = factory.generateCertificate(is); } KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null, null); trustStore.setCertificateEntry("ca", trustedCa); SSLContextBuilder sslContextBuilder = SSLContexts.custom() .loadTrustMaterial(trustStore, null); sslContext = sslContextBuilder.build(); } catch (CertificateException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException e) { log.error("ES连接认证失败", e); } return sslContext; } private static ElasticsearchTransport getElasticsearchTransport(String username, String passwd, HttpHost... hosts) { // 账号密码的配置 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, passwd)); // 自签证书的设置,并且还包含了账号密码 HttpClientConfigCallback callback = httpAsyncClientBuilder -> httpAsyncClientBuilder // .setSSLContext(buildSSLContext()) // 证书式认证方式 .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) .setDefaultCredentialsProvider(credentialsProvider) // Todo 这里是关键 ######################解决 X-Elastic-Product 问题开始 .setDefaultHeaders( Arrays.asList( new BasicHeader( HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()))) .addInterceptorLast( (HttpResponseInterceptor) (response, context) -> response.addHeader("X-Elastic-Product", "Elasticsearch")); // X-Elastic-Product end // 用builder创建RestClient对象 RestClient client = RestClient .builder(hosts) .setHttpClientConfigCallback(callback) .build(); return new RestClientTransport(client, new JacksonJsonpMapper()); } private static ElasticsearchTransport getElasticsearchTransport(String apiKey, HttpHost... hosts) { // 将ApiKey放入header中 Header[] headers = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKey)}; // es自签证书的设置 HttpClientConfigCallback callback = httpAsyncClientBuilder -> httpAsyncClientBuilder .setSSLContext(buildSSLContext()) .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE); // 用builder创建RestClient对象 RestClient client = RestClient .builder(hosts) .setHttpClientConfigCallback(callback) .setDefaultHeaders(headers) .build(); return new RestClientTransport(client, new JacksonJsonpMapper()); } } ``` ```yaml spring: elasticsearch: uris: 127.0.0.1:9200 username: elastic password: 123456 connection-timeout: 10000 socket-timeout: 30s ``` ```java package com.jinw.cms.service.impl; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.ElasticsearchException; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch.core.GetResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import co.elastic.clients.elasticsearch.indices.CreateIndexResponse; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.jinw.cms.constants.*; import com.jinw.cms.content.IContent; import com.jinw.cms.content.type.IContentType; import com.jinw.cms.entity.CmsArticle; import com.jinw.cms.entity.CmsCategory; import com.jinw.cms.entity.CmsSite; import com.jinw.cms.entity.ESContent; import com.jinw.cms.mapper.CmsCategoryMapper; import com.jinw.cms.service.ICmsArticleService; import com.jinw.cms.service.ICmsSiteService; import com.jinw.utils.cms.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Service; import java.io.IOException; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Slf4j @RequiredArgsConstructor @Service public class ContentIndexService implements CommandLineRunner { private final ICmsSiteService siteService; private final CmsCategoryMapper cmsCategoryMapper; private final ICmsArticleService contentService; private final ElasticsearchClient esClient; private void createIndex() throws IOException { // 创建索引 Map properties = new HashMap<>(); properties.put("catalogAncestors", Property.of(fn -> fn.keyword(b -> b .ignoreAbove(500) // 指定字符串字段的最大长度。超过该长度的字符串将被截断或忽略。 ))); properties.put("contentType", Property.of(fn -> fn.keyword(b -> b .ignoreAbove(20) ))); properties.put("logo", Property.of(fn -> fn.keyword(b -> b .ignoreAbove(256) ))); properties.put("title", Property.of(fn -> fn.text(b -> b .store(true) // 是否存储在索引中 .analyzer(SearchConsts.IKAnalyzeType_Smart) ))); properties.put("fullText", Property.of(fn -> fn.text(b -> b .analyzer(SearchConsts.IKAnalyzeType_Smart) ))); CreateIndexResponse response = esClient.indices().create(fn -> fn .index(ESContent.INDEX_NAME) .mappings(mb -> mb.properties(properties))); Assert.isTrue(response.acknowledged(), () -> new RuntimeException("Create Index[cms_article] failed.")); } public void recreateIndex(CmsSite site) throws IOException { boolean exists = esClient.indices().exists(fn -> fn.index(ESContent.INDEX_NAME)).value(); if (!exists) { this.createIndex(); } else { // 删除站点索引文档数据 long total = this.contentService.getContentMapper().selectCount(Wrappers.lambdaQuery(CmsArticle.class).eq(CmsArticle::getSiteId, site.getId())); long pageSize = 1000; for (int i = 0; i * pageSize < total; i++) { List contentIds = this.contentService.getContentMapper().selectPage( new Page<>(i, pageSize, false), Wrappers.lambdaQuery(CmsArticle.class).eq(CmsArticle::getSiteId, site.getId()) ).getRecords().stream().map(CmsArticle::getId).collect(Collectors.toList()); deleteContentDoc(contentIds); } } } /** * 创建/更新内容索引Document */ public void createContentDoc(IContent> content) { // 判断栏目/站点配置是否生成索引 String enableIndex = EnableIndexProperty.getValue(content.getCatalog().getConfigProps(), content.getSite().getConfigProps()); if (YesOrNo.isNo(enableIndex)) { return; } try { esClient.update(fn -> fn .index(ESContent.INDEX_NAME) .id(content.getContentEntity().getId().toString()) .doc(newESContentDoc(content)) .docAsUpsert(true), ESContent.class); } catch (ElasticsearchException | IOException e) { // AsyncTaskManager.addErrMessage(e.getMessage()); e.printStackTrace(); } } private void batchContentDoc(CmsSite site, CmsCategory catalog, List contents) { if (contents.isEmpty()) { return; } List bulkOperationList = new ArrayList<>(contents.size()); for (CmsArticle xContent : contents) { // 判断栏目/站点配置是否生成索引 String enableIndex = EnableIndexProperty.getValue(catalog.getConfigProps(), site.getConfigProps()); if (YesOrNo.isYes(enableIndex)) { IContentType contentType = ContentCoreUtils.getContentType(xContent.getContentType()); IContent> icontent = contentType.loadContent(xContent); BulkOperation bulkOperation = BulkOperation.of(b -> b.update(up -> up.index(ESContent.INDEX_NAME) .id(xContent.getId().toString()) .action(action -> action.docAsUpsert(true).doc(newESContentDoc(icontent)))) // b.create(co -> co.index(ESContent.INDEX_NAME) // .id(xContent.getContentId().toString()).document(newESContent(icontent))) ); bulkOperationList.add(bulkOperation); } } if (bulkOperationList.isEmpty()) { return; } // 批量新增索引 try { esClient.bulk(bulk -> bulk.operations(bulkOperationList)); } catch (ElasticsearchException | IOException e) { // AsyncTaskManager.addErrMessage(e.getMessage()); e.printStackTrace(); } } /** * 删除内容索引 */ public void deleteContentDoc(List contentIds) throws ElasticsearchException, IOException { List bulkOperationList = contentIds.stream().map(contentId -> BulkOperation .of(b -> b.delete(dq -> dq.index(ESContent.INDEX_NAME).id(contentId.toString())))).collect(Collectors.toList()); this.esClient.bulk(bulk -> bulk.operations(bulkOperationList)); } public void rebuildCatalog(CmsCategory catalog, boolean includeChild) { CmsSite site = this.siteService.getSite(catalog.getSiteId()); String enableIndex = EnableIndexProperty.getValue(catalog.getConfigProps(), site.getConfigProps()); if (YesOrNo.isYes(enableIndex)) { LambdaQueryWrapper q = new LambdaQueryWrapper() .ne(CmsArticle::getCopyType, ContentCopyType.Mapping) .eq(CmsArticle::getStatus, ContentStatus.PUBLISHED) .eq(!includeChild, CmsArticle::getCategoryId, catalog.getId()) .likeRight(includeChild, CmsArticle::getCategoryAncestors, catalog.getAncestors()); long total = this.contentService.getContentMapper().selectCount(q); long pageSize = 200; for (int i = 0; i * pageSize < total; i++) { Page page = contentService.getContentMapper().selectPage(new Page<>(i, pageSize, false), q); batchContentDoc(site, catalog, page.getRecords()); } } } /** * 重建指定站点所有内容索引 */ public void rebuildAll(CmsSite site) throws IOException { // 先重建索引 recreateIndex(site); List catalogs = cmsCategoryMapper.selectList(Wrappers.lambdaQuery()); for (CmsCategory category : catalogs) { LambdaQueryWrapper q = new LambdaQueryWrapper() .eq(CmsArticle::getSiteId, site.getId()) .ne(CmsArticle::getCopyType, ContentCopyType.Mapping) .eq(CmsArticle::getStatus, ContentStatus.PUBLISHED) .eq(CmsArticle::getCategoryId, category.getId()); long total = contentService.getContentMapper().selectCount(q); int pageSize = 200; int count = 1; for (int i = 0; (long) i * pageSize < total; i++) { log.debug((int) (count++ * 100 / total) + "正在重建栏目【" + category.getName() + "】内容索引"); Page page = contentService.getContentMapper().selectPage(new Page<>(i, pageSize, false), q); batchContentDoc(site, category, page.getRecords()); // AsyncTaskManager.checkInterrupt(); // 允许中断 } } log.debug("100% 重建全站索引完成"); } /** * 获取指定内容索引详情 * * @param contentId 内容ID * @return 索引Document详情 */ public ESContent getContentDocDetail(String contentId) throws ElasticsearchException, IOException { GetResponse res = this.esClient.get(qb -> qb.index(ESContent.INDEX_NAME).id(contentId.toString()), ESContent.class); return res.source(); } private Map newESContentDoc(IContent> content) { Map data = new HashMap<>(); data.put("contentId", content.getContentEntity().getId()); data.put("contentType", content.getContentEntity().getContentType()); data.put("siteId", content.getSiteId()); data.put("catalogId", content.getCatalogId()); data.put("catalogAncestors", content.getContentEntity().getCategoryAncestors()); data.put("author", content.getContentEntity().getAuthor()); data.put("editor", content.getContentEntity().getEditor()); data.put("keywords", StringUtils.join(content.getContentEntity().getKeywords())); data.put("tags", StringUtils.join(content.getContentEntity().getTags())); data.put("createTime", content.getContentEntity().getCreateTime()); data.put("logo", content.getContentEntity().getLogo()); data.put("status", content.getContentEntity().getStatus()); data.put("publishDate", content.getContentEntity().getPublishDate().toEpochSecond(ZoneOffset.UTC)); data.put("link", InternalUrlUtils.getInternalUrl(InternalDataType_Content.ID, content.getContentEntity().getId())); data.put("title", content.getContentEntity().getTitle()); data.put("summary", content.getContentEntity().getSummary()); data.put("fullText", content.getFullText()); // 扩展模型数据 // this.extendModelService.getModelData(content.getContentEntity()).forEach(fd -> { // data.put(fd.getFieldName(), fd.getValue()); // }); return data; } public boolean isElasticSearchAvailable() { try { return esClient.ping().value(); } catch (Exception e) { e.printStackTrace(); return false; } } @Override public void run(String... args) throws Exception { if (isElasticSearchAvailable()) { boolean exists = esClient.indices().exists(fn -> fn.index(ESContent.INDEX_NAME)).value(); if (!exists) { this.createIndex(); // 创建内容索引库 } } else { log.warn("ES service not available!"); } } } ``` ```xml 8.6.2 ``` ```xml co.elastic.clients elasticsearch-java ${elasticsearch-java} jakarta.json-api jakarta.json elasticsearch-rest-client org.elasticsearch.client org.elasticsearch.client elasticsearch-rest-client ${elasticsearch-java} ``` 本文由 admin 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。