ELK+kafka docker快速搭建+.NetCore中使用

ELK開源實時日誌分析平台。ELK是Elasticsearch,Logstash,Kibana 的縮寫。

Elasticsearch:是個開源分佈式搜索引擎,簡稱ES
Logstash:是一個完全開源的工具,可以對日誌進行收集,過濾,存儲到ES
Kibana: 也是一個開源和免費的工具,這裡主要用作ES的可視化界面工具,用於查看日誌。

環境:centos7.9

一、搭建ES

先要調高jvm線程數限制,修改sysctl.conf

vim /etc/sysctl.conf

修改max_map_count調大,如果沒有這個設置,則新增一行

vm.max_map_count=262144

改完保存後, 執行下面命令讓sysctl.conf文件生效

sysctl –p

拉取es鏡像

#拉取鏡像,指定版本號
 docker pull elasticsearch:7.13.2

新建elasticsearch.yml配置文件並上傳到主機目錄用於配置文件掛載,方便後面修改,這裡上傳到/home/es目錄。

http.host: 0.0.0.0
#跨域
http.cors.enabled:  true 
http.cors.allow-origin:  "*"

啟動es

docker run -d -p 9200:9200 -p 9300:9300 --name es -e ES_JAVA_OPTS="-Xms128m -Xmx256m" -v /home/es/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml elasticsearch:7.13.2
#9200是對外端口,9300是es內部通信端口
#-e ES_JAVA_OPTS="-Xms128m -Xmx256m" 限制內存,最小128,最大256
#-v 把配置文件掛載到es的docker里的配置文件

在瀏覽器打開,ip:9200看到es信息,就成功了,啟動可能需要一點時間,要等一會。

 

 

 二、搭建Kibana

啟動kibana,這裡沒有先拉取鏡像,docker沒有鏡像會到公共庫嘗試拉取,下面的一樣。

#如果沒有鏡像會自動到公共庫拉取再啟動,-e 環境配置指向es的地址
docker run -p 5601:5601 -d --name kibana -e ELASTICSEARCH_URL=//172.16.2.84:9200 -e ELASTICSEARCH_HOSTS=//172.16.2.84:9200 kibana:7.13.2  

在瀏覽器輸入ip:5601,能看到kibana的信息,說明成功了

 

 

 

 

 

 三、搭建kafka

1)kafka前置需要先安裝zookeeper

#-v /etc/localtime:/etc/localtime把本機的時間掛載進docker,讓docker同步主機的時間
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper

2)安裝kafka

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.2.84:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.2.84:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
#KAFKA_BROKER_ID:kafka節點Id,集群時要指定
#KAFKA_ZOOKEEPER_CONNECT:配置zookeeper管理kafka的路徑,內網ip
#KAFKA_ADVERTISED_LISTENERS:如果是外網的,則外網ip,把kafka的地址端口註冊給zookeeper,將告訴 zookeeper 自己的地址為 XXXX,當消費者向 zookeeper 詢問 kafka 的地址時,將會返回該地址
#KAFKA_LISTENERS: 配置kafka的監聽端口

kafka界面查看可以用 kafka tool 地址://www.kafkatool.com/download.html

 

 

 

四、搭建Logstash

新建文件logstash.yml,logstash配置信息

http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "//172.16.2.84:9200" ]
#host多節點用逗號隔開

新建文件logstashkafka.conf,讀取kafka信息和寫到es的信息

input {
     kafka {
      topics => "logkafka" #訂閱kafka的topics
      bootstrap_servers => "172.16.2.84:9092"  # 從kafka的leader主機上提取緩存
      codec => "json"  # 在提取kafka主機的日誌時,需要寫成json格式
            }
}
output {
  elasticsearch {
    hosts => ["172.16.2.84:9200"]
    index => "logkafka%{+yyyy.MM.dd}" #採集到es的索引名稱
    #user => "elastic"
    #password => "changeme"
  }
}

把這兩個文件放到主機目錄上,這裡放到/home/logstash

啟動logstash容器

docker run --rm -it --privileged=true -p 9600:9600 -d --name logstash -v /home/logstash/logstashkafka.conf:/usr/share/logstash/pipeline/logstash.conf -v /home/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml logstash:7.13.2
#-v 把logstashkafka.conf和logstash.yml掛載到logstash容器

