RabbitMQ入门指南

消息队列(Message Queue,以下简称MQ)常用于异步系统的数据传递。若不用MQ,我们只能[在应用层]使用轮询或接口回调等方式处理,这在效率或耦合度上是难以让人满意的。当然我们也可以在系统间保持一个长连接,基于底层socket机制进行数据的实时收发,如果再将这部分功能独立成一个中间件,供项目中所有系统使用,就是我们今天所指的MQ。


对比&选择

以下以当前较为流行社区活跃度较高的两个MQ——RabbitMQKafka做一比较,顺带提一提redis

简单的小型系统可以使用redis,redis简单易用,本身就提供了队列结构,也支持发布订阅模式。不过说到底redis是一个缓存数据库,主要职责并不是消息队列,缺少消息可达(防丢失)、可靠性(分布式、集群)、异步、事务处理等特性,需要应用层额外处理。

RabbitMQ:erlang开发,单机吞吐高,但是它只支持集群模式,不支持分布式,可靠性依靠的是集群中分属两个不同节点的master queuemirror queue同步数据,在master queue所在节点挂掉之后,系统把mirror queue提升为master queue,负责处理客户端队列操作请求。注意,mirror queue只做镜像,设计目的不是为了承担客户端读写压力,读写都走的master queue,这就有了单点性能瓶颈。RabbitMQ支持消费端pull和push模式。

Kafka:Scala开发,支持分布式,因此如果是相同队列,集群吞吐量肯定是大于RabbitMQ的。Kafka只支持pull模式。pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断轮询,直到新消息到达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞直到消息的数量达到某个特定的量这样就可以批量获取),如此个人认为在消息传输不是很频繁的场景下反而比push更好,即减少了轮询次数,又不需要永远占着一个连接,实时性也基本上能得到保障。RabbitMQ pull模式并不支持此机制。

其实对于吞吐量而言,除非我们预期有百万级并发,否则两者差别不大。另外对于上述RabbitMQ每个队列的单点瓶颈,我们可以将一个队列按一定逻辑拆分为多个队列,在业务端将消息分流,也能提高吞吐量。相比Kafka,RabbitMQ提供了较为完备的消息路由、消息到期删除/延迟/预定、消息容错机制,这些功能可不是短期内靠堆硬件能完成的,对此有要求的话,那么优选RabbitMQ没错了。由此我们也知道了为什么Kafka常用于日志系统,一是日志相对业务来说写操作异常频繁,可能一次请求会产生数十条日志,需要较高的吞吐量,且关联日志一般都是跨系统跨业务的,无法进行细粒度拆分,限制了RabbitMQ提升吞吐量的空间;另外日志记录对一致性实时性等要求不高,不需要什么策略,稍有丢失也无关大雅,无法体现RabbitMQ的优势。

综上所述,业务层建议使用RabbitMQ。


RabbitMQ概念及注意点

RabbitMQ主要概念:

  1. Connection:在RabbitMQ中指的是AMQP 0-9-1 connection,它与底层的TCP链接是一一对应的。
  2. Channel:信道,用于消息的传递。
  3. Queue:队列,消息通过交换机被投递到这里。
  4. Exchange:交换机,用于消息路由,通过routeKey决定将消息投递到哪个队列,有四种模式(fanout、direct、topic、header)。
  5. routeKey:路由键。
  6. DeadLetter:死信机制。

关于它们的介绍网上资料很多,这里就不赘述了,我们把注意点放到具体细节上。以下部分摘自RabbitMQ最佳实践,建议先了解了上述RabbitMQ主要概念再看。

