您当前的位置: 首页 >  Java

Charge8

暂无认证

  • 3浏览

    0关注

    447博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Springboot整合ES8(Java API Client)

Charge8 发布时间:2022-09-04 11:08:41 ,浏览量:3

在 Elasticsearch7.15版本之后,Elasticsearch官方将它的高级客户端 RestHighLevelClient标记为弃用状态。同时推出了全新的 Java API客户端 Elasticsearch Java API Client,该客户端也将在 Elasticsearch8.0及以后版本中成为官方推荐使用的客户端。

Elasticsearch Java API Client 支持除 Vector tile search API 和 Find structure API 之外的所有 Elasticsearch API。且支持所有API数据类型,并且不再有原始JsonValue属性。它是针对Elasticsearch8.0及之后版本的客户端,所以我们需要学习新的Elasticsearch Java API Client的使用方法。

前面使用的是 ES 7.x的版本,这次使用 ES 8.1.3版本。使用Docker搭建环境还是蛮简单的。

在这里插入图片描述

一、Springboot整合ES8

引入依赖:

	
    
      co.elastic.clients
      elasticsearch-java
      8.1.3
    

    
      com.fasterxml.jackson.core
      jackson-databind
      2.13.3
    

注意:

  • ES8 API使用大量的Builder模式,所以我们在使用API时,也尽量使用它。
1、ES配置类

Java连接 Elasticsearch8的核心步骤:

  • 先创建 RestClient
  • 再基于 RestClient创建 ElasticsearchTransport
  • 最后基于 ElasticsearchTransport创建 ElasticsearchClient

不论是直连,还是带安全检查的连接,都是这个不变的核心步骤。根据业务可能会会加一点参数配置等。

// 配置的前缀
@ConfigurationProperties(prefix = "elasticsearch") 
@Configuration
public class ESClientConfig {

	/**
	 * 多个IP逗号隔开
	 */
	@Setter
	private String hosts;

	/**
	 * 同步方式
	 * 
	 * @return
	 */
	@Bean
	public ElasticsearchClient elasticsearchClient() {
		HttpHost[] httpHosts = toHttpHost();
		// Create the RestClient 
		RestClient restClient = RestClient.builder(httpHosts).build();
		// Create the transport with a Jackson mapper
		RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
		// create the API client
		return new ElasticsearchClient(transport);
	}

	/**
	 * 异步方式
	 * 
	 * @return
	 */
	@Bean
	public ElasticsearchAsyncClient elasticsearchAsyncClient() {
		HttpHost[] httpHosts = toHttpHost();
		RestClient restClient = RestClient.builder(httpHosts).build();
		RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
		return new ElasticsearchAsyncClient(transport);
	}

	/**
	 * 解析配置的字符串hosts,转为HttpHost对象数组
	 *
	 * @return
	 */
	private HttpHost[] toHttpHost() {
		if (!StringUtils.hasLength(hosts)) {
			throw new RuntimeException("invalid elasticsearch configuration. elasticsearch.hosts不能为空!");
		}

		// 多个IP逗号隔开
		String[] hostArray = hosts.split(",");
		HttpHost[] httpHosts = new HttpHost[hostArray.length];
		HttpHost httpHost;
		for (int i = 0; i  c.index("db_idx5"));
		System.out.println(products.acknowledged());

	}

	/**
	 * 判断索引是否存在
	 * 
	 * @throws IOException
	 */
	@Test
	public void createExi() throws IOException {
		BooleanResponse exists = client.indices().exists(e -> e.index("db_idx5"));
		System.out.println(exists.value());

	}

}

测试ok,到此Springboot整合ES8就ok了。

二、索引操作 1、业务类

1)接口

public interface IndexService {

	/**
	 * 新建索引,指定索引名称
	 * 
	 * @param name
	 * @throws IOException
	 */
	void createIndex(String name) throws IOException;

	/**
	 * 创建索引,指定索引名称和setting和mapping
	 *
	 * @param name
	 *            - 索引名称
	 * @param settingFn
	 *            - 索引参数
	 * @param mappingFn
	 *            - 索引结构
	 * @throws IOException
	 */
	void createIndex(String name, Function settingFn,
			Function mappingFn) throws IOException;

	/**
	 * 删除索引
	 *
	 * @param name
	 * @throws IOException
	 */
	void deleteIndex(String name) throws IOException;

	/**
	 * 修改索引字段信息 
	 * 字段可以新增,已有的字段只能修改字段的 search_analyzer 属性。
	 * 
	 * @param name
	 *            - 索引名称
	 * @param propertyMap
	 *            - 索引字段,每个字段都有自己的property
	 * @throws IOException
	 */
	void updateIndexProperty(String name, HashMap propertyMap) throws IOException;