好了,到這裡所有需要的容器都建好了運行docker ps -a看下所有容器

 

 

 

五、.Net Core寫日誌到kafka

.Net Core只要把日誌寫到kafka,然後logstash訂閱kafka把日誌寫到es即可,這裡是.Net Core5.0演示。

NuGet包安裝Confluent.Kafka

 class Program
    {
        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            //kafka節點
            string brokerList = "172.16.2.84:9092";
            string topicName = "logkafka";
            while (true)
            {
                Console.WriteLine($"{Environment.NewLine}請輸入發送的內容,發送topics名:{topicName}");
                string content = Console.ReadLine();
                await ConfluentKafka.PublishAsync(brokerList, topicName, content);
            }
        }
    }

 

 public class ConfluentKafka
    {
        public static async Task PublishAsync(string brokerList, string topicName, string content)
        {
            //生產者配置
            var config = new ProducerConfig
            {
                BootstrapServers = brokerList, //kafka節點
                Acks = Acks.None //ack機制,0不等服務器確認,1主節點確認返回ack,-1全部節點同步完返回ack
            };
            using (var producer = new ProducerBuilder<string, string>(config)
            .Build())
            {
                try
                {
                    //key要給值,根據key做負載均衡,不然如果多節點,key不給值會全部寫有一個分區
                    var deliveryReport = await producer.
                        ProduceAsync(
                        topicName, new Message<string, string> { Key = (new Random().Next(1, 10)).ToString(), Value = content });
                    Console.WriteLine($"向kafka發送了數據: {deliveryReport.TopicPartitionOffset}");
                }
                catch (ProduceException<string, string> e)
                {
                    Console.WriteLine($"向kafka發送數據失敗:{e.Message}");
                }
            }
        }
    }

運行程序,向kafka寫數據

 

 

 查看kafka隊列,已經有數據

 

 

 六、ELK日誌查看

打開Kibana界面,ip:5601,打開左邊的面板

 

 

 點擊索引匹配

 

 

 

輸入匹配符,匹配到es日誌索引,下一步

 

 

 

 

 

 

索引匹配創建完畢,返回到面板

 

 

 

 

 

 

七、其他方式寫日誌

上面介紹的是kafka方式寫日誌,也是ELK最佳方式,能承載大量的日誌傳輸,這裡列一下其他常用的方式,主要是修改logstash採集配置就可以了。

1.讀目錄文件方式,根據給的目錄,文件類型去讀取寫到es,logstash配置

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
 file {
 path => "D:/Log/Application/*log.log" #讀取目錄下log.log結尾的文件
   start_position => beginning
    }
 file {
 path => "D:/Log/Application2/*log.log" #讀取目錄下log.log結尾的文件
   start_position => beginning
    }
}
output {
  elasticsearch {
    hosts => ["172.16.2.84:9200"]
    index => "filelog"
    #user => "elastic"
    #password => "changeme"
  }
}

2.Tcp方式,通過tcp方式寫日誌,在log4net和nlog中直接可配tcp請求logstash,配置

input {
 tcp{
 port => 8001
 type => "TcpLog"
 }
}
output {
  elasticsearch {
    hosts => ["172.16.2.84:9200"]
    index => "tcplog"
    #user => "elastic"
    #password => "changeme"
  }
}

3.reids,把日誌推送到一個list類型的Key,logstash會訂閱這個key讀取消息寫到es

input {
    redis {
        codec => plain
        host => "127.0.0.1"
        port => 6379
        data_type => list
        key => "listlog"
        db => 0
    }
}
output {
  elasticsearch {
    hosts => ["172.16.2.84:9200"]
    index => "redislog"
    #user => "elastic"
    #password => "changeme"
  }
}

4.RabbitMQ,把日誌寫到一個隊列,讓logstash訂閱隊列獲取日誌寫到es

input {
    rabbitmq {
        host => "127.0.0.1"    #RabbitMQ-IP地址
        vhost => "test"      #虛擬主機
        port => 5672             #端口號
        user => "admin"            #用戶名
        password => "123456"        #密碼
        queue => "LogQueue"      #隊列
        durable => false         #持久化跟隊列配置一致
        codec => "plain"         #格式
    }
}

output {
    elasticsearch {
        hosts => ["172.16.2.84:9200"]
        index => "rabbitmqlog"
    }
}