ES搜索引擎-一篇文章就夠了

前言

在眾多搜索引擎中,solr,es是我所知道其他公司最為廣泛使用的中間件。他可以解決你複雜的搜索需求。當你需要在大量數據的情況下搜索一下關鍵字,使用mysql的like查詢是非常緩慢的,而es可以做到近實時的搜索。

背景

我們公司最近對我們的fungo的遊戲,用戶,文章提出了更加複雜的搜索要求,要求對指定的關鍵字進行相似度匹配。
例如 搜索 『fungo小助手』 搜索的結果應該含有fungo小助手,還應有隻含有fungo關鍵字的東西,還應有隻含有小助手關鍵字的東西。並且根據所含元素的多少,進行優先排名。
面對這些需求,以我對mysql和現在java框架的了解,無法解決這個問題。使用mysql的like功能只能檢索到『fungo小助手』的關鍵字。所以百度一下主流的解決方案。發現大部分公司都會使用solr和es這些搜索框架,來作為中間件解決複雜搜索功能。

知識準備

  • node 節點

就是一個es實例。

  • cluster 集群
    • 集群健康
      green yellow red

    • 主節點
      主資格節點的主要職責是和集群操作相關的內容,如創建或刪除索引,跟蹤哪些節點是群集的一部分,並決定哪些分片分配給相關的節點。穩定的主節點對集群的健康是非常重要的,默認情況下任何一個集群中的節點都有可能被選為主節點,索引數據和搜索查詢等操作會佔用大量的cpu,內存,io資源,為了確保一個集群的穩定,分離主節點和數據節點是一個比較好的選擇。

    • 數據節點
      數據節點主要是存儲索引數據的節點,主要對文檔進行增刪改查操作,聚合操作等。數據節點對cpu,內存,io要求較高, 在優化的時候需要監控數據節點的狀態,當資源不夠的時候,需要在集群中添加新的節點。

具有相同的cluster.name的node節點集合。

  • index 索引

一個用來指向一個或者多個分片的邏輯命名空間。

  • shard 分片

最小級別的工作單元,他只是保存了索引中的所有數據的一部分,最重要的分片就是一個Lucene實例。他本身就是一個完整的搜索引擎。我們的文檔存儲在分片中,並且在分片中被索引,我們的應用程序不會直接與他通信,而是直接與索引通信。

分片是集群中分發數據的關鍵,文檔數據存儲在分片中,分片分配到集群中的節點上,當你的集群擴容和縮容時,es會主動在你的節點遷移分片,以使集群保持平衡。

分片分為主分片和複製分片。你的索引的文檔屬於一個單獨的主分片,主分片的數量決定你的索引最多存儲多少數據。複製分片只是主分片的一個副本。做一個高可用,防止主分片出現故障,造成數據丟失,且對外提供讀請求。索引建成後主分片數據就固定了,但是複製分片可以隨時調整。

  • 倒排索引

Lucene的倒排索引實現比關係型數據更快的過濾。特別他對多條件的過濾支持非常好。一個字段由一個自己的倒排索引。18,20.這些叫做term,而[1,3]就是posting list.

Posting list就是一個int的數組。存儲所有符合某個term的文檔id.
term dictionary 和 term index
假如我們由很多term,就是由很多18.20….。如果我們查詢某個term一定很慢。因為term沒有排序。需要全部過濾一遍才能查到。這樣我們可以使用二分查找方式。這個就是term dictionary.可以用logN次磁盤查到目標。但是磁盤讀取仍然是非常昂貴。所以引進term index.他就像一本字典的大的章節表。比如:A開頭的term ……… Xxx頁;C開頭的term ……… Xxx頁;E開頭的term ………Xxx頁。實際上term index是一顆trie樹.這個樹不會包含所有的term,它包含是term的一些前綴,通過term index可以快速定位到term dictionary的offset,然後從這個位置往後順序查找。再加上一些壓縮技術(Lucene Finite State Transducers).term index 尺寸是term的尺寸的幾十分之一,使得內存緩存整個term index變得可能。term index在內存中以FST的形式保存的,特點非常節省內存。term dictionary因為在磁盤上是以分block的方式保存的,一個block內部利用公共前綴壓縮,比如都是Ab開頭的單詞就可以把Ab省去。這樣term dictionary可以比b-tree更節約磁盤空間。
例子: 查詢過濾條件 age=18 的過程就是先從term index找到18在term dictionary的大概位置,然後再從term dictionary里精確地找到18這個term,然後得到一個posting list或者一個指向posting list位置的指針。

聯合索引查詢
skip list數據結構。同時遍歷各個屬性的posting list.互相skip.使用bitset數據結構。