	/**
	 * 查询索引列表
	 * 
	 * @return
	 * @throws IOException
	 */
	GetIndexResponse getIndexList() throws IOException;

	/**
	 * 查询索引详情
	 *
	 * @param name
	 * @return
	 * @throws IOException
	 */
	GetIndexResponse getIndexDetail(String name) throws IOException;

	/**
	 * 检查指定名称的索引是否存在
	 * 
	 * @param name
	 * @return - true:存在
	 * @throws IOException
	 */
	boolean indexExists(String name) throws IOException;

}

2)实现类

@Service
@Slf4j
public class IndexServiceImpl implements IndexService {

    @Autowired
    private ElasticsearchClient elasticsearchClient;

    @Override
    public void createIndex(String name) throws IOException {
        //ApplicationContext applicationContext;
        CreateIndexResponse response = elasticsearchClient.indices().create(c -> c.index(name));
        log.info("createIndex方法,acknowledged={}", response.acknowledged());
    }

    @Override
    public void createIndex(String name,
                       Function settingFn,
                       Function mappingFn) throws IOException {
        CreateIndexResponse response = elasticsearchClient
                .indices()
                .create(c -> c
                        .index(name)
                        .settings(settingFn)
                        .mappings(mappingFn)
                );
        log.info("createIndex方法,acknowledged={}", response.acknowledged());
    }

    @Override
    public void deleteIndex(String name) throws IOException {
        DeleteIndexResponse response = elasticsearchClient.indices().delete(c -> c.index(name));
        log.info("deleteIndex方法,acknowledged={}", response.acknowledged());
    }

    @Override
    public void updateIndexProperty(String name, HashMap propertyMap) throws IOException {
        PutMappingResponse response = elasticsearchClient.indices()
                .putMapping(typeMappingBuilder ->
                                typeMappingBuilder
                                .index(name)
                                .properties(propertyMap)
        );
        log.info("updateIndexMapping方法,acknowledged={}", response.acknowledged());
    }

    @Override
    public GetIndexResponse getIndexList() throws IOException {
        //使用 * 或者 _all都可以
        GetIndexResponse response = elasticsearchClient.indices().get(builder -> builder.index("_all"));
        log.info("getIndexList方法,response.result()={}", response.result().toString());
        return response;
    }

    @Override
    public GetIndexResponse getIndexDetail(String name) throws IOException {
        GetIndexResponse response = elasticsearchClient.indices().get(builder -> builder.index(name));
        log.info("getIndexDetail方法,response.result()={}", response.result().toString());
        return response;
    }

    @Override
    public boolean indexExists(String name) throws IOException {
        return elasticsearchClient.indices().exists(b -> b.index(name)).value();
    }
2、单元测试
    @Autowired
    private IndexService indexService;

    @Test
    public void testCreateIndex() throws Exception {
        String indexName = "db_api_idx1";
        indexService.createIndex(indexName);

        //Assertions.assertTrue(indexService.indexExists(indexName));
        //indexService.createIndex(indexName);
        //Assertions.assertFalse(indexService.indexExists(indexName));
    }

    @Test
    public void testCreateIndex2() throws Exception {
        // 索引名
        String indexName = "db_api_idx2";

        // 构建setting
        Function settingFn = sBuilder -> sBuilder
                .index(iBuilder -> iBuilder
                        // 三个分片
                        .numberOfShards("3")
                        // 一个副本
                        .numberOfReplicas("1")
                );



        // 索引字段,每个字段都有自己的property
        Property keywordProperty = Property.of(pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(256)));
        Property integerProperty = Property.of(pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder));
        Property textProperty = Property.of(pBuilder -> pBuilder.text(tBuilder -> tBuilder));

        // 构建mapping
        Function mappingFn = mBuilder -> mBuilder
                .properties("name", keywordProperty)
                .properties("age", integerProperty)
                .properties("description", textProperty);

        // 创建索引,并指定setting和mapping
        indexService.createIndex(indexName, settingFn, mappingFn);
    }

    @Test
    public void testIndexExists() throws Exception {
        String indexName = "db_api_idx1";
        System.out.println(indexService.indexExists(indexName));
    }

    @Test
    public void testUpdateIndexProperty() throws Exception {
        String indexName = "db_api_idx2";

        // 索引字段,每个字段都有自己的property
        Property keywordProperty = Property.of(pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(1024)));
        Property integerProperty = Property.of(pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder));
        Property textProperty = Property.of(pBuilder -> pBuilder.text(tBuilder -> tBuilder));

        HashMap propertyMap = new HashMap();
        propertyMap.put("name", keywordProperty);
        propertyMap.put("description", textProperty);
        propertyMap.put("address", textProperty);

        // 构建mapping
        indexService.updateIndexProperty(indexName, propertyMap);
    }

    @Test
    public void testGetIndexList() throws Exception {
        indexService.getIndexList();
    }

    @Test
    public void testGetIndexDetail() throws Exception {
        String indexName = "db_api_idx2";
        indexService.getIndexDetail(indexName);
    }

    @Test
    public void testDeleteIndex() throws Exception {
        String indexName = "db_api_idx1";
        indexService.deleteIndex(indexName);
    }
