Kafka,RocketMQ,RabbitMQ部署與使用體驗

前言

近期在研究各種消息隊列方案,為了有一個直觀的使用體驗,我把Kafka,RocketMQ,RabbitMQ各自部署了一遍,並使用了最基本的生產與消費消息功能。在部署過程中也遇到一些問題,特此記錄。本文只適用於沒有使用過消息隊列,還停留在安裝部署階段的新手用戶,要了解一個軟體,最好的開始方法是開始使用他,這樣才會有一個直觀的印象。本篇文章的作用也在於此,至於需要了解更深入的架構與細節,則需要查詢其他的文檔資料,這也不是本文的目的。我這裡使用的作業系統是Centos 6.x,硬體配置一般即可。

Kafka的部署與使用

Kafka的部署我是參考官網的步驟開始的,請直接參考其Quickstart章節。

Step1:下載安裝包並解壓

# wget //mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
# tar -xzf kafka_2.12-2.4.1.tgz
# cd kafka_2.12-2.4.1

問題:下載有可能報下面的錯誤(沒有報錯則忽略):
To connect to mirrors.tuna.tsinghua.edu.cn insecurely, use 『–no-check-certificate』.
只需要添加報錯提示的參數即可:
wget –no-check-certificate //mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz

Step2:啟動伺服器

在上述第一步執行完成之後,當前在目錄kafka_2.12-2.4.1下,運行下面的命令啟動zookeeper服務,Kafka依賴zookeeper服務,所以我們首先要啟動zookeeper服務,由於zookeeper服務需要另外安裝,Kafka的安裝包中提供了一個簡單的單節點的zookeeper實例,以方便快速運行Kafka服務,如果是在正式環境中使用則需要另外單獨安裝zookeeper服務(一般是分散式集群而非單節點)給Kafka使用。

# bin/zookeeper-server-start.sh config/zookeeper.properties

我在運行此命令時候報下面的錯誤:
Unrecognized VM option ‘+UseGCLogFileRotation’
Could not create the Java virtual machine.

從提示上看是因為java虛擬機不支援參數UseGCLogFileRotation,我經過查資料說這個參數可以去掉,bin/zookeeper-server-start.sh 腳本中調用了 bin/kafka-run-class.sh ,真正起作用的程式碼在這個腳本中 kafka-run-class.sh,打開這個腳本找到上述參數所在的地方,去掉 -XX:+UseGCLogFileRotation , 如下圖所示:
UseGCLogFileRotation報錯

繼續運行上面的腳本,繼續報錯:
Unrecognized VM option ‘NumberOfGCLogFiles=10’
Could not create the Java virtual machine.

像剛才一樣,繼續去掉 -XX:NumberOfGCLogFiles=10 ,如下圖:
NumberOfGCLogFiles參數報錯

繼續運行,得到報錯:
Unrecognized VM option ‘GCLogFileSize=100M’
Could not create the Java virtual machine.

我們繼續去掉腳本中的 -XX:GCLogFileSize=100M 如下圖:
GCLogFileSize報錯

繼續運行,得到下面的報錯:
UnsupportedClassVersionError錯誤

從提示資訊可以看出應該是java位元組碼的兼容性問題,java.lang.UnsupportedClassVersionError,表示當前運行的zookeeper的位元組碼是高版本的java編譯器編譯的,而現在的運行時環境是低版本,所以不兼容。運行下面的命令查看java版本:
# java -version

java version “1.6.0_24”
OpenJDK Runtime Environment (IcedTea6 1.11.8) (rhel-1.56.1.11.8.el6_3-x86_64)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)

是jdk1.6,這個版本太低了,我們需要安裝jdk1.8或者以上的版本,怎麼安裝呢,這裡我首先搜索一下系統的安裝源裡面有沒有,如果有我們直接從作業系統提供的源進行安裝,如果沒有則需要通過源碼等其他辦法進行安裝了,運行下面命令查看:

# yum search openjdk

結果圖如下:
yum-search-openjdk

我們安裝開發環境,也就是下面這個,區別就是開發環境中有javac等編譯工具,而運行時環境中沒有,運行時環境只包含運行java程式所需要的工具,因為後面我們需要使用javac編譯,所以我們安裝開發環境,使用下面命令安裝完成即可:

# yum install java-1.8.0-openjdk-devel.x86_64

成功之後我們再次運行 bin/zookeeper-server-start.sh config/zookeeper.properties 以啟動zookeeper服務,沒有任何錯誤,最後停留在如下介面:
run zookeeper

啟動Kafka服務:bin/kafka-server-start.sh config/server.properties 沒有任何報錯,中間會列印出Kafka的配置,輸出比較多,我只截取了一部分,其中 INFO KafkaConfig values 後面就是kafka的配置:
run kafka

Step3:創建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testtopic

我們可以列出kafka伺服器上的topic列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
testtopic

Step4:發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

>hello message
>
運行命令之後 > 提示符等待輸入消息,我們輸入字元串 hello message 並回車

