本文主要介绍 Elasticsearch Java API Client 的使用,相关的环境及软件信息如下:CentOS 7.6.1810、Java 1.8.0_321(客户端用)、Elasticsearch 8.2.2。

1、Java API Client 的特点

  • Strongly typed requests and responses for all Elasticsearch APIs.
  • Blocking and asynchronous versions of all APIs.
  • Use of fluent builders and functional patterns to allow writing concise yet readable code when creating complex nested structures.
  • Seamless integration of application classes by using an object mapper such as Jackson or any JSON-B implementation.
  • Delegates protocol handling to an http client such as the Java Low Level REST Client that takes care of all transport-level concerns: HTTP connection pooling, retries, node discovery, and so on.

2、引入依赖

<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.2.2</version>
</dependency>

3、使用

Elasticsearch Java API Client 通过 API 的方式来组装请求数据,避免直接编写 JSON 字符串;请求数据的详细说明可参考:Elasticsearch 入门实战(3)–REST API 使用

3.1、连接及关闭

Java API Client 底层依赖 Java Low Level REST Client,需先创建 Low Level REST Client。

private ElasticsearchTransport transport;
private ElasticsearchClient client;

@Before
public void before() {
    RestClient restClient = RestClient.builder(
            new HttpHost("10.49.196.10", 9200),
            new HttpHost("10.49.196.11", 9200),
            new HttpHost("10.49.196.12", 9200)).build();
    ObjectMapper objectMapper = new ObjectMapper();
    transport = new RestClientTransport(restClient, new JacksonJsonpMapper(objectMapper));
    client = new ElasticsearchClient(transport);
}

@After
public void after() throws IOException {
    client.shutdown();
}

3.2、索引

3.2.1、创建索引