三、文档操作 1、业务类

注意:

  • 接口中的 Object类,可以定义成我们的业务实体类。
  • 如果写的通用点,通过泛型,我们可以自定义一个基类,所有业务实体类继承它。

1)接口

public interface DocumentDemoService {

    /**
     * 新增一个文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param document 文档对象
     * @return
     */
    IndexResponse createByFluentDSL(String idxName, String idxId, Object document) throws Exception;

    /**
     * 新增一个文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param document 文档对象
     * @return
     */
    IndexResponse createByBuilderPattern(String idxName, String idxId, Object document) throws Exception;

    /**
     * 用JSON字符串创建文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param jsonContent
     * @return
     */
    IndexResponse createByJson(String idxName, String idxId, String jsonContent) throws Exception;


    /**
     * 异步新增文档
     * @param idxName 索引名
     * @param idxId 索引id
     * @param document
     * @param action
     */
    void createAsync(String idxName, String idxId, Object document, BiConsumer action);

    /**
     * 批量增加文档
     * @param idxName 索引名
     * @param documents 要增加的对象集合
     * @return 批量操作的结果
     * @throws Exception
     */
    BulkResponse bulkCreate(String idxName, List documents) throws Exception;


    /**
     * 根据文档id查找文档
     * @param idxName 索引名
     * @param docId 文档id
     * @return Object类型的查找结果
     * @throws Exception
     */
    Object getById(String idxName, String docId) throws IOException;

    /**
     * 根据文档id查找文档,返回类型是ObjectNode
     * @param idxName 索引名
     * @param docId 文档id
     * @return ObjectNode类型的查找结果
     */
    ObjectNode getObjectNodeById(String idxName, String docId) throws IOException;

    /**
     * 根据文档id删除文档
     * @param idxName 索引名
     * @param docId 文档id
     * @return Object类型的查找结果
     * @throws Exception
     */
    Boolean deleteById(String idxName, String docId) throws IOException;

    /**
     * 批量删除文档
     * @param idxName 索引名
     * @param docIds 要删除的文档id集合
     * @return
     * @throws Exception
     */
    BulkResponse bulkDeleteByIds(String idxName, List docIds) throws Exception;

}

2)实现类

@Slf4j
@Service
public class DocumentDemoServiceImpl implements DocumentDemoService {

    @Autowired
    private ElasticsearchClient elasticsearchClient;

    @Autowired
    private ElasticsearchAsyncClient elasticsearchAsyncClient;

    @Override
    public IndexResponse createByFluentDSL(String idxName, String idxId, Object document) throws Exception {
        IndexResponse response = elasticsearchClient.index(idx -> idx
                .index(idxName)
                .id(idxId)
                .document(document));
        return response;
    }

    @Override
    public IndexResponse createByBuilderPattern(String idxName, String idxId, Object document) throws Exception {
        IndexRequest.Builder indexReqBuilder = new IndexRequest.Builder();

        indexReqBuilder.index(idxName);
        indexReqBuilder.id(idxId);
        indexReqBuilder.document(document);
        return elasticsearchClient.index(indexReqBuilder.build());
    }

    @Override
    public IndexResponse createByJson(String idxName, String idxId, String jsonContent) throws Exception {
        return elasticsearchClient.index(i -> i
                .index(idxName)
                .id(idxId)
                .withJson(new StringReader(jsonContent))
        );
    }

    @Override
    public void createAsync(String idxName, String idxId, Object document, BiConsumer action) {
        elasticsearchAsyncClient.index(idx -> idx
                .index(idxName)
                .id(idxId)
                .document(document)
        ).whenComplete(action);
    }

    @Override
    public BulkResponse bulkCreate(String idxName, List documents) throws Exception {
        BulkRequest.Builder br = new BulkRequest.Builder();

        // TODO 可以将 Object定义为一个文档基类。比如 ESDocument类

        // 将每一个product对象都放入builder中
        //documents.stream()
        //        .forEach(esDocument -> br
        //                .operations(op -> op
        //                        .index(idx -> idx
        //                                .index(idxName)
        //                                .id(esDocument.getId())
        //                                .document(esDocument))));

        return elasticsearchClient.bulk(br.build());
    }

