您当前的位置: 首页 >  Java

Springboot整合ES8(Java API Client)

发布时间:2022-09-04 11:08:41 ,浏览量:7

在 Elasticsearch7.15版本之后,Elasticsearch官方将它的高级客户端 RestHighLevelClient标记为弃用状态。同时推出了全新的 Java API客户端Elasticsearch Java API Client,该客户端也将在 Elasticsearch8.0及以后版本中成为官方推荐使用的客户端。

Elasticsearch Java API Client 支持除 Vector tile search API 和 Find structure API 之外的所有 Elasticsearch API。且支持所有API数据类型,并且不再有原始JsonValue属性。它是针对Elasticsearch8.0及之后版本的客户端,所以我们需要学习新的Elasticsearch Java API Client的使用方法。

前面使用的是 ES 7.x的版本,这次使用 ES 8.1.3版本。使用Docker搭建环境还是蛮简单的。

在这里插入图片描述

一、Springboot整合ES8

引入依赖:

 <dependency> <groupId>co.elastic.clients /**
	 * 多个IP逗号隔开
	 */ @Setter private String hosts; /**
	 * 同步方式
	 * 
	 * @return
	 */ @Bean public ElasticsearchClient elasticsearchClient() { HttpHost[] httpHosts = toHttpHost(); // Create the RestClient  RestClient restClient = RestClient.builder(httpHosts).build(); // Create the transport with a Jackson mapper RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); // create the API client return new ElasticsearchClient(transport); } /**
	 * 异步方式
	 * 
	 * @return
	 */ @Bean public ElasticsearchAsyncClient elasticsearchAsyncClient() { HttpHost[] httpHosts = toHttpHost(); RestClient restClient = RestClient.builder(httpHosts).build(); RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); return new ElasticsearchAsyncClient(transport); } /**
	 * 解析配置的字符串hosts,转为HttpHost对象数组
	 *
	 * @return
	 */ private HttpHost[] toHttpHost() { if (!StringUtils.hasLength(hosts)) { throw new RuntimeException("invalid elasticsearch configuration. elasticsearch.hosts不能为空!"); } // 多个IP逗号隔开 String[] hostArray = hosts.split(","); HttpHost[] httpHosts = new HttpHost[hostArray.length]; HttpHost httpHost; for (int i = 0; i < hostArray.length; i++) { String[] strings = hostArray[i].split(":"); httpHost = new HttpHost(strings[0], Integer.parseInt(strings[1]), "http"); httpHosts[i] = httpHost; } return httpHosts; } } 
2、配置文件

在application.yml配置文件中添加 ES的服务地址等信息。

## ES配置:@ConfigurationProperties(prefix = "elasticsearch") //配置的前缀 elasticsearch: # 多个IP逗号隔开 hosts: 192.168.xxx.xxx:9200 
3、单元测试

启动类没什么变化,和以前一样。我们直接写一个测试类来操作ES。

@RunWith(SpringRunner.class) @SpringBootTest public class ESClientConfigTest { @Autowired private ElasticsearchClient client; /**
	 * 创建索引
	 * 
	 * @throws IOException
	 */ @Test public void createIndex() throws IOException { CreateIndexResponse products = client.indices().create(c -> c.index("db_idx5")); System.out.println(products.acknowledged()); } /**
	 * 判断索引是否存在
	 * 
	 * @throws IOException
	 */ @Test public void createExi() throws IOException { BooleanResponse exists = client.indices().exists(e -> e.index("db_idx5")); System.out.println(exists.value()); } } 

测试ok,到此Springboot整合ES8就ok了。

二、索引操作 1、业务类

1)接口

public interface IndexService { /**
	 * 新建索引,指定索引名称
	 * 
	 * @param name
	 * @throws IOException
	 */ void createIndex(String name) throws IOException; /**
	 * 创建索引,指定索引名称和setting和mapping
	 *
	 * @param name
	 *            - 索引名称
	 * @param settingFn
	 *            - 索引参数
	 * @param mappingFn
	 *            - 索引结构
	 * @throws IOException
	 */ void createIndex(String name, Function<IndexSettings.Builder, ObjectBuilder<IndexSettings>> settingFn, Function<TypeMapping.Builder, ObjectBuilder<TypeMapping>> mappingFn) throws IOException; /**
	 * 删除索引
	 *
	 * @param name
	 * @throws IOException
	 */ void deleteIndex(String name) throws IOException; /**
	 * 修改索引字段信息 
 * 字段可以新增,已有的字段只能修改字段的 search_analyzer 属性。
	 * 
	 * @param name
	 *            - 索引名称
	 * @param propertyMap
	 *            - 索引字段,每个字段都有自己的property
	 * @throws IOException
	 */ void updateIndexProperty(String name, HashMap<String, Property> propertyMap) throws IOException; /**
	 * 查询索引列表
	 * 
	 * @return
	 * @throws IOException
	 */ GetIndexResponse getIndexList() throws IOException; /**
	 * 查询索引详情
	 *
	 * @param name
	 * @return
	 * @throws IOException
	 */ GetIndexResponse getIndexDetail(String name) throws IOException; /**
	 * 检查指定名称的索引是否存在
	 * 
	 * @param name
	 * @return - true:存在
	 * @throws IOException
	 */ boolean indexExists(String name) throws IOException; } 

