RabbitMQ入門指南

消息隊列(Message Queue,以下簡稱MQ)常用於異步系統的數據傳遞。若不用MQ,我們只能[在應用層]使用輪詢或接口回調等方式處理,這在效率或耦合度上是難以讓人滿意的。當然我們也可以在系統間保持一個長連接,基於底層socket機制進行數據的實時收發,如果再將這部分功能獨立成一個中間件,供項目中所有系統使用,就是我們今天所指的MQ。


對比&選擇

以下以當前較為流行社區活躍度較高的兩個MQ——RabbitMQKafka做一比較,順帶提一提redis

簡單的小型系統可以使用redis,redis簡單易用,本身就提供了隊列結構,也支持發佈訂閱模式。不過說到底redis是一個緩存數據庫,主要職責並不是消息隊列,缺少消息可達(防丟失)、可靠性(分佈式、集群)、異步、事務處理等特性,需要應用層額外處理。

RabbitMQ:erlang開發,單機吞吐高,但是它只支持集群模式,不支持分佈式,可靠性依靠的是集群中分屬兩個不同節點的master queuemirror queue同步數據,在master queue所在節點掛掉之後,系統把mirror queue提升為master queue,負責處理客戶端隊列操作請求。注意,mirror queue只做鏡像,設計目的不是為了承擔客戶端讀寫壓力,讀寫都走的master queue,這就有了單點性能瓶頸。RabbitMQ支持消費端pull和push模式。

Kafka:Scala開發,支持分佈式,因此如果是相同隊列,集群吞吐量肯定是大於RabbitMQ的。Kafka只支持pull模式。pull有個缺點是,如果broker沒有可供消費的消息,將導致consumer不斷輪詢,直到新消息到達。為了避免這點,Kafka有個參數可以讓consumer阻塞知道新消息到達(當然也可以阻塞直到消息的數量達到某個特定的量這樣就可以批量獲取),如此個人認為在消息傳輸不是很頻繁的場景下反而比push更好,即減少了輪詢次數,又不需要永遠佔著一個連接,實時性也基本上能得到保障。RabbitMQ pull模式並不支持此機制。

其實對於吞吐量而言,除非我們預期有百萬級並發,否則兩者差別不大。另外對於上述RabbitMQ每個隊列的單點瓶頸,我們可以將一個隊列按一定邏輯拆分為多個隊列,在業務端將消息分流,也能提高吞吐量。相比Kafka,RabbitMQ提供了較為完備的消息路由、消息到期刪除/延遲/預定、消息容錯機制,這些功能可不是短期內靠堆硬件能完成的,對此有要求的話,那麼優選RabbitMQ沒錯了。由此我們也知道了為什麼Kafka常用於日誌系統,一是日誌相對業務來說寫操作異常頻繁,可能一次請求會產生數十條日誌,需要較高的吞吐量,且關聯日誌一般都是跨系統跨業務的,無法進行細粒度拆分,限制了RabbitMQ提升吞吐量的空間;另外日誌記錄對一致性實時性等要求不高,不需要什麼策略,稍有丟失也無關大雅,無法體現RabbitMQ的優勢。

綜上所述,業務層建議使用RabbitMQ。


RabbitMQ概念及注意點

RabbitMQ主要概念:

  1. Connection:在RabbitMQ中指的是AMQP 0-9-1 connection,它與底層的TCP鏈接是一一對應的。
  2. Channel:信道,用於消息的傳遞。
  3. Queue:隊列,消息通過交換機被投遞到這裡。
  4. Exchange:交換機,用於消息路由,通過routeKey決定將消息投遞到哪個隊列,有四種模式(fanout、direct、topic、header)。
  5. routeKey:路由鍵。
  6. DeadLetter:死信機制。

關於它們的介紹網上資料很多,這裡就不贅述了,我們把注意點放到具體細節上。以下部分摘自RabbitMQ最佳實踐,建議先了解了上述RabbitMQ主要概念再看。

