nginx lua集成kafka

  • 2019 年 10 月 3 日
  • 筆記

NGINX lua集成kafka

第一步:進入opresty目錄

[root@node03 openresty]# cd /export/servers/openresty/  [root@node03 openresty]# ll  total 356  drwxr-xr-x  2 root root   4096 Jul 26 11:33 bin  drwxrwxr-x 44 1000 1000   4096 Jul 26 11:31 build  drwxrwxr-x 43 1000 1000   4096 Nov 13  2017 bundle  -rwxrwxr-x  1 1000 1000  45908 Nov 13  2017 configure  -rw-rw-r--  1 1000 1000  22924 Nov 13  2017 COPYRIGHT  drwxr-xr-x  6 root root   4096 Jul 26 11:33 luajit  drwxr-xr-x  6 root root   4096 Aug  1 08:14 lualib  -rw-r--r--  1 root root   5413 Jul 26 11:32 Makefile  drwxr-xr-x 11 root root   4096 Jul 26 11:35 nginx  drwxrwxr-x  2 1000 1000   4096 Nov 13  2017 patches  drwxr-xr-x 44 root root   4096 Jul 26 11:33 pod  -rw-rw-r--  1 1000 1000   3689 Nov 13  2017 README.markdown  -rw-rw-r--  1 1000 1000   8690 Nov 13  2017 README-win32.txt  -rw-r--r--  1 root root 218352 Jul 26 11:33 resty.index  drwxr-xr-x  5 root root   4096 Jul 26 11:33 site  drwxr-xr-x  2 root root   4096 Aug  1 10:54 testlua  drwxrwxr-x  2 1000 1000   4096 Nov 13  2017 util  [root@node03 openresty]#  

說明:接下來我們關注兩個目錄lualibnginx

1.lualib: 是存放opresty所需要的集成軟體包的

2.nginx:是nginx服務目錄

接下來,我們進入lualib目錄一看究竟:

[root@node03 openresty]# cd lualib/  [root@node03 lualib]# ll  total 116  -rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so  drwxr-xr-x 3 root root   4096 Jul 26 11:33 ngx  drwxr-xr-x 2 root root   4096 Jul 26 11:33 rds  drwxr-xr-x 2 root root   4096 Jul 26 11:33 redis  drwxr-xr-x 9 root root   4096 Aug  1 10:34 resty  

這裡我們看到了redis和ngx集成軟體包,說明我們可以之間使用nginx和redis而無需導入任何依賴包!!!!

下面看看resty裡面有些說明呢????

[root@node03 lualib]# cd resty/  [root@node03 resty]# ll  total 152  -rw-r--r-- 1 root root  6409 Jul 26 11:33 aes.lua  drwxr-xr-x 2 root root  4096 Jul 26 11:33 core  -rw-r--r-- 1 root root   596 Jul 26 11:33 core.lua  drwxr-xr-x 2 root root  4096 Jul 26 11:33 dns  drwxr-xr-x 2 root root  4096 Aug  1 10:42 kafka   #這是我們自己導入的  drwxr-xr-x 2 root root  4096 Jul 26 11:33 limit  -rw-r--r-- 1 root root  4616 Jul 26 11:33 lock.lua  drwxr-xr-x 2 root root  4096 Jul 26 11:33 lrucache  -rw-r--r-- 1 root root  4620 Jul 26 11:33 lrucache.lua  -rw-r--r-- 1 root root  1211 Jul 26 11:33 md5.lua  -rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua  -rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua  -rw-r--r-- 1 root root   616 Jul 26 11:33 random.lua  -rw-r--r-- 1 root root  9227 Jul 26 11:33 redis.lua  -rw-r--r-- 1 root root  1192 Jul 26 11:33 sha1.lua  -rw-r--r-- 1 root root  1045 Jul 26 11:33 sha224.lua  -rw-r--r-- 1 root root  1221 Jul 26 11:33 sha256.lua  -rw-r--r-- 1 root root  1045 Jul 26 11:33 sha384.lua  -rw-r--r-- 1 root root  1359 Jul 26 11:33 sha512.lua  -rw-r--r-- 1 root root   236 Jul 26 11:33 sha.lua  -rw-r--r-- 1 root root   698 Jul 26 11:33 string.lua  -rw-r--r-- 1 root root  5178 Jul 26 11:33 upload.lua  drwxr-xr-x 2 root root  4096 Jul 26 11:33 upstream  drwxr-xr-x 2 root root  406 Jul 26 11:33 websocket  

