ElasticSearch高版本API的使用姿势

ElasticSearch之前我没有深入去学过,在上家公司也是简单用了一下,本来是想用来做千万级ip库数据缓存的,后面查询耗时就弃用了,也就没有深入去学习。之前看过一些视频,也只是说说怎么去使用而已。

现在项目中用了7.1.1版本的ElasticSearchAPI用的是elasticsearch-rest-high-level-client7.1.1,为了能完成任务,我也只是去看了怎么去用。因为刚入职新公司,我的首要任务还是去熟悉业务,有时间再深入去学习elasticsearch

今天分享下,如何在Java项目中使用elasticsearch-rest-high-level-client

直接http接口调用

实际上使用kibana或者elasticsearch-rest-high-level-client最终也是发送http请求。不同于redis这类服务,需要去了解它的通信协议,再通过Socket编程去实现通信,因此都是直接使用别人封装好的API。而ES提供了RESTFUL接口,就不需要我们去了解协议,因此,最简单的方式就是直接构造请求body发送http请求访问ES

String esUrl = String.format("%s/%s/_search",elasticsearchConfig.getClusterNodes(),INDEX);  // 发送http请求  String responseStr = HttpUtil.requestPostWithJson(esUrl,searchBuilder.toString());  // 对响应结果responseStr进行解析

这里还是会用到ESAPI,使用SearchBuilder构造请求体。当然也可以不使用,自己实现。

SearchSourceBuilder searchBuilder = new SearchSourceBuilder();  BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()                  .must(....);  searchBuilder.query(boolQueryBuilder);

但是构造请求body也是很繁琐的事情,因此一般会选择使用封装的API

基于API封装便用方式

添加elasticsearch-rest-high-level-client依赖。添加依赖时需要排除elasticsearchelasticsearch-rest-client包的依赖,因为默认是依赖低版本的,这里有个坑。排除之后再自己添加对应版本的elasticsearchelasticsearch-rest-client包的依赖就行了。(项目中用的是maven,我还是喜欢用gradle)。

<!-- es -->  <dependency>      <groupId>org.elasticsearch.client</groupId>      <artifactId>elasticsearch-rest-high-level-client</artifactId>      <version>${elasticsearch.version}</version>      <exclusions>          <!-- 默认引入的低版本 所以要排除重新依赖 -->          <exclusion>              <groupId>org.elasticsearch</groupId>              <artifactId>elasticsearch</artifactId>          </exclusion>              <exclusion>              <groupId>org.elasticsearch.client</groupId>              <artifactId>elasticsearch-rest-client</artifactId>          </exclusion>      </exclusions>  </dependency>  <!-- 重新依赖 -->  <dependency>      <groupId>org.elasticsearch.client</groupId>      <artifactId>elasticsearch-rest-client</artifactId>      <version>${elasticsearch.version}</version>  </dependency>  <dependency>      <groupId>org.elasticsearch</groupId>      <artifactId>elasticsearch</artifactId>      <version>${elasticsearch.version}</version>  </dependency>

接着在application.yml配置文件中添加es的配置,名称自己取。

# 要求ES 7.x版本  es:    host: 127.0.0.1    port: 9400    scheme: http

读取配置信息

@Component  @ConfigurationProperties(prefix = "es")  public class ElasticSearchPropertys {      private String host;      private int port;      private String scheme;  }

然后根据配置创建一个RestHighLevelClient注入到spring容器。

@Configuration  public class ElasticSearchConfig {        @Resource      private ElasticSearchPropertys elasticSearchPropertys;        @Bean      public RestHighLevelClient restHighLevelClient() {          RestClientBuilder restClientBuilder = RestClient.builder(                  new HttpHost(elasticSearchPropertys.getHost(),                          elasticSearchPropertys.getPort(), elasticSearchPropertys.getScheme()));          return new RestHighLevelClient(restClientBuilder);      }    }

如果是集群,自己改下就好了,RestClientbuilder方法是支持传多个HttpHost的。

然后就可以愉快的使用RestHighLevelClient提供的API实现CURD操作了。为了便于使用,可以基于RestHighLevelClient再封装一层。

/**   * 封装ES通用API   *   * @author wujiuye   * @date 2020/03/04   */  @Component  @ConditionalOnBean(RestHighLevelClient.class)  public class ElasticSearchService {        @Resource      RestHighLevelClient restHighLevelClient;        /**       * 判断某个index是否存在       *       * @param index index名       */      public boolean existIndex(String index) throws Exception {      }        /**       * 创建索引(仅测试使用)       *       * @param index    索引名称       * @param mappings 索引描述       * @param shards   分片数       * @param replicas 副本数       */      public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception {      }        /**       * 插入或更新单条记录       *       * @param index  index       * @param entity 对象       */      public void insertOrUpdate(String index, EsEntity entity) throws Exception {      }        /**       * 批量插入数据       *       * @param index index       * @param list  带插入列表       */      public void insertBatch(String index, List<EsEntity> list) throws Exception {      }        /**       * 搜索       *       * @param index   index       * @param builder 查询参数       * @param c       结果对象类型       */      public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) throws Exception {      }        .......    }

在开发过程中,我们需要本地测试,或者连接测试环境的ES进行测试。为了方便,我将在创建索引的动作写在代码中,当判断环境为dev环境时,删除索引重建。因此,我也封装了创建索引的逻辑。