如圖所示

Image text

  • es

es面向文檔,意味着他可以存儲整個對象和文檔(document).並且索引每個文檔內容使其可以被搜索到。你可以對文檔進行索引,搜索,排序,過濾。這就是es可以執行複雜的全文搜索的原因。

參考文獻

Elasticsearch權威指南(中文版).pdf 提取碼:c4th

阮一峰的網絡日誌 全文搜索引擎 Elasticsearch 入門教程

Elasticsearch linux安裝包 elasticsearch-7.2.0-linux-x86_64.tar.gz 提取碼:ya9s

Elasticsearch 中文分詞器 elasticsearch-analysis-ik-master 提取碼:o2ih

Elasticsearch 拼音分詞器 elasticsearch-analysis-pinyin-master 提取碼:qsd1

Logstash中文文檔Logstash簡介

Logstash linux安裝包 logstash-7.2.0.tar.gz 提取碼:se1s

阿里雲Elasticsearch產品文檔介紹 阿里雲Elasticsearch

Es為什麼比MYSQL快 博客鏈接

如果無法下載請聯繫我 [email protected] ,或者評論下留下郵箱百度雲賬號,我會私發給您。

安裝es

通過參考文獻下載對應的es插件.通過xshell軟件和Xftp插件傳遞到指定得linux服務器。
tar解壓文件.進入bin文件內,因為es無法使用root權限去啟動,所以創建一個用戶和用戶組。去啟動es。
如要修改配置需要進去config文件內,修改主配置elasticsearch.yml

# ======================== Elasticsearch Configuration =========================

# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0
# Set a custom port for HTTP:
#
http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
cluster.initial_master_nodes: ["node-1"]

http.cors.enabled: true
http.cors.allow-origin: "*"
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

上面是默認的配置,不修改配置的默認使用9200端口。
切換到非root用戶,使用vim console.out 建立文件。
使用nohup ./elasticsearch > console.out & 啟動es,並將輸出指向console.out文件。

如圖

Image text

es的可視化

我們一般使用elasticsearch-head這個插件來連接es服務器來可視化es的索引和數據。
git地址 //github.com/mobz/elasticsearch-head
在linux中 git clone //github.com/mobz/elasticsearch-head

cd elasticsearch-head
npm install
npm run start

open //ip:9100/
如圖

Image text

es客戶端操作

啟動一個spring boot工程。
pom.xml添加一下es依賴

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.2.0</version>
        </dependency>

在配置文件中添加es信息

##es節點信息
es.cluster-nodes.ip= xx.xx.xx.xx
##es節點端口
es.cluster-nodes.port= 9200
##es索引名稱
es.cluster-nodes.index= uat-cloudcmmpost
##es索引類型
es.cluster-node.type= CmmPost

這裡省略了配置文件映射到java文件的過程,大家自行處理。

具體es使用方法

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.mapper.Wrapper;
import com.baomidou.mybatisplus.plugins.Page;
import com.fungo.community.config.NacosFungoCircleConfig;
import com.fungo.community.controller.PostController;
import com.fungo.community.dao.service.CmmPostDaoService;
import com.fungo.community.entity.CmmPost;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


/**
 * <p>ES搜索引擎</p>
 * @Author: dl.zhang
 * @Date: 2019/7/24
 */
@Repository
public class ESDAOServiceImpl {

    private static final Logger LOGGER = LoggerFactory.getLogger(ESDAOServiceImpl.class);

    private RestHighLevelClient client;

    @Autowired
    private NacosFungoCircleConfig nacosFungoCircleConfig;
    @Autowired
    private CmmPostDaoService postService;

