Openresty+Lua+Kafka實現日誌實時採集

簡介

  在很多數據採集場景下,Flume作為一個高性能採集日誌的工具,相信大家都知道它。許多人想起Flume這個組件能聯想到的大多數都是Flume跟Kafka相結合進行日誌的採集,這種方案有很多他的優點,比如高性能、高吞吐、數據可靠性等。但是我們如果要求對日誌進行實時的採集,這顯然不是一個好的解決方案。原因如下:

  就目前來說,Flume能支援實時監控一個目錄的數據文件,一旦對某個目錄的文件採集完成,就會打上completed的標誌,若之後再有數據進入這個文件中,Flume則不會檢測到。

  所以,我們更多的是使用這種方案進行定時採集,只要有一個新的數據目錄產生,我們就採集這個目錄下的數據文件。

  那麼接下來本篇文章將為大家介紹基於Openresty+Lua+Kafka對日誌進行實時的採集。

需求

  很多時候,我們需要對用戶的埋點數據進行一個實時的採集,然後用這些數據對用戶的行為做一些實時的分析。所以,第一步當然是先解決怎樣對數據進行實時的採集。

  這裡我們用到的方案是Openresty+Lua+Kafka。

原理介紹

  那麼什麼是Openresty呢?這裡引用官方的一段話: 

  OpenResty是一個基於Nginx與Lua的高性能Web平台,其內部集成了大量精良的Lua庫、第三方模組以及大多數的依賴項。用於方便地搭建能夠處理超高並發、擴展性極高的動態 Web 應用、Web 服務和動態網關。
  OpenResty通過匯聚各種設計精良的Nginx模組,從而將Nginx有效地變成一個強大的通用Web應用平台。這樣,Web開發人員和系統工程師可以使用Lu 腳本語言調動Nginx支援的各種C以及Lua模組,快速構造出足以勝任10K乃至1000 以上單機並發連接的高性能Web應用系統。
  OpenResty的目標是讓你的Web服務直接跑在Nginx服務內部,充分利用Nginx的非阻塞 I/O 模型,不僅僅對 HTTP 客戶端請求,甚至於對遠程後端諸如MySQL、PostgreSQL、Memcached 以及 Redis 等都進行一致的高性能響應。

  簡單來說,就是將客戶端的請求(本文指的是用戶的行為日誌)通過Nginx把用戶的數據投遞到我們指定的地方(Kafka),而為了實現這個需求,我們用到了Lua腳本,因為Openresty封裝了各種Lua模組,其中有一個模組就是對Kafka模組進行了分裝,我們只需要寫一個簡單的腳本就可以將用戶的數據通過Nginx轉發到Kafka中,以便後續對數據進行消費。

  這裡給出一張架構圖,方便大家理解:

 

  

  在這裡簡單總結一下使用Openresty+Lua+Kafka的優點:

    1.支援多種業務數據,不同的業務數據,只需要配置不同的Lua腳本,就可以將不同的業務數據發送到Kafka不同的topic中。

    2.對用戶觸發的埋點數據進行實時的採集

    3.高可靠的集群,Openresty由於是基於Nginx,其集群擁有非常高的性能和穩定性。

    4.高並發,相比tomcat、apache等web伺服器,Nginx的並發量遠遠高於其他兩種。正常情況下處理上萬的並發量都不是什麼難事。

  那麼接下來我們就動手實操一下。

Openresty的安裝

本實例採用的單機部署形式,當單機部署成功了之後,集群的搭建跟單機一樣,只是在不同的機器上執行相同的步驟而已。

註:本實驗基於centos7.0作業系統

1.下載Openresty依賴:

yum install readline-devel pcre-devel openssl-devel gcc  

2.編譯安裝Openresty:

#1.安裝openresty:  mkdir /opt/software  mkdir /opt/module  cd /opt/software/ # 安裝文件所在目錄  wget https://openresty.org/download/openresty-1.9.7.4.tar.gz  tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/  cd /opt/module/openresty-1.9.7.4  #2.配置:  # 指定目錄為/opt/openresty,默認在/usr/local。  ./configure --prefix=/opt/openresty               --with-luajit               --without-http_redis2_module               --with-http_iconv_module  make  make install  

3.安裝lua-resty-kafka

因為我們需要將數據通過nginx+lua腳本轉發到Kafka中,編寫lua腳本時需要用到lua模組中的一些關於Kafka的依賴。

#下載lua-resty-kafka:  cd /opt/software/  wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip  unzip master.zip -d /opt/module/    #拷貝kafka相關依賴腳本到openresty  cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/

 註:由於kafka大家都比較熟知,這裡就不介紹它的安裝了。

Openresty安裝完成之後目錄結構如下:

drwxr-xr-x  2 root root 4096 Mar 24 14:26 bin  drwxr-xr-x  6 root root 4096 Mar 24 14:26 luajit  drwxr-xr-x  7 root root 4096 Mar 24 14:29 lualib  drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx

4.配置文件

編輯/opt/openresty/nginx/conf/nginx.conf

