分散式消息隊列RocketMQ(一)安裝與啟動

分散式消息隊列RocketMQ

一、RocketMQ簡介

RocketMQ(火箭MQ) 出自於阿里,後開源給apache成為apache的頂級開源項目之一,頂住了淘寶10年的 雙11壓力 是電商產品的不二選擇 (略微有點誇張)

1、MQ概述

Message Queue,是一種提供消息隊列服務的中間件,也成為消息中間件,是一套提供了消息生產、存儲、消費全過程API的軟體系統

2、MQ用途

(1)、限流削峰

系統A每秒只能處理50請求 一般來講如過收到請求大於處理請求,則多餘請求會捨去。如果加入MQ 多出來的請求就會存儲在MQ中,每秒向系統A發送50請求

image

(2)、非同步解耦

用戶請求上游系統模組,上游再去調用下游,在下游做出回應之前,一直處於等待狀態,這也是同步調用。

加入MQ後則不用等待一條下單鏈結束後才返回結果給用戶,讓庫存系統和郵件系統可以同時執行。

可以看出原本需要 50+50+50 = 150ms的下單鏈 現在只需要 50+50 = 100ms就可以解決

image

(3)、數據收集

分散式系統會產生海量級數據流,如:業務日誌、監控數據、用戶行為(用戶點了哪裡,看了什麼。。)等。針對這些數據流進行實時或批量採集匯總,然後對這些數據流進行大數據分析,這是當前互聯網平台必備的計數。通過MQ完成此類數據收集是最好的選擇

3、常見的MQ

image

(1)、ActiveMQ

使用java開發的一款MQ產品。早些年公司項目都在使用,現在社區活躍度低。現在的項目中已經很少使用了

(2)、RabbitMQ

使用Erlang開發的一款MQ產品。吞吐量較kafka和RocketMQ較低,由於不是使用java語言開發。因此對其訂製化開發較難

(3)、kafaka

使用Erlang/java開發的一款MQ產品。其最大的特點就是吞吐率高,常用於大數據領域實時計算、日誌採集等場景。沒有遵循任何的MQ協議,而是使用自研協議

(4)、RocketMQ

使用java開發的一款MQ產品。是阿里基於kafka開發而來。經過數年阿里雙11考研,性能與穩定性非常高,沒有遵循任何常見MQ協議(與基於kafka開發有關係),而是使用自研協議

4、常見MQ協議

因主要學習的 kafaka/RocketMQ 並不遵循常見的MQ協議 因此先放一放。

二、RocketMQ安裝

1、基本概念

(1)、消息(Message)

消息是指消息系統所傳輸資訊的無力再提,生產和消費數據的最小單位,每條消息必須屬於一個主題

(2)、主題(Topic)

Topic(主題)可以看做消息的規類,它是消息的第一級類型。比如一個電商系統可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。

Topic 與生產者和消費者的關係非常鬆散,一個 Topic 可以有0個、1個、多個生產者向其發送消息,一個生產者也可以同時向不同的 Topic 發送消息。
一個 Topic 也可以被 0個、1個、多個消費者訂閱。

image

(3)、標籤(Tag)

Tag(標籤)可以看作子主題,它是消息的第二級類型,用於為用戶提供額外的靈活性。使用標籤,同一業務模組不同目的的消息就可以用相同 Topic 而不同的 Tag 來標識。比如交易消息又可以分為:交易創建消息、交易完成消息等,一條消息可以沒有 Tag

(4)、隊列(Queue)

存儲消息的物理實體。一個 Topic可以包含多個Queue,每個Queue中存放的就是該Topic的消息。一個Topic的Queue也被稱為一個Topic中消息的分區

一個 Topic 中 一個 Queue 的消息只能被一個 消費者組 中的消費者消費

image

分片(Sharding)不同於分區。在RocketMQ中,分片指的是存放相應 Topic 的 Broker(主機)。每個分片中會創建出相應數量的分區,即 Queue,每個Queue的大小都是相同的

image

(5)、消息標識(MessageID/Key)