    @PostConstruct
    public void init() {
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost(nacosFungoCircleConfig.getEsHttpIp(),   nacosFungoCircleConfig.getEsHttpPort(), "http")
                        //        new HttpHost("localhost", 9201, "http")
                ));
    }

    @PreDestroy
    public void destroy(){
        try {
            client.close();
        } catch (IOException e) {

        }
    }

    public List<CmmPost> addESPosts() {
//        Wrapper<CmmPost> wrapperCmmPost = new EntityWrapper<>();
//        List<CmmPost> posts =  postService.selectList(wrapperCmmPost);
        CmmPost param = new CmmPost();
        param.setId("b1f1f35d4b4242a0b794e17ed0d1d64a");
        CmmPost cmmPost = postService.selectById(param);
        try {
            // 創建索引
            IndexRequest request = new IndexRequest(nacosFungoCircleConfig.getIndex());
            // 準備文檔數據
            String jsonStr = JSON.toJSONString(cmmPost);
            // 轉成 MAP
            Map<String, Object> jsonMap = JSON.parseObject(jsonStr, Map.class);
//            jsonMap.put("createdAt", new Date());
            //Document source provided as a Map which gets automatically converted to JSON format
            request.source(jsonMap);

            client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                }
                @Override
                public void onFailure(Exception e) {

                }
            });
        }catch (Exception e){
            LOGGER.error("獲取es數據異常,索引id="+nacosFungoCircleConfig.getIndex(),e);
        }
        return null;
    }

    public Page<CmmPost> getAllPosts(String keyword, int page, int limit ) {
        Page<CmmPost> postPage = new Page<>();
        try {

            // 1、創建search請求
            SearchRequest searchRequest = new SearchRequest(nacosFungoCircleConfig.getIndex());
            searchRequest.types(nacosFungoCircleConfig.getSearchIndexType());
            // 2、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各種查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            if(keyword != null && !"".equals(keyword)){
//                BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
                //普通模糊匹配
//                boolQueryBuilder.must(QueryBuilders.wildcardQuery("title",keyword));
//                sourceBuilder.query(boolQueryBuilder);
                MatchQueryBuilder matchQueryBuilder1 = QueryBuilders.matchQuery("state",1);
                MatchQueryBuilder matchQueryBuilder2 = QueryBuilders.matchQuery("title",keyword);
                MatchQueryBuilder matchQueryBuilder3 = QueryBuilders.matchQuery("content",keyword);
                BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
                BoolQueryBuilder childBoolQueryBuilder = new BoolQueryBuilder()
                        .should(matchQueryBuilder2)
                        .should(matchQueryBuilder3);
                boolQueryBuilder.must(childBoolQueryBuilder);
                boolQueryBuilder.must(matchQueryBuilder1);
                sourceBuilder.query(boolQueryBuilder);
            }
//            sourceBuilder.query( QueryBuilders.termQuery("title", keyword));
            // 結果開始處
            sourceBuilder.from((page-1)*limit);//            sourceBuilder.from(0);
            // 查詢結果終止處
            sourceBuilder.size(page*limit);// sourceBuilder.size(10);
            sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

            //指定排序
            sourceBuilder.sort(new ScoreSortBuilder().order( SortOrder.DESC));
            sourceBuilder.sort(new FieldSortBuilder("watch_num").order(SortOrder.DESC));

            //將請求體加入到請求中
            searchRequest.source(sourceBuilder);


            //3、發送請求
            SearchResponse searchResponse = client.search(searchRequest,RequestOptions.DEFAULT);



            //4、處理響應
            //搜索結果狀態信息
            RestStatus status = searchResponse.status();
            TimeValue took = searchResponse.getTook();
            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
            boolean timedOut = searchResponse.isTimedOut();

            //分片搜索情況
            int totalShards = searchResponse.getTotalShards();
            int successfulShards = searchResponse.getSuccessfulShards();
            int failedShards = searchResponse.getFailedShards();
            for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
                // failures should be handled here
            }

            //處理搜索命中文檔結果
            SearchHits hits = searchResponse.getHits();

            TotalHits totalHits = hits.getTotalHits();
            float maxScore = hits.getMaxScore();

            SearchHit[] searchHits = hits.getHits();

            List<CmmPost> list = new ArrayList<>();
            for (SearchHit hit : searchHits) {
                // do something with the SearchHit

                String index = hit.getIndex();
                String type = hit.getType();
                String id = hit.getId();
                float score = hit.getScore();

                //取_source字段值
                String sourceAsString = hit.getSourceAsString(); //取成json串
                JSONObject jsonObj = (JSONObject) JSON.parse(sourceAsString);
                CmmPost cmmPost= JSONObject.toJavaObject(jsonObj,CmmPost.class);
//                CmmPost cmmPost = (CmmPost) JSON.parse( sourceAsString );
                list.add(cmmPost);
//                Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map對象
                //從map中取字段值
                /*
                String documentTitle = (String) sourceAsMap.get("title");
                List<Object> users = (List<Object>) sourceAsMap.get("user");
                Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
                */
//                LOGGER.info("index:" + index + "  type:" + type + "  id:" + id);
//                LOGGER.info(sourceAsString);

                //取高亮結果
                /*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                HighlightField highlight = highlightFields.get("title");
                Text[] fragments = highlight.fragments();
                String fragmentString = fragments[0].string();*/
            }
            postPage.setRecords(list);
            postPage.setTotal(Long.valueOf(totalHits.value).intValue());