2)实现类

@Service @Slf4j public class IndexServiceImpl implements IndexService { @Autowired private ElasticsearchClient elasticsearchClient; @Override public void createIndex(String name) throws IOException { //ApplicationContext applicationContext; CreateIndexResponse response = elasticsearchClient.indices().create(c -> c.index(name)); log.info("createIndex方法,acknowledged={}", response.acknowledged()); } @Override public void createIndex(String name, Function<IndexSettings.Builder, ObjectBuilder<IndexSettings>> settingFn, Function<TypeMapping.Builder, ObjectBuilder<TypeMapping>> mappingFn) throws IOException { CreateIndexResponse response = elasticsearchClient .indices() .create(c -> c .index(name) .settings(settingFn) .mappings(mappingFn) ); log.info("createIndex方法,acknowledged={}", response.acknowledged()); } @Override public void deleteIndex(String name) throws IOException { DeleteIndexResponse response = elasticsearchClient.indices().delete(c -> c.index(name)); log.info("deleteIndex方法,acknowledged={}", response.acknowledged()); } @Override public void updateIndexProperty(String name, HashMap<String, Property> propertyMap) throws IOException { PutMappingResponse response = elasticsearchClient.indices() .putMapping(typeMappingBuilder -> typeMappingBuilder .index(name) .properties(propertyMap) ); log.info("updateIndexMapping方法,acknowledged={}", response.acknowledged()); } @Override public GetIndexResponse getIndexList() throws IOException { //使用 * 或者 _all都可以 GetIndexResponse response = elasticsearchClient.indices().get(builder -> builder.index("_all")); log.info("getIndexList方法,response.result()={}", response.result().toString()); return response; } @Override public GetIndexResponse getIndexDetail(String name) throws IOException { GetIndexResponse response = elasticsearchClient.indices().get(builder -> builder.index(name)); log.info("getIndexDetail方法,response.result()={}", response.result().toString()); return response; } @Override public boolean indexExists(String name) throws IOException { return elasticsearchClient.indices().exists(b -> b.index(name)).value(); } 
2、单元测试
@Autowired private IndexService indexService; @Test public void testCreateIndex() throws Exception { String indexName = "db_api_idx1"; indexService.createIndex(indexName); //Assertions.assertTrue(indexService.indexExists(indexName)); //indexService.createIndex(indexName); //Assertions.assertFalse(indexService.indexExists(indexName)); } @Test public void testCreateIndex2() throws Exception { // 索引名 String indexName = "db_api_idx2"; // 构建setting Function<IndexSettings.Builder, ObjectBuilder<IndexSettings>> settingFn = sBuilder -> sBuilder .index(iBuilder -> iBuilder // 三个分片 .numberOfShards("3") // 一个副本 .numberOfReplicas("1") ); // 索引字段,每个字段都有自己的property Property keywordProperty = Property.of(pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(256))); Property integerProperty = Property.of(pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder)); Property textProperty = Property.of(pBuilder -> pBuilder.text(tBuilder -> tBuilder)); // 构建mapping Function<TypeMapping.Builder, ObjectBuilder<TypeMapping>> mappingFn = mBuilder -> mBuilder .properties("name", keywordProperty) .properties("age", integerProperty) .properties("description", textProperty); // 创建索引,并指定setting和mapping indexService.createIndex(indexName, settingFn, mappingFn); } @Test public void testIndexExists() throws Exception { String indexName = "db_api_idx1"; System.out.println(indexService.indexExists(indexName)); } @Test public void testUpdateIndexProperty() throws Exception { String indexName = "db_api_idx2"; // 索引字段,每个字段都有自己的property Property keywordProperty = Property.of(pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(1024))); Property integerProperty = Property.of(pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder)); Property textProperty = Property.of(pBuilder -> pBuilder.text(tBuilder -> tBuilder)); HashMap<String, Property> propertyMap = new HashMap<>(); propertyMap.put("name", keywordProperty); propertyMap.put("description", textProperty); propertyMap.put("address", textProperty); // 构建mapping indexService.updateIndexProperty(indexName, propertyMap); } @Test public void testGetIndexList() throws Exception { indexService.getIndexList(); } @Test public void testGetIndexDetail() throws Exception { String indexName = "db_api_idx2"; indexService.getIndexDetail(indexName); } @Test public void testDeleteIndex() throws Exception { String indexName = "db_api_idx1"; indexService.deleteIndex(indexName); } 
三、文档操作 1、业务类

注意:

  • 接口中的 Object类,可以定义成我们的业务实体类。
  • 如果写的通用点,通过泛型,我们可以自定义一个基类,所有业务实体类继承它。

1)接口

public interface DocumentDemoService { /**
     * 新增一个文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param document 文档对象
     * @return
     */ IndexResponse createByFluentDSL(String idxName, String idxId, Object document) throws Exception; /**
     * 新增一个文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param document 文档对象
     * @return
     */ IndexResponse createByBuilderPattern(String idxName, String idxId, Object document) throws Exception; /**
     * 用JSON字符串创建文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param jsonContent
     * @return
     */ IndexResponse createByJson(String idxName, String idxId, String jsonContent) throws Exception; /**
     * 异步新增文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param document
     * @param action
     */ void createAsync(String idxName, String idxId, Object document, BiConsumer<IndexResponse, Throwable> action); /**
     * 批量增加文档
     * @param idxName 索引名
     * @param documents 要增加的对象集合
     * @return 批量操作的结果
     * @throws Exception
     */ BulkResponse bulkCreate(String idxName, List<Object> documents) throws Exception; /**
     * 根据文档id查找文档
     * @param idxName 索引名
     * @param docId 文档id
     * @return Object类型的查找结果
     * @throws Exception
     */ Object getById(String idxName, String docId) throws IOException; /**
     * 根据文档id查找文档,返回类型是ObjectNode
     * @param idxName 索引名
     * @param docId 文档id
     * @return ObjectNode类型的查找结果
     */ ObjectNode getObjectNodeById(String idxName, String docId) throws IOException; /**
     * 根据文档id删除文档
     * @param idxName 索引名
     * @param docId 文档id
     * @return Object类型的查找结果
     * @throws Exception
     */ Boolean deleteById(String idxName, String docId) throws IOException; /**
     * 批量删除文档
     * @param idxName 索引名
     * @param docIds 要删除的文档id集合
     * @return
     * @throws Exception
     */ BulkResponse bulkDeleteByIds(String idxName, List<String> docIds) throws Exception; } 

2)实现类

@Slf4j @Service public class DocumentDemoServiceImpl implements DocumentDemoService { @Autowired private ElasticsearchClient elasticsearchClient; @Autowired private ElasticsearchAsyncClient elasticsearchAsyncClient; @Override public IndexResponse createByFluentDSL(String idxName, String idxId, Object document) throws Exception { IndexResponse response = elasticsearchClient.index(idx -> idx .index(idxName) .id(idxId) .document(document)); return response; } @Override public IndexResponse createByBuilderPattern(String idxName, String idxId, Object document) throws Exception { IndexRequest.Builder<Object> indexReqBuilder = new IndexRequest.Builder<>(); indexReqBuilder.index(idxName); indexReqBuilder.id(idxId); indexReqBuilder.document(document); return elasticsearchClient.index(indexReqBuilder.build()); } @Override public IndexResponse createByJson(String idxName, String idxId, String jsonContent) throws Exception { return elasticsearchClient.index(i -> i .index(idxName) .id(idxId) .withJson(new StringReader(jsonContent)) ); } @Override public void createAsync(String idxName, String idxId, Object document, BiConsumer<IndexResponse, Throwable> action) { elasticsearchAsyncClient.index(idx -> idx .index(idxName) .id(idxId) .document(document) ).whenComplete(action); } @Override public BulkResponse bulkCreate(String idxName, List<Object> documents) throws Exception { BulkRequest.Builder br = new BulkRequest.Builder(); // TODO 可以将 Object定义为一个文档基类。比如 ESDocument类 // 将每一个product对象都放入builder中 //documents.stream() //        .forEach(esDocument -> br //                .operations(op -> op //                        .index(idx -> idx //                                .index(idxName) //                                .id(esDocument.getId()) //                                .document(esDocument)))); return elasticsearchClient.bulk(br.build()); } @Override public Object getById(String idxName, String docId) throws IOException { GetResponse<Object> response = elasticsearchClient.get(g -> g .index(idxName) .id(docId), Object.class); return response.found() ? response.source() : null; } @Override public ObjectNode getObjectNodeById(String idxName, String docId) throws IOException { GetResponse<ObjectNode> response = elasticsearchClient.get(g -> g .index(idxName) .id(docId), ObjectNode.class); return response.found() ? response.source() : null; } @Override public Boolean deleteById(String idxName, String docId) throws IOException { DeleteResponse delete = elasticsearchClient.delete(d -> d .index(idxName) .id(docId)); return delete.forcedRefresh(); } @Override public BulkResponse bulkDeleteByIds(String idxName, List<String> docIds) throws Exception { BulkRequest.Builder br = new BulkRequest.Builder(); // 将每一个对象都放入builder中 docIds.stream().forEach(id -> br .operations(op -> op .delete(d -> d .index(idxName) .id(id)))); return elasticsearchClient.bulk(br.build()); } } 
2、单元测试
private final static String INDEX_NAME = "db_api_idx_uservo"; @Autowired private DocumentDemoService documentDemoService; @Test public void testCreateByFluentDSL() throws Exception { // 构建文档数据 UserVO userVO = new UserVO(); userVO.setId(1L); userVO.setUserName("赵云2"); userVO.setAge(11); userVO.setCreateTime(new Date()); userVO.setUpdateTime(new Date()); userVO.setEmail("ss.com"); userVO.setVersion(1); userVO.setHeight(12D); // 新增一个文档 IndexResponse response = documentDemoService.createByFluentDSL(INDEX_NAME, userVO.getId().toString(), userVO); System.out.println("response.forcedRefresh() -> " + response.forcedRefresh()); System.out.println("response.toString() -> " + response.toString()); } @Test public void testCreateByBuilderPattern() throws Exception { // 构建文档数据 UserVO userVO = new UserVO(); userVO.setId(2L); userVO.setUserName("赵云2"); userVO.setAge(12); userVO.setCreateTime(new Date()); userVO.setUpdateTime(new Date()); userVO.setEmail("ss.com"); userVO.setVersion(1); userVO.setHeight(12D); // 新增一个文档 IndexResponse response = documentDemoService.createByBuilderPattern(INDEX_NAME, userVO.getId().toString(), userVO); System.out.println("response.toString() -> " + response.toString()); } @Test public void testCreateByJSON() throws Exception { // 构建文档数据 UserVO userVO = new UserVO(); userVO.setId(3L); userVO.setUserName("赵云3"); userVO.setAge(13); userVO.setCreateTime(new Date()); userVO.setUpdateTime(new Date()); userVO.setEmail("ss.com"); userVO.setVersion(1); userVO.setHeight(12D); // 新增一个文档 IndexResponse response = documentDemoService.createByJson(INDEX_NAME, userVO.getId().toString(), JSON.toJSONString(userVO)); System.out.println("response.toString() -> " + response.toString()); } @Test public void testCreateAsync() throws Exception { // 构建文档数据 UserVO userVO = new UserVO(); userVO.setId(4L); userVO.setUserName("赵云4"); userVO.setAge(14); userVO.setCreateTime(new Date()); userVO.setUpdateTime(new Date()); userVO.setEmail("ss.com"); userVO.setVersion(1); userVO.setHeight(12D); documentDemoService.createAsync(INDEX_NAME, userVO.getId().toString(), userVO, new BiConsumer<>() { @Override public void accept(IndexResponse indexResponse, Throwable throwable) { // throwable必须为空 Assertions.assertNull(throwable); // 验证结果 System.out.println("response.toString() -> " + indexResponse.toString()); } }); } @Test public void testBulkCreate() throws Exception { int start = 5; int end = 10; // 构造文档集合 List<Object> list = new ArrayList<>(); for (int i = 5; i <= 7; i++) { UserVO userVO = new UserVO(); userVO.setId(Long.valueOf(i)); userVO.setUserName("赵云batch" + i ); userVO.setHeight(1.88D); userVO.setAge(10 + i); userVO.setCreateTime(new Date()); list.add(userVO); } // 批量新增 BulkResponse response = documentDemoService.bulkCreate(INDEX_NAME, list); List<BulkResponseItem> items = response.items(); for (BulkResponseItem item : items) { System.out.println("BulkResponseItem.toString() -> " + item.toString()); } } @Test public void testGetById() throws Exception { Long id = 1L; Object object = documentDemoService.getById(INDEX_NAME, id.toString()); System.out.println("object ->" + object); // 无法直接强转,会报错 //UserVO userVO = (UserVO) object; //System.out.println("userVO ->" + object); } @Test public void testGetObjectNode() throws Exception { Long id = 1L; ObjectNode objectNode = documentDemoService.getObjectNodeById(INDEX_NAME, id.toString()); Assertions.assertNotNull(objectNode); System.out.println("id ->" + objectNode.get("id").asLong()); System.out.println("userName ->" + objectNode.get("userName").asText()); } 

– 求知若饥,虚心若愚。

关注
打赏
1688896170
查看更多评论

暂无认证

  • 7浏览

    0关注

    115984博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.1094s