在RabbitMQ中,消息确认分为发送方确认和消费方确认两个确认环节。

  • 发送端:
    ConfirmListener:消息是否到达exchange的回调,需要实现两个方法——handleAckhandleNack。通常来讲,发送端只需要保证消息能够发送到exchange即可,而无需关注消息是否被正确地投递到了某个queue,这个是RabbitMQ和消息的接收方需要考虑的事情。基于此,如果RabbitMQ找不到任何需要投递的queue,那么依然会ack给发送方,此时发送方可以认为消息已经正确投递,而不用关心消息没有queue接收的问题。此时可以为exchange设置alternate-exchange,即表示rabbitmq将把无法投递到任何queue的消息发送到alternate-exchange指定的exchange中,此时该指定的exchange就是一个死信交换机(DLX,所以DLX与普通交换机并无不同,只不过路由的是一些无法处理的消息而已)。
    ReturnListener:事实上,对于exchange存在但是却找不到任何接收queue时,如果发送时设置了mandatory=true,那么在消息被ack前将return给发送端,此时发送端可以创建一个ReturnListener用于接收返回的消息。
    需要注意的是,在发送消息时如果exchange不存在,消息会被直接丢弃,并且不会ack或者nack操作。
  • 消费端:消息默认是直接ack的,即消息到达消费方立即ack,而不管消费方业务处理是否成功。大部分情况我们需要业务处理完毕才认为此消息被正确消费了,为此可以开启手动确认模式,即有消费方自行决定何时应该ack,通过设置autoAck=false开启手动确认模式。
    requeue:消费端nack或reject时设置,告知rq是否将消息重新投递。
    默认情况下,queue中被抛弃的消息将被直接丢掉,但是可以通过设置queue的x-dead-letter-exchange参数,将被抛弃的消息发送到x-dead-letter-exchange中指定的exchange中,这样的exchange成为DLX。

Lazy Queue:一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。

实测在topic模式下,例如test#是没用的,无法匹配test1,需要配置为test.#,也许这是RabbitMQ所要求的规范吧。

在RabbitMQ中,使用一个还是多个exchange,似乎网上并没有关于这方面的广泛讨论(可见性能上两种方案并无显著差别),so,我们就从不增加复杂度出发,保持一个exchange对应多个queue的简单模式,或按业务划分。

创建链接时可以提供一个ConnectionName,如newConnection(ConnectionName),然而ConnectionName似乎只是给人看的(比如在管理后台),并不要求唯一性。

接下来我们聊下Connection和Channel。


Connection和Channel

为什么要将这两个东西单独拎出来讲呢。因为RabbitMQ并没有为我们提供一个开箱即用的链接复用组件。众所周知,Connection这东西,创建和销毁是一笔不小的开支,然而以官方提供的Java Client SDK为例,ConnectionFactory.newConnection()每次都会new一个新的Connection,SDK并没有内置连接池,这块工作就需要另外处理。
NIO里也有这两个概念(不了解NIO的可参看博主以前写的也谈Reactor模式),而它们都有链接复用的意思,也许RabbitMQ就是参考了NIO呢。

而Channel是干嘛的?数据传输嘛,Connection就做了,有了Connection为什么还要Channel?其实Channel是为了在另外一个层面复用Connection———解决多线程数据并发传输的问题。直接操作Connection进行数据传输,当有多个线程同时操作时,很容易出现数据帧错乱的情况。一个Connection可以create多个Channel。消息的收发、路由等操作都是和某个Channel绑定而非Connection,每个消息都由一个Channel ID标识。然而,窃以为这种细节完全可以对使用者隐藏,暴露出来反而会增加复杂度。关于Channel的使用方式和注意事项,官方文档给出了一些描述,具体编码时需要考量。

As a rule of thumb, Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
官方建议每个线程使用一个Channel,不同线程最好不要共享Channel,否则并发时容易产生数据帧交错(同多个线程直接共用一个Connection一样),这种情况下exchange会直接关闭下层Connection。
Channels consume resources,所以在一个进程中同时存在成百上千个打开状态的Channel不是一个好注意。但是用完即关也不是一个好主意, A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient. 开辟一个新的channel需要一个网络的往返,这种模式是很低效的。

Consuming in one thread and publishing in another thread on a shared channel can be safe.

一个Connection上的多个Channel的调度是由一个java.util.concurrent.ExecutorService负责的。我们可以使用ConnectionFactory#setSharedExecutor设置自定义调度器。

在消费者端,消息Ack需要由接收消息(received the delivery)的线程完成,否则可能会产生Channel级别的异常,并且Channel会被关闭。

总得来说,Channel也需要复用,但是数量可以比Connection多一两个数量级。我们可以设计一个简单的连接池方案PooledConnectionFactory,它是一个Connection容器,保持若干long-lived Connection提供给外部使用,而每个Connection又有自己的PooledChannelFactory,其中维持着一些long-lived Channel。Spring-boot提供了一个AMQP组件Spring AMQP,已经帮我们实现了类似的方案,并且还隐藏了Channel这个东东,但Spring的原罪——代码碎片化,注解满天飞——又增加了组件本身使用的复杂度,且无法掌控细节。当然它还提供了其它一些可有可无的特性。其实,我们只需要一个简单的连接池而已,so,让我们自己实现吧。