//            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//            SearchRequest rq = new SearchRequest();
//            //索引
//            rq.indices(index);
//            //各種組合條件
//            rq.source(sourceBuilder);
//
//            //請求
//            System.out.println(rq.source().toString());
//            SearchResponse rp = client.search(rq);

        }catch (Exception e){
            LOGGER.error("獲取es數據異常,索引id="+nacosFungoCircleConfig.getIndex(),e);
        }
       finally {
           try {
               client.close();
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
        return postPage;
    }
}

這就可以使用es搜索引擎來解決複雜的搜索需求。

因為阿里雲上的es服務只有5.5,和6.3和6.7的版本,所有這裡又給你整理出一個阿里雲版本的RestHighLevelClient代碼。
es框架使用


import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.*;
import org.springframework.beans.factory.annotation.Autowired;


/**
 * <p></p>
 *
 * @Author: dl.zhang
 * @Date: 2019/10/17
 */
public class AliESRestClient {

    private static final RequestOptions COMMON_OPTIONS;

    private static RestHighLevelClient highClient;

    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        // 默認緩衝限制為100MB,此處修改為30MB。
        builder.setHttpAsyncResponseConsumerFactory(
                new HttpAsyncResponseConsumerFactory
                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }

    public static void initClinet(){
        NacosFungoCircleConfig nacosFungoCircleConfig = new NacosFungoCircleConfig();
        // 阿里雲ES集群需要basic auth驗證。
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //訪問用戶名和密碼為您創建阿里雲Elasticsearch實例時設置的用戶名和密碼,也是Kibana控制台的登錄用戶名和密碼。
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nacosFungoCircleConfig.getEsUser(), nacosFungoCircleConfig.getEsPassword()));

        // 通過builder創建rest client,配置http client的HttpClientConfigCallback。
        // 單擊所創建的Elasticsearch實例ID,在基本信息頁面獲取公網地址,即為ES集群地址。
        RestClientBuilder builder = RestClient.builder(new HttpHost(nacosFungoCircleConfig.getEsHttpIp(), 9200))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
        // RestHighLevelClient實例通過REST low-level client builder進行構造。
         highClient = new RestHighLevelClient(builder);
//        return highClient;
    }
    public static RestHighLevelClient getAliEsHighClient(){
        if(highClient != null){
            return highClient;
        }else {
            initClinet();
            return highClient;
        }
    }
    public static RequestOptions getCommonOptions(){
        return COMMON_OPTIONS;
    }
}

使用範例

public Page<Game> searchGame(){
        try {
            RestHighLevelClient highClient = AliESRestClient.getAliEsHighClient();
            RequestOptions COMMON_OPTIONS = AliESRestClient.getCommonOptions();
            // 創建request。
            Map<String, Object> jsonMap = new HashMap<>();
            // field_01、field_02為字段名,value_01、value_02為對應的值。
            jsonMap.put("{field_01}", "{value_01}");
            jsonMap.put("{field_02}", "{value_02}");
            //index_name為索引名稱;type_name為類型名稱;doc_id為文檔的id。
            IndexRequest indexRequest = new IndexRequest("{index_name}", "{type_name}", "{doc_id}").source(jsonMap);

            // 同步執行,並使用自定義RequestOptions(COMMON_OPTIONS)。
            IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);

            long version = indexResponse.getVersion();

            System.out.println("Index document successfully! " + version);
            //index_name為索引名稱;type_name為類型名稱;doc_id為文檔的id。與以上創建索引的名稱和id相同。
            DeleteRequest request = new DeleteRequest("{index_name}", "{type_name}", "{doc_id}");
            DeleteResponse deleteResponse = highClient.delete(request, COMMON_OPTIONS);

            System.out.println("Delete document successfully! \n" + deleteResponse.toString() + "\n" + deleteResponse.status());

            highClient.close();
        }catch (IOException e){
            LOGGER.error( "haode",e );
        }
        return null;
    }

分佈式集群

當殺死主節點,必須再次選舉出主節點,來讓集群的功能可用。當我們刪除主節點,主機點上的主分片也就丟失了,所在的索引也就不能的正常工作。這時候就需要將其他的節點的副本提升為主分片。這樣就可以使用es集群了。

所有的請求都會到主節點。這個節點我們稱為請求節點。然後分發到各個子節點,返回到請求節點,再返回給客戶端。
es集群查詢請求

  1. 客戶端給 Node 1 發送get請求。

  2. 節點使用文檔的 _id 確定文檔屬於分片 0 。分片 0 對應的複製分片在三個節點上都有。此時,它轉發請求到 Node 2 。

  3. Node 2 返回endangered給 Node 1 然後返回給客戶端

對於讀請求,為了平衡負載,請求節點會為每個請求選擇不同的分片——它會循環所有分片副本。

搭建