RocketMQ中每個消息擁有MessageID,且可以攜帶具有業務標識的Key,以便對消息的查詢。MessageID有兩個:在生產者 send() 消息時會自動生成一個MessageID(msgID),當消息到達Broker後,Broker也會自動生成一個MessageID(offsetMsgID)。msgID、offsetMsgID與key都成為消息標識

msgID:由producer生成,規則為:

producerIP + 進程pid + MessageClientIdSetter類的ClassLoder的hashcode + 當前時間 + AutomicInteger自增計數器

offsetMsgID:由Broker端生成,規則為:

brokerIP + 物理分區的offset

key:由用戶指定的業務相關的唯一標識

2、系統架構

image

(1)、Producer

消息生產者,負責產生消息,一般由業務系統負責產生消息。

  • Producer由用戶進行分散式部署,消息由Producer通過多種負載均衡模式發送到Broker集群,發送低延時,支援快速失敗。
  • RocketMQ 提供了三種方式發送消息:同步、非同步和單向
  • 同步發送:同步發送指消息發送方發出數據後會在收到接收方發迴響應之後才發下一個數據包。一般用於重要通知消息,例如重要通知郵件、營銷簡訊。
  • 非同步發送:非同步發送指發送方發出數據後,不等接收方發迴響應,接著發送下個數據包,一般用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如用戶影片上傳後通知啟動轉碼服務。
  • 單向發送:單向發送是指只負責發送消息而不等待伺服器回應且沒有回調函數觸發,適用於某些耗時非常短但對可靠性要求並不高的場景,例如日誌收集。

(2)、Consumer

消息消費者,負責消費消息,一般是後台系統負責非同步消費。消費時會均分。

  • Consumer也由用戶部署,支援PUSH和PULL兩種消費模式,支援集群消費廣播消息,提供實時的消息訂閱機制
  • Pull:拉取型消費者(Pull Consumer)主動從消息伺服器拉取資訊,只要批量拉取到消息,用戶應用就會啟動消費過程,所以 Pull 稱為主動消費型。
  • Push:推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其他的內部維護工作,將消息到達時執行的回調介面留給用戶應用程式來實現。所以 Push 稱為被動消費類型,但從實現上看還是從消息伺服器中拉取消息,不同於 Pull 的是 Push 首先要註冊消費監聽器,當監聽器處觸發後才開始消費消息。
  • 消費者組中Consumer的數量應該小於等於訂閱Topic的Queue數量。如果超出Queue數量,則多出的Consumer將不能消費消息。反之,一個Topic類型的消息可以被多個消費者組消費

image

(3)、Name Server

①、功能介紹

NameServer是一個Broker與Topic路由的註冊中心,支援Broker動態註冊與發現。

②、路由註冊

NameServer是無狀態的。在Broker節點啟動時,輪詢NameServer列表,與每個NameServer節點建立長連接,發起註冊請求。在NameServer內部維護著一個Broker列表,用來動態存儲Broker的資訊。以此保證節點中的數據同步。(zk等與之相反,通知一個讓其內部自己同步。麻煩,註冊是簡單了,如果擴容NameServer需要修改Broker指定新加的NameServer)

Broker節點會每30s發送一次心跳,將最新的的資訊以心跳包的方式上報NameServer。心跳包含BrokerID、Broker地址(Ip+Port)、Broker名稱、Broker所屬集群名稱……,NameServer接受心跳包後,會更新心跳時間戳,記錄Broker最新存活時間

③、路由剔除

如果NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker中剔除

NameServer中有一個定時任務,每隔10s會掃描Broker表,查看每一個Broker最新心跳時間距離當前是否超過120s,如果超過,則會判定Broker失效,然後將其從Broker列表中剔除。(RocketMQ沒有自我保護機制)

若要停掉Broker工作,需將Broker讀寫許可權禁用,Client等向Broker發送請求會受到NO_PERMISSION響應,然後Client會進行對其他Broker重試。

④、路由發現

RocketMQ採用的是Pull模型。當Topic路由資訊發生變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉去主題最新的路由。默認科幻段每30s拉取一次最新的路由。