首先是定义一个注解,用于注释在实体类的字段上,用于创建索引时构造mapping。如果需要更多信息,添加到EsField注解,并完善解析逻辑就可以了。

/**   * ES索引字段映射,用于代码创建索引 (仅测试使用)   *   * @author wujiuye   * @date 2020/03/04   */  @Target({ElementType.FIELD})  @Retention(RetentionPolicy.RUNTIME)  @Documented  public @interface EsField {        /**       * 字段类型       *       * @return       */      String type() default "text";    }

例如:

public class Product {        /**       * 商品id       */      @EsField(type = "integer")      private Integer leId;      /**       * 品牌id       */      @EsField(type = "integer")      private Integer brandId;      /**       * 品牌名称       */      private String brandName;      /**       * 日期       */      private String date;  }

实现注解EsField的解析,生成EsIndexMappings对象。

/**   * 用于代码创建索引(仅测试使用)   *   * @author wujiuye   * @date 2020/03/04   */  public class EsIndexMappings {        private boolean dynamic = false;      private Map<String, Map<String, Object>> properties;        /**       * 生成索引字段映射信息       *       * @param dynamic       * @param type       * @return       */      public static EsIndexMappings byType(boolean dynamic, Class<?> type) {          EsIndexMappings esIndexMappings = new EsIndexMappings();          esIndexMappings.setDynamic(dynamic);          esIndexMappings.setProperties(new HashMap<>());          Field[] fields = type.getDeclaredFields();          for (Field field : fields) {              Map<String, Object> value = new HashMap<>();              EsField esField = field.getAnnotation(EsField.class);              if (esField == null) {                  value.put("type", "text");                  value.put("index", true);              } else {                  value.put("type", esField.type());                  value.put("index", esField.index());              }              esIndexMappings.getProperties().put(field.getName(), value);          }          return esIndexMappings;      }    }

创建索引方法的实现

/**       * 创建索引(仅测试使用)       *       * @param index    索引名称       * @param mappings 索引描述       * @param shards   分片数       * @param replicas 副本数       */      public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception {          if (this.existIndex(index)) {              return;          }          CreateIndexRequest request = new CreateIndexRequest(index);          request.settings(Settings.builder()                  // 分片数                  .put("index.number_of_shards", shards)                  // 副本数                  .put("index.number_of_replicas", replicas));          // 指定mappings          request.mapping(JSON.toJSONString(mappings), XContentType.JSON);          CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);          if (!res.isAcknowledged()) {              throw new RuntimeException("所以创建失败!");          }      }

使用例子:

elasticService.createIndex(INDEX, EsIndexMappings.byType(false, Product.class), 1, 1);

在插入对象时,我们可能会有指定文档id的需求,因此,为了封装更通用的插入和批量插入方法,需要抽象一个中间对象EsEntity

public class EsEntity {      /**       * 索引的_id,不指定则使用es自动生成的       */      private String id;        /**       * 不转中间对象,直接转为json字符串,避免批量插入浪费内存资源       */      private String jsonData;  }

提供将任意对象转为EsEntity的静态方法,支持指定id和不指定id,当不指定id时,ES会自动生成。

/**       * 将任意类型对象转为EsEntity       * 不指定_id       *       * @param obj 一个文档(记录)       * @param <T>       * @return       */      public static <T> EsEntity objToElasticEntity(T obj) {          return objToElasticEntity(null, obj);      }        /**       * 将任意类型对象转为EsEntity       *       * @param id  null:不指定_id,非null:指定_id       * @param obj 一个文档(记录)       * @param <T>       * @return       */      public static <T> EsEntity objToElasticEntity(Integer id, T obj) {          EsEntity elasticEntity = new EsEntity();          String data = JSON.toJSONString(obj);          elasticEntity.setId(id == null ? null : String.valueOf(id));          elasticEntity.setData(data);          return elasticEntity;      }

插入和批量插入的实现:

 /**       * 插入或更新单条记录       *       * @param index  index       * @param entity 对象       */      public void insertOrUpdate(String index, EsEntity entity) throws Exception {          IndexRequest request = new IndexRequest(index);          request.id(entity.getId());          request.source(entity.getData(), XContentType.JSON);          IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);          if (response.status() != RestStatus.OK) {              throw new RuntimeException(response.toString());          }      }        /**       * 批量插入数据       *       * @param index index       * @param list  带插入列表       */      public void insertBatch(String index, List<EsEntity> list) throws Exception {          BulkRequest request = new BulkRequest();          list.forEach(item -> request.add(new IndexRequest(index)                  .id(item.getId())                  .source(item.getData(), XContentType.JSON)));          BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);          if (response.hasFailures()) {              throw new RuntimeException(response.buildFailureMessage());          }      }

批量插入的使用例子:

List<Product> products = new ArrayList<>();  elasticService.insertBatch(INDEX,      products.stream().map(EsEntity::objToElasticEntity)      .collect(Collectors.toList()));

总结

个人更倾向于基于API封装的方式,简单通用。但是要注意,批量插入数据时,不要产生太多的中间对象,造成内存空间浪费。比如从数据库查询出来的结果转成中间对象,又转成Map对象再插入ES

当然,如果团队成员不认可这种方式,觉得不好,我也会选择跟着现有项目中的使用方式。