title: 一周一个中间件-ES搜索引擎
date: 2019-09-19 18:43:36
– 中间件
– 搜索引擎

## 前言

> 在众多搜索引擎中,solr,es是我所知道其他公司最为广泛使用的中间件。他可以解决你复杂的搜索需求。当你需要在大量数据的情况下搜索一下关键字,使用mysql的like查询是非常缓慢的,而es可以做到近实时的搜索。

## 背景
> 我们公司最近对我们的fungo的游戏,用户,文章提出了更加复杂的搜索要求,要求对指定的关键字进行相似度匹配。
> 例如 搜索 ‘fungo小助手’ 搜索的结果应该含有fungo小助手,还应有只含有fungo关键字的东西,还应有只含有小助手关键字的东西。并且根据所含元素的多少,进行优先排名。
> 面对这些需求,以我对mysql和现在java框架的了解,无法解决这个问题。使用mysql的like功能只能检索到‘fungo小助手’的关键字。所以百度一下主流的解决方案。发现大部分公司都会使用solr和es这些搜索框架,来作为中间件解决复杂搜索功能。

## 知识准备
– node 节点

> 就是一个es实例。

– cluster 集群
– 集群健康
green yellow red

– 主节点

– 数据节点
数据节点主要是存储索引数据的节点,主要对文档进行增删改查操作,聚合操作等。数据节点对cpu,内存,io要求较高, 在优化的时候需要监控数据节点的状态,当资源不够的时候,需要在集群中添加新的节点。

> 具有相同的cluster.name的node节点集合。

– index 索引

> 一个用来指向一个或者多个分片的逻辑命名空间。

– shard 分片

> 最小级别的工作单元,他只是保存了索引中的所有数据的一部分,最重要的分片就是一个Lucene实例。他本身就是一个完整的搜索引擎。我们的文档存储在分片中,并且在分片中被索引,我们的应用程序不会直接与他通信,而是直接与索引通信。

> 分片是集群中分发数据的关键,文档数据存储在分片中,分片分配到集群中的节点上,当你的集群扩容和缩容时,es会主动在你的节点迁移分片,以使集群保持平衡。

> 分片分为主分片和复制分片。你的索引的文档属于一个单独的主分片,主分片的数量决定你的索引最多存储多少数据。复制分片只是主分片的一个副本。做一个高可用,防止主分片出现故障,造成数据丢失,且对外提供读请求。索引建成后主分片数据就固定了,但是复制分片可以随时调整。

– 倒排索引

> Lucene的倒排索引实现比关系型数据更快的过滤。特别他对多条件的过滤支持非常好。一个字段由一个自己的倒排索引。18,20.这些叫做term,而[1,3]就是posting list.

> Posting list就是一个int的数组。存储所有符合某个term的文档id.
> term dictionary 和 term index
> 假如我们由很多term,就是由很多18.20….。如果我们查询某个term一定很慢。因为term没有排序。需要全部过滤一遍才能查到。这样我们可以使用二分查找方式。这个就是term dictionary.可以用logN次磁盘查到目标。但是磁盘读取仍然是非常昂贵。所以引进term index.他就像一本字典的大的章节表。比如:A开头的term ……… Xxx页;C开头的term ……… Xxx页;E开头的term ………Xxx页。实际上term index是一颗trie树.这个树不会包含所有的term,它包含是term的一些前缀,通过term index可以快速定位到term dictionary的offset,然后从这个位置往后顺序查找。再加上一些压缩技术(Lucene Finite State Transducers).term index 尺寸是term的尺寸的几十分之一,使得内存缓存整个term index变得可能。term index在内存中以FST的形式保存的,特点非常节省内存。term dictionary因为在磁盘上是以分block的方式保存的,一个block内部利用公共前缀压缩,比如都是Ab开头的单词就可以把Ab省去。这样term dictionary可以比b-tree更节约磁盘空间。
> 例子: 查询过滤条件 age=18 的过程就是先从term index找到18在term dictionary的大概位置,然后再从term dictionary里精确地找到18这个term,然后得到一个posting list或者一个指向posting list位置的指针。

