6、Sping Boot消息

1.消息概述

  可通過消息服務中間件來提升系統異步通信、擴展解耦能力

  消息服務中兩個重要概念:消息代理(message broker)和目的地(destination)當消息發送者發送消息以後,將由消息代理接管,消息代理保證消息傳遞到指定目的地。

消息隊列主要有兩種形式的目的地:

  隊列(queue):點對點消息通信(point-to-point)

  主題(topic):發佈(publish)/訂閱(subscribe)消息通信

註:通過ActiveMQ的學習即可知道以上的概念

 

  在未引入消息中間件的情況下,響應時間並不能降到最低;在引入消息中間件時,響應時間由150ms降低為55ms;

 

  在秒殺系統中,我們可以引入消息隊列進行流量削峰。如,5件商品,100人搶購,如果搶購完了,則後面搶購的消息全部拒絕。

(1).點對點式(隊列)

  消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取後被移出隊列

  消息只有唯一的發送者和接受者,但並不是說只能有一個接收者

(2).發佈訂閱式(主題)

  發送者(發佈者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那麼就會在消息到達時同時收到消息.類比微信公眾號

(3).JMS(Java Message Service)

  JAVA消息服務(Java Message Service),基於JVM消息代理的規範。ActiveMQ、HornetMQ是JMS實現

(4).AMQP(Advanced Message Queuing Protocol)

  高級消息隊列協議(Advanced Message Queuing Protocol),也是一個消息代理的規範,兼容JMS.RabbitMQ是AMQP的實現;

(5).JMS和AMQP區別

 

(6).Spring支持

  spring-jms提供了對JMS的支持;

  spring-rabbit提供了對AMQP的支持;

  需要ConnectionFactory的實現來連接消息代理;

  提供JmsTemplate、RabbitTemplate來發送消息;

  @JmsListener(JMS)、@RabbitListener(AMQP)註解在方法上監聽消息代理發佈的消息;

  @EnableJms、@EnableRabbit開啟支持;

(7).Spring Boot自動配置

  JmsAutoConfiguration

  RabbitAutoConfiguration

2.RabbitMQ簡介

  RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。

(1).核心概念

  Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。

  Publisher:消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。

  Exchange:交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別.

  Queue:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者連接到這個隊列將其取走。

  Binding:綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。Exchange 和Queue的綁定可以是多對多的關係。

  Connection:網絡連接,比如一個TCP連接。

  Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發佈消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。

  Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

  Virtual Host虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是/。

  Broker:表示消息隊列服務器實體。

 

(2).RabbitMQ運行機制

[1].AMQP(Advanced Message Queuing Protocol)消息路由

  AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。

 

[2].Direct Exchange類型

  Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers;headers 匹配 AMQP 消息的 header 而不是路由鍵, headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:

 

[3].Fanout Exchange類型 

[4].Topic Exchange類型

 

  如,usa.news和usa.weather會匹配到usa.#上;而usa.weather和europe.weather就會匹配到#.weather;類似於模糊匹配

3.RabbitMQ整合

(1).搭建docker

[1].安裝docker

//www.cnblogs.com/HOsystem/p/13789551.html

[2].加速docker

//www.cnblogs.com/HOsystem/p/13789551.html

(2).搭建rabbitmq

  docker上pull rabbitmq

[root@hosystem ~]# docker pull rabbitmq

Using default tag: latest

latest: Pulling from library/rabbitmq

171857c49d0f: Pull complete

419640447d26: Pull complete

61e52f862619: Pull complete

856781f94405: Pull complete

125d5ee3d600: Pull complete

42de77c4d197: Pull complete

4d65f87814dd: Pull complete

f6c0bf06039f: Pull complete

01671add1b7b: Pull complete

088ff84cf8cb: Pull complete

Digest: sha256:3da3bcd2167a1fc9bdbbc40ec0ae2b195df5df05e3c10c64569c969cb3d86435

Status: Downloaded newer image for rabbitmq:latest

docker.io/library/rabbitmq:latest

[root@hosystem ~]# docker images

REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE

redis               latest              62f1d3402b78        4 days ago          104MB

rabbitmq            latest              ea2bf0a30abf        4 weeks ago         156MB

hello-world         latest              bf756fb1ae65        10 months ago       13.3kB

  通過docker啟動rabbitmq

[root@hosystem ~]# docker images

REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE

rabbitmq            latest              ea2bf0a30abf        4 weeks ago         156MB

[root@hosystem ~]# docker run -d -p 5672:5672 -p 15672:15672 –name myrabbitmq ea2bf0a30abf

e687835a6ea784d55717dc402d5d447d62e486e78f6c770ec703dfdec3d64f16

[root@hosystem ~]#

  -d:表示後台啟動

  -p:進行端口映射

  –name:重新名,修改成我們想要的名字 

  訪問rabbitmq,因為我的ip為192.168.188.198所以只要在瀏覽器上輸入192.168.188.198:15672即可;賬號:guest 密碼:guest 

 

註:rabbitMQ啟動後用web訪問顯示服務器拒絕訪問,用以下方法解決

#添加防火牆規則

[root@hosystem ~]# firewall-cmd –permanent –zone=public –add-port=15672/tcp

success

[root@hosystem ~]# firewall-cmd –reload

success

#//blog.csdn.net/tl1242616458/article/details/105586984

[root@hosystem ~]# docker exec -it myrabbitmq /bin/bash

[root@e687835a6ea7:/]# rabbitmq-plugins enable rabbitmq_management

(3).rabbitmq web操作

 

[1].添加exchange

  按照上圖依次添加,exchange.direct、exchange.fanout、exchange.topic這三個exchange.效果如下

 

[2].添加queues

 

[3].綁定關係

  點擊需要的exchange,進去後在bingdings里填寫與之綁定的queues。

①.direct bindings

 

②.fanout bindings

 

③.topic bindings

 

[4].發送消息

①exchange.direct

 

②exchange.fanout

 

③.exchange.topic

 

  我們發送key為hello.news的消息,因為我們topic有#.news,所以只要有#.news都可以接收

 

[5].獲取消息

①.hos queues

 

②.hosystem.news queues

 

 

(4).IDEA整合RabbitMQ

[1].創建工程

①.引入rabbit

<dependency>

   <groupId>org.springframework.amqp</groupId>

   <artifactId>spring-rabbit-test</artifactId>

   <scope>test</scope>

</dependency>

②.application.yml

#rabbitmq配置信息

spring.rabbitmq.host=192.168.188.198

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

#spring.rabbitmq.port=

#spring.rabbitmq.virtual-host=

③.測試RabbitMQ

  AmqpAdmin:管理組件

  RabbitTemplate:消息發送處理組件

[2].查看RabbitAutoConfiguration

 

[3].查看CachingConnectionFactory

 

[4].查看RabbitProperties

  RabbitProperties是封裝RabbitMQ相關配置的類

 

[5].查看RabbitTemplate

  RabbitTemplate是用於RabbitMQ發送和接收消息

 

[6].查看AmqpAdmin

  AmqpAdmin是RabbitMQ系統管理功能組件

 

[7].application.properties

  配置rabbitmq參數

#rabbitmq配置信息

spring.rabbitmq.host=192.168.188.198

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

#spring.rabbitmq.port=

#spring.rabbitmq.virtual-host=

[8].發送消息

  發送消息到rabbitmq,默認使用java-serialized序列化

@SpringBootTest

class Springboot02AmqpApplicationTests {

 

@Autowired

RabbitTemplate rabbitTemplate;

 

/**

* 單播(點對點)

*/

@Test

void contextLoads() {

//message需要自己定義;定義消息體內容和消息頭(org.springframework.amqp.core.Message())

// rabbitTemplate.send(exchange,routekey,message);

 

//object默認當成消息體,只需要傳入要發送的對象,自動序列化發送給rabbitmq

// rabbitTemplate.convertAndSend(exchange,routekey,message);

Map<String,Object> map = new HashMap<>();

map.put(“msg”,”test1″);

map.put(“data”, Arrays.asList(“helloworld”,123,true));

//對象被默認序列化(java-serialized-object)後發送

// rabbitTemplate.convertAndSend(“exchange.direct”,”hos.news”,map);

 

}

 

[9].接收消息

/**

 *  接收rabbitmsq消息

 *  將數據轉為json發送出去(private MessageConverter messageConverter = new SimpleMessageConverter();)

 */

@Test

public void receive(){

    Object o = rabbitTemplate.receiveAndConvert(“hos.news”);

    System.out.println(o.getClass());

    System.out.println(o);

    Object o1 = rabbitTemplate.receiveAndConvert(“hos.news”);

    System.out.println(o1.getClass());

    System.out.println(o1);

 

}

 

[10].數據轉json

  將數據轉為json發送出去MessageConverter messageConverter = new SimpleMessageConverter();(org.springframework.amqp.support.converter.MessageConverter)

①.MyAMQPConfig.java

package com.hosystem.amqp.config;

 

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

 

@Configuration

public class MyAMQPConfig {

 

    @Bean

    public MessageConverter messageConverter(){

        return new Jackson2JsonMessageConverter();

    }

}

②.Book.java

package com.hosystem.amqp.bean;

 

public class Book {

 

    private String bookName;

    private String author;

 

    public Book() {

    }

 

    public Book(String bookName, String author) {

 

        this.bookName = bookName;

        this.author = author;

    }

 

    public String getBookName() {

        return bookName;

    }

 

    public String getAuthor() {

        return author;

    }

 

    public void setBookName(String bookName) {

        this.bookName = bookName;

    }

 

    public void setAuthor(String author) {

        this.author = author;

    }

}

③.自定義對象

  使用自定義對象發送給rabbitmq

@SpringBootTest

class Springboot02AmqpApplicationTests {

 

@Autowired

RabbitTemplate rabbitTemplate;

 

/**

* 單播(點對點)

*/

@Test

void contextLoads() {

//message需要自己定義;定義消息體內容和消息頭(org.springframework.amqp.core.Message())

// rabbitTemplate.send(exchange,routekey,message);

 

//object默認當成消息體,只需要傳入要發送的對象,自動序列化發送給rabbitmq

// rabbitTemplate.convertAndSend(exchange,routekey,message);

Map<String,Object> map = new HashMap<>();

map.put(“msg”,”test1″);

map.put(“data”, Arrays.asList(“helloworld”,123,true));

//發送自定義對象

        rabbitTemplate.convertAndSend(“exchange.direct”,”hos.news”,new Book(“Linux”,”linux”));

 

}

}

④.接收json數據

@SpringBootTest

class Springboot02AmqpApplicationTests {

 

   @Autowired

   RabbitTemplate rabbitTemplate;

 

 

    /**

     *  接收rabbitmsq消息

     *  將數據轉為json發送出去(private MessageConverter messageConverter = new SimpleMessageConverter();)

     */

    @Test

    public void receive(){

        Object o = rabbitTemplate.receiveAndConvert(“hos.news”);

        System.out.println(o.getClass());

        System.out.println(o);

    }

 

[11].廣播發送消息

package com.hosystem.amqp;

 

@SpringBootTest

class Springboot02AmqpApplicationTests {

 

   @Autowired

   RabbitTemplate rabbitTemplate;

 

    /**

     *  廣播

     */

    @Test

    public void sendMsg(){

        rabbitTemplate.convertAndSend(“exchange.fanout”,“”,new Book(“python書籍,“python作者));

    }

}

[12].@EnableRabbit + @RabbitListener

  @EnableRabbit + @RabbitListener 監聽消息隊列內容

①.BookService.java

package com.hosystem.amqp.service;

 

import com.hosystem.amqp.bean.Book;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Service;

 

@Service

public class BookService {

 

    @RabbitListener(queues = “hos.news”)

    public void receive(Book book){

        System.out.println(收到消息:”+book);

    }

 

    @RabbitListener(queues = “hos”)

    public void receive02(Message message){

        System.out.println(message.getBody());

        System.out.println(message.getMessageProperties());

    }

}

②.Springboot02AmqpApplication.java

@EnableRabbit //開啟基於註解的rabbitmq模式

@SpringBootApplication

public class Springboot02AmqpApplication {

 

   public static void main(String[] args) {

      SpringApplication.run(Springboot02AmqpApplication.class, args);

   }

 

}

[13].AmqpAdmin

  AmqpAdmin(org.springframework.amqp.core.AmqpAdmin):RabbitMQ系統管理功能組件。

  AmqpAdmin:創建和刪除Queue、exchange、binding

@SpringBootTest

class Springboot02AmqpApplicationTests {

 

   @Autowired

   RabbitTemplate rabbitTemplate;

 

   //        @Bean

    //        @ConditionalOnSingleCandidate(ConnectionFactory.class)

    //        @ConditionalOnProperty(

    //            prefix = “spring.rabbitmq”,

    //            name = {“dynamic”},

    //            matchIfMissing = true

    //        )

    //        @ConditionalOnMissingBean

    //        public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {

    //            return new RabbitAdmin(connectionFactory);

    //        }

   @Autowired

   AmqpAdmin amqpAdmin;

 

   //創建exchange queues binding

   @Test

   public void createExchange(){

       //org.springframework.amqp.core.Exchange

        //org.springframework.amqp.core.DirectExchange

        //創建exchange

//        amqpAdmin.declareExchange(new DirectExchange(“amqpadmin.exchange”));

//        System.out.println(“創建成功“);

 

        //創建queues

        //org.springframework.amqp.core.AmqpAdmin

//        amqpAdmin.declareQueue(new Queue(“amqpadmin.queue”,true));

 

        //org.springframework.amqp.core.Binding

        //創建binding

        amqpAdmin.declareBinding(new Binding(“amqpadmin.queue”,Binding.DestinationType.QUEUE,“amqpadmin.exchange”,“amqp.haha”,null));

    }

}