user  nginx;  #Linux的用戶  worker_processes  auto;  worker_rlimit_nofile 100000;    #error_log  logs/error.log;  #error_log  logs/error.log  notice;  #error_log  logs/error.log  info;    #pid        logs/nginx.pid;    events {      worker_connections  102400;      multi_accept on;      use epoll;  }      http {      include       mime.types;      default_type  application/octet-stream;        log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '                        '$status $body_bytes_sent "$http_referer" '                        '"$http_user_agent" "$http_x_forwarded_for"';        access_log  /var/log/nginx/access.log  main;        resolver 8.8.8.8;      #resolver 127.0.0.1 valid=3600s;        sendfile        on;        keepalive_timeout  65;        underscores_in_headers on;        gzip  on;        include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個文件名字可自定義    }

 編輯 /opt/openresty/nginx/conf/conf.d/common.conf

##api  lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";  lua_package_cpath "/opt/openresty/lualib/?.so;;";    lua_shared_dict ngx_cache 128m;  # cache  lua_shared_dict cache_lock 100k; # lock for cache    server {      listen       8887; #監聽埠      server_name  192.168.3.215; #埋點日誌的ip地址或域名,多個域名之間用空格分開      root         html; #root指令用於指定虛擬主機的網頁根目錄,這個目錄可以是相對路徑,也可以是絕對路徑。      lua_need_request_body on; #打開獲取消息體的開關,以便能獲取到消息體        access_log /var/log/nginx/message.access.log  main;      error_log  /var/log/nginx/message.error.log  notice;        location = /lzp/message {          lua_code_cache on;          charset utf-8;          default_type 'application/json';          content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本      }  }

 編輯 /opt/openresty/nginx/lua/testMessage_kafka.lua

#創建目錄mkdir /opt/openresty/nginx/lua/  vim /opt/openresty/nginx/lua/testMessage_kafka.lua
#編輯記憶體如下:

-- require需要resty.kafka.producer的lua腳本,沒有會報錯  local producer = require("resty.kafka.producer")    -- kafka的集群資訊,單機也是可以的  local broker_list = {      {host = "192.168.3.215", port = 9092},  }    -- 定義最終kafka接受到的數據是怎樣的json格式  local log_json = {}  --增加read_body之後即可獲取到消息體,默認情況下可能會是nil  log_json["body"] = ngx.req.read_body()  log_json["body_data"] = ngx.req.get_body_data()    -- 定義kafka同步生產者,也可設置為非同步 async  -- -- 注意!!!當設置為非同步時,在測試環境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息  -- -- encode()將log_json日誌轉換為字元串  -- -- 發送日誌消息,send配套之第一個參數topic:  -- -- 發送日誌消息,send配套之第二個參數key,用於kafka路由控制:  -- -- key為nill(空)時,一段時間向同一partition寫入數據  -- -- 指定key,按照key的hash寫入到對應的partition    -- -- batch_num修改為1方便測試  local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })  -- local bp = producer:new(broker_list)    local cjson = require("cjson.safe")  local sendMsg = cjson.encode(log_json)  local ok, err = bp:send("testMessage",nil, sendMsg)  if not ok then     ngx.log(ngx.ERR, 'kafka send err:', err)  elseif ok then     ngx.say("the message send successful")  else     ngx.say("未知錯誤")  end  

5.啟動服務運行:

useradd nginx #創建用戶  passwd nginx #設置密碼    #設置openresty的所有者nginx  chown -R nginx:nginx /opt/openresty/    #啟動服務  cd /opt/openresty/nginx/sbin  ./nginx -c /opt/openresty/nginx/conf/nginx.conf    查看服務:  ps -aux | grep nginx  nginx     2351  0.0  0.1 231052 46444 ?        S    Mar30   0:33 nginx: worker process  nginx     2352  0.0  0.1 233396 48540 ?        S    Mar30   0:35 nginx: worker process  nginx     2353  0.0  0.1 233396 48536 ?        S    Mar30   0:33 nginx: worker process  nginx     2354  0.0  0.1 232224 47464 ?        S    Mar30   0:34 nginx: worker process  nginx     2355  0.0  0.1 231052 46404 ?        S    Mar30   0:33 nginx: worker process  nginx     2356  0.0  0.1 232224 47460 ?        S    Mar30   0:34 nginx: worker process  nginx     2357  0.0  0.1 231052 46404 ?        S    Mar30   0:34 nginx: worker process  nginx     2358  0.0  0.1 232224 47484 ?        S    Mar30   0:34 nginx: worker process  root      7009  0.0  0.0 185492  2516 ?        Ss   Mar24   0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf      查看埠:  netstat -anput | grep 8887  tcp        0      0 0.0.0.0:8887            0.0.0.0:*               LISTEN      2351/nginx: worke  

看到以上進程,證明服務已正常運行

6.使用postman,發送post請求進行簡單的測試,查看kafka是否能否接受到數據

 

 

 7.kafka消費數據:

kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning

若消費到數據,則證明配置成功,若未調通可查看/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關錯誤日誌進行調整

總結

  使用Openresty+Lua+Kafka就可以將用戶的埋點數據實時採集到kafka集群中,並且Openresty是基於Nginx的,而Nginx能處理上萬的並發量,所以即使用戶的數據在短時間內激增,這套架構也能輕鬆的應對,不會導致集群崩潰。另一方面,若數據過多導致集群的超負荷,我們也可以隨時加多一台機器,非常方便。

  另外一個小小的拓展:若業務數據非常多,需要發送到不同的topic中,我們也不用編寫多個腳本,而是可以聯繫後端在json格式裡面加一個欄位,這個欄位的值就是topic的名稱。我們只需要編寫一個通用腳本,解析Json數據將topic名稱拿出來就可以了。