@Test
public void createIndex() throws IOException {
    CreateIndexResponse response = client.indices().create(builder -> builder
            .settings(indexSettingsBuilder -> indexSettingsBuilder.numberOfReplicas("1").numberOfShards("2"))
            .mappings(typeMappingBuilder -> typeMappingBuilder
                    .properties("age", propertyBuilder -> propertyBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
                    .properties("name", propertyBuilder -> propertyBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder))
                    .properties("poems", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_max_word")))
                    .properties("about", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_max_word")))
                    .properties("success", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_max_word")))
            )
            .index(INDEX_NAME));
    logger.info("acknowledged={}", response.acknowledged());
}

3.2.2、修改 _mapping 信息

字段可以新增,已有的字段只能修改字段的 search_analyzer 属性。

@Test
public void modifyIndex() throws IOException {
    PutMappingResponse response = client.indices().putMapping(typeMappingBuilder -> typeMappingBuilder
            .index(INDEX_NAME)
            .properties("age", propertyBuilder -> propertyBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
            .properties("name", propertyBuilder -> propertyBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder))
            .properties("poems", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_smart")))
    );
    logger.info("acknowledged={}", response.acknowledged());
}

3.2.3、删除索引

@Test
public void deleteIndex() throws IOException {
    DeleteIndexResponse response = client.indices().delete(builder -> builder.index(INDEX_NAME));
    logger.info("acknowledged={}", response.acknowledged());
}

3.2.4、查询索引列表

@Test
public void getIndex() throws IOException {
    //使用 * 也可以
    GetIndexResponse response = client.indices().get(builder -> builder.index("_all"));
    logger.info(response.result().toString());
}

3.2.5、查询索引详情

@Test
public void getIndexDetail() throws IOException {
    GetIndexResponse response = client.indices().get(builder -> builder.index(INDEX_NAME));
    logger.info(response.result().toString());
}

3.3、文档

3.3.1、创建文档

@Test
public void createDoc() throws IOException {
    Map<String, Object> doc = new HashMap<>();
    doc.put("age", 30);
    doc.put("name", "李白");
    doc.put("poems", "静夜思");
    doc.put("about", "字太白");
    doc.put("success", "创造了古代浪漫主义文学高峰、歌行体和七绝达到后人难及的高度");

    CreateResponse response = client.create(builder -> builder.index(INDEX_NAME).id("1").document(doc));
    logger.info(response.toString());

    Poet poet = new Poet(31, "杜甫", "登高", "字子美", "唐代伟大的现实主义文学作家,唐诗思想艺术的集大成者");
    response = client.create(builder -> builder.index(INDEX_NAME).id("2").document(poet));
    logger.info(response.toString());
}

3.3.2、删除文档

@Test
public void deleteDoc() throws IOException {
    DeleteResponse response = client.delete(builder -> builder.index(INDEX_NAME).id("1"));
    logger.info(response.toString());
}

3.3.3、修改文档

修改文档,只修改设置的字段。

@Test
public void updateDoc() throws IOException {
    Map<String, Object> doc = new HashMap<>();
    doc.put("age", 33);
    doc.put("name", "李白2");

    UpdateResponse response = client.update(builder -> builder.index(INDEX_NAME).id("1").doc(doc), Map.class);
    logger.info(response.toString());

    Poet poet = new Poet();
    poet.setAge(40);
    poet.setName("杜甫2");
    response = client.update(builder -> builder.index(INDEX_NAME).id("2").doc(poet).docAsUpsert(true), Poet.class);
    logger.info(response.toString());
}

3.3.4、新增或修改文档

新增或修改文档,修改时所有的字段都会覆盖(相当于先删除在新增)。

@Test
public void createOrUpdateDoc() throws IOException {
    Map<String, Object> doc = new HashMap<>();
    doc.put("age", 33);
    doc.put("name", "李白2");

    //只更新设置的字段
    IndexResponse response = client.index(builder -> builder.index(INDEX_NAME).id("1").document(doc));
    logger.info(response.toString());

    Poet poet = new Poet();
    poet.setAge(40);
    poet.setName("杜甫2");
    response = client.index(builder -> builder.index(INDEX_NAME).id("2").document(poet));
    logger.info(response.toString());
}

3.3.5、批量操作

@Test
public void bulk() throws IOException {
    List<BulkOperation> list = new ArrayList<>();

    //批量新增
    for (int i = 0; i < 5; i++) {
        Map<String, Object> doc = new HashMap<>();
        doc.put("age", 30);
        doc.put("name", "李白" + i);
        doc.put("poems", "静夜思");
        doc.put("about", "字太白");
        doc.put("success", "创造了古代浪漫主义文学高峰、歌行体和七绝达到后人难及的高度");
        String id = 10 + i + "";
        list.add(new BulkOperation.Builder().create(builder -> builder.index(INDEX_NAME).id(id).document(doc)).build());
    }
    for (int i = 0; i < 5; i++) {
        Poet poet = new Poet(31, "杜甫" + i, "登高", "字子美", "唐代伟大的现实主义文学作家,唐诗思想艺术的集大成者");
        String id = 20 + i + "";
        list.add(new BulkOperation.Builder().create(builder -> builder.index(INDEX_NAME).id(id).document(poet)).build());
    }

    //批量删除
    list.add(new BulkOperation.Builder().delete(builder -> builder.index(INDEX_NAME).id("1")).build());
    list.add(new BulkOperation.Builder().delete(builder -> builder.index(INDEX_NAME).id("2")).build());

    BulkResponse response = client.bulk(builder -> builder.index(INDEX_NAME).operations(list));
    logger.info(response.toString());
}

3.4、查询

3.4.1、查询所有文档

@Test
public void getDocAll() throws IOException {
    SearchResponse<Map> response = client.search(builder -> builder.index(INDEX_NAME), Map.class);
    logger.info(response.toString());
}

3.4.2、查询单个文档

@Test
public void getDoc() throws IOException {
    GetResponse<Map> response = client.get(builder -> builder.index(INDEX_NAME).id("1"), Map.class);
    if (response.found()) {
        logger.info(response.source().toString());
    }

    GetResponse<Poet> response2 = client.get(builder -> builder.index(INDEX_NAME).id("2"), Poet.class);
    if (response2.found()) {
        logger.info(response2.source().toString());
    }
}

3.4.3、term/terms 查询

term/terms查询,对输入内容不做分词处理。

@Test
public void searchTerm() throws IOException {
    SearchResponse<Map> response = client.search(searchRequestBuilder -> searchRequestBuilder
            .index(INDEX_NAME)
            .query(queryBuilder -> queryBuilder
                    .term(termQueryBuilder -> termQueryBuilder
                            .field("name").value("李白")))
            .sort(sortOptionsBuilder -> sortOptionsBuilder
                    .field(fieldSortBuilder -> fieldSortBuilder
                            .field("name").order(SortOrder.Asc)))
            .source(sourceConfigBuilder -> sourceConfigBuilder
                    .filter(sourceFilterBuilder -> sourceFilterBuilder
                            .includes("age", "name")))
            .from(0)
            .size(10)
            , Map.class);
    logger.info(response.toString());

    List<FieldValue> words = new ArrayList<>();
    words.add(new FieldValue.Builder().stringValue("李白").build());
    words.add(new FieldValue.Builder().stringValue("杜甫").build());
    SearchResponse<Poet> response2 = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .terms(termsQueryBuilder -> termsQueryBuilder
                                    .field("name").terms(termsQueryFieldBuilder -> termsQueryFieldBuilder.value(words))))
                    .source(sourceConfigBuilder -> sourceConfigBuilder
                            .filter(sourceFilterBuilder -> sourceFilterBuilder
                                    .excludes("about")))
                    .from(0)
                    .size(10)
            , Poet.class);
    logger.info(response2.toString());
}

3.4.4、range(范围) 查询

@Test
public void searchRange() throws IOException {
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .range(rangeQueryBuilder -> rangeQueryBuilder
                                    .field("age").gte(JsonData.of("20")).lt(JsonData.of("40"))))
            , Poet.class);
    logger.info(response.toString());
}