    @Override
    public Object getById(String idxName, String docId) throws IOException {
        GetResponse response = elasticsearchClient.get(g -> g
                        .index(idxName)
                        .id(docId),
                Object.class);
        return response.found() ? response.source() : null;
    }

    @Override
    public ObjectNode getObjectNodeById(String idxName, String docId) throws IOException {
        GetResponse response = elasticsearchClient.get(g -> g
                        .index(idxName)
                        .id(docId),
                ObjectNode.class);

        return response.found() ? response.source() : null;
    }

    @Override
    public Boolean deleteById(String idxName, String docId) throws IOException {
        DeleteResponse delete = elasticsearchClient.delete(d -> d
                .index(idxName)
                .id(docId));
        return delete.forcedRefresh();
    }

    @Override
    public BulkResponse bulkDeleteByIds(String idxName, List docIds) throws Exception {
        BulkRequest.Builder br = new BulkRequest.Builder();

        // 将每一个对象都放入builder中
        docIds.stream().forEach(id -> br
                        .operations(op -> op
                                .delete(d -> d
                                        .index(idxName)
                                        .id(id))));

        return elasticsearchClient.bulk(br.build());
    }
}
2、单元测试
    private final static String INDEX_NAME = "db_api_idx_uservo";

    @Autowired
    private DocumentDemoService documentDemoService;


    @Test
    public void testCreateByFluentDSL() throws Exception {
        // 构建文档数据
        UserVO userVO = new UserVO();
        userVO.setId(1L);
        userVO.setUserName("赵云2");
        userVO.setAge(11);
        userVO.setCreateTime(new Date());
        userVO.setUpdateTime(new Date());
        userVO.setEmail("ss.com");
        userVO.setVersion(1);
        userVO.setHeight(12D);

        // 新增一个文档
        IndexResponse response = documentDemoService.createByFluentDSL(INDEX_NAME, userVO.getId().toString(), userVO);

        System.out.println("response.forcedRefresh() -> " + response.forcedRefresh());
        System.out.println("response.toString() -> " + response.toString());
    }

    @Test
    public void testCreateByBuilderPattern() throws Exception {
        // 构建文档数据
        UserVO userVO = new UserVO();
        userVO.setId(2L);
        userVO.setUserName("赵云2");
        userVO.setAge(12);
        userVO.setCreateTime(new Date());
        userVO.setUpdateTime(new Date());
        userVO.setEmail("ss.com");
        userVO.setVersion(1);
        userVO.setHeight(12D);

        // 新增一个文档
        IndexResponse response = documentDemoService.createByBuilderPattern(INDEX_NAME, userVO.getId().toString(), userVO);

        System.out.println("response.toString() -> " + response.toString());
    }

    @Test
    public void testCreateByJSON() throws Exception {
        // 构建文档数据
        UserVO userVO = new UserVO();
        userVO.setId(3L);
        userVO.setUserName("赵云3");
        userVO.setAge(13);
        userVO.setCreateTime(new Date());
        userVO.setUpdateTime(new Date());
        userVO.setEmail("ss.com");
        userVO.setVersion(1);
        userVO.setHeight(12D);

        // 新增一个文档
        IndexResponse response = documentDemoService.createByJson(INDEX_NAME, userVO.getId().toString(), JSON.toJSONString(userVO));

        System.out.println("response.toString() -> " + response.toString());
    }

    @Test
    public void testCreateAsync() throws Exception {
        // 构建文档数据
        UserVO userVO = new UserVO();
        userVO.setId(4L);
        userVO.setUserName("赵云4");
        userVO.setAge(14);
        userVO.setCreateTime(new Date());
        userVO.setUpdateTime(new Date());
        userVO.setEmail("ss.com");
        userVO.setVersion(1);
        userVO.setHeight(12D);

        documentDemoService.createAsync(INDEX_NAME, userVO.getId().toString(), userVO, new BiConsumer() {
            @Override
            public void accept(IndexResponse indexResponse, Throwable throwable) {
                // throwable必须为空
                Assertions.assertNull(throwable);
                // 验证结果
                System.out.println("response.toString() -> " + indexResponse.toString());
            }
        });
    }

    @Test
    public void testBulkCreate() throws Exception {
        int start = 5;
        int end = 10;

        // 构造文档集合
        List list = new ArrayList();
        for (int i = 5; i             
关注
打赏
1664721914
查看更多评论
0.0488s