因為只有二台機器,所以只有在這二台機器上演示es三節點集群。
214 9200 9300
168 9200 9300
168 9201 9301

第一修改主節點配置elasticsearch.yml

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# //www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-1
node.master: true
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0
# Set a custom port for HTTP:
#
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
#transport.tcp.port: 9300
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.seed_hosts: ["127.0.0.1", "::1"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
cluster.initial_master_nodes: ["node-1"]

# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

修改從節點 168機器9200 elasticsearch.yml

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# //www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-2
node.master: false
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0
# Set a custom port for HTTP:
#
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
discovery.seed_hosts: ["39.105.18.214:9300"]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

修改從節點 168機器9201 elasticsearch.yml

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# //www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-3
node.master: false
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0

# Set a custom port for HTTP:
#
http.port: 9201
transport.tcp.port: 9301
http.cors.enabled: true
http.cors.allow-origin: "*"

# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
discovery.seed_hosts: ["39.105.18.214:9300"]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

配置內容解析

cluster.name: ES-Cluster
#ES集群名稱,同一個集群內的所有節點集群名稱必須保持一致

node.name: ES-master-10.150.55.94
#ES集群內的節點名稱,同一個集群內的節點名稱要具備唯一性

node.master: true
#允許節點是否可以成為一個master節點,ES是默認集群中的第一台機器成為master,如果這台機器停止就會重新選舉

node.data: false
#允許該節點存儲索引數據(默認開啟)
#關於Elasticsearch節點的角色功能詳解,請看://www.dockerc.com/elasticsearch-master-or-data/

path.data: /data/ES-Cluster/master/ES-master-10.150.55.94/data1,/data/ES-Cluster/master/ES-master-10.150.55.94/data2
#ES是搜索引擎,會創建文檔,建立索引,此路徑是索引的存放目錄,如果我們的日誌數據較為龐大,那麼索引所佔用的磁盤空間也是不可小覷的
#這個路徑建議是專門的存儲系統,如果不是存儲系統,最好也要有冗餘能力的磁盤,此目錄還要對elasticsearch的運行用戶有寫入權限
#path可以指定多個存儲位置,分散存儲,有助於性能提升,以至於怎麼分散存儲請看詳解//www.dockerc.com/elk-theory-elasticsearch/

path.logs: /data/ES-Cluster/master/ES-master-10.150.55.94/logs
#elasticsearch專門的日誌存儲位置,生產環境中建議elasticsearch配置文件與elasticsearch日誌分開存儲

bootstrap.memory_lock: true
#在ES運行起來後鎖定ES所能使用的堆內存大小,鎖定內存大小一般為可用內存的一半左右;鎖定內存後就不會使用交換分區
#如果不打開此項,當系統物理內存空間不足,ES將使用交換分區,ES如果使用交換分區,那麼ES的性能將會變得很差

network.host: 10.150.55.94
#es綁定地址,支持IPv4及IPv6,默認綁定127.0.0.1;es的HTTP端口和集群通信端口就會監聽在此地址上

network.tcp.no_delay: true
#是否啟用tcp無延遲,true為啟用tcp不延遲,默認為false啟用tcp延遲

network.tcp.keep_alive: true
#是否啟用TCP保持活動狀態,默認為true

network.tcp.reuse_address: true
#是否應該重複使用地址。默認true,在Windows機器上默認為false

network.tcp.send_buffer_size: 128mb
#tcp發送緩衝區大小,默認不設置

network.tcp.receive_buffer_size: 128mb
#tcp接收緩衝區大小,默認不設置

transport.tcp.port: 9301
#設置集群節點通信的TCP端口,默認就是9300

transport.tcp.compress: true
#設置是否壓縮TCP傳輸時的數據,默認為false

http.max_content_length: 200mb
#設置http請求內容的最大容量,默認是100mb

http.cors.enabled: true
#是否開啟跨域訪問

http.cors.allow-origin: 「*」
#開啟跨域訪問後的地址限制,*表示無限制

http.port: 9201
#定義ES對外調用的http端口,默認是9200

discovery.zen.ping.unicast.hosts: [「10.150.55.94:9301」, 「10.150.55.95:9301」,「10.150.30.246:9301」] #在Elasticsearch7.0版本已被移除,配置錯誤
#寫入候選主節點的設備地址,來開啟服務時就可以被選為主節點
#默認主機列表只有127.0.0.1和IPV6的本機迴環地址
#上面是書寫格式,discover意思為發現,zen是判定集群成員的協議,unicast是單播的意思,ES5.0版本之後只支持單播的方式來進行集群間的通信,hosts為主機
#總結下來就是:使用zen協議通過單播方式去發現集群成員主機,在此建議將所有成員的節點名稱都寫進來,這樣就不用僅靠集群名稱cluster.name來判別集群關係了

discovery.zen.minimum_master_nodes: 2 #在Elasticsearch7.0版本已被移除,配置無效
#為了避免腦裂,集群的最少節點數量為,集群的總節點數量除以2加一

discovery.zen.fd.ping_timeout: 120s #在Elasticsearch7.0版本已被移除,配置無效
#探測超時時間,默認是3秒,我們這裡填120秒是為了防止網絡不好的時候ES集群發生腦裂現象

discovery.zen.fd.ping_retries: 6 #在Elasticsearch7.0版本已被移除,配置無效
#探測次數,如果每次探測90秒,連續探測超過六次,則認為節點該節點已脫離集群,默認為3次

discovery.zen.fd.ping_interval: 15s #在Elasticsearch7.0版本已被移除,配置無效
#節點每隔15秒向master發送一次心跳,證明自己和master還存活,默認為1秒太頻繁,

discovery.seed_hosts: [「10.150.55.94:9301」, 「10.150.55.95:9301」,「10.150.30.246:9301」]
#Elasticsearch7新增參數,寫入候選主節點的設備地址,來開啟服務時就可以被選為主節點,由discovery.zen.ping.unicast.hosts:參數改變而來

cluster.initial_master_nodes: [「10.150.55.94:9301」, 「10.150.55.95:9301」,「10.150.30.246:9301」]
#Elasticsearch7新增參數,寫入候選主節點的設備地址,來開啟服務時就可以被選為主節點

cluster.fault_detection.leader_check.interval: 15s
#Elasticsearch7新增參數,設置每個節點在選中的主節點的檢查之間等待的時間。默認為1秒

discovery.cluster_formation_warning_timeout: 30s
#Elasticsearch7新增參數,啟動後30秒內,如果集群未形成,那麼將會記錄一條警告信息,警告信息未master not fount開始,默認為10秒

cluster.join.timeout: 30s
#Elasticsearch7新增參數,節點發送請求加入集群後,在認為請求失敗後,再次發送請求的等待時間,默認為60秒

cluster.publish.timeout: 90s
#Elasticsearch7新增參數,設置主節點等待每個集群狀態完全更新後發佈到所有節點的時間,默認為30秒

cluster.routing.allocation.cluster_concurrent_rebalance: 32
#集群內同時啟動的數據任務個數,默認是2個

cluster.routing.allocation.node_concurrent_recoveries: 32
#添加或刪除節點及負載均衡時並發恢復的線程個數,默認4個

cluster.routing.allocation.node_initial_primaries_recoveries: 32
#初始化數據恢復時,並發恢複線程的個數,默認4個
————————————————
版權聲明:本文為CSDN博主「運維工程師 Linke」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接://blog.csdn.net/qq_31547771/article/details/100665922

所有節點統一修改一下jvm.options

## JVM configuration

################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See //www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
# 最重要的修改 
-Xms2g
-Xmx2g

################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################

## GC configuration
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly

## G1GC Configuration
# NOTE: G1GC is only supported on JDK version 10 or later.
# To use G1GC uncomment the lines below.
# 10-:-XX:-UseConcMarkSweepGC
# 10-:-XX:-UseCMSInitiatingOccupancyOnly
# 10-:-XX:+UseG1GC
# 10-:-XX:InitiatingHeapOccupancyPercent=75

## DNS cache policy
# cache ttl in seconds for positive DNS lookups noting that this overrides the
# JDK security property networkaddress.cache.ttl; set to -1 to cache forever
-Des.networkaddress.cache.ttl=60
# cache ttl in seconds for negative DNS lookups noting that this overrides the
# JDK security property networkaddress.cache.negative ttl; set to -1 to cache
# forever
-Des.networkaddress.cache.negative.ttl=10

## optimizations

# pre-touch memory pages used by the JVM during initialization
-XX:+AlwaysPreTouch

## basic

# explicitly set the stack size
-Xss1m

# set to headless, just in case
-Djava.awt.headless=true

# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8

# use our provided JNA always versus the system one
-Djna.nosys=true

# turn off a JDK optimization that throws away stack traces for common
# exceptions because stack traces are important for debugging
-XX:-OmitStackTraceInFastThrow

# flags to configure Netty
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0

# log4j 2
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true

-Djava.io.tmpdir=${ES_TMPDIR}

## heap dumps

# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError

# specify an alternative path for heap dumps; ensure the directory exists and
# has sufficient space
-XX:HeapDumpPath=data

# specify an alternative path for JVM fatal error logs
-XX:ErrorFile=logs/hs_err_pid%p.log

## JDK 8 GC logging

8:-XX:+PrintGCDetails
8:-XX:+PrintGCDateStamps
8:-XX:+PrintTenuringDistribution
8:-XX:+PrintGCApplicationStoppedTime
8:-Xloggc:logs/gc.log
8:-XX:+UseGCLogFileRotation
8:-XX:NumberOfGCLogFiles=32
8:-XX:GCLogFileSize=64m

# JDK 9+ GC logging
9-:-Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m
# due to internationalization enhancements in JDK 9 Elasticsearch need to set the provider to COMPAT otherwise
# time/date parsing will break in an incompatible way for some date patterns and locals
9-:-Djava.locale.providers=COMPAT

統一修改linux系統的es的內存權限。
elasticsearch用戶擁有的內存權限太小,至少需要262144;

切換到root用戶
執行命令:
sysctl -w vm.max_map_count=262144
查看結果:
sysctl -a|grep vm.max_map_count
顯示:
vm.max_map_count = 262144

創建非root用戶。

建立用戶   useradd es 
建立用戶密碼  passwd es

將es文件夾內的config文件和logs文件夾的文件權限賦予es.
chmod 777 config/*
chmod 777 logs/* 很多問題是因為es用戶沒有權限導致的。儘可能多的賦予es用戶。

全部以es用戶啟動

nohup ./elasticsearch > console.out &

如圖 集群節點建立完成

Image text

如果head顯示無法連接,但是es確正確啟動(1)

索引數據

es是一個分佈式的文檔(document)存儲引擎,可以實時存儲並檢索出數據結構–序列化的JSON文檔,通常,我們認為對象(object)和文檔(documnet)是等價相同的。

檢索文檔 GET /megacorp/employee/1
{
"_index" : "megacorp",
"_type" : "employee",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
	"first_name" : "John",
	"last_name" : "Smith",
	"age" : 25,
	"about" : "I love to go rock climbing",
	"interests": [ "sports", "music" ]
	}
}
名稱 備註
_index 索引名稱 類似mysql數據庫的表 索引這個名字必須是全部小寫,不能以下劃線開頭,不能包含逗號。
_type 索引對象類型 類似每個對象都屬於一個類class 可以是大寫或小寫,不能包含下劃線或逗號
_id 索引主鍵 它與 _index 和 _type 組合時,就可以在ELasticsearch中唯一標識一個文檔
_version 索引數據版本

ID
– 使用自己的ID
– ES可以自增ID

_version
每次修改數據都會version都會自增1.使用樂觀鎖來做數據安全。

數據如何存儲到某個主分片

shard = hash(routing) % number_of_primary_shards

routing是任意字符串,默認_id。通過hash生成一個數字。除以主分片數量取余,這個餘數就是存儲的分片。這就是為什麼創建索引時主分片數量就確定了,一旦值改變之前的路由值就無效了,文檔就找不到了。

所有的新建,索引,刪除請求都是些操作,必須再主分片中操作成功才可以複製到複製分片。

  1. 客戶端給 Node 1 發送新建、索引或刪除請求。

  2. 節點使用文檔的 _id 確定文檔屬於分片 0 。它轉發請求到 Node 3 ,分片 0 位於這個節點上。

  3. Node 3 在主分片上執行請求,如果成功,它轉發請求到相應的位於 Node 1 和 Node 2的複製節點上。當所有的複製節點報告成功, Node 3 報告成功到請求的節點,請求的節點再報告給客戶端。

這表明必須主分片和複製分片全部成功,客戶端猜得到相應。
replication默認為sync。
你可以設置replication 為 async,這樣es在主分片執行成功後就會相應客戶端。但是es依舊會轉發請求給複製節點。

默認主分片在嘗試寫入時需要規定數量quorum或過半的分片(可以是主節點或複製節點)可用。這是防止數據被寫入到錯的網絡分區。

int( (primary + number_of_replicas) / 2 ) + 1

consistency允許值 one,all,默認的quorum或過半分片.
number_of_replicas是在索引中的的設置,用來定義複製分片的數量.但如果你只有2個節點,那你的活動分片不夠規定數量,也就不能索引或刪除任何文檔。

搜索

es搜索可以做

  1. 在類似於 gender 或者 age 這樣的字段上使用結構化查詢, join_date 這樣的字段上使用排序,就像SQL的結構化查詢一
    樣。
  2. 全文檢索,可以使用所有字段來匹配關鍵字,然後按照關聯性(relevance)排序返回結果。
  3. 或者結合以上兩條。
概念 解釋
映射(Mapping) 數據在每個字段中的解釋說明
分析(Analysis) 全文是如何處理的可以被搜索的
領域特定語言查詢(Query DSL) Elasticsearch使用的靈活的、強大的查詢語言

搜索相應體

{
  "hits": {
    "total": 14,
    "hits": [
      {
        "_index": "us",
        "_type": "tweet",
        "_id": "7",
        "_score": 1,
        "_source": {
          "date": "2014-09-17",
          "name": "John Smith",
          "tweet": "The Query DSL is really powerful and flexible",
          "user_id": 2
        }
      }
    ],
    "max_score": 1
  },
  "took": 4,
  "_shards": {
    "failed": 0,
    "successful": 10,
    "total": 10
  },
  "timed_out": false
}

hits

響應中最重要的部分是 hits ,它包含了 total 字段來表示匹配到的文檔總數, hits 數組還包含了匹配到的前10條數據。
hits 數組中的每個結果都包含 _index 、 _type 和文檔的 _id 字段,被加入到 _source 字段中這意味着在搜索結果中我們將
可以直接使用全部文檔。這不像其他搜索引擎只返迴文檔ID,需要你單獨去獲取文檔。
每個節點都有一個 _score 字段,這是相關性得分(relevance score),它衡量了文檔與查詢的匹配程度。默認的,返回的結
果中關聯性最大的文檔排在首位;這意味着,它是按照 _score 降序排列的。這種情況下,我們沒有指定任何查詢,所以所有
文檔的相關性是一樣的,因此所有結果的 _score 都是取得一個中間值 1
max_score 指的是所有文檔匹配查詢中 _score 的最大值。

took

請求時間

shards

_shards 節點告訴我們參與查詢的分片數( total 字段),有多少是成功的( successful字段),有多少的是失敗的( failed 字段)。

timeout

是否超時

數據同步

無論是使用什麼中間件,都是根據具體的情形和公司的情況來選擇不同的方案。
在我們公司的情況下,我們使用es只是作為負責複雜搜索的方案,並不作為增刪改的持久化方案,依舊採用通過mysql數據庫同步數據到es。本身可以通過binlog日誌同步,但是我們是使用阿里雲的RDS,數據同步需要額外收費,所以我們選擇的方案,採用另外一個中件件logstash來做數據同步。

在上面參考文獻中下載logstash文件,安裝在lunux服務器中,修改logstash.conf

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
	jdbc {
        #需要同步的數據庫
		type => "uat"
        jdbc_connection_string => "jdbc:mysql://xx.xx.xx.xx:3306/fungo_games_uat"
        jdbc_user => "xxx"
        jdbc_password => "xxxx"
        #本地jar包
        jdbc_driver_library => "/opt/ELK/logstash-7.2.0/mysqltool/mysql-connector-java-8.0.15.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "5000"
        #獲取到記錄的SQL查詢語句
	use_column_value => true
        tracking_column => "updated_at"
        tracking_column_type => "timestamp"
        lowercase_column_names => false
        record_last_run => true
        last_run_metadata_path => "/opt/ELK/logstash-7.2.0/config/station_parameter.txt"
        clean_run => false
        statement => "SELECT * FROM t_cmm_post where updated_at >= :sql_last_value order by  updated_at  desc "
        #定時字段 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鐘都更新
        schedule => "*/1 * * * *"
    }
}