3.4.5、全文查询

3.4.5.1、match 查询

match 查询,对输入内容先分词再查询。

@Test
public void searchMatch() throws IOException {
    SearchResponse<Map> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .match(matchQueryBuilder -> matchQueryBuilder
                                    .field("success").query("思想")))
            , Map.class);
    logger.info(response.toString());
}
3.4.5.2、multi_match 查询

多个字段进行匹配。

@Test
public void searchMultiMatch() throws IOException {
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .multiMatch(multiMatchQueryBuilder -> multiMatchQueryBuilder
                                    .fields("about", "success").query("思想")))
            , Poet.class);
    logger.info(response.toString());
}
3.4.5.3、match_phrase 查询

匹配整个查询字符串。

@Test
public void searchMatchPhrase() throws IOException {
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .matchPhrase(matchPhraseQueryBuilder -> matchPhraseQueryBuilder.field("success").query("文学作家")))
            , Poet.class);
    logger.info(response.toString());
}
3.4.5.4、match_all 查询

查询所有文档。

@Test
public void searchMatchAll() throws IOException {
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .matchAll(matchAllQueryBuilder -> matchAllQueryBuilder))
            , Poet.class);
    logger.info(response.toString());
}
3.4.5.5、query_string 查询

query_string 可以同时实现前面几种查询方法。

@Test
public void searchQueryString() throws IOException {
    //类似 match
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                .defaultField("success").query("古典文学")))
            , Poet.class);
    logger.info(response.toString());

    //类似 mulit_match
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                    .fields("about", "success").query("古典文学")))
            , Poet.class);
    logger.info(response.toString());

    //类似 match_phrase
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                    .defaultField("success").query("\"文学作家\"")))
            , Poet.class);
    logger.info(response.toString());

    //带运算符查询,运算符两边的词不再分词
    //查询同时包含 ”文学“ 和 ”伟大“ 的文档
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                    .fields("success").query("文学 AND 伟大")))
            , Poet.class);
    logger.info(response.toString());

    //等同上一个查询
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                    .fields("success").query("文学 伟大").defaultOperator(Operator.And)))
            , Poet.class);
    logger.info(response.toString());

    //查询 name 或 success 字段包含"文学"和"伟大"这两个单词,或者包含"李白"这个单词的文档。
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                    .fields("name","success").query("(文学 AND 伟大) OR 高度")))
            , Poet.class);
    logger.info(response.toString());
}
3.4.5.6、simple_query_string 查询

类似 query_string,主要区别如下:

1、不支持AND OR NOT ,会当做字符处理;使用 + 代替 AND,| 代替OR,- 代替 NOT
2、会忽略错误的语法

@Test
public void searchSimpleQueryString() throws IOException {
    //查询同时包含 ”文学“ 和 ”伟大“ 的文档
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .simpleQueryString(simpleQueryStringQueryBuilder -> simpleQueryStringQueryBuilder
                                    .fields("success").query("文学 + 伟大")))
            , Poet.class);
    logger.info(response.toString());
}

3.4.6、模糊查询

@Test
public void searchFuzzy() throws IOException {
    //全文查询时使用模糊参数,先分词再计算模糊选项。
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .match(matchQueryBuilder -> matchQueryBuilder
                                    .field("success").query("思考").fuzziness("1")))
            , Poet.class);
    logger.info(response.toString());

    //使用 fuzzy query,对输入不分词,直接计算模糊选项。
    SearchResponse<Poet> response2 = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .fuzzy(fuzzyQueryBuilder ->  fuzzyQueryBuilder
                                    .field("success").fuzziness("1").value("理想")))
            , Poet.class);
    logger.info(response2.toString());
}

3.4.7、组合查询