> skip list数据结构。同时遍历各个属性的posting list.互相skip.使用bitset数据结构。



![Image text](//output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/elas_0202.png)

– es

> es面向文档,意味着他可以存储整个对象和文档(document).并且索引每个文档内容使其可以被搜索到。你可以对文档进行索引,搜索,排序,过滤。这就是es可以执行复杂的全文搜索的原因。

## 参考文献
Elasticsearch权威指南(中文版).pdf [提取码:c4th](//pan.baidu.com/s/1bq5fAqju8ZElULt4jBHSKw)

阮一峰的网络日志 [全文搜索引擎 Elasticsearch 入门教程](//www.ruanyifeng.com/blog/2017/08/elasticsearch.html)

Elasticsearch linux安装包 elasticsearch-7.2.0-linux-x86_64.tar.gz [提取码:ya9s](//pan.baidu.com/s/1HPEMzaBcmdaiYxnYtY2FPw)

Elasticsearch 中文分词器 elasticsearch-analysis-ik-master [提取码:o2ih](//pan.baidu.com/s/1kx79pepNSvT9EP3AQZvMtw)

Elasticsearch 拼音分词器 elasticsearch-analysis-pinyin-master [提取码:qsd1](//pan.baidu.com/s/1_V7ZiaxQBwRlMUgtTotXUw)


Logstash linux安装包 logstash-7.2.0.tar.gz [提取码:se1s](//pan.baidu.com/s/1eblvZ9n_t4RnwGQ2AedziQ)

阿里云Elasticsearch产品文档介绍 [阿里云Elasticsearch](//help.aliyun.com/product/57736.html?spm=a2c4g.11186623.6.540.2ad8731ct6WbSY)

Es为什么比MYSQL快 [博客链接](//blog.csdn.net/u011635492/article/details/81023158)
<!– more –>

## 安装es


# ======================== Elasticsearch Configuration =========================

# ———————————- Cluster ———————————–
# Use a descriptive name for your cluster:
cluster.name: my-application
# ———————————— Node ————————————
# Use a descriptive name for the node:
node.name: node-1
# Add custom attributes to the node:
#node.attr.rack: r1
# ———————————– Paths ————————————
# Path to directory where to store the data (separate multiple locations by comma):
#path.data: /path/to/data
# Path to log files:
#path.logs: /path/to/logs
# ———————————– Memory ———————————–
# Lock the memory on startup:
#bootstrap.memory_lock: true
# Elasticsearch performs poorly when the system is swapping the memory.
# ———————————- Network ———————————–
# Set the bind address to a specific IP (IPv4 or IPv6):
# Set a custom port for HTTP:
http.port: 9200
# For more information, consult the network module documentation.
# ——————————— Discovery ———————————-
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is [“”, “[::1]”]
#discovery.seed_hosts: [“host1”, “host2”]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#cluster.initial_master_nodes: [“node-1”, “node-2”]
cluster.initial_master_nodes: [“node-1”]

http.cors.enabled: true
http.cors.allow-origin: “*”
# For more information, consult the discovery and cluster formation module documentation.
# ———————————- Gateway ———————————–
# Block initial recovery after a full cluster restart until N nodes are started:
#gateway.recover_after_nodes: 3
# For more information, consult the gateway module documentation.
# ———————————- Various ———————————–
# Require explicit names when deleting indices:
#action.destructive_requires_name: true

切换到非root用户,使用vim console.out 建立文件。
使用nohup ./elasticsearch > console.out & 启动es,并将输出指向console.out文件。


![Image text](//output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/15690363408763.png)

## es的可视化

git地址 //github.com/mobz/elasticsearch-head
在linux中 git clone //github.com/mobz/elasticsearch-head

cd elasticsearch-head
npm install
npm run start

open //ip:9100/

![Image text](//output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/156904197022.png)

## es客户端操作

启动一个spring boot工程。
es.cluster-nodes.ip= xx.xx.xx.xx
es.cluster-nodes.port= 9200
es.cluster-nodes.index= uat-cloudcmmpost
es.cluster-node.type= CmmPost


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.mapper.Wrapper;
import com.baomidou.mybatisplus.plugins.Page;
import com.fungo.community.config.NacosFungoCircleConfig;
import com.fungo.community.controller.PostController;
import com.fungo.community.dao.service.CmmPostDaoService;
import com.fungo.community.entity.CmmPost;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

* <p>ES搜索引擎</p>
* @Author: dl.zhang
* @Date: 2019/7/24
public class ESDAOServiceImpl {

private static final Logger LOGGER = LoggerFactory.getLogger(ESDAOServiceImpl.class);

private RestHighLevelClient client;

private NacosFungoCircleConfig nacosFungoCircleConfig;
private CmmPostDaoService postService;

public void init() {
client = new RestHighLevelClient(
new HttpHost(nacosFungoCircleConfig.getEsHttpIp(), nacosFungoCircleConfig.getEsHttpPort(), “http”)
// new HttpHost(“localhost”, 9201, “http”)

public void destroy(){
try {
} catch (IOException e) {


public List<CmmPost> addESPosts() {
// Wrapper<CmmPost> wrapperCmmPost = new EntityWrapper<>();
// List<CmmPost> posts = postService.selectList(wrapperCmmPost);
CmmPost param = new CmmPost();
CmmPost cmmPost = postService.selectById(param);
try {
// 创建索引
IndexRequest request = new IndexRequest(nacosFungoCircleConfig.getIndex());
// 准备文档数据
String jsonStr = JSON.toJSONString(cmmPost);
// 转成 MAP
Map<String, Object> jsonMap = JSON.parseObject(jsonStr, Map.class);
// jsonMap.put(“createdAt”, new Date());
//Document source provided as a Map which gets automatically converted to JSON format

client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
public void onResponse(IndexResponse indexResponse) {
public void onFailure(Exception e) {

}catch (Exception e){
return null;

public Page<CmmPost> getAllPosts(String keyword, int page, int limit ) {
Page<CmmPost> postPage = new Page<>();
try {

// 1、创建search请求
SearchRequest searchRequest = new SearchRequest(nacosFungoCircleConfig.getIndex());
// 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
if(keyword != null && !””.equals(keyword)){
// BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// boolQueryBuilder.must(QueryBuilders.wildcardQuery(“title”,keyword));
// sourceBuilder.query(boolQueryBuilder);
MatchQueryBuilder matchQueryBuilder1 = QueryBuilders.matchQuery(“state”,1);
MatchQueryBuilder matchQueryBuilder2 = QueryBuilders.matchQuery(“title”,keyword);
MatchQueryBuilder matchQueryBuilder3 = QueryBuilders.matchQuery(“content”,keyword);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
BoolQueryBuilder childBoolQueryBuilder = new BoolQueryBuilder()
// sourceBuilder.query( QueryBuilders.termQuery(“title”, keyword));
// 结果开始处
sourceBuilder.from((page-1)*limit);// sourceBuilder.from(0);
// 查询结果终止处
sourceBuilder.size(page*limit);// sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

sourceBuilder.sort(new ScoreSortBuilder().order( SortOrder.DESC));
sourceBuilder.sort(new FieldSortBuilder(“watch_num”).order(SortOrder.DESC));


SearchResponse searchResponse = client.search(searchRequest,RequestOptions.DEFAULT);


RestStatus status = searchResponse.status();
TimeValue took = searchResponse.getTook();
Boolean terminatedEarly = searchResponse.isTerminatedEarly();
boolean timedOut = searchResponse.isTimedOut();

int totalShards = searchResponse.getTotalShards();
int successfulShards = searchResponse.getSuccessfulShards();
int failedShards = searchResponse.getFailedShards();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
// failures should be handled here

SearchHits hits = searchResponse.getHits();

TotalHits totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();

SearchHit[] searchHits = hits.getHits();

List<CmmPost> list = new ArrayList<>();
for (SearchHit hit : searchHits) {
// do something with the SearchHit

String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();

String sourceAsString = hit.getSourceAsString(); //取成json串
JSONObject jsonObj = (JSONObject) JSON.parse(sourceAsString);
CmmPost cmmPost= JSONObject.toJavaObject(jsonObj,CmmPost.class);
// CmmPost cmmPost = (CmmPost) JSON.parse( sourceAsString );
// Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
String documentTitle = (String) sourceAsMap.get(“title”);
List<Object> users = (List<Object>) sourceAsMap.get(“user”);
Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get(“innerObject”);
// LOGGER.info(“index:” + index + ” type:” + type + ” id:” + id);
// LOGGER.info(sourceAsString);

/*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField highlight = highlightFields.get(“title”);
Text[] fragments = highlight.fragments();
String fragmentString = fragments[0].string();*/


// SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// SearchRequest rq = new SearchRequest();
// //索引
// rq.indices(index);
// //各种组合条件
// rq.source(sourceBuilder);
// //请求
// System.out.println(rq.source().toString());
// SearchResponse rp = client.search(rq);

}catch (Exception e){
finally {
try {
} catch (IOException e) {
return postPage;


import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.*;
import org.springframework.beans.factory.annotation.Autowired;

* <p></p>
* @Author: dl.zhang
* @Date: 2019/10/17
public class AliESRestClient {

private static final RequestOptions COMMON_OPTIONS;

private static RestHighLevelClient highClient;

static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// 默认缓冲限制为100MB,此处修改为30MB。
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();

public static void initClinet(){
NacosFungoCircleConfig nacosFungoCircleConfig = new NacosFungoCircleConfig();
// 阿里云ES集群需要basic auth验证。
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nacosFungoCircleConfig.getEsUser(), nacosFungoCircleConfig.getEsPassword()));

// 通过builder创建rest client,配置http client的HttpClientConfigCallback。
// 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
RestClientBuilder builder = RestClient.builder(new HttpHost(nacosFungoCircleConfig.getEsHttpIp(), 9200))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// RestHighLevelClient实例通过REST low-level client builder进行构造。
highClient = new RestHighLevelClient(builder);
// return highClient;
public static RestHighLevelClient getAliEsHighClient(){
if(highClient != null){
return highClient;
}else {
return highClient;
public static RequestOptions getCommonOptions(){

public Page<Game> searchGame(){
try {
RestHighLevelClient highClient = AliESRestClient.getAliEsHighClient();
RequestOptions COMMON_OPTIONS = AliESRestClient.getCommonOptions();
// 创建request。
Map<String, Object> jsonMap = new HashMap<>();
// field_01、field_02为字段名,value_01、value_02为对应的值。
jsonMap.put(“{field_01}”, “{value_01}”);
jsonMap.put(“{field_02}”, “{value_02}”);
IndexRequest indexRequest = new IndexRequest(“{index_name}”, “{type_name}”, “{doc_id}”).source(jsonMap);

// 同步执行,并使用自定义RequestOptions(COMMON_OPTIONS)。
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);

long version = indexResponse.getVersion();

System.out.println(“Index document successfully! ” + version);
DeleteRequest request = new DeleteRequest(“{index_name}”, “{type_name}”, “{doc_id}”);
DeleteResponse deleteResponse = highClient.delete(request, COMMON_OPTIONS);

System.out.println(“Delete document successfully! \n” + deleteResponse.toString() + “\n” + deleteResponse.status());

}catch (IOException e){
LOGGER.error( “haode”,e );
return null;

## 分布式集群

> 当杀死主节点,必须再次选举出主节点,来让集群的功能可用。当我们删除主节点,主机点上的主分片也就丢失了,所在的索引也就不能的正常工作。这时候就需要将其他的节点的副本提升为主分片。这样就可以使用es集群了。


1. 客户端给 Node 1 发送get请求。

2. 节点使用文档的 _id 确定文档属于分片 0 。分片 0 对应的复制分片在三个节点上都有。此时,它转发请求到 Node 2 。

3. Node 2 返回endangered给 Node 1 然后返回给客户端


### 搭建

> 因为只有二台机器,所以只有在这二台机器上演示es三节点集群。
> 214 9200 9300
> 168 9200 9300
> 168 9201 9301


# ======================== Elasticsearch Configuration =========================
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
# Please consult the documentation for further information on configuration options:
# //www.elastic.co/guide/en/elasticsearch/reference/index.html
# ———————————- Cluster ———————————–
# Use a descriptive name for your cluster:
cluster.name: my-application
# ———————————— Node ————————————
# Use a descriptive name for the node:
node.name: node-1
node.master: true
# Add custom attributes to the node:
#node.attr.rack: r1
# ———————————– Paths ————————————
# Path to directory where to store the data (separate multiple locations by comma):
#path.data: /path/to/data
# Path to log files:
#path.logs: /path/to/logs
# ———————————– Memory ———————————–
# Lock the memory on startup:
#bootstrap.memory_lock: true
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
# Elasticsearch performs poorly when the system is swapping the memory.
# ———————————- Network ———————————–
# Set the bind address to a specific IP (IPv4 or IPv6):
# Set a custom port for HTTP:
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: “*”
#transport.tcp.port: 9300
# For more information, consult the network module documentation.
# ——————————— Discovery ———————————-
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is [“”, “[::1]”]
discovery.seed_hosts: [“”, “::1”]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#cluster.initial_master_nodes: [“node-1”, “node-2”]
cluster.initial_master_nodes: [“node-1”]

# For more information, consult the discovery and cluster formation module documentation.
# ———————————- Gateway ———————————–
# Block initial recovery after a full cluster restart until N nodes are started:
#gateway.recover_after_nodes: 3
# For more information, consult the gateway module documentation.
# ———————————- Various ———————————–
# Require explicit names when deleting indices:
#action.destructive_requires_name: true

修改从节点 168机器9200 elasticsearch.yml
# ======================== Elasticsearch Configuration =========================
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
# Please consult the documentation for further information on configuration options:
# //www.elastic.co/guide/en/elasticsearch/reference/index.html
# ———————————- Cluster ———————————–
# Use a descriptive name for your cluster:
cluster.name: my-application
# ———————————— Node ————————————
# Use a descriptive name for the node:
node.name: node-2
node.master: false
# Add custom attributes to the node:
#node.attr.rack: r1
# ———————————– Paths ————————————
# Path to directory where to store the data (separate multiple locations by comma):
#path.data: /path/to/data
# Path to log files:
#path.logs: /path/to/logs
# ———————————– Memory ———————————–
# Lock the memory on startup:
#bootstrap.memory_lock: true
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
# Elasticsearch performs poorly when the system is swapping the memory.
# ———————————- Network ———————————–
# Set the bind address to a specific IP (IPv4 or IPv6):
# Set a custom port for HTTP:
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: “*”
# For more information, consult the network module documentation.
# ——————————— Discovery ———————————-
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is [“”, “[::1]”]
#discovery.seed_hosts: [“host1”, “host2”]
discovery.seed_hosts: [“”]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#cluster.initial_master_nodes: [“node-1”, “node-2”]
# For more information, consult the discovery and cluster formation module documentation.
# ———————————- Gateway ———————————–
# Block initial recovery after a full cluster restart until N nodes are started:
#gateway.recover_after_nodes: 3
# For more information, consult the gateway module documentation.
# ———————————- Various ———————————–
# Require explicit names when deleting indices:
#action.destructive_requires_name: true

修改从节点 168机器9201 elasticsearch.yml
# ======================== Elasticsearch Configuration =========================
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
# Please consult the documentation for further information on configuration options:
# //www.elastic.co/guide/en/elasticsearch/reference/index.html
# ———————————- Cluster ———————————–
# Use a descriptive name for your cluster:
cluster.name: my-application
# ———————————— Node ————————————
# Use a descriptive name for the node:
node.name: node-3
node.master: false
# Add custom attributes to the node:
#node.attr.rack: r1
# ———————————– Paths ————————————
# Path to directory where to store the data (separate multiple locations by comma):
#path.data: /path/to/data
# Path to log files:
#path.logs: /path/to/logs
# ———————————– Memory ———————————–
# Lock the memory on startup:
#bootstrap.memory_lock: true
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
# Elasticsearch performs poorly when the system is swapping the memory.
# ———————————- Network ———————————–
# Set the bind address to a specific IP (IPv4 or IPv6):

# Set a custom port for HTTP:
http.port: 9201
transport.tcp.port: 9301
http.cors.enabled: true
http.cors.allow-origin: “*”

# For more information, consult the network module documentation.
# ——————————— Discovery ———————————-
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is [“”, “[::1]”]
#discovery.seed_hosts: [“host1”, “host2”]
discovery.seed_hosts: [“”]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#cluster.initial_master_nodes: [“node-1”, “node-2”]
# For more information, consult the discovery and cluster formation module documentation.
# ———————————- Gateway ———————————–
# Block initial recovery after a full cluster restart until N nodes are started:
#gateway.recover_after_nodes: 3
# For more information, consult the gateway module documentation.
# ———————————- Various ———————————–
# Require explicit names when deleting indices:
#action.destructive_requires_name: true


cluster.name: ES-Cluster

node.name: ES-master-

node.master: true

node.data: false

path.data: /data/ES-Cluster/master/ES-master-,/data/ES-Cluster/master/ES-master-

path.logs: /data/ES-Cluster/master/ES-master-

bootstrap.memory_lock: true


network.tcp.no_delay: true

network.tcp.keep_alive: true

network.tcp.reuse_address: true

network.tcp.send_buffer_size: 128mb

network.tcp.receive_buffer_size: 128mb

transport.tcp.port: 9301

transport.tcp.compress: true

http.max_content_length: 200mb

http.cors.enabled: true

http.cors.allow-origin: “*”

http.port: 9201

discovery.zen.ping.unicast.hosts: [“”, “”,“”] #在Elasticsearch7.0版本已被移除,配置错误

discovery.zen.minimum_master_nodes: 2 #在Elasticsearch7.0版本已被移除,配置无效

discovery.zen.fd.ping_timeout: 120s #在Elasticsearch7.0版本已被移除,配置无效

discovery.zen.fd.ping_retries: 6 #在Elasticsearch7.0版本已被移除,配置无效

discovery.zen.fd.ping_interval: 15s #在Elasticsearch7.0版本已被移除,配置无效

discovery.seed_hosts: [“”, “”,“”]

cluster.initial_master_nodes: [“”, “”,“”]

cluster.fault_detection.leader_check.interval: 15s

discovery.cluster_formation_warning_timeout: 30s
#Elasticsearch7新增参数,启动后30秒内,如果集群未形成,那么将会记录一条警告信息,警告信息未master not fount开始,默认为10秒

cluster.join.timeout: 30s

cluster.publish.timeout: 90s

cluster.routing.allocation.cluster_concurrent_rebalance: 32

cluster.routing.allocation.node_concurrent_recoveries: 32

cluster.routing.allocation.node_initial_primaries_recoveries: 32
## JVM configuration

## IMPORTANT: JVM heap size
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
## -Xms4g
## -Xmx4g
## See //www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
# 最重要的修改

## Expert settings
## All settings below this section are considered
## expert settings. Don’t tamper with them unless
## you understand what you are doing

## GC configuration

## G1GC Configuration
# NOTE: G1GC is only supported on JDK version 10 or later.
# To use G1GC uncomment the lines below.
# 10-:-XX:-UseConcMarkSweepGC
# 10-:-XX:-UseCMSInitiatingOccupancyOnly
# 10-:-XX:+UseG1GC
# 10-:-XX:InitiatingHeapOccupancyPercent=75

## DNS cache policy
# cache ttl in seconds for positive DNS lookups noting that this overrides the
# JDK security property networkaddress.cache.ttl; set to -1 to cache forever
# cache ttl in seconds for negative DNS lookups noting that this overrides the
# JDK security property networkaddress.cache.negative ttl; set to -1 to cache
# forever

## optimizations

# pre-touch memory pages used by the JVM during initialization

## basic

# explicitly set the stack size

# set to headless, just in case

# ensure UTF-8 encoding by default (e.g. filenames)

# use our provided JNA always versus the system one

# turn off a JDK optimization that throws away stack traces for common
# exceptions because stack traces are important for debugging

# flags to configure Netty

# log4j 2


## heap dumps

# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM

# specify an alternative path for heap dumps; ensure the directory exists and
# has sufficient space

# specify an alternative path for JVM fatal error logs

## JDK 8 GC logging


# JDK 9+ GC logging
# due to internationalization enhancements in JDK 9 Elasticsearch need to set the provider to COMPAT otherwise
# time/date parsing will break in an incompatible way for some date patterns and locals


> 切换到root用户
sysctl -w vm.max_map_count=262144
sysctl -a|grep vm.max_map_count
vm.max_map_count = 262144

建立用户 useradd es
建立用户密码 passwd es
chmod 777 config/*
chmod 777 logs/* 很多问题是因为es用户没有权限导致的。尽可能多的赋予es用户。

> nohup ./elasticsearch > console.out &

如图 集群节点建立完成

![Image text](//output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/es-cluster15710408203718.png)


## 索引数据

> es是一个分布式的文档(document)存储引擎,可以实时存储并检索出数据结构–序列化的JSON文档,通常,我们认为对象(object)和文档(documnet)是等价相同的。

检索文档 GET /megacorp/employee/1
“_index” : “megacorp”,
“_type” : “employee”,
“_id” : “1”,
“_version” : 1,
“found” : true,
“_source” : {
“first_name” : “John”,
“last_name” : “Smith”,
“age” : 25,
“about” : “I love to go rock climbing”,
“interests”: [ “sports”, “music” ]

|名称 | 备注|
| —–: | :—-: |
|_index|索引名称 类似mysql数据库的表 索引这个名字必须是全部小写,不能以下划线开头,不能包含逗号。|
|_type|索引对象类型 类似每个对象都属于一个类class 可以是大写或小写,不能包含下划线或逗号|
|_id|索引主键 它与 _index 和 _type 组合时,就可以在ELasticsearch中唯一标识一个文档|

– 使用自己的ID
– ES可以自增ID



> shard = hash(routing) % number_of_primary_shards



1. 客户端给 Node 1 发送新建、索引或删除请求。

2. 节点使用文档的 _id 确定文档属于分片 0 。它转发请求到 Node 3 ,分片 0 位于这个节点上。

3. Node 3 在主分片上执行请求,如果成功,它转发请求到相应的位于 Node 1 和 Node 2的复制节点上。当所有的复制节点报告成功, Node 3 报告成功到请求的节点,请求的节点再报告给客户端。

你可以设置replication 为 async,这样es在主分片执行成功后就会相应客户端。但是es依旧会转发请求给复制节点。

> int( (primary + number_of_replicas) / 2 ) + 1

consistency允许值 one,all,默认的quorum或过半分片.

## 搜索


1. 在类似于 gender 或者 age 这样的字段上使用结构化查询, join_date 这样的字段上使用排序,就像SQL的结构化查询一
2. 全文检索,可以使用所有字段来匹配关键字,然后按照关联性(relevance)排序返回结果。
3. 或者结合以上两条。

| —–: | :—-:|
|领域特定语言查询(Query DSL)|Elasticsearch使用的灵活的、强大的查询语言|

“hits”: {
“total”: 14,
“hits”: [
“_index”: “us”,
“_type”: “tweet”,
“_id”: “7”,
“_score”: 1,
“_source”: {
“date”: “2014-09-17”,
“name”: “John Smith”,
“tweet”: “The Query DSL is really powerful and flexible”,
“user_id”: 2
“max_score”: 1
“took”: 4,
“_shards”: {
“failed”: 0,
“successful”: 10,
“total”: 10
“timed_out”: false

响应中最重要的部分是 hits ,它包含了 total 字段来表示匹配到的文档总数, hits 数组还包含了匹配到的前10条数据。
hits 数组中的每个结果都包含 _index 、 _type 和文档的 _id 字段,被加入到 _source 字段中这意味着在搜索结果中我们将
每个节点都有一个 _score 字段,这是相关性得分(relevance score),它衡量了文档与查询的匹配程度。默认的,返回的结
果中关联性最大的文档排在首位;这意味着,它是按照 _score 降序排列的。这种情况下,我们没有指定任何查询,所以所有
文档的相关性是一样的,因此所有结果的 _score 都是取得一个中间值 1
max_score 指的是所有文档匹配查询中 _score 的最大值。




_shards 节点告诉我们参与查询的分片数( total 字段),有多少是成功的( successful字段),有多少的是失败的( failed 字段)。



## 数据同步


# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
jdbc {
type => “uat”
jdbc_connection_string => “jdbc:mysql://xx.xx.xx.xx:3306/fungo_games_uat”
jdbc_user => “xxx”
jdbc_password => “xxxx”
jdbc_driver_library => “/opt/ELK/logstash-7.2.0/mysqltool/mysql-connector-java-8.0.15.jar”
jdbc_driver_class => “com.mysql.cj.jdbc.Driver”
jdbc_paging_enabled => “true”
jdbc_page_size => “5000”
use_column_value => true
tracking_column => “updated_at”
tracking_column_type => “timestamp”
lowercase_column_names => false
record_last_run => true
last_run_metadata_path => “/opt/ELK/logstash-7.2.0/config/station_parameter.txt”
clean_run => false
statement => “SELECT * FROM t_cmm_post where updated_at >= :sql_last_value order by updated_at desc “
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => “*/1 * * * *”

filter {

output {
if [type] == “uat” {
elasticsearch {
hosts => “xx.xx.xx.xx:9200”
index => “uat-cloudcmmpost”
document_type => “CmmPost”
document_id => “%{post_id}”

需要将mysql的连接包下载指定位置,jdbc_driver_library 来指定位置,
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch
直接 在bin文件内 ./logstash 启动 如果报错 可能需要删除data里面的 .lock 文件

## 备注

### 1 阿里云的解决思路

阿里云上面也是采用单个主机上搭建3个es节点的集群。性能分别是1核2G的配置.因为无法登录到logstash服务的机器上,所有 我们无法直接通过链接MYSQL服务的形式,就不能像我们上面展示那样通过sql语句去同步数据,这时候阿里云上有一个DTS服务,可以解决数据传输和同步。

### 2 附属内容
elasticsearch-head 无法连接elasticsearch的原因和解决

### 3 ES比mysql快

> 为什么ES比mysql快
> mysql只有term dictionary这一层,是以b-tree排序方式存储在磁盘上。检索一个term需要 若干次random access磁盘操作。但是es在此基础上添加term index来加速检索。term index以树的形式缓存内存中,减少磁盘读取次数。