filter { 
}

output {
	if [type] == "uat" {
		elasticsearch {
			hosts => "xx.xx.xx.xx:9200"
			index => "uat-cloudcmmpost"
			document_type => "CmmPost"
			document_id => "%{post_id}"
		}
	}	
}

注意
需要將mysql的連接包下載指定位置,jdbc_driver_library 來指定位置,
安裝logstash相關插件
安裝jdbc的插件
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch
啟動數據同步
直接 在bin文件內 ./logstash 啟動 如果報錯 可能需要刪除data裏面的 .lock 文件

備註

1 阿里雲的解決思路

阿里雲上面也是採用單個主機上搭建3個es節點的集群。性能分別是1核2G的配置.因為無法登錄到logstash服務的機器上,所有 我們無法直接通過鏈接MYSQL服務的形式,就不能像我們上面展示那樣通過sql語句去同步數據,這時候阿里雲上有一個DTS服務,可以解決數據傳輸和同步。

2 附屬內容

elasticsearch-head 無法連接elasticsearch的原因和解決
//blog.csdn.net/fst438060684/article/details/80936201

3 ES比mysql快

為什麼ES比mysql快
mysql只有term dictionary這一層,是以b-tree排序方式存儲在磁盤上。檢索一個term需要 若干次random access磁盤操作。但是es在此基礎上添加term index來加速檢索。term index以樹的形式緩存內存中,減少磁盤讀取次數。