@Test
public void searchBool() throws IOException {
    //查询 success 包含 “思想” 且 age 在 [20-40] 之间的文档
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
            .index(INDEX_NAME)
            .query(queryBuilder -> queryBuilder
                    .bool(boolQueryBuilder -> boolQueryBuilder
                            .must(queryBuilder2 -> queryBuilder2
                                    .match(matchQueryBuilder -> matchQueryBuilder
                                            .field("success").query("思想"))
                            )
                            .must(queryBuilder2 -> queryBuilder2
                                    .range(rangeQueryBuilder -> rangeQueryBuilder
                                            .field("age").gte(JsonData.of("20")).lt(JsonData.of("40")))
                            )
                    )
            )
    , Poet.class);
    logger.info(response.toString());

    //过滤出 success 包含 “思想” 且 age 在 [20-40] 之间的文档,不计算得分
    SearchResponse<Poet> response2 = client.search(searchRequestBuilder -> searchRequestBuilder
            .index(INDEX_NAME)
            .query(queryBuilder -> queryBuilder
                    .bool(boolQueryBuilder -> boolQueryBuilder
                            .filter(queryBuilder2 -> queryBuilder2
                                    .match(matchQueryBuilder -> matchQueryBuilder
                                            .field("success").query("思想"))
                            )
                            .filter(queryBuilder2 -> queryBuilder2
                                    .range(rangeQueryBuilder -> rangeQueryBuilder
                                            .field("age").gte(JsonData.of("20")).lt(JsonData.of("40")))
                            )
                    )
            )
    , Poet.class);
    logger.info(response2.toString());
}

3.4.8、聚合查询

@Test
public void searchAggs() throws IOException {
    //求和
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .aggregations("age_sum", aggregationBuilder -> aggregationBuilder
                            .sum(sumAggregationBuilder -> sumAggregationBuilder
                                    .field("age")))
            , Poet.class);
    logger.info(response.toString());

    //类似 select count distinct(age) from poet-index
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .aggregations("age_count", aggregationBuilder -> aggregationBuilder
                            .cardinality(cardinalityAggregationBuilder -> cardinalityAggregationBuilder.field("age")))
            , Poet.class);
    logger.info(response.toString());

    //数量、最大、最小、平均、求和
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .aggregations("age_stats", aggregationBuilder -> aggregationBuilder
                            .stats(statsAggregationBuilder -> statsAggregationBuilder
                                    .field("age")))
            , Poet.class);
    logger.info(response.toString());

    //select name,count(*) from poet-index group by name
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .aggregations("name_terms", aggregationBuilder -> aggregationBuilder
                            .terms(termsAggregationBuilder -> termsAggregationBuilder
                                    .field("name")))
            , Poet.class);
    logger.info(response.toString());

    //select name,age,count(*) from poet-index group by name,age
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .aggregations("name_terms", aggregationBuilder -> aggregationBuilder
                            .terms(termsAggregationBuilder -> termsAggregationBuilder
                                    .field("name")
                            )
                            .aggregations("age_terms", aggregationBuilder2 -> aggregationBuilder2
                                    .terms(termsAggregationBuilder -> termsAggregationBuilder
                                            .field("age")
                                    ))
                    )
            , Poet.class);
    logger.info(response.toString());

    //类似 select avg(age) from poet-index where name='李白'
    response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .bool(boolQueryBuilder -> boolQueryBuilder
                                    .filter(queryBuilder2 -> queryBuilder2
                                            .term(termQueryBuilder -> termQueryBuilder
                                                    .field("name").value("李白")))))
                    .aggregations("ave_age", aggregationBuilder -> aggregationBuilder
                            .avg(averageAggregationBuilder -> averageAggregationBuilder.field("age")))
            , Poet.class);
    logger.info(response.toString());
}

3.4.9、推荐搜索

@Test
public void searchSuggest() throws IOException {
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .suggest(suggesterBuilder -> suggesterBuilder
                            .suggesters("success_suggest", fieldSuggesterBuilder -> fieldSuggesterBuilder
                                    .text("思考")
                                    .term(termSuggesterBuilder -> termSuggesterBuilder
                                            .field("success")
                                            .suggestMode(SuggestMode.Always)
                                            .minWordLength(2)
                                    )
                            )
                    )
            , Poet.class);
    logger.info(response.toString());
}

该测试会报如下错误:

co.elastic.clients.json.JsonpMappingException: Error deserializing co.elastic.clients.elasticsearch.core.search.TermSuggest: co.elastic.clients.json.UnexpectedJsonEventException: Unexpected JSON event 'START_ARRAY' instead of '[START_OBJECT, KEY_NAME]' (JSON path: suggest['term#success_suggest'][0].options) (line no=1, column no=247, offset=-1)

还有高手知道原因?

3.4.10、高亮显示

@Test
public void searchHighlight() throws IOException {
    SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                .index(INDEX_NAME)
                .query(queryBuilder -> queryBuilder
                        .match(matchQueryBuilder -> matchQueryBuilder
                                .field("success").query("思想")))
                .highlight(highlightBuilder -> highlightBuilder
                        .preTags("<span color='red'>")
                        .postTags("</span>")
                        .fields("success", highlightFieldBuilder -> highlightFieldBuilder))
        , Poet.class);
    logger.info(response.toString());
}