這裡我們看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:這裡的 kafka這個包是沒有的,說明opnresty么有集成kafka。此處我已經提前導入啦kafka集成包

我們看看kafka裡面多有哪些包:

[root@node03 resty]# cd kafka  [root@node03 kafka]# ll  total 48  -rw-r--r-- 1 root root  1369 Aug  1 10:42 broker.lua  -rw-r--r-- 1 root root  5537 Aug  1 10:42 client.lua  -rw-r--r-- 1 root root   710 Aug  1 10:42 errors.lua  -rw-r--r-- 1 root root 10718 Aug  1 10:42 producer.lua  -rw-r--r-- 1 root root  4072 Aug  1 10:42 request.lua  -rw-r--r-- 1 root root  2118 Aug  1 10:42 response.lua  -rw-r--r-- 1 root root  1494 Aug  1 10:42 ringbuffer.lua  -rw-r--r-- 1 root root  4845 Aug  1 10:42 sendbuffer.lua  

附上kafka集成包:

鏈接:https://pan.baidu.com/s/1pFLhz3E_txb3ZWIRWxfQYg
提取碼:0umg

第二步:創建kafka測試lua文件

1.退回到openresty

[root@node03 kafka]# cd /export/servers/openresty/

2.創建測試文件

[root@node03 openresty]# mkdir -r testlua  #這裡文件名自己取,文件位置自己定,但必須找得到

這裡文件名自己取,文件位置自己定,但必須找得到!!!!!!!!!!!下面會用到!!!!!!!!!!

3.進入剛剛創建的文件夾並創建kafkalua.lua腳本文件

創建文件:vim kafkalua.lua或者touch kafkalua.lua

[root@node03 openresty]# cd testlua/  [root@node03 testlua]# ll  total 8  -rw-r--r-- 1 root root 3288 Aug  1 10:54 kafkalua.lua

kafkalua.lua:

--測試語句可以不用  ngx.say('hello kafka file configuration successful!!!!!!')    --數據採集閾值限制,如果lua採集超過閾值,則不採集  local DEFAULT_THRESHOLD = 100000  -- kafka分區數  local PARTITION_NUM = 6  -- kafka主題名稱  local TOPIC = 'B2CDATA_COLLECTION1'  -- 輪詢器共享變數KEY值  local POLLING_KEY = "POLLING_KEY"  -- kafka集群(定義kafka broker地址,ip需要和kafka的host.name配置一致)  local function partitioner(key, num, correlation_id)      return tonumber(key)  end  --kafka broker列表  local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}  --kafka參數,  local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }  -- 共享記憶體計數器,用於kafka輪詢使用  local shared_data = ngx.shared.shared_data  local pollingVal = shared_data:get(POLLING_KEY)  if not pollingVal then      pollingVal = 1      shared_data:set(POLLING_KEY, pollingVal)  end  --獲取每一條消息的計數器,對PARTITION_NUM取餘數,均衡分區  local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)  shared_data:incr(POLLING_KEY, 1)    -- 並發控制  local isGone = true  --獲取ngx.var.connections_active進行過載保護,即如果當前活躍連接數超過閾值進行限流保護  if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then      isGone = false  end  -- 數據採集  if isGone then        local time_local = ngx.var.time_local      if time_local == nil then          time_local = ""      end        local request = ngx.var.request      if request == nil then          request = ""      end        local request_method = ngx.var.request_method      if request_method == nil then          request_method = ""      end        local content_type = ngx.var.content_type      if content_type == nil then          content_type = ""      end      ngx.req.read_body()      local request_body = ngx.var.request_body      if request_body == nil then          request_body = ""      end        local http_referer = ngx.var.http_referer      if http_referer == nil then          http_referer = ""      end        local remote_addr = ngx.var.remote_addr      if remote_addr == nil then          remote_addr = ""      end        local http_user_agent = ngx.var.http_user_agent      if http_user_agent == nil then          http_user_agent = ""      end        local time_iso8601 = ngx.var.time_iso8601      if time_iso8601 == nil then          time_iso8601 = ""      end        local server_addr = ngx.var.server_addr      if server_addr == nil then          server_addr = ""      end        local http_cookie = ngx.var.http_cookie      if http_cookie == nil then          http_cookie = ""      end  --封裝數據      local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;  --引入kafka的producer  local producer = require "resty.kafka.producer"  --創建producer  local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)  --發送數據  local ok, err = bp:send(TOPIC, partitions, message)  --列印錯誤日誌      if not ok then          ngx.log(ngx.ERR, "kafka send err:", err)          return      end  end  