1、push模型:實時性好,是一個發布訂閱模型,需要維護一個長連接。耗費資源。

2、pull模型:實時性較差

3、Long Polling模型:長輪詢模型。整合pull和push,維護一個長連接指定時間再去釋放長連接

⑤、客戶端NameServer選擇策略

客戶端首先生成一個隨機數,在與NameServer節點數量取模,此時得到就是所要連接的節點索引,然後就會進行連接。如果連接失敗,則會採用round-robin,逐個嘗試去連接其他節點。首先採用隨即策略,失敗後採用輪詢

(4)、Broker

①、功能介紹

消息中轉角色,負責存儲消息,轉發消息。

  • Broker是具體提供業務的伺服器,單個Broker節點與所有的NameServer節點保持長連接及心跳,並會定時將Topic資訊註冊到NameServer,順帶一提底層的通訊和連接都是基於Netty實現的。
  • Broker負責消息存儲,以Topic為緯度支援輕量級的隊列,單機可以支撐上萬隊列規模,支援消息推拉模型。
  • 官網上有數據顯示:具有上億級消息堆積能力,同時可嚴格保證消息的有序性

②、模組構成

image

Remoting Module:整個Broker的屍體,負責處理來自Clients端的請求。而這個Broker實體則由以下模組構成。

Client Manager:客戶端管理器。負責接受、解析客戶端(Producer/Consumer)請求,管理客戶端。例如:維護Consumer的Topuc訂閱資訊

Store Service:存儲服務。提供方便簡單的API介面,處理消息存儲到物理硬碟消息查詢 功能

HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能

Index Service:索引服務。根據特定的Message key,投遞到Broker的消息進行索引服務,同時也提供根據Message Key對消息進行快速查詢的功能

③、集群部署

主備集群,Master掛掉後會啟動Slave。Master與Slave的對應關係是通過指定相同的BrokerName、不同的BrokerId來確定。BrokerId為0表示Master,非0表示Slave。每個Broker與NameServer集群中的所有節點建立長連接,定時註冊Topic資訊到所有的NameServer。

image

(5)、工作流程

image

1、啟動NameServer,啟動後開始監聽埠,等待Broker,Producer,Consumer連接

2、啟動Broker時,Broker會與所有的NameServer建立並保持長連接,然後每隔30s向NameServer發送心跳包

3、收發消息前,可先創Topic,創建時需要指定該Topic要存儲在那些Broker上,在次高壓部分華北Topic時也會將Topic與Broker的關係寫到NameServer中。 此步驟時可選的,也可以在發送消息時自動創建Topic

4、Producer發送消息,啟動時先跟NameServer集群中其中一台建立長連接,並從NameServer中獲取路由資訊,即當前發送的Topic的Queue與Broker地址(Ip+Port)的映射關係。然後根據演算法策略從隊列選擇一個Queue,與隊列所在的Broker建立長連接從而向Broker發消息。在獲取到路由資訊後,Producer會首先將路由資訊快取到本地,再每30s從NamesServer更新一次路由資訊。

5、Consumer與Producer類似,跟其中一台NameServer建立長連接,獲取其所訂閱Topic的路由資訊,然後根據演算法策略從路由資訊中獲取其所要消費的Queue,然後直接跟Broker建立長連接,開始消費其中的消息。Consumer再獲取到路由資訊後,同樣也會每30s從NameServer更新一次路由資訊。不同於Producer的是,Consumer還會向Broker發送心跳,以確保Broker的存活狀態。

Topic創建模式

Topic手動創建模式有兩種

集群模式:該模式下創建的Topic在該集群中,所有Broker中的Queue數量相同。

Broker模式:該模式下創建的Topic在該集群中,每個Broker中的Queue數量可以不同。

Topic自動創建模式有一種

默認採用的是Broker模式:會為每個Broker默認創建4個Queue(配置文件中取的4)

image

讀寫隊列

image

讀寫隊列可以不同,例:可寫隊列設置為16,可讀隊列寫為14,那麼生產者可以把消息卸載0-15的隊列中,而消費者只能消費0-14中隊列的消息。