在RabbitMQ中,消息確認分為發送方確認和消費方確認兩個確認環節。

  • 發送端:
    ConfirmListener:消息是否到達exchange的回調,需要實現兩個方法——handleAckhandleNack。通常來講,發送端只需要保證消息能夠發送到exchange即可,而無需關注消息是否被正確地投遞到了某個queue,這個是RabbitMQ和消息的接收方需要考慮的事情。基於此,如果RabbitMQ找不到任何需要投遞的queue,那麼依然會ack給發送方,此時發送方可以認為消息已經正確投遞,而不用關心消息沒有queue接收的問題。此時可以為exchange設置alternate-exchange,即表示rabbitmq將把無法投遞到任何queue的消息發送到alternate-exchange指定的exchange中,此時該指定的exchange就是一個死信交換機(DLX,所以DLX與普通交換機並無不同,只不過路由的是一些無法處理的消息而已)。
    ReturnListener:事實上,對於exchange存在但是卻找不到任何接收queue時,如果發送時設置了mandatory=true,那麼在消息被ack前將return給發送端,此時發送端可以創建一個ReturnListener用於接收返回的消息。
    需要注意的是,在發送消息時如果exchange不存在,消息會被直接丟棄,並且不會ack或者nack操作。
  • 消費端:消息默認是直接ack的,即消息到達消費方立即ack,而不管消費方業務處理是否成功。大部分情況我們需要業務處理完畢才認為此消息被正確消費了,為此可以開啟手動確認模式,即有消費方自行決定何時應該ack,通過設置autoAck=false開啟手動確認模式。
    requeue:消費端nack或reject時設置,告知rq是否將消息重新投遞。
    默認情況下,queue中被拋棄的消息將被直接丟掉,但是可以通過設置queue的x-dead-letter-exchange參數,將被拋棄的消息發送到x-dead-letter-exchange中指定的exchange中,這樣的exchange成為DLX。

Lazy Queue:一個重要的設計目標是能夠支持更長的隊列,即支持更多的消息存儲。惰性隊列會將接收到的消息直接存入文件系統中,而不管是持久化的或者是非持久化的。注意如果惰性隊列中存儲的是非持久化的消息,內存的使用率會一直很穩定,但是重啟之後消息一樣會丟失。

實測在topic模式下,例如test#是沒用的,無法匹配test1,需要配置為test.#,也許這是RabbitMQ所要求的規範吧。

在RabbitMQ中,使用一個還是多個exchange,似乎網上並沒有關於這方面的廣泛討論(可見性能上兩種方案並無顯著差別),so,我們就從不增加複雜度出發,保持一個exchange對應多個queue的簡單模式,或按業務劃分。

創建鏈接時可以提供一個ConnectionName,如newConnection(ConnectionName),然而ConnectionName似乎只是給人看的(比如在管理後台),並不要求唯一性。

接下來我們聊下Connection和Channel。


Connection和Channel

為什麼要將這兩個東西單獨拎出來講呢。因為RabbitMQ並沒有為我們提供一個開箱即用的鏈接復用組件。眾所周知,Connection這東西,創建和銷毀是一筆不小的開支,然而以官方提供的Java Client SDK為例,ConnectionFactory.newConnection()每次都會new一個新的Connection,SDK並沒有內置連接池,這塊工作就需要另外處理。
NIO里也有這兩個概念(不了解NIO的可參看博主以前寫的也談Reactor模式),而它們都有鏈接復用的意思,也許RabbitMQ就是參考了NIO呢。

而Channel是幹嘛的?數據傳輸嘛,Connection就做了,有了Connection為什麼還要Channel?其實Channel是為了在另外一個層面復用Connection———解決多線程數據並發傳輸的問題。直接操作Connection進行數據傳輸,當有多個線程同時操作時,很容易出現數據幀錯亂的情況。一個Connection可以create多個Channel。消息的收發、路由等操作都是和某個Channel綁定而非Connection,每個消息都由一個Channel ID標識。然而,竊以為這種細節完全可以對使用者隱藏,暴露出來反而會增加複雜度。關於Channel的使用方式和注意事項,官方文檔給出了一些描述,具體編碼時需要考量。

As a rule of thumb, Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
官方建議每個線程使用一個Channel,不同線程最好不要共享Channel,否則並發時容易產生數據幀交錯(同多個線程直接共用一個Connection一樣),這種情況下exchange會直接關閉下層Connection。
Channels consume resources,所以在一個進程中同時存在成百上千個打開狀態的Channel不是一個好注意。但是用完即關也不是一個好主意, A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient. 開闢一個新的channel需要一個網絡的往返,這種模式是很低效的。

Consuming in one thread and publishing in another thread on a shared channel can be safe.

一個Connection上的多個Channel的調度是由一個java.util.concurrent.ExecutorService負責的。我們可以使用ConnectionFactory#setSharedExecutor設置自定義調度器。

在消費者端,消息Ack需要由接收消息(received the delivery)的線程完成,否則可能會產生Channel級別的異常,並且Channel會被關閉。

總得來說,Channel也需要復用,但是數量可以比Connection多一兩個數量級。我們可以設計一個簡單的連接池方案PooledConnectionFactory,它是一個Connection容器,保持若干long-lived Connection提供給外部使用,而每個Connection又有自己的PooledChannelFactory,其中維持着一些long-lived Channel。Spring-boot提供了一個AMQP組件Spring AMQP,已經幫我們實現了類似的方案,並且還隱藏了Channel這個東東,但Spring的原罪——代碼碎片化,註解滿天飛——又增加了組件本身使用的複雜度,且無法掌控細節。當然它還提供了其它一些可有可無的特性。其實,我們只需要一個簡單的連接池而已,so,讓我們自己實現吧。