Step5:啟動消費者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic --from-beginning
hello message

此時在發送消息側不斷輸入消息並回車,消費者會不斷列印收到的消息,至此Kafka的簡單收發消息運行完畢

RocketMQ的部署與使用

我們依然根據官網文檔中的 Quick Start節的提示開始操作,官網建議使用64位Linux/Unix/Mac作業系統,我們使用的是64位的Centos 6.x,建議使用JDK1.8+,我們上面部署Kafka的時候已經安裝了jdk1.8,需要maven3.2.x,我的系統上沒有,所以首先需要安裝maven:

安裝maven:

1. 下載maven安裝包,我這裡使用的是更高一點的版本:
# wget //mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz

2. 解壓maven安裝包:
# tar -zxvf apache-maven-3.5.4-bin.tar.gz

3. 配置maven到profile文件中以便啟動的時候自動設置環境變數 vim /etc/profile 添加下面兩行:
export MAVEN_HOME=/data/rocket_mq/apache-maven-3.5.4
export PATH=$MAVEN_HOME/bin:$PATH

注意這裡你應該換成你自己的對應的目錄

4. 使環境變數生效:
# source /etc/profile

5. 檢驗是否正常:
# mvn -version

Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /data/rocket_mq/apache-maven-3.5.4
Java version: 1.8.0_212, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el6_10.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: “linux”, version: “4.1.0-32.el6.ucloud.x86_64”, arch: “amd64”, family: “unix”

下載RocketMQ的源碼並編譯:

# wget //archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
# unzip rocketmq-all-4.7.0-source-release.zip
# cd rocketmq-all-4.7.0/
# mvn -Prelease-all -DskipTests clean install -U
# cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0

編譯成功之後的截圖如下:
compile rocketmq

運行nameserver服務:

# sh bin/mqnamesrv
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

首先我們在編譯RocketMQ的時候我們已經將目錄定位到 cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0  在此目錄下運行nameserver服務報錯,根據提示我們需要設置 JAVA_HOME 環境變數,如下(修改為你自己的地址):

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el6_10.x86_64/
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 
export PATH=$PATH:$JAVA_HOME/bin

之後再次運行:

# sh bin/mqnamesrv 

OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

可以看到Name Server運行成功,這裡提一下,Name Server是為RocketMQ提供名字服務的模組,其角色與zookeeper在Kafka中的作用類似

啟動Broker服務:

在啟動Broker這裡有一點要注意的是,我們這裡運行命令所在的目錄跟上面運行Name Server的目錄一樣,都是在 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 這個目錄下運行,並且上面需要導入的 JAVA_HOME 等環境變數這裡都需要,除此之外,運行Broker我們還需要修改一下配置文件,在當前目錄下(distribution/target/rocketmq-4.7.0/rocketmq-4.7.0)  修改文件 vim conf/broker.conf 在文件後面添加如下一行配置:

brokerIP1 = 127.0.0.1

設置Broker的ip,如果不設置的話,當機器存在多網卡的時候,例如一個內外ip和一個外網ip的情況,Broker可能會配置第一個ip地址,導致我們後面的生產者啟動的時候寫入消息失敗報告下面的錯誤:
RemotingTooMuchRequestException

為了測試方便,我們直接設置BrokerIP1為本機迴環ip地址127.0.0.1,然後通過配置文件啟動Broker:

# bin/mqbroker -n localhost:9876 -c conf/broker.conf
The broker[broker-a, 127.0.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876

啟動生產者發送消息:

Broker啟動完成之後,我們接下來啟動生產者發送消息(同樣要定位到 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 並導入 JAVA_HOME 等環境變數):

# export NAMESRV_ADDR=localhost:9876
# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

03:08:05.769 [main] DEBUG i.n.u.i.l.InternalLoggerFactory – Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=20020AB3FD9D000000000000000000017E931540E19D439EE28E0000, offsetMsgId=7F00000100002A9F00000000000638E4, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=500]
SendResult [sendStatus=SEND_OK, msgId=20020AB3FD9D000000000000000000017E931540E19D439EE2B00001, offsetMsgId=7F00000100002A9F00000000000639AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=500]
……

03:08:07.668 [NettyClientSelector_1] INFO RocketmqRemoting – closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
03:08:07.669 [NettyClientSelector_1] INFO RocketmqRemoting – closeChannel: close the connection to remote address[127.0.0.1:10911] result: true
可以看到中間我省略了很多日誌,這些日誌表示生產者向伺服器寫入了大量的消息,由於輸出太多,我只列出2條

啟動消費者消費消息:

接著我們啟動消費者消費消息(同樣要定位到 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 並導入 JAVA_HOME 等環境變數):

# export NAMESRV_ADDR=localhost:9876
# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