為何如此設計?

因為這麼搞可以方便縮容,例:可寫可讀隊列都為16,現在只保存8個隊列,可以先改可寫隊列為8,待消費者消費完可讀隊列中8-15的消息後在調整可讀隊列為8,這樣一來整個縮容過程沒有任何消息丟失。

三、單機的安裝與啟動

1、安裝RocketMQ

在官網下載安裝RocketMQ和JDK並解壓

註:RocketMQ是zip 需要安裝unzip才能解壓。

官網

image

2、修改配置

註:RocketMQ適配於J8,J8以上需要修改配置文件

例如:RocketMQ4.9.2(2021-1)

修改RocketMQ的bin目錄下的 runserver.sh、runbroker.sh、tools.sh

(1)、runserver.sh

刪除:

  • -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy
  • JAVA_OPT=”${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m”
  • JAVA_OPT=”${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib”(刪除帶有Djava哪一行就行不同版本不一樣)

更改

  • CLASSPATH為:${BASE_DIR}/lib/rocketmq-broker-4.5.1.jar:${BASE_DIR}/lib/*😒{BASE_DIR}/conf:${CLASSPATH} rocketmq-broker-版本號.jar

4.9.2改好的

#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     //www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=${BASE_DIR}/lib/rocketmq-broker-4.9.2.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
# The RAMDisk initializing size in MB on Darwin OS for gc-log
DIR_SIZE_IN_MB=600

choose_gc_log_directory()
{
    case "`uname`" in
        Darwin)
            if [ ! -d "/Volumes/RAMDisk" ]; then
                # create ram disk on Darwin systems as gc-log directory
                DEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/null
                diskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/null
                echo "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."
            fi
            GC_LOG_DIR="/Volumes/RAMDisk"
        ;;
        *)
            # check if /dev/shm exists on other systems
            if [ -d "/dev/shm" ]; then
                GC_LOG_DIR="/dev/shm"
            else
                GC_LOG_DIR=${BASE_DIR}
            fi
        ;;
    esac
}

choose_gc_options()
{
    # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
    # '1' means releases befor Java 9
    JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
      JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xlog:gc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDateStamps"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    else
      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
      JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
    fi
}

choose_gc_log_directory
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

$JAVA ${JAVA_OPT} $@

(2)、runbroker.sh

刪除

  • -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy
  • JAVA_OPT=”${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m”
  • JAVA_OPT=”${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib”(刪除帶有Djava哪一行就行不同版本不一樣)

修改

  • CLASSPATH為:${BASE_DIR}/lib/rocketmq-broker-4.5.1.jar:${BASE_DIR}/lib/*😒{BASE_DIR}/conf:${CLASSPATH} rocketmq-broker-版本號.jar
  • -Xloggc:改成-Xlog:gc

4.9.2改好的

#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     //www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=${BASE_DIR}/lib/rocketmq-broker-4.9.2.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
# The RAMDisk initializing size in MB on Darwin OS for gc-log
DIR_SIZE_IN_MB=600

choose_gc_log_directory()
{
    case "`uname`" in
        Darwin)
            if [ ! -d "/Volumes/RAMDisk" ]; then
                # create ram disk on Darwin systems as gc-log directory
                DEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/null
                diskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/null
                echo "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."
            fi
            GC_LOG_DIR="/Volumes/RAMDisk"
        ;;
        *)
            # check if /dev/shm exists on other systems
            if [ -d "/dev/shm" ]; then
                GC_LOG_DIR="/dev/shm"
            else
                GC_LOG_DIR=${BASE_DIR}
            fi
        ;;
    esac
}

choose_gc_log_directory

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

numactl --interleave=all pwd > /dev/null 2>&1
if [ $? -eq 0 ]
then
	if [ -z "$RMQ_NUMA_NODE" ] ; then
		numactl --interleave=all $JAVA ${JAVA_OPT} $@
	else
		numactl --cpunodebind=$RMQ_NUMA_NODE --membind=$RMQ_NUMA_NODE $JAVA ${JAVA_OPT} $@
	fi
else
	$JAVA ${JAVA_OPT} $@
fi

(3)、tool.sh

刪除

  • JAVA_OPT=”${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext”(刪除帶有Djava哪一行就行不同版本不一樣)

修改

  • 更改 CLASSPATH的值為export CLASSPATH=${BASE_DIR}/lib/*😒{BASE_DIR}/conf:.😒{CLASSPATH}

4.9.2改好的

#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     //www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=export CLASSPATH=${BASE_DIR}/lib/*:${BASE_DIR}/conf:.:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

$JAVA ${JAVA_OPT} "$@"

3、啟動RocketMQ

image

方便複製

 > nohup sh bin/mqnamesrv &
 > tail -f ~/logs/rocketmqlogs/namesrv.log
 The Name Server boot success...

 > nohup sh bin/mqbroker -n localhost:9876 &
 > tail -f ~/logs/rocketmqlogs/broker.log
 The broker[%s, 172.30.30.233:10911] boot success...

4、關閉RocketMQ

方便複製

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

四、控制台的安裝與啟動

1、下載

可視化下載地址(GitHub)

下載下來是個SpringBoot工程

image

2、修改配置

修改application的 rocketmq.config.namesrvAddr= 你伺服器的地址:broker啟動的埠號

3、添加依賴

在解壓目錄rocketmq-console的pom.xml中添加如下JAXB依賴。

JAXB,Java Architechture for Xml Binding,用於XML綁定的Java技術,是一個業界標準,是一
項可以根據XML Schema生成Java類的技術。

<dependency>
    <groupId>javax.xml.bind</groupId>
    <artifactId>jaxb-api</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.sun.xml.bind</groupId>
    <artifactId>jaxb-impl</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.sun.xml.bind</groupId>
    <artifactId>jaxb-core</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>javax.activation</groupId>
    <artifactId>activation</artifactId>
    <version>1.1.1</version>
</dependency>

4、打包

確保已經安裝過Maven

進入該項目目錄執行打開cmd執行以下命令:mvn clean package -Dmaven.test.skip=true

5、運行

java -jar 運行該jar包即可,然後就可在瀏覽器訪問到該項目

image

當然 這個項目最好是掛在伺服器上

五、集群搭建理論

image

1、數據複製與刷盤策略

image

(1)、複製策略

複製策略是Broker的Master與Slave間的數據同步方式。分為同步複製與非同步複製

同步複製:消息寫入master後,master會等待slave同步數據成功後才向producer返回成功ACK

非同步複製:消息寫入master後,master立即向producer返回成功ACK,無需等待slave同步數據成功

非同步複製策略會降低系統的寫入延遲,RT變小,提高了系統的吞吐量

(2)、刷盤策略

刷盤策略指的是broker中消息的落盤方式,即消息發送到broker記憶體後消息持久化到磁碟的方式。分為同步刷盤與非同步刷盤:

同步刷盤:當消息持久化到broker的磁碟後才算是消息寫入成功。

非同步刷盤:當消息寫入到broker的記憶體後即表示消息寫入成功,無需等待消息持久化到磁碟。

  • 非同步刷盤策略會降低系統的寫入延遲,RT變小,提高了系統的吞吐量

  • 消息寫入到Broker的記憶體,一般是寫入到了PageCache

  • 對於非同步 刷盤策略,消息會寫入到PageCache後立即返回成功ACK。但並不會立即做落盤操作,而是當PageCache到達一定量時會自動進行落盤。

2、Broker集群模式

根據Broker集群中各個節點間關係的不同,Broker集群可以分為以下幾類:

(1)、- 單Master

只有一個broker(其本質上就不能稱為集群)。這種方式也只能是在測試時使用,生產環境下不能使用,因為存在單點問題。

(2)、多Master

broker集群僅由多個master構成,不存在Slave。同一Topic的各個Queue會平均分布在各個master節點上。

  • 優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在 磁碟配置為RAID10 時,即使機器宕機不可恢復情況下,由於RAID10磁碟非常可靠,消息也不會丟(非同步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;

  • 缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱(不可消費),消息實時性會受到影響以上優點的前提是,這些Master都配置了RAID磁碟陣列。如果沒有配置,一旦出現某Master宕機,則會發生大量消息丟失的情況。

以上優點的前提是,這些Master都配置了RAID磁碟陣列。如果沒有配置,一旦出現某Master宕機,則會發生大量消息丟失的情況。

(3)、多Master多Slave模式-非同步複製

  • broker集群由多個master構成,每個master又配置了多個slave(在配置了RAID磁碟陣列的情況下,一個master一般配置一個slave即可)。master與slave的關係是主備關係,即master負責處理消息的讀寫請求,而slave僅負責消息的備份與master宕機後的角色切換。

  • 非同步複製即前面所講的複製策略中的非同步複製策略,即消息寫入master成功後,master立即向producer返回成功ACK,無需等待slave同步數據成功。

  • 該模式的最大特點之一是,當master宕機後slave能夠 自動切換 為master。不過由於slave從master的同步具有短暫的延遲(毫秒級),所以當master宕機後,這種非同步複製方式可能會存在少量消息的丟失問題。

Slave從Master同步的延遲越短,其可能丟失的消息就越少,對於Master的RAID磁碟陣列;若使用的也是非同步複製策略,同樣也存在延遲問題,同樣也可能會丟失消息。但RAID陣列的秘訣是微秒級的(因為是由硬碟支援的,但是成本過高),所以其丟失的數據量會更少。

(4)、多Master多Slave模式-同步雙寫

  • 該模式是多Master多Slave模式的同步複製實現。所謂同步雙寫,指的是消息寫入master成功後,master會等待slave同步數據成功後才向producer返回成功ACK,即master與slave都要寫入成功後才會返回成功ACK,也即雙寫。

  • 該模式與非同步複製模式相比,優點是消息的安全性更高,不存在消息丟失的情況。但單個消息的RT(響應時間)略高,從而導致性能要略低(大約低10%)。

  • 該模式存在一個大的問題:對於目前的版本,Master宕機後,Slave不會自動切換到Master(致命問題)。

(5)、最佳實踐

  • 一般會為Master配置RAID10磁碟陣列,然後再為其配置一個Slave。即利用了RAID10磁碟陣列的高
    效、安全性,又解決了可能會影響訂閱的問題。——-多M多S+RAID10陣列

    • RAID磁碟陣列的效率要高於Master-Slave集群。因為RAID是硬體支援的。也正因為如此,所以RAID陣列
      的搭建成本較高。

    • 多Master+RAID陣列,與多Master多Slave集群的區別是什麼?

      • 多Master+RAID陣列,其僅僅可以保證數據不丟失,即不影響消息寫入,但其可能會影響到
        消息的訂閱。但其執行效率要遠高於多Master多Slave集群

      • 多Master多Slave集群,其不僅可以保證數據不丟失,也不會影響消息寫入。其運行效率要低
        於多Master+RAID陣列

六、磁碟陣列RAID

1、RAID歷史

1988 年美國加州大學伯克利分校的 D. A. Patterson 教授等首次在論文 「A Case of Redundant Array ofInexpensive Disks」 中提出了 RAID 概念 ,即廉價冗餘磁碟陣列( Redundant Array of InexpensiveDisks )。

由於當時大容量磁碟比較昂貴, RAID 的基本思想是將多個容量較小、相對廉價的磁碟進行有機組合,從而以較低的成本獲得與昂貴大容量磁碟相當的容量、性能、可靠性。隨著磁碟成本和價格的不斷降低, 「廉價」 已經毫無意義。因此, RAID 諮詢委員會( RAID Advisory Board, RAB )決定用「 獨立 」 替代 「 廉價 」 ,於時 RAID 變成了獨立磁碟冗餘陣列( Redundant Array of IndependentDisks )。但這僅僅是名稱的變化,實質內容沒有改變。

2、RAID等級

RAID 這種設計思想很快被業界接納, RAID 技術作為高性能、高可靠的存儲技術,得到了非常廣泛的應用。 RAID 主要利用鏡像、數據條帶和數據校驗三種技術來獲取高性能、可靠性、容錯能力和擴展性,根據對這三種技術的使用策略和組合架構,可以把 RAID 分為不同的等級,以滿足不同數據應用的需求。

D. A. Patterson 等的論文中定義了 RAID0 ~ RAID6 原始 RAID 等級。隨後存儲廠商又不斷推出 RAID7、 RAID10、RAID01 、 RAID50 、 RAID53 、 RAID100 等 RAID 等級,但這些並無統一的標準。目前業界與學術界公認的標準是 RAID0 ~ RAID6 ,而在實際應用領域中使用最多的 RAID 等級是 RAID0 、RAID1 、 RAID3 、 RAID5 、 RAID6 和 RAID10。

RAID 每一個等級代表一種實現方法和技術,等級之間並無高低之分。在實際應用中,應當根據用戶的數據應用特點,綜合考慮可用性、性能和成本來選擇合適的 RAID 等級,以及具體的實現方式。

3、關鍵技術

(1)、鏡像技術

  • 鏡像技術是一種冗餘技術,為磁碟提供數據備份功能,防止磁碟發生故障而造成數據丟失。對於 RAID而言,採用鏡像技術最典型地的用法就是,同時在磁碟陣列中產生兩個完全相同的數據副本,並且分布在兩個不同的磁碟上。鏡像提供了完全的數據冗餘能力,當一個數據副本失效不可用時,外部系統仍可正常訪問另一副本,不會對應用系統運行和性能產生影響。而且,鏡像不需要額外的計算和校驗,故障修復非常快,直接複製即可。鏡像技術可以從多個副本進行並發讀取數據,提供更高的讀 I/O 性能,但不能並行寫數據,寫多個副本通常會導致一定的 I/O 性能下降。

鏡像技術提供了非常高的數據安全性,其代價也是非常昂貴的,需要至少雙倍的存儲空間。高成本限制了鏡像的廣泛應用,主要應用於至關重要的數據保護,這種場合下的數據丟失可能會造成非常巨大的損失。(硬碟不值錢沒多大成本)

(2)、數據條帶技術

  • 將一個數據拆開,並行寫給多個磁碟中;數據條帶化技術是一種自動將 I/O操作負載均衡到多個物理磁碟上的技術。更具體地說就是,將一塊連續的數據分成很多小部分並把它們分別存儲到不同磁碟上。這就能使多個進程可以並發訪問數據的多個不同部分,從而獲得最大程度上的 I/O 並行能力,極大地提升性能。

(3)、數據校驗技術

  • 數據校驗技術是指, RAID 要在寫入數據的同時進行校驗計算,並將得到的校驗數據存儲在 RAID 成員磁碟中。校驗數據可以集中保存在某個磁碟或分散存儲在多個不同磁碟中。當其中一部分數據出錯時,就可以對剩餘數據和校驗數據進行反校驗計算重建丟失的數據。(硬碟比CPU便宜,所以這個不咋地)

數據校驗技術相對於鏡像技術的優勢在於節省大量開銷,但由於每次數據讀寫都要進行大量的校驗運算,對電腦的運算速度要求很高,且必須使用硬體 RAID 控制器。在數據重建恢復方面,檢驗技術比鏡像技術複雜得多且慢得多。(並不實用)

4、RAID分類

從實現角度看, RAID 主要分為軟 RAID、硬 RAID 以及混合 RAID 三種。對於項目而言,都是用 硬RAID ,成本毛毛雨~,其他兩個了解就好

(1)、軟 RAID

  • 所有功能均有作業系統和 CPU 來完成,沒有獨立的 RAID 控制處理晶片和 I/O 處理晶片,效率自然最低。軟體完成

(2)、硬 RAID

  • 配備了專門的 RAID 控制處理晶片和 I/O 處理晶片以及陣列緩衝,不佔用 CPU 資源。效率很高,但成本也很高。硬體完成

(3)、混合 RAID

  • 具備 RAID 控制處理晶片,但沒有專門的I/O 處理晶片,需要 CPU 和驅動程式來完成。性能和成本在軟RAID 和硬 RAID 之間。軟硬完成

5、常見RAID等級詳解

(1)、JBOD

image

  • JBOD ,Just a Bunch of Disks,磁碟簇。表示一個沒有控制軟體提供協調控制的磁碟集合,這是 RAID區別與 JBOD 的主要因素。 JBOD 將多個物理磁碟串聯起來,提供一個巨大的邏輯磁碟·。

  • JBOD 的數據存放機制是由第一塊磁碟開始按順序往後存儲,當前磁碟存儲空間用完後,再依次往後面的磁碟存儲數據。 JBOD 存儲性能完全等同於單塊磁碟,而且也不提供數據安全保護。

多塊磁碟組成的邏輯磁碟;

其只是簡單提供一種擴展存儲空間的機制,JBOD可用存儲容量等於所有成員磁碟的存儲空間之和

JBOD 常指磁碟櫃,而不論其是否提供 RAID 功能。不過,JBOD並非官方術語,官方稱為Spanning。

(2)、RAID0

image

  • RAID0 是 一種簡單的、無數據校驗的數據條帶化技術。實際上不是一種真正的 RAID ,因為它並不提供任何形式的冗餘策略 。 RAID0 將所在磁碟條帶化後組成大容量的存儲空間,將數據分散存儲在所有磁碟中,以獨立訪問方式實現多塊磁碟的並讀訪問。

  • 理論上講,一個由 n 塊磁碟組成的 RAID0 ,它的讀寫性能是單個磁碟性能的 n 倍,但由於匯流排頻寬等多種因素的限制,實際的性能提升低於理論值。由於可以並發執行 I/O 操作,匯流排頻寬得到充分利用。再加上不需要進行數據校驗,RAID0 的性能在所有 RAID 等級中是最高的。

  • RAID0 具有低成本、高讀寫性能、 100% 的高存儲空間利用率等優點,但是它不提供數據冗餘保護,一旦數據損壞,將無法恢復。

應用場景:

對數據的順序讀寫要求不高,對數據的安全性和可靠性要求不高,但對系統性能要求很高的場景。

和JBOD的區別:

1、存儲容量:都是成員磁碟容量總和

2、磁碟利用率,都是100%,即都沒有做任何的數據冗餘備份

3、JBOD:數據是順序存放的,一個磁碟存滿後才會開始存放到下一個磁碟

4、RAID:各個磁碟中的數據寫入是並行的,是通過數據條帶技術寫入的。其讀寫性能是JBOD的n倍

(3)、RAID1

image

  • RAID1 就是一種鏡像技術,它將數據完全一致地分別寫到工作磁碟和鏡像磁碟,它的磁碟空間利用率為 50% 。 RAID1 在數據寫入時,響應時間會有所影響,但是讀數據的時候沒有影響。 RAID1 提供了最佳的數據保護,一旦工作磁碟發生故障,系統將自動切換到鏡像磁碟,不會影響使用。

  • RAID1是為了增強數據安全性使兩塊磁碟數據呈現完全鏡像,從而達到安全性好、技術簡單、管理方便。 RAID1 擁有完全容錯的能力,但實現成本高。

應用場景:

對順序讀寫性能要求較高,或對數據安全性要求較高的場景。

(4)、RAID10

image

  • RAID10是一個RAID1與RAID0的組合體,所以它繼承了RAID0的快速和RAID1的安全

  • 簡單來說就是,先做條帶,再做鏡像。先把進來的數據先分散到不同的磁碟,再將磁碟中的數據做鏡像。

(5)、RAID01

image

  • RAID01是一個RAID0與RAID1的組合體,所以它繼承了RAID0的快速和RAID1的安全

  • 簡單來說就是,先做鏡像,再做條帶。先把進來的數據先做鏡像,再將鏡像數據寫入到與之前數據不同的磁碟,即再做條帶。

對比RAID10容錯率會高,生產環境一般選擇RAID10