簡單連接池實現&使用

直接上代碼。先定義一個鏈接配置類:

@Component
data class ConnectionConfig(
    @Value("\${rabbitmq.userName:guest}")
    var userName: String, 
    @Value("\${rabbitmq.password:guest}")
    var password: String,
    @Value("\${rabbitmq.host:localhost}")
    var host: String,
    @Value("\${rabbitmq.port:5672}")
    var port: Int,
    @Value("\${rabbitmq.virtualHost:/}")
    var virtualHost: String
)

配置項可在配置文件中配置。

工廠類,內部有個BlockingQueue存放PooledConnection實例,PooledConnection封裝了RabbitMQ的Connection,至於為啥要封裝一層稍後說。

@Component
class PooledConnectionFactory(@Autowired private val connectionConfig: ConnectionConfig,
                              @Value("\${rabbitmq.maxConnectionCount:5}")
                              private val maxConnectionCount: Int) {
    private val _logger: Logger by lazy {
        LoggerFactory.getLogger(PooledConnectionFactory::class.java)
    }

    private val _connQueue = ArrayBlockingQueue<PooledConnection>(maxConnectionCount)

    //已創建了幾個connection
    private val _connCreatedCount = AtomicInteger()

    private val _factory by lazy {
        buildConnectionFactory()
    }

    private fun buildConnectionFactory(): ConnectionFactory {
        val factory = ConnectionFactory()
        with(connectionConfig) {
            factory.username = userName
            factory.password = password
            factory.virtualHost = virtualHost
            factory.host = host
            factory.port = port
        }
        return factory
    }

    @Throws(IOException::class, TimeoutException::class)
    fun newConnection(): PooledConnection {
        var conn = _connQueue.poll()
        if (conn == null) {
            if (_connCreatedCount.getAndIncrement() < maxConnectionCount) {
                try {
                    conn = PooledConnection(_factory.newConnection(), _connQueue)
                } catch (e: Exception) {
                    _connCreatedCount.decrementAndGet()
                    _logger.error("創建RabbitMQ連接出錯", e)
                    throw e
                }
            } else {
                _connCreatedCount.decrementAndGet()
                conn = _connQueue.take()
            }
        }
        return conn
    }
}

注意newConnection方法使用了AtomicInteger保證線程安全。

再來看PooledConnection,它實現了Closeable接口。而我是用kotlin寫的代碼,對於Closeable接口,kotlin提供了一個擴展函數use(),use函數會在代碼塊執行後自動關閉調用者(無論中間是否出現異常),類似於C#的using()操作,等會我們就會看到如何使用。

class PooledConnection(private val connection: Connection, private val container: BlockingQueue<PooledConnection>) : Closeable {
    private val _logger: Logger by lazy {
        LoggerFactory.getLogger(PooledConnection::class.java)
    }

    override fun close() {
        val offered = container.offer(this)
        if (!offered) {
            val message = "RabbitMQ連接池已滿,無法釋放當前連接"
            _logger.error(message)
            throw IOException(message)
        }
    }

    fun get() = connection
}

注意close()函數不是真的close,而是將Connection放回連接池。如果用的是RabbitMQ.Connection的話,就直接關閉了。
get()函數將RabbitMQ.Connection暴露出來供生產者和消費者使用。

That’s all! 關於連接池的代碼就這麼簡單,Channel池也可照貓畫虎,以此類推:)

使用的話,以生產端為例:

    /**
     * 發送消息
     *
     * @param data 需要發送的數據
     * @param exchange the name of the exchange sent to
     * @param routeKey 路由鍵,用於exchange投遞消息到隊列
     */
    @Throws(IOException::class)
    fun send(data: Any, exchange: String, routeKey: String) = factory.newConnection().use{
        val conn = it.get()
        val channel = conn.createChannel()
        try {
            val properties = AMQP.BasicProperties.Builder()
                .contentType("application/json")
                .deliveryMode(2) //消息持久化,防處理之前丟失。默認1。
                .build()
            it.basicPublish(exchange, routeKey, properties, JSON.toJSONString(data).toByteArray())
        }catch (e: Exception) {
            logger.error(e.message)
            throw e
        } finally {
            channel.close()
        }            
    }

so easy! 注意use()的用法。

Channel池可以類似方式實現。


參考資料

RabbitMQ和Kafka到底怎麼選?
Kafka與RabbitMQ區別
RabbitMQ發佈訂閱實戰-實現延時重試隊列