ElasticSearch高版本API的使用姿势
- 2020 年 3 月 9 日
- 笔记
ElasticSearch
之前我没有深入去学过,在上家公司也是简单用了一下,本来是想用来做千万级ip
库数据缓存的,后面查询耗时就弃用了,也就没有深入去学习。之前看过一些视频,也只是说说怎么去使用而已。
现在项目中用了7.1.1
版本的ElasticSearch
,API
用的是elasticsearch-rest-high-level-client7.1.1
,为了能完成任务,我也只是去看了怎么去用。因为刚入职新公司,我的首要任务还是去熟悉业务,有时间再深入去学习elasticsearch
。
今天分享下,如何在Java
项目中使用elasticsearch-rest-high-level-client
。
直接http接口调用

实际上使用kibana
或者elasticsearch-rest-high-level-client
最终也是发送http
请求。不同于redis
这类服务,需要去了解它的通信协议,再通过Socket
编程去实现通信,因此都是直接使用别人封装好的API
。而ES
提供了RESTFUL
接口,就不需要我们去了解协议,因此,最简单的方式就是直接构造请求body
发送http
请求访问ES
。
String esUrl = String.format("%s/%s/_search",elasticsearchConfig.getClusterNodes(),INDEX); // 发送http请求 String responseStr = HttpUtil.requestPostWithJson(esUrl,searchBuilder.toString()); // 对响应结果responseStr进行解析
这里还是会用到ES
的API
,使用SearchBuilder
构造请求体。当然也可以不使用,自己实现。
SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery() .must(....); searchBuilder.query(boolQueryBuilder);
但是构造请求body
也是很繁琐的事情,因此一般会选择使用封装的API
。
基于API封装便用方式
添加elasticsearch-rest-high-level-client
依赖。添加依赖时需要排除elasticsearch
、elasticsearch-rest-client
包的依赖,因为默认是依赖低版本的,这里有个坑。排除之后再自己添加对应版本的elasticsearch
、elasticsearch-rest-client
包的依赖就行了。(项目中用的是maven
,我还是喜欢用gradle
)。
<!-- es --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> <exclusions> <!-- 默认引入的低版本 所以要排除重新依赖 --> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> <exclusion> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> </exclusion> </exclusions> </dependency> <!-- 重新依赖 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> </dependency>
接着在application.yml
配置文件中添加es
的配置,名称自己取。
# 要求ES 7.x版本 es: host: 127.0.0.1 port: 9400 scheme: http
读取配置信息
@Component @ConfigurationProperties(prefix = "es") public class ElasticSearchPropertys { private String host; private int port; private String scheme; }
然后根据配置创建一个RestHighLevelClient
注入到spring
容器。
@Configuration public class ElasticSearchConfig { @Resource private ElasticSearchPropertys elasticSearchPropertys; @Bean public RestHighLevelClient restHighLevelClient() { RestClientBuilder restClientBuilder = RestClient.builder( new HttpHost(elasticSearchPropertys.getHost(), elasticSearchPropertys.getPort(), elasticSearchPropertys.getScheme())); return new RestHighLevelClient(restClientBuilder); } }
如果是集群,自己改下就好了,RestClient
的builder
方法是支持传多个HttpHost
的。
然后就可以愉快的使用RestHighLevelClient
提供的API
实现CURD
操作了。为了便于使用,可以基于RestHighLevelClient
再封装一层。
/** * 封装ES通用API * * @author wujiuye * @date 2020/03/04 */ @Component @ConditionalOnBean(RestHighLevelClient.class) public class ElasticSearchService { @Resource RestHighLevelClient restHighLevelClient; /** * 判断某个index是否存在 * * @param index index名 */ public boolean existIndex(String index) throws Exception { } /** * 创建索引(仅测试使用) * * @param index 索引名称 * @param mappings 索引描述 * @param shards 分片数 * @param replicas 副本数 */ public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception { } /** * 插入或更新单条记录 * * @param index index * @param entity 对象 */ public void insertOrUpdate(String index, EsEntity entity) throws Exception { } /** * 批量插入数据 * * @param index index * @param list 带插入列表 */ public void insertBatch(String index, List<EsEntity> list) throws Exception { } /** * 搜索 * * @param index index * @param builder 查询参数 * @param c 结果对象类型 */ public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) throws Exception { } ....... }
在开发过程中,我们需要本地测试,或者连接测试环境的ES
进行测试。为了方便,我将在创建索引的动作写在代码中,当判断环境为dev
环境时,删除索引重建。因此,我也封装了创建索引的逻辑。
首先是定义一个注解,用于注释在实体类的字段上,用于创建索引时构造mapping
。如果需要更多信息,添加到EsField
注解,并完善解析逻辑就可以了。
/** * ES索引字段映射,用于代码创建索引 (仅测试使用) * * @author wujiuye * @date 2020/03/04 */ @Target({ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface EsField { /** * 字段类型 * * @return */ String type() default "text"; }
例如:
public class Product { /** * 商品id */ @EsField(type = "integer") private Integer leId; /** * 品牌id */ @EsField(type = "integer") private Integer brandId; /** * 品牌名称 */ private String brandName; /** * 日期 */ private String date; }
实现注解EsField
的解析,生成EsIndexMappings
对象。
/** * 用于代码创建索引(仅测试使用) * * @author wujiuye * @date 2020/03/04 */ public class EsIndexMappings { private boolean dynamic = false; private Map<String, Map<String, Object>> properties; /** * 生成索引字段映射信息 * * @param dynamic * @param type * @return */ public static EsIndexMappings byType(boolean dynamic, Class<?> type) { EsIndexMappings esIndexMappings = new EsIndexMappings(); esIndexMappings.setDynamic(dynamic); esIndexMappings.setProperties(new HashMap<>()); Field[] fields = type.getDeclaredFields(); for (Field field : fields) { Map<String, Object> value = new HashMap<>(); EsField esField = field.getAnnotation(EsField.class); if (esField == null) { value.put("type", "text"); value.put("index", true); } else { value.put("type", esField.type()); value.put("index", esField.index()); } esIndexMappings.getProperties().put(field.getName(), value); } return esIndexMappings; } }
创建索引方法的实现
/** * 创建索引(仅测试使用) * * @param index 索引名称 * @param mappings 索引描述 * @param shards 分片数 * @param replicas 副本数 */ public void createIndex(String index, EsIndexMappings mappings, int shards, int replicas) throws Exception { if (this.existIndex(index)) { return; } CreateIndexRequest request = new CreateIndexRequest(index); request.settings(Settings.builder() // 分片数 .put("index.number_of_shards", shards) // 副本数 .put("index.number_of_replicas", replicas)); // 指定mappings request.mapping(JSON.toJSONString(mappings), XContentType.JSON); CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); if (!res.isAcknowledged()) { throw new RuntimeException("所以创建失败!"); } }
使用例子:
elasticService.createIndex(INDEX, EsIndexMappings.byType(false, Product.class), 1, 1);
在插入对象时,我们可能会有指定文档id
的需求,因此,为了封装更通用的插入和批量插入方法,需要抽象一个中间对象EsEntity
。
public class EsEntity { /** * 索引的_id,不指定则使用es自动生成的 */ private String id; /** * 不转中间对象,直接转为json字符串,避免批量插入浪费内存资源 */ private String jsonData; }
提供将任意对象转为EsEntity
的静态方法,支持指定id
和不指定id
,当不指定id
时,ES
会自动生成。
/** * 将任意类型对象转为EsEntity * 不指定_id * * @param obj 一个文档(记录) * @param <T> * @return */ public static <T> EsEntity objToElasticEntity(T obj) { return objToElasticEntity(null, obj); } /** * 将任意类型对象转为EsEntity * * @param id null:不指定_id,非null:指定_id * @param obj 一个文档(记录) * @param <T> * @return */ public static <T> EsEntity objToElasticEntity(Integer id, T obj) { EsEntity elasticEntity = new EsEntity(); String data = JSON.toJSONString(obj); elasticEntity.setId(id == null ? null : String.valueOf(id)); elasticEntity.setData(data); return elasticEntity; }
插入和批量插入的实现:
/** * 插入或更新单条记录 * * @param index index * @param entity 对象 */ public void insertOrUpdate(String index, EsEntity entity) throws Exception { IndexRequest request = new IndexRequest(index); request.id(entity.getId()); request.source(entity.getData(), XContentType.JSON); IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT); if (response.status() != RestStatus.OK) { throw new RuntimeException(response.toString()); } } /** * 批量插入数据 * * @param index index * @param list 带插入列表 */ public void insertBatch(String index, List<EsEntity> list) throws Exception { BulkRequest request = new BulkRequest(); list.forEach(item -> request.add(new IndexRequest(index) .id(item.getId()) .source(item.getData(), XContentType.JSON))); BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT); if (response.hasFailures()) { throw new RuntimeException(response.buildFailureMessage()); } }
批量插入的使用例子:
List<Product> products = new ArrayList<>(); elasticService.insertBatch(INDEX, products.stream().map(EsEntity::objToElasticEntity) .collect(Collectors.toList()));
总结
个人更倾向于基于API
封装的方式,简单通用。但是要注意,批量插入数据时,不要产生太多的中间对象,造成内存空间浪费。比如从数据库查询出来的结果转成中间对象,又转成Map
对象再插入ES
。
当然,如果团队成员不认可这种方式,觉得不好,我也会选择跟着现有项目中的使用方式。