3.4.11、SQL 查询

@Test
public void searchSql() throws IOException {
    QueryResponse response = client.sql().query(builder -> builder
            .format("json").query("SELECT * FROM \"" + INDEX_NAME + "\" limit 3"));
    logger.info(response.toString());
}

该测试会报如下错误:

co.elastic.clients.json.JsonpMappingException: Error deserializing co.elastic.clients.elasticsearch.sql.QueryResponse: java.lang.UnsupportedOperationException (JSON path: rows[0][0]) (line no=1, column no=184, offset=-1)

还有高手知道原因?

3.5、完整代码


package com.abc.demo.es;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.*;
import co.elastic.clients.elasticsearch._types.aggregations.AverageAggregation;
import co.elastic.clients.elasticsearch._types.aggregations.CardinalityAggregation;
import co.elastic.clients.elasticsearch._types.aggregations.SumAggregation;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.cat.IndicesResponse;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.sql.QueryResponse;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ElasticsearchJavaCase {
    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchJavaCase.class.getName());

    private static final String INDEX_NAME = "poet-index";

    private ElasticsearchTransport transport;
    private ElasticsearchClient client;

    @Before
    public void before() {
        RestClient restClient = RestClient.builder(
                new HttpHost("10.49.196.10", 9200),
                new HttpHost("10.49.196.11", 9200),
                new HttpHost("10.49.196.12", 9200)).build();
        ObjectMapper objectMapper = new ObjectMapper();
        transport = new RestClientTransport(restClient, new JacksonJsonpMapper(objectMapper));
        client = new ElasticsearchClient(transport);
    }

    @After
    public void after() throws IOException {
        transport.close();
    }

    /**
     * 创建索引
     */
    @Test
    public void createIndex() throws IOException {
        CreateIndexResponse response = client.indices().create(builder -> builder
                .settings(indexSettingsBuilder -> indexSettingsBuilder.numberOfReplicas("1").numberOfShards("2"))
                .mappings(typeMappingBuilder -> typeMappingBuilder
                        .properties("age", propertyBuilder -> propertyBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
                        .properties("name", propertyBuilder -> propertyBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder))
                        .properties("poems", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_max_word")))
                        .properties("about", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_max_word")))
                        .properties("success", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_max_word")))
                )
                .index(INDEX_NAME));
        logger.info("acknowledged={}", response.acknowledged());
    }

    /**
     * 修改索引的_mapping信息
     * 字段可以新增,已有的字段只能修改字段的search_analyzer属性
     */
    @Test
    public void modifyIndex() throws IOException {
        PutMappingResponse response = client.indices().putMapping(typeMappingBuilder -> typeMappingBuilder
                .index(INDEX_NAME)
                .properties("age", propertyBuilder -> propertyBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
                .properties("name", propertyBuilder -> propertyBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder))
                .properties("poems", propertyBuilder -> propertyBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_smart")))
        );
        logger.info("acknowledged={}", response.acknowledged());
    }

    /**
     * 删除索引
     */
    @Test
    public void deleteIndex() throws IOException {
        DeleteIndexResponse response = client.indices().delete(builder -> builder.index(INDEX_NAME));
        logger.info("acknowledged={}", response.acknowledged());
    }

    /**
     * 查询索引列表
     */
    @Test
    public void getIndex() throws IOException {
        //使用 * 也可以
        GetIndexResponse response = client.indices().get(builder -> builder.index("_all"));
        logger.info(response.result().toString());
    }

    /**
     * 查询索引详情
     */
    @Test
    public void getIndexDetail() throws IOException {
        GetIndexResponse response = client.indices().get(builder -> builder.index(INDEX_NAME));
        logger.info(response.result().toString());
    }

    /**
     * 创建文档
     */
    @Test
    public void createDoc() throws IOException {
        Map<String, Object> doc = new HashMap<>();
        doc.put("age", 30);
        doc.put("name", "李白");
        doc.put("poems", "静夜思");
        doc.put("about", "字太白");
        doc.put("success", "创造了古代浪漫主义文学高峰、歌行体和七绝达到后人难及的高度");

        CreateResponse response = client.create(builder -> builder.index(INDEX_NAME).id("1").document(doc));
        logger.info(response.toString());

        Poet poet = new Poet(31, "杜甫", "登高", "字子美", "唐代伟大的现实主义文学作家,唐诗思想艺术的集大成者");
        response = client.create(builder -> builder.index(INDEX_NAME).id("2").document(poet));
        logger.info(response.toString());
    }

    /**
     * 删除文档
     */
    @Test
    public void deleteDoc() throws IOException {
        DeleteResponse response = client.delete(builder -> builder.index(INDEX_NAME).id("1"));
        logger.info(response.toString());
    }

    /**
     * 修改文档,只修改设置的字段
     */
    @Test
    public void updateDoc() throws IOException {
        Map<String, Object> doc = new HashMap<>();
        doc.put("age", 33);
        doc.put("name", "李白2");

        UpdateResponse response = client.update(builder -> builder.index(INDEX_NAME).id("1").doc(doc), Map.class);
        logger.info(response.toString());

        Poet poet = new Poet();
        poet.setAge(40);
        poet.setName("杜甫2");
        response = client.update(builder -> builder.index(INDEX_NAME).id("2").doc(poet).docAsUpsert(true), Poet.class);
        logger.info(response.toString());
    }

    /**
     * 新增或修改文档,修改时所有的字段都会覆盖(相当于先删除在新增)
     */
    @Test
    public void createOrUpdateDoc() throws IOException {
        Map<String, Object> doc = new HashMap<>();
        doc.put("age", 33);
        doc.put("name", "李白2");

        //只更新设置的字段
        IndexResponse response = client.index(builder -> builder.index(INDEX_NAME).id("1").document(doc));
        logger.info(response.toString());

        Poet poet = new Poet();
        poet.setAge(40);
        poet.setName("杜甫2");
        response = client.index(builder -> builder.index(INDEX_NAME).id("2").document(poet));
        logger.info(response.toString());
    }


    /**
     * 批量操作
     */
    @Test
    public void bulk() throws IOException {
        List<BulkOperation> list = new ArrayList<>();

        //批量新增
        for (int i = 0; i < 5; i++) {
            Map<String, Object> doc = new HashMap<>();
            doc.put("age", 30);
            doc.put("name", "李白" + i);
            doc.put("poems", "静夜思");
            doc.put("about", "字太白");
            doc.put("success", "创造了古代浪漫主义文学高峰、歌行体和七绝达到后人难及的高度");
            String id = 10 + i + "";
            list.add(new BulkOperation.Builder().create(builder -> builder.index(INDEX_NAME).id(id).document(doc)).build());
        }
        for (int i = 0; i < 5; i++) {
            Poet poet = new Poet(31, "杜甫" + i, "登高", "字子美", "唐代伟大的现实主义文学作家,唐诗思想艺术的集大成者");
            String id = 20 + i + "";
            list.add(new BulkOperation.Builder().create(builder -> builder.index(INDEX_NAME).id(id).document(poet)).build());
        }

        //批量删除
        list.add(new BulkOperation.Builder().delete(builder -> builder.index(INDEX_NAME).id("1")).build());
        list.add(new BulkOperation.Builder().delete(builder -> builder.index(INDEX_NAME).id("2")).build());

        BulkResponse response = client.bulk(builder -> builder.index(INDEX_NAME).operations(list));
        logger.info(response.toString());
    }

    /**
     * 查询索有文档
     */
    @Test
    public void getDocAll() throws IOException {
        SearchResponse<Map> response = client.search(builder -> builder.index(INDEX_NAME), Map.class);
        logger.info(response.toString());
    }

    /**
     * 查询单个文档
     */
    @Test
    public void getDoc() throws IOException {
        GetResponse<Map> response = client.get(builder -> builder.index(INDEX_NAME).id("1"), Map.class);
        if (response.found()) {
            logger.info(response.source().toString());
        }

        GetResponse<Poet> response2 = client.get(builder -> builder.index(INDEX_NAME).id("2"), Poet.class);
        if (response2.found()) {
            logger.info(response2.source().toString());
        }
    }

    /**
     * term/terms查询,对输入内容不做分词处理
     */
    @Test
    public void searchTerm() throws IOException {
        SearchResponse<Map> response = client.search(searchRequestBuilder -> searchRequestBuilder
                .index(INDEX_NAME)
                .query(queryBuilder -> queryBuilder
                        .term(termQueryBuilder -> termQueryBuilder
                                .field("name").value("李白")))
                .sort(sortOptionsBuilder -> sortOptionsBuilder
                        .field(fieldSortBuilder -> fieldSortBuilder
                                .field("name").order(SortOrder.Asc)))
                .source(sourceConfigBuilder -> sourceConfigBuilder
                        .filter(sourceFilterBuilder -> sourceFilterBuilder
                                .includes("age", "name")))
                .from(0)
                .size(10)
                , Map.class);
        logger.info(response.toString());

        List<FieldValue> words = new ArrayList<>();
        words.add(new FieldValue.Builder().stringValue("李白").build());
        words.add(new FieldValue.Builder().stringValue("杜甫").build());
        SearchResponse<Poet> response2 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .terms(termsQueryBuilder -> termsQueryBuilder
                                        .field("name").terms(termsQueryFieldBuilder -> termsQueryFieldBuilder.value(words))))
                        .source(sourceConfigBuilder -> sourceConfigBuilder
                                .filter(sourceFilterBuilder -> sourceFilterBuilder
                                        .excludes("about")))
                        .from(0)
                        .size(10)
                , Poet.class);
        logger.info(response2.toString());
    }

    /**
     * range查询,范围查询
     */
    @Test
    public void searchRange() throws IOException {
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .range(rangeQueryBuilder -> rangeQueryBuilder
                                        .field("age").gte(JsonData.of("20")).lt(JsonData.of("40"))))
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * match查询,对输入内容先分词再查询
     */
    @Test
    public void searchMatch() throws IOException {
        SearchResponse<Map> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .match(matchQueryBuilder -> matchQueryBuilder
                                        .field("success").query("思想")))
                , Map.class);
        logger.info(response.toString());
    }

    /**
     * multi_match查询,-
     */
    @Test
    public void searchMultiMatch() throws IOException {
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .multiMatch(multiMatchQueryBuilder -> multiMatchQueryBuilder
                                        .fields("about", "success").query("思想")))
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * match_phrase 查询,匹配整个查询字符串
     */
    @Test
    public void searchMatchPhrase() throws IOException {
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .matchPhrase(matchPhraseQueryBuilder -> matchPhraseQueryBuilder.field("success").query("文学作家")))
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * match_all 查询,查询所有
     */
    @Test
    public void searchMatchAll() throws IOException {
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .matchAll(matchAllQueryBuilder -> matchAllQueryBuilder))
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * query_string 查询
     */
    @Test
    public void searchQueryString() throws IOException {
        //类似 match
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                    .defaultField("success").query("古典文学")))
                , Poet.class);
        logger.info(response.toString());

        //类似 mulit_match
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                        .fields("about", "success").query("古典文学")))
                , Poet.class);
        logger.info(response.toString());

        //类似 match_phrase
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                        .defaultField("success").query("\"文学作家\"")))
                , Poet.class);
        logger.info(response.toString());

        //带运算符查询,运算符两边的词不再分词
        //查询同时包含 ”文学“ 和 ”伟大“ 的文档
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                        .fields("success").query("文学 AND 伟大")))
                , Poet.class);
        logger.info(response.toString());

        //等同上一个查询
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                        .fields("success").query("文学 伟大").defaultOperator(Operator.And)))
                , Poet.class);
        logger.info(response.toString());

        //查询 name 或 success 字段包含"文学"和"伟大"这两个单词,或者包含"李白"这个单词的文档。
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .queryString(queryStringQueryBuilder -> queryStringQueryBuilder
                                        .fields("name","success").query("(文学 AND 伟大) OR 高度")))
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * simple_query_string 查询,和query_string类似
     * 不支持AND OR NOT,会当做字符串处理
     * 使用 +替代AND,|替代OR,-替代NOT
     */
    @Test
    public void searchSimpleQueryString() throws IOException {
        //查询同时包含 ”文学“ 和 ”伟大“ 的文档
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .simpleQueryString(simpleQueryStringQueryBuilder -> simpleQueryStringQueryBuilder
                                        .fields("success").query("文学 + 伟大")))
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * 模糊查询
     */
    @Test
    public void searchFuzzy() throws IOException {
        //全文查询时使用模糊参数,先分词再计算模糊选项。
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .match(matchQueryBuilder -> matchQueryBuilder
                                        .field("success").query("思考").fuzziness("1")))
                , Poet.class);
        logger.info(response.toString());

        //使用 fuzzy query,对输入不分词,直接计算模糊选项。
        SearchResponse<Poet> response2 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .fuzzy(fuzzyQueryBuilder ->  fuzzyQueryBuilder
                                        .field("success").fuzziness("1").value("理想")))
                , Poet.class);
        logger.info(response2.toString());
    }

    /**
     * bool查询,组合查询
     */
    @Test
    public void searchBool() throws IOException {
        //查询 success 包含 “思想” 且 age 在 [20-40] 之间的文档
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                .index(INDEX_NAME)
                .query(queryBuilder -> queryBuilder
                        .bool(boolQueryBuilder -> boolQueryBuilder
                                .must(queryBuilder2 -> queryBuilder2
                                        .match(matchQueryBuilder -> matchQueryBuilder
                                                .field("success").query("思想"))
                                )
                                .must(queryBuilder2 -> queryBuilder2
                                        .range(rangeQueryBuilder -> rangeQueryBuilder
                                                .field("age").gte(JsonData.of("20")).lt(JsonData.of("40")))
                                )
                        )
                )
        , Poet.class);
        logger.info(response.toString());

        //过滤出 success 包含 “思想” 且 age 在 [20-40] 之间的文档,不计算得分
        SearchResponse<Poet> response2 = client.search(searchRequestBuilder -> searchRequestBuilder
                .index(INDEX_NAME)
                .query(queryBuilder -> queryBuilder
                        .bool(boolQueryBuilder -> boolQueryBuilder
                                .filter(queryBuilder2 -> queryBuilder2
                                        .match(matchQueryBuilder -> matchQueryBuilder
                                                .field("success").query("思想"))
                                )
                                .filter(queryBuilder2 -> queryBuilder2
                                        .range(rangeQueryBuilder -> rangeQueryBuilder
                                                .field("age").gte(JsonData.of("20")).lt(JsonData.of("40")))
                                )
                        )
                )
        , Poet.class);
        logger.info(response2.toString());
    }

    /**
     * aggs查询,聚合查询
     */
    @Test
    public void searchAggs() throws IOException {
        //求和
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .aggregations("age_sum", aggregationBuilder -> aggregationBuilder
                                .sum(sumAggregationBuilder -> sumAggregationBuilder
                                        .field("age")))
                , Poet.class);
        logger.info(response.toString());

        //类似 select count distinct(age) from poet-index
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .aggregations("age_count", aggregationBuilder -> aggregationBuilder
                                .cardinality(cardinalityAggregationBuilder -> cardinalityAggregationBuilder.field("age")))
                , Poet.class);
        logger.info(response.toString());

        //数量、最大、最小、平均、求和
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .aggregations("age_stats", aggregationBuilder -> aggregationBuilder
                                .stats(statsAggregationBuilder -> statsAggregationBuilder
                                        .field("age")))
                , Poet.class);
        logger.info(response.toString());

        //select name,count(*) from poet-index group by name
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .aggregations("name_terms", aggregationBuilder -> aggregationBuilder
                                .terms(termsAggregationBuilder -> termsAggregationBuilder
                                        .field("name")))
                , Poet.class);
        logger.info(response.toString());

        //select name,age,count(*) from poet-index group by name,age
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .aggregations("name_terms", aggregationBuilder -> aggregationBuilder
                                .terms(termsAggregationBuilder -> termsAggregationBuilder
                                        .field("name")
                                )
                                .aggregations("age_terms", aggregationBuilder2 -> aggregationBuilder2
                                        .terms(termsAggregationBuilder -> termsAggregationBuilder
                                                .field("age")
                                        ))
                        )
                , Poet.class);
        logger.info(response.toString());

        //类似 select avg(age) from poet-index where name='李白'
        response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .query(queryBuilder -> queryBuilder
                                .bool(boolQueryBuilder -> boolQueryBuilder
                                        .filter(queryBuilder2 -> queryBuilder2
                                                .term(termQueryBuilder -> termQueryBuilder
                                                        .field("name").value("李白")))))
                        .aggregations("ave_age", aggregationBuilder -> aggregationBuilder
                                .avg(averageAggregationBuilder -> averageAggregationBuilder.field("age")))
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * suggest查询,推荐搜索, 报错
     */
    @Test
    public void searchSuggest() throws IOException {
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(INDEX_NAME)
                        .suggest(suggesterBuilder -> suggesterBuilder
                                .suggesters("success_suggest", fieldSuggesterBuilder -> fieldSuggesterBuilder
                                        .text("思考")
                                        .term(termSuggesterBuilder -> termSuggesterBuilder
                                                .field("success")
                                                .suggestMode(SuggestMode.Always)
                                                .minWordLength(2)
                                        )
                                )
                        )
                , Poet.class);
        logger.info(response.toString());
    }

    /**
     * 高亮显示
     */
    @Test
    public void searchHighlight() throws IOException {
        SearchResponse<Poet> response = client.search(searchRequestBuilder -> searchRequestBuilder
                    .index(INDEX_NAME)
                    .query(queryBuilder -> queryBuilder
                            .match(matchQueryBuilder -> matchQueryBuilder
                                    .field("success").query("思想")))
                    .highlight(highlightBuilder -> highlightBuilder
                            .preTags("<span color='red'>")
                            .postTags("</span>")
                            .fields("success", highlightFieldBuilder -> highlightFieldBuilder))
            , Poet.class);
        logger.info(response.toString());
    }

    /**
     * sql查询,报错
     */
    @Test
    public void searchSql() throws IOException {
        QueryResponse response = client.sql().query(builder -> builder
                .format("json").query("SELECT * FROM \"" + INDEX_NAME + "\" limit 1"));
        logger.info(response.toString());
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class Poet {
        private Integer age;
        private String name;
        private String poems;
        private String about;
        /**成就*/
        private String success;
    }
}

ElasticsearchJavaCase.java

 

详细的 Elasticsearch Java API Client 使用说明,请参考官网文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/index.html。