03:11:37.229 [main] DEBUG i.n.u.i.l.InternalLoggerFactory – Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=500, sysFlag=0, bornTimestamp=1586804886200, bornHost=/127.0.0.1:42523, storeTimestamp=1586804886203, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F0000000000063B42, commitLogOffset=408386, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=’TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1586805097731, UNIQ_KEY=20020AB3FD9D000000000000000000017E931540E19D439EE2B80003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId=’null’}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=500, sysFlag=0, bornTimestamp=1586804886195, bornHost=/127.0.0.1:42523, storeTimestamp=1586804886196, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F0000000000063A78, commitLogOffset=408184, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=’TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1586805097731, UNIQ_KEY=20020AB3FD9D000000000000000000017E931540E19D439EE2B30002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId=’null’}]]

同樣是大量消費消息的日誌,這裡也只列出前面2條輸出,至此,RocketMQ的部署與簡單消息收發使用完畢

RabbitMQ的部署與使用

RabbitMQ的部署我們同樣參照官網給出的步驟,找到官網的 Get Started 點開按鈕 「Download + Installation」 ,RabbitMQ有多種運行方式,可以基於docker鏡像運行,我們本次不使用docker這種方式部署。我們使用常規的部署方法,RabbitMQ使用Erlang語言編寫,運行需要Erlang的運行時環境,所以我們首先需要安裝Erlang環境,RabbitMQ團隊為我們提供了一個Erlang安裝包,其僅包含運行RabbitMQ所需要的全部組件,為了方便我們就使用此Erlang安裝包,安裝起來很容易:

安裝Erlang環境:

# wget //github.com/rabbitmq/erlang-rpm/releases/download/v22.3.1/erlang-22.3.1-1.el6.x86_64.rpm
# yum install erlang-22.3.1-1.el6.x86_64.rpm

沒有報錯,我這裡是下載的Centos版本的二進位安裝包,其他系統可以自行去RabbitMQ官網下載,Erlang安裝好之後,我們就可以安裝RabbitMQ服務了:

安裝RabbitMq:

# wget //github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# rpm –import rabbitmq-release-signing-key.asc 
# wget //wangqiguoy.cn-bj.ufileos.com/rabbitmq-server-3.8.3-1.el6.noarch.rpm
# yum install rabbitmq-server-3.8.3-1.el6.noarch.rpm

我安裝的時候沒有任何報錯,直接安裝完成,比Kafka和RocketMQ要順利多了,RabbitMQ提供了一個web介面可以對其進行管理,可以在上面看到當前的隊列,通道資訊,消息堆積情況等等,通過下面的命令開啟插件:

開啟web管理插件:

# rabbitmq-plugins enable rabbitmq_management

Enabling plugins on node rabbit@10-179-253-157:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@10-179-253-157…
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch

started 3 plugins.

開啟成功之後,可以通過一個http的地址訪問web介面管理RabbitMQ,地址形式是://ip:15672/ 將ip地址換成自己的伺服器ip地址即可,登陸介面如下:
rabbit manager

修改RabbitMQ的配置文件以登陸其web介面:

# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/ebin/rabbit.app
{loopback_users, [<<“guest”>>]},
改成
{loopback_users, [guest]}

修改好之後,重啟服務生效,可以通過上面的web介面登錄,用戶名和密碼都是guest,登陸進去之後的介面如下圖:
rabbit manager

功能還是很豐富的,可以查看connection、channel、exchange、queue等資訊,還可以管理用戶許可權,除了通過web介面管理RabbitMQ之外,Rabbit還提供一個命令行工具rabbitmqctl來管理RabbitMQ,該工具的具體使用可以查看其官網文檔,另外對RabbitMQ常用的啟動停止操作可以參考如下命令:

service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
service rabbitmq-server status

發送消息與消費消息:

RabbitMQ沒有提供命令行的測試方式,不過RabbitMQ提供了很多客戶端的demo可供使用,這裡我選擇的是使用nodejs,首先要準備node的環境如下:
# yum install nodejs
# yum install npm
# npm install amqplib
最後一步安裝 amqplib 即是nodejs中要使用的與RabbitMQ相關的客戶端庫,我們的生產者 send.js 程式碼如下:

//send.js
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function(error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = 'hello';
        var msg = '{"name":"Hello World!"}';
        channel.assertQueue(queue, {
            durable: false
        });
        channel.sendToQueue(queue, new Buffer(msg));
        console.log(" [x] Sent %s", msg);
    });
    setTimeout(function() {
        connection.close();
        process.exit(0);
    }, 500);
});

消費者 receive.js 的程式碼如下:

//receive.js
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function(error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = 'hello';
        channel.assertQueue(queue, {
            durable: false
        });
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        channel.consume(queue, function(msg) {
            console.log(" [x] Received %s", msg.content.toString());
        }, {
            noAck: true
        });
    });
});

分別運行 send.js 與 receive.js 得到如下輸出:
# node send.js
[x] Sent {“name”:”Hello World!”}

# node receive.js
[*] Waiting for messages in hello. To exit press CTRL+C
[x] Received {“name”:”Hello World!”}

至此RabbitMQ的部署以及簡單消息收發使用完成