RabbitMQ之消息模式简单易懂,超详细分享~~~
前言
上一篇对RabbitMQ的流程和相关的理论进行初步的概述,如果小伙伴之前对消息队列不是很了解,那么在看理论时会有些困惑,这里以消息模式为切入点,结合理论细节和代码实践的方式一起来学习。
正文
常用的模式有Simple、Work、Fanout、Direct、Topic、Headers,可以通过设置交换机类型和配置参数来实现各个模式;接下来就分别进行实操演示吧。
以下演示都是通过管理员的账号进行。其实每种模式其实很大一部分操作都是一样的,所以公共部分不会重复截图说明,不过会针对不同的配置进行说明。
1. 简单模式(Simple)
简单模式顾名思义就是简单,不用配置太多的东西,如下图所示:
上图解析:
-
P:表示生产者,负责推送消息;
-
C:表示消费者,负责接收消息;
-
中间红色部分:代表的是队列(Queue);
小伙伴可能会奇怪,这里没有交换机吗?
其实是有的,上一篇说流程的时候,消息肯定是要通过交换机转发到队列中的,这里没有指定,那是因为用到了默认的交换机,具体看以下演示。
1.1 Web管理界面进行演示
对于Web界面演示来说,只需要将消息能生产、投递、消费即可,我们不用去弄一个生产者和消费者,生产者和消费者都是业务处理逻辑用的,所以通常都是根据业务需求就行实现的;话不多说开始演示吧。
根据上图所示,我们只需要创建一个队列即可,然后就可以进行消息模拟发送和消费了。
此时并没有指定交换机绑定,点击队列名看详情中的Bindings,有一个默认的交换机已经和队列进行绑定:
队列详情页面的说明,在上篇文章中就已经标注了,这里就不再赘述。
有了绑定关系之后,就可以在默认的交换机页面开始模拟转发消息;首先进入Exchanges管理页面,点击默认交换机(AMQP default)进入详情开始发布消息:
消息发送成功之后就会在队列界面看到消息情况:
队列里面有了消息之后,就可以模拟消费者进行消息消费,点击队列名进入详情,可在详情也模拟消费:
如上所示,简单模式整个消费流程就通过Web页面模拟完了。但在消费消息时,提供了Ack Mode模式(消息确认模式)选择来进行消费,可选择的模式如下:
- Nack message requeue true:获取消息,但是不会向Server做ack应答确认(即不告诉服务器消息被消费了),消息重新入队。即队列中的消息不会被删除掉;
- Automatic Ack:获取消息,向Server做应答确认(即会告诉服务器消息被消费了),消息不重新入队,将会从队列中删除;
- Reject requeue true:拒绝获取消息(即拒绝处理消息),消息重新入队;
- Reject requeue false:拒绝获取消息(即拒绝处理消息),消息不重新入队,将会被删除;
到这关于简单模式下的界面演示就结束了,其中描述的细节内容是共用的,在其他模式下的操作也类似,后续不做重复说明。
1.2 代码进行演示
这里就用控制台的方式,一步一步的实现。这里需要引入Nuget包:RabbitMQ.Client。生产者的整体代码如下:
接下来就一步一步来调试,看看消息是怎么一步一步发出去的;
-
创建连接
刚开始没有任何连接,如下:
代码继续下一步,连接就有了:
此时就可以理解为网络连接上了,但通道还没有创建出来,如下:
-
根据连接创建通道
通道根据连接进行创建,目的是为了提高传输效率,共用一个连接,不然频繁的创建和销毁连接会占资源,影响性能。
-
定义队列
有连接和通道之后理论就可以直接发消息了,但直接通信会相互依赖比较强,达不到解耦合的效果。所以需要定义一个队列将消息存放到里面,客户端想用了自己来消费就行,另外队列还可以达到一定的削峰作用。
创建队列的时候需要传几个参数,分别意思如下:
参数1:queue, 队列的名称;
参数2:durable, 队列是否持久化;如果为true,服务器重启之后队列还在,不会被清除;否则就被清掉。
参数3:exclusive ,是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭;
参数4:autoDelete, 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息;
参数5:arguments, 用来设置队列附加参数,如设置队列的有效期、队列的消息生命周期、消息的最大长度等;
-
发送消息
经过以上步骤,就可以发送消息了,如上没有定义交换机,那就是绑定了默认交换机。
发送时的几个参数意思如下:
参数1:exchange,交换机,这里没有指定交换机。
参数2:routingKey,路由key,即指定队列,简单模式下,路由key默认就是队列名
参数3:basicProperties, 配置其他相关属性
参数4:body ,需要发送的消息内容
以上的生产者完成了,现在再来一个消费者演示一下消息消费,消费者整体代码如下:
效果如下:
在整个过程中,还是会先建立连接,创建通道,指定队列; 不同的是增加对接收数据的处理。
是不是用起来比较简单方便,主要是在复杂项目中,消息队列的作用真的很大。来,接着往下说说其他模式。
2. 工作模式(Work)
工作模式是考虑到多个消费者情况下,消息如何被消费的,主要有两种方案,轮询分发和公平分发;
- 轮询分发:消费者依次轮着消费消息,直到消息消费完为止,按均分配。
- 公平分发:根据消费者能力进行分发,即处理快的消费就多,处理慢的就消费就少,能者多劳。
上图解析:
- P:表示生产者,负责推送消息;
- C1、C2:表示多个消费者,都可以从同一个队列接收消息;
- 中间红色部分:代表的是队列(Queue);
这两种方式需要多个消费者,在界面不太好模拟,所以就直接上代码演示了。
2.1 轮询分发
在简单模式基础上稍微改动一下代码即可。在生产者中发多条消息出来,然后启用多个消费者就可以进行模拟如下:
生产者代码整体如下:
消费者代码整体如下:
两个消费者的代码都是一样,没有变动; 两个消费者的都是在消费同一个队列的消息,可以先启动两个消费者,然后在启动生产者,效果如下:
由上可见,默认情况下其实采用的是轮询方式。
2.2 公平分发
在实际业务中,有些业务处理比较耗时,有些处理耗时不长,如上案例,假如奇数消息需要处理比较耗时,那么对应的消费者就压力比较大。这种情况可以通过公平分发的方式进行业务处理,处理快点的就多处理点。
如何才能知道消费者处理业务完成呢?消费者处理完成之后主动上报是最好不过的,所以只需要在消费者端将自动确认机制改为手动确认即可,即:业务处理完成之后,手动上报确认状态。
生产者的代码不需要变动,只需要稍微改改消费者代码即可,这里模拟两个不同处理能力的消费者:
- 消费者1处理一条消息业务需要500毫秒;
- 消费者2处理一条消息业务需要2秒;
消费者1整体代码如下:
消费者2整体代码如下,主要是模拟时间不一样:
先启动两个消费者,再启动生产者,看看消费情况,如下:
以上演示就是公平分发模式的演示,其中有两个关键的步骤:
- 设置每次消费的消息条数,可以根据实际业务情况配置,这里设置的是每次取一条。通过 channel.BasicQos进行设置。
- 将消息确认模式改为自动模式,这样就可以根据实际业务处理情况反馈确认信息,服务器就会将消息处理掉,即删除消息。所以一般在实际业务场景大都会推荐使用手动确认的方式,这样避免业务未处理导致消息就被服务器给清掉的情况。
3. 发布订阅模式(Fanout)
Fanout模式是一种发布订阅模式,是一种广播机制,不需要指定路由Key。这种模式的交换机就会将消息广播到绑定的所有队列上去,只要有消费者订阅对应的队列,就会收到消息。如下图:
上图解析:
- P:表示生产者,负责推送消息;
- X:表示交换机,图中表示一个交换机绑定了多个队列;
- C1、C2:表示多个消费者,都可以从同一个队列接收消息;
- 中间红色部分:代表的是队列(Queue);
在这种模式下,消息会一次性被多个消费者消费。
3.1 Web管理界面进行演示
-
先创建一个Fanout模式的交换机;
-
创建两个队列(根据实际需要创建多个),并将队列绑定到上一步创建的交换机上;
这里演示就创建两个队列,分别是FanoutQ1和FanoutQ2
在队列详情页或交换机详情页都可以进行交换机和队列的绑定,这里分别在FanoutQ1和FanoutQ2队列详情页进行绑定,如下:
同样的方式绑定FanoutQ2, 绑定完成之后,可以在交换机详情页看到对应的绑定队列:
-
通过交换机上投递消息,看效果;
此时通过FanoutExchange交换机投递消息,绑定到此交换机上的队列都能收到:
查看队列消息情况,可以看到两个队列都接收到消息了。
3.2 代码进行演示
其实代码和操作Web一样,只是用代码实现生产者和消费者而已。
-
生产者的代码
因为Fanout交换机不用关注RoutingKey,所以在发布消息时,第二个参数不需要传递RoutingKey。
-
消费者1的代码
消费者比较关注的是交换机需要和生产者指定的是同一个,队列和交换机有绑定。
-
消费者2的代码,其实和消费者1基本一样,只是定义的队列不一样
-
先启动消费者,然后启动生产者,看效果
这里是控制台程序,为了显示方便就先启动消费者,后期的生产者,实际应用场景先启动谁都行。
4. 路由模式(Direct)
Direct模式是在Fanout基础增加RoutingKey条件, 即交换机不会将消息现全部投递到所有队列,而是只投递到对应RoutingKey下的队列。如图:
上图解析:
- P:表示生产者,负责推送消息;
- X:表示交换机,指定类型为direct,图中表示一个交换机绑定了多个队列;
- 中间箭头上的error、info、warning代表具体的RoutingKey;
- C1、C2:表示多个消费者,都可以从同一个队列接收消息;
- 中间红色部分:代表的是队列(Queue);
Direct模式其实在实际应用场景中用的比较多的,默认的Exhanges也是Direct模式, 很多关于消息队列的框架,默认也是采用这种模式,主要原因是根据RoutingKey精确的处理对应的业务,不会由于考虑不周到,导致消息处理有不确定性,性能相对也不错。
4.1 Web管理界面进行演示
-
先创建一个Direct模式的交换机;
-
创建两个队列,然后将队列绑定到上一步创建的交换机上,并指定对应的RoutingKey;
创建队列完成之后,还需要绑定到交换机上,上一种模式演示的是从队列详情中维护绑定关系,这次从交换机详情中进行演示,如下:
绑定第1个队列,指定RoutingKey是order,模拟处理订单的:
绑定第2个队列,指定RoutingKey是msg,模拟处理消息的:
有了绑定关系,就可以测试验证了。
注:这里的RoutingKey可以根据实际情况随意指定的。
-
向交换机上投递消息,看效果;
模拟发布一个RoutingKey为order的消息:
再发布一个消息,指定RoutingKey为msg,如下:
两个消息都发布成功,而是指定对应的RoutingKey进行投递,所以现在绑定到此交换机上两个队列中分别有一条数据,查看队列的消息概况:
可以进入队列详情,通过GetMessage获取到具体的消息内容,这里就不截图了,上面已经演示过。
4.2 代码进行演示
在Fanout的代码基础上稍微改动即可,主要改动点就是改变交换机的类型,并在队列和交换机绑定时设置对应的RoutingKey,发布消息的时候指定交换机和RoutingKey。
-
生产者代码
-
消费者1代码,主要是绑定队列时指定的RoutingKey为order
-
消费者2代码,主要是绑定队列时指定的RoutingKey为msg
-
运行起来看效果:
如上演示效果,和Web演示一样,只有精确匹配到RoutingKey才能消费到对应的消息数据。
5. 主题模式(Topic)
Topic模式是在Direct模式基础增加模糊匹配RoutingKey,Direct精确匹配RoutingKey,Topic可以通*或#进行模糊匹配,从而把消息投递到对应的队列中,如图:
上图解析:
- P:表示生产者,负责推送消息;
- X:表示交换机,指定类型为topic,图中表示一个交换机绑定了多个队列;
- 中间箭头上的文字代表模糊匹配的RoutingKey;其中*表示匹配RoutingKey中的一个词,#号表示匹配RoutingKey的零个或多个词,匹配符需要与点号(.)搭配使用。 *.orange.test.# 示例中orange算一个词,test算一个词,即通过点号(.)分开的就称为一个词。
- C1、C2:表示多个消费者,都可以从同一个队列接收消息;
- 中间红色部分:代表的是队列(Queue);
5.1 Web管理界面进行演示
-
先创建一个Topic模式的交换机;
-
创建两个队列,然后将队列绑定到上一步创建的交换机上,并指定对应的RoutingKey;
将队列绑定到交换机上,这里还是在交换机详情中进行演示,如下:
同样的步骤分别对TopicQ1和TopicQ2进行规则绑定,如下:
现在的TopicExchange的绑定关系如下:
有了关系之后就可以进行验证效果了。
-
向交换机上投递消息,会根据RoutingKey的模糊匹配规则将消息投递到对应的队列中,看效果;
先指定order.create.test发布消息,看看会匹配哪些队列:
TopicQ2接收到消息,匹配到路由规则order.#
再指定RoutingKey 为order.update 发布一个消息,如下:
TopicQ1和TopicQ2都收到消息了,匹配到路由规则order.#和order.*,如下:
再指定RoutingKey 为order发布一个消息,就会匹配到order.#,这里就不截图了。
以上测试说明:在Topic类型交换机和队列绑定关系时,可以指定RoutingKey的匹配规则,星号、#号、点号搭配使用,其中*表示匹配RoutingKey中的一个词,#号表示匹配RoutingKey的零个或多个词。
更多情况,小伙伴们自己动手试试。
5.2 代码进行演示
在Direct的代码基础上稍微改动即可,主要改动点就是改变交换机的类型,并在队列和交换机绑定时设置对应的RoutingKey,这里的RoutingKey是一个规则,是星号、#号、点号和每个词的组合,发布消息的时候指定交换机和RoutingKey,RoutingKey会去匹配绑定的规则。
-
生产者代码
-
消费者1代码,指定路由匹配规则为order.#
-
消费者2代码,指定路由匹配规则为order.*
-
演示效果,将生产者和消费者都启动
如上图,和Web演示一样,#号匹配0个和多个词,*号只能匹配一个词。 符号可以与词任意组合,小伙伴可以根据业务情况自行发挥。
6. 参数模式(Headers)
Headers模式不是通过RoutingKey进行匹配投递消息,而是匹配请求头中所带的键值进行消息投递,所以创建队列是需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。
- 全部匹配:x-match=all,表示所有的键值都匹配了才行。
- 部分匹配:x-match=any,表示只要其中有键值对匹配就行。
5.1 Web管理界面进行演示
-
先创建一个Headers模式的交换机;
-
创建两个队列,然后将队列绑定到上一步创建的交换机上,可以指定Headers的参数;
将队列绑定到交换机上,这里还是在交换机详情中进行演示,如下:
这里不使用RoutingKey的方式,而是通过设置参数的形式进行绑定,后续投递消息的时候就匹配参数,如果能匹配上,就将消息投递到对应的队列。
绑定HeaderQ1队列:
同样的方式绑定HeaderQ2队列,只是只添加了一个键值对,order:111,最后HeaderExchange交换机的绑定关系如下:
关系绑定好之后就可以进行测试效果了。
-
向交换机上投递消息,会根据检查Headers参数的条件是否符合,若符合将消息投递到对应的队列中,看效果;
设置两个参数进行发布,如下:
可以看到两个队列都匹配到了,因为order和msg键值对匹配到HeaderQ1,order的键值对匹配到HeaderQ2,如果只设置一个order简直对呢:
此时只有HeaderQ2才能精确匹配,HeaderQ1没有全部匹配,所以对应队列没有收到消息,如下:
由此可见,在界面上没有指定x-match绑定的话,默认是all,就是要全部匹配才投递消息到对应队列。
这里继续新增一个HeaderQ3的队列,创建方式和上面不一样,只是在绑定交换机的时候增加x-match 为 any,如下:
绑定成功之后,现在关系如下,其中order的键值对是在每个绑定中都有,如下:
测试发消息之前,把之前消息都清空了,也就是队列中的消息都是空的,这次我们再指定order为111的参数进行发布消息,看看有哪些队列收到消息呢:
消息发出后,之后HeaderQ2和HeaderQ3收到消息,HeaderQ2只有一个order参数,精确匹配上了,HeaderQ3有多个参数,但设置了x-match为any,所以只要匹配其中一个即可。 HeaderQ1多个参数需要全部匹配才行,所以没有接收到消息:
5.2 代码进行演示
Headers模式是根据参数进行匹配,不是通过RoutingKey,所以只需要在绑定队列时设置好参数,在发送消息的时候也设置好参数,这样就会根据匹配原则去匹配参数,如果匹配上,消息就投递到对应的队列,供消费者进行消费。这里为了演示比较清晰一点,使用一个生产者,三个消费者的形式进行演示。
-
生产者
代码中关键的部分是指定交换机的类型为Headers,然后模拟了两类参数的形式投递消息,方便用于测试参数匹配模式的测试。
-
消费者1,绑定参数为order:111和msg:222
-
消费者2,和消费者1代表基本一样,只是绑定参数不一样;
-
消费者3,代表基本和消费者1一样,只是在参数指定的时候增加了x-match来指定匹配模式,这里指定为any,也就是只要其中有部分匹配上也可以消费到消息。
-
演示效果,启动生产者和消费者:
如上图,消费者3指定了匹配模式为部分匹配,所以可以接收到一个参数的消息,而消费者1需要精确匹配,所以不能接收到。
关于常用的消息模式就聊到这吧,小伙伴们可以根据自己的业务场景进行使用,相关演示代码的地址如下:
码云://gitee.com/CodeZoe/dot-net-core-study-demo/tree/main/RabbitMQDemo
总结
RabbitMQ提供了多种模式应对各种业务,但匹配条件越是模糊或者参数化,那性能相对比较弱。再回顾一些常用的消息队列组件,是不是很多都是默认使用Direct模式,精确匹配的RoutingKey可以针对具体不同业务处理,匹配性能也相对比较高; 当然其他模式小伙伴也可以针对业务情况进行使用。
后续的文章将继续分享RabbitMQ消息确认机制、死信队列、磁盘监控等相关知识点的应用,关注“Code综艺圈”,和我一起学习吧。