简单连接池实现&使用

直接上代码。先定义一个链接配置类:

@Component
data class ConnectionConfig(
    @Value("\${rabbitmq.userName:guest}")
    var userName: String, 
    @Value("\${rabbitmq.password:guest}")
    var password: String,
    @Value("\${rabbitmq.host:localhost}")
    var host: String,
    @Value("\${rabbitmq.port:5672}")
    var port: Int,
    @Value("\${rabbitmq.virtualHost:/}")
    var virtualHost: String
)

配置项可在配置文件中配置。

工厂类,内部有个BlockingQueue存放PooledConnection实例,PooledConnection封装了RabbitMQ的Connection,至于为啥要封装一层稍后说。

@Component
class PooledConnectionFactory(@Autowired private val connectionConfig: ConnectionConfig,
                              @Value("\${rabbitmq.maxConnectionCount:5}")
                              private val maxConnectionCount: Int) {
    private val _logger: Logger by lazy {
        LoggerFactory.getLogger(PooledConnectionFactory::class.java)
    }

    private val _connQueue = ArrayBlockingQueue<PooledConnection>(maxConnectionCount)

    //已创建了几个connection
    private val _connCreatedCount = AtomicInteger()

    private val _factory by lazy {
        buildConnectionFactory()
    }

    private fun buildConnectionFactory(): ConnectionFactory {
        val factory = ConnectionFactory()
        with(connectionConfig) {
            factory.username = userName
            factory.password = password
            factory.virtualHost = virtualHost
            factory.host = host
            factory.port = port
        }
        return factory
    }

    @Throws(IOException::class, TimeoutException::class)
    fun newConnection(): PooledConnection {
        var conn = _connQueue.poll()
        if (conn == null) {
            if (_connCreatedCount.getAndIncrement() < maxConnectionCount) {
                try {
                    conn = PooledConnection(_factory.newConnection(), _connQueue)
                } catch (e: Exception) {
                    _connCreatedCount.decrementAndGet()
                    _logger.error("创建RabbitMQ连接出错", e)
                    throw e
                }
            } else {
                _connCreatedCount.decrementAndGet()
                conn = _connQueue.take()
            }
        }
        return conn
    }
}

注意newConnection方法使用了AtomicInteger保证线程安全。

再来看PooledConnection,它实现了Closeable接口。而我是用kotlin写的代码,对于Closeable接口,kotlin提供了一个扩展函数use(),use函数会在代码块执行后自动关闭调用者(无论中间是否出现异常),类似于C#的using()操作,等会我们就会看到如何使用。

class PooledConnection(private val connection: Connection, private val container: BlockingQueue<PooledConnection>) : Closeable {
    private val _logger: Logger by lazy {
        LoggerFactory.getLogger(PooledConnection::class.java)
    }

    override fun close() {
        val offered = container.offer(this)
        if (!offered) {
            val message = "RabbitMQ连接池已满,无法释放当前连接"
            _logger.error(message)
            throw IOException(message)
        }
    }

    fun get() = connection
}

注意close()函数不是真的close,而是将Connection放回连接池。如果用的是RabbitMQ.Connection的话,就直接关闭了。
get()函数将RabbitMQ.Connection暴露出来供生产者和消费者使用。

That’s all! 关于连接池的代码就这么简单,Channel池也可照猫画虎,以此类推:)

使用的话,以生产端为例:

    /**
     * 发送消息
     *
     * @param data 需要发送的数据
     * @param exchange the name of the exchange sent to
     * @param routeKey 路由键,用于exchange投递消息到队列
     */
    @Throws(IOException::class)
    fun send(data: Any, exchange: String, routeKey: String) = factory.newConnection().use{
        val conn = it.get()
        val channel = conn.createChannel()
        try {
            val properties = AMQP.BasicProperties.Builder()
                .contentType("application/json")
                .deliveryMode(2) //消息持久化,防处理之前丢失。默认1。
                .build()
            it.basicPublish(exchange, routeKey, properties, JSON.toJSONString(data).toByteArray())
        }catch (e: Exception) {
            logger.error(e.message)
            throw e
        } finally {
            channel.close()
        }            
    }

so easy! 注意use()的用法。

Channel池可以类似方式实现。


参考资料

RabbitMQ和Kafka到底怎么选?
Kafka与RabbitMQ区别
RabbitMQ发布订阅实战-实现延时重试队列