第三步:修改nginx配置文件nginx.conf

1.進入ngin/conf目錄

[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/  [root@node03 conf]# ll  total 76  -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf  -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default  -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params  -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default  -rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf  -rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win  -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types  -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default  -rw-r--r-- 1 root root 3191 Aug  1 10:52 nginx.conf  -rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default  -rw-r--r-- 1 root root  636 Jul 26 11:33 scgi_params  -rw-r--r-- 1 root root  636 Jul 26 11:33 scgi_params.default  -rw-r--r-- 1 root root  664 Jul 26 11:33 uwsgi_params  -rw-r--r-- 1 root root  664 Jul 26 11:33 uwsgi_params.default  -rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf  

2.修改nginx.conf

[root@node03 conf]# vim nginx.conf            #1.說明找到第一個server          #2.在server上面添加兩行程式碼如下          #3.在server裡面添加kafka相關的程式碼如下      #------------------添加的程式碼---------------------------------------   #開啟共享字典,設置記憶體大小為10M,供每個nginx的執行緒消費   lua_shared_dict shared_data 10m;   #配置本地域名解析   resolver 127.0.0.1;  #------------------添加的程式碼---------------------------------------     server {          listen       80;          server_name  localhost;            #charset koi8-r;            #access_log  logs/host.access.log  main;          location / {              root   html;              index  index.html index.htm;          }            #------------------添加的程式碼---------------------------------------          location /kafkalua {  #這裡的kafkalua就是工程名字,不加默認為空              #開啟nginx監控              stub_status on;              #載入lua文件              default_type text/html;              #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!)              content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;          }          #------------------添加的程式碼---------------------------------------  }

說明:location /kafkalua{…}這裡的kafkalua是工程名,可以隨意取也可以不取,但是必須要記住!!!

看到我們上面配置了兩個location,第一個為location /{…}第二個為location /kafkalua{…}那麼他們有什麼區別呢???先向下看,迷霧將會慢慢揭開。

第四步:啟動nginx

1.進入nginx/sbin

[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/  [root@node03 sbin]# ll  total 16356  -rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx  

2.測試配置文件是否正確

[root@node03 sbin]# nginx -t  nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok  nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful  #看到已經成功啦

3.啟動nginx

[root@node03 sbin]# nginx  #不顯示任何東西一般是成功啦

4.查看nginx是否啟動成功

[root@node03 sbin]# ps -ef | grep nginx  root       3730      1  0 09:24 ?        00:00:00 nginx: master process nginx  nobody     3731   3730  0 09:24 ?        00:00:20 nginx: worker process is shutting down  nobody     5766   3730  0 12:17 ?        00:00:00 nginx: worker process  root       5824   3708  0 12:24 pts/1    00:00:00 grep nginx  #看到有兩個nginx進程,表示成功le

5.瀏覽器訪問nginx

在瀏覽器輸入:node03/kafkalua

說明:如何么有配置hosts則輸入openresty所在設備的地址如:192.168.52.120/kafkalua

在瀏覽器輸入:node03/ 或者 192.168.52.120/

再在瀏覽器輸入:node03:80/kafkalua 和 node03:80/試試

搬來nginx.conf來看看:

node03:80/kafkalua這裡的nide03是伺服器的別名或者之間寫文伺服器地址,80是【listen 80;】配置的監聽埠,80埠可以省略不寫,如果這寫成【listen 8088;】那麼瀏覽器需輸入node03:8088/kafkalua(這裡不能省略8088),kafkalua是工程名。

 server {          listen       80;          server_name  localhost;            #charset koi8-r;            #access_log  logs/host.access.log  main;          location / {              root   html;              index  index.html index.htm;          }            #------------------添加的程式碼---------------------------------------          location /kafkalua {  #這裡的kafkalua就是工程名字,不加默認為空              #開啟nginx監控              stub_status on;              #載入lua文件              default_type text/html;              #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!)              content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;          }

第五步:創建測試爬蟲程式

1.創建maven工程導入依賴

    <dependencies>          <dependency>              <groupId>org.jsoup</groupId>              <artifactId>jsoup</artifactId>              <version>1.11.3</version>          </dependency>          <dependency>              <groupId>org.apache.httpcomponents</groupId>              <artifactId>httpclient</artifactId>              <version>4.5.4</version>          </dependency>      </dependencies>

2.偽爬蟲程式

public class SpiderGoAirCN {      private static String basePath = "http://node03/kafkalua";      public static void main(String[] args) throws Exception {          for (int i = 0; i < 50000; i++) {              // 請求查詢資訊              spiderQueryao();              // 請求html              spiderHtml();              // 請求js              spiderJs();              // 請求css              spiderCss();              // 請求png              spiderPng();              // 請求jpg              spiderJpg();              Thread.sleep(100);          }      }        /**       *       * @throws Exception       */      public static void spiderQueryao() throws Exception {          // 1.指定目標網站      ^.*/B2C40/query/jaxb/direct/query.ao.*$          String url = basePath + "/B2C40/query/jaxb/direct/query.ao";          // 2.發起請求          HttpPost httpPost = new HttpPost(url);          // 3. 設置請求參數          httpPost.setHeader("Time-Local", getLocalDateTime());          httpPost.setHeader("Requst",                      "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");          httpPost.setHeader("Request Method", "POST");          httpPost.setHeader("Content-Type",                  "application/x-www-form-urlencoded; charset=UTF-8");          httpPost.setHeader(                  "Referer",                  "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="                          + getGoTime() + "&at=1&ct=0&it=0");          httpPost.setHeader("Remote Address", "192.168.56.80");          httpPost.setHeader(                  "User-Agent",                  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");          httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());          httpPost.setHeader("Server Address", "243.45.78.132");          httpPost.setHeader(                  "Cookie",                  "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"                          + getGoTime()                          + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("                          + getGoTime() + ")");          // 4.設置請求參數          ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();          parameters                  .add(new BasicNameValuePair(                          "json",                          "{"depcity":"CAN", "arrcity":"WUH", "flightdate":"20180220", "adultnum":"1", "childnum":"0", "infantnum":"0", "cabinorder":"0", "airline":"1", "flytype":"0", "international":"0", "action":"0", "segtype":"1", "cache":"0", "preUrl":"", "isMember":""}"));          httpPost.setEntity(new UrlEncodedFormEntity(parameters));          // 5. 發起請求          CloseableHttpClient httpClient = HttpClients.createDefault();          CloseableHttpResponse response = httpClient.execute(httpPost);          // 6.獲取返回值          System.out.println(response != null);      }        public static void spiderHtml() throws Exception {          // 1.指定目標網站         ^.*html.*$          String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0";          // 2.發起請求          HttpPost httpPost = new HttpPost(url);          // 3. 設置請求參數          httpPost.setHeader("Time-Local", getLocalDateTime());          httpPost.setHeader("Requst",                  "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");          httpPost.setHeader("Request Method", "POST");          httpPost.setHeader("Content-Type",                  "application/x-www-form-urlencoded; charset=UTF-8");          httpPost.setHeader(                  "Referer",                  "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");          httpPost.setHeader("Remote Address", "192.168.56.1");          httpPost.setHeader(                  "User-Agent",                  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");          httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());          httpPost.setHeader("Server Address", "192.168.56.80");          httpPost.setHeader(                  "Cookie",                  "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");          // 4.設置請求參數          // httpPost.setEntity(new StringEntity(          // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));          ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();          parameters                  .add(new BasicNameValuePair(                          "json",                          "{"depcity":"CAN", "arrcity":"WUH", "flightdate":"20180220", "adultnum":"1", "childnum":"0", "infantnum":"0", "cabinorder":"0", "airline":"1", "flytype":"0", "international":"0", "action":"0", "segtype":"1", "cache":"0", "preUrl":"", "isMember":""}"));          httpPost.setEntity(new UrlEncodedFormEntity(parameters));          // 5. 發起請求          CloseableHttpClient httpClient = HttpClients.createDefault();          CloseableHttpResponse response = httpClient.execute(httpPost);          // 6.獲取返回值          System.out.println(response != null);      }        public static void spiderJs() throws Exception {            // 1.指定目標網站          String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js";          // 2.發起請求          HttpPost httpPost = new HttpPost(url);          // 3. 設置請求參數          httpPost.setHeader("Time-Local", getLocalDateTime());          httpPost.setHeader("Requst",                  "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");          httpPost.setHeader("Request Method", "POST");          httpPost.setHeader("Content-Type",                  "application/x-www-form-urlencoded; charset=UTF-8");          httpPost.setHeader(                  "Referer",                  "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");          httpPost.setHeader("Remote Address", "192.168.56.1");          httpPost.setHeader(                  "User-Agent",                  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");          httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());          httpPost.setHeader("Server Address", "192.168.56.80");          httpPost.setHeader(                  "Cookie",                  "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");          // 4.設置請求參數          ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();          parameters                  .add(new BasicNameValuePair(                          "json",                          "{"depcity":"CAN", "arrcity":"WUH", "flightdate":"20180220", "adultnum":"1", "childnum":"0", "infantnum":"0", "cabinorder":"0", "airline":"1", "flytype":"0", "international":"0", "action":"0", "segtype":"1", "cache":"0", "preUrl":"", "isMember":""}"));          httpPost.setEntity(new UrlEncodedFormEntity(parameters));          // 5. 發起請求          CloseableHttpClient httpClient = HttpClients.createDefault();          CloseableHttpResponse response = httpClient.execute(httpPost);          // 6.獲取返回值          System.out.println(response != null);      }        public static void spiderCss() throws Exception {            // 1.指定目標網站          String url = basePath +"/B2C40/dist/main/css/flight.css";          // 2.發起請求          HttpPost httpPost = new HttpPost(url);          // 3. 設置請求參數          httpPost.setHeader("Time-Local", getLocalDateTime());          httpPost.setHeader("Requst",                  "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");          httpPost.setHeader("Request Method", "POST");          httpPost.setHeader("Content-Type",                  "application/x-www-form-urlencoded; charset=UTF-8");          httpPost.setHeader("Referer",                  "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html");          httpPost.setHeader("Remote Address", "192.168.56.1");          httpPost.setHeader(                  "User-Agent",                  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");          httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());          httpPost.setHeader("Server Address", "192.168.56.80");          httpPost.setHeader(                  "Cookie",                  "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");          // 4.設置請求參數          ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();          parameters                  .add(new BasicNameValuePair(                          "json",                          "{"depcity":"CAN", "arrcity":"WUH", "flightdate":"20180220", "adultnum":"1", "childnum":"0", "infantnum":"0", "cabinorder":"0", "airline":"1", "flytype":"0", "international":"0", "action":"0", "segtype":"1", "cache":"0", "preUrl":"", "isMember":""}"));          httpPost.setEntity(new UrlEncodedFormEntity(parameters));          // 5. 發起請求          CloseableHttpClient httpClient = HttpClients.createDefault();          CloseableHttpResponse response = httpClient.execute(httpPost);          // 6.獲取返回值          System.out.println(response != null);      }        public static void spiderPng() throws Exception {            // 1.指定目標網站          String url =basePath + "/B2C40/dist/main/images/common.png";          // 2.發起請求          HttpPost httpPost = new HttpPost(url);          // 3. 設置請求參數          httpPost.setHeader("Time-Local", getLocalDateTime());          httpPost.setHeader("Requst",                  "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");          httpPost.setHeader("Request Method", "POST");          httpPost.setHeader("Content-Type",                  "application/x-www-form-urlencoded; charset=UTF-8");          httpPost.setHeader(                  "Referer",                  "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");          httpPost.setHeader("Remote Address", "192.168.56.1");          httpPost.setHeader(                  "User-Agent",                  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");          httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());          httpPost.setHeader("Server Address", "192.168.56.80");          httpPost.setHeader(                  "Cookie",                  "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");          // 4.設置請求參數          ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();          parameters                  .add(new BasicNameValuePair(                          "json",                          "{"depcity":"CAN", "arrcity":"WUH", "flightdate":"20180220", "adultnum":"1", "childnum":"0", "infantnum":"0", "cabinorder":"0", "airline":"1", "flytype":"0", "international":"0", "action":"0", "segtype":"1", "cache":"0", "preUrl":"", "isMember":""}"));          httpPost.setEntity(new UrlEncodedFormEntity(parameters));          // 5. 發起請求          CloseableHttpClient httpClient = HttpClients.createDefault();          CloseableHttpResponse response = httpClient.execute(httpPost);          // 6.獲取返回值          System.out.println(response != null);      }        public static void spiderJpg() throws Exception {            // 1.指定目標網站          String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg";          // 2.發起請求          HttpPost httpPost = new HttpPost(url);          // 3. 設置請求參數          httpPost.setHeader("Time-Local", getLocalDateTime());          httpPost.setHeader("Requst",                  "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");          httpPost.setHeader("Request Method", "POST");          httpPost.setHeader("Content-Type",                  "application/x-www-form-urlencoded; charset=UTF-8");          httpPost.setHeader(                  "Referer",                  "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");          httpPost.setHeader("Remote Address", "192.168.56.1");          httpPost.setHeader(                  "User-Agent",                  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");          httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());          httpPost.setHeader("Server Address", "192.168.56.80");          httpPost.setHeader(                  "Cookie",                  "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");          // 4.設置請求參數          ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();          parameters                  .add(new BasicNameValuePair(                          "json",                          "{"depcity":"CAN", "arrcity":"WUH", "flightdate":"20180220", "adultnum":"1", "childnum":"0", "infantnum":"0", "cabinorder":"0", "airline":"1", "flytype":"0", "international":"0", "action":"0", "segtype":"1", "cache":"0", "preUrl":"", "isMember":""}"));          httpPost.setEntity(new UrlEncodedFormEntity(parameters));          // 5. 發起請求          CloseableHttpClient httpClient = HttpClients.createDefault();          CloseableHttpResponse response = httpClient.execute(httpPost);          // 6.獲取返回值          System.out.println(response != null);      }        public static String getLocalDateTime() {          DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00",                  Locale.ENGLISH);          String nowAsISO = df.format(new Date());          return nowAsISO;        }        public static String getISO8601Timestamp() {          DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00");          String nowAsISO = df.format(new Date());          return nowAsISO;      }        public static String getGoTime() {          DateFormat df = new SimpleDateFormat("yyyy-MM-dd");          String nowAsISO = df.format(new Date());          return nowAsISO;      }        public static String getBackTime() {          Date date = new Date();// 取時間          Calendar calendar = new GregorianCalendar();          calendar.setTime(date);          calendar.add(calendar.DATE, +1);// 把日期往前減少一天,若想把日期向後推一天則將負數改為正數          date = calendar.getTime();          SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");          String dateString = formatter.format(date);          return dateString;      }  }

第六步:啟動kafka

1.創建主題topic

[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3  --replication-factor 3 --create --topic B2CDATA_COLLECTION1  

2.開啟kafka消費者

[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092  --topic B2CDATA_COLLECTION1  

第七步:開啟爬蟲程式並觀察結果

1.啟動爬蟲程式

2.觀察消費者窗口如下

第八步:啟動kafka-manager觀察

1.啟動kafka-manager

[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/  [root@node01 bin]# ll  total 36  -rwxr-xr-x 1 root root 13747 May  1 06:27 kafka-manager  -rw-r--r-- 1 root root  9975 May  1 06:27 kafka-manager.bat  -rwxr-xr-x 1 root root  1383 May  1 06:27 log-config  -rw-r--r-- 1 root root   105 May  1 06:27 log-config.bat  [root@node01 bin]#    #啟動  [root@node01 bin]# ./kafka-manager  

啟動後的窗口:

2.瀏覽器訪問

瀏覽器輸入:node01:9000

kafka manager使用不做講解,觀察B2CDATA_COLLECTION1主題消費情況:

​ 有三個分區,每個分區消費的消息差多說明成功啦,

​ 如果不一樣,則是kafkalua.lua 腳本中沒有配置分區策略,默認分區會導致 數據傾斜 我們需配置自己的分區策略!

完畢!!!!!!!!