springboot集成redis實現消息發布訂閱模式-跨多伺服器

  • 2019 年 10 月 3 日
  • 筆記

 

環境:SpringBoot + jdk1.8 

 

基礎配置參考
https://blog.csdn.net/llll234/article/details/80966952

 

查看了基礎配置那麼會遇到一下幾個問題:

1.實際應用中可能會訂閱多個通道,而一下這種寫法不太通用
container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic(“pmp”));

2.使用過程中使用new RedisPmpSub()配置消息接收對象會有問題。
如果RedisPmpSub既是消息接收類,也是消息處理類。那麼如果此時需要注入Bean,會成功嗎?

3.考慮後期的擴展性是否能盡量不改變原有程式碼的基礎上,進行擴展

 

額外的配置文件

<dependency>      <groupId>org.projectlombok</groupId>      <artifactId>lombok</artifactId>      <optional>true</optional>  </dependency>    <dependency>      <groupId>commons-lang</groupId>      <artifactId>commons-lang</artifactId>      <version>RELEASE</version>  </dependency>    <dependency>      <groupId>com.google.code.gson</groupId>      <artifactId>gson</artifactId>  </dependency>

由於GsonUtil依賴的是某個SDK,GsonUtil.toJson(this, BasePubMessage.class)可替換為
new Gson().toJson(this, BasePubMessage.class);
lombok需要下載插件

 

發布者

 

枚舉定義

考慮到可維護性,採用枚舉的方式定義管道RedisChannelEnums

 1 public enum RedisChannelEnums {   2   3     /**redis頻道code定義 需要與發布者一致*/   4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","直播資訊改變"),   5   6     ;   7     /** 枚舉定義+描述 */   8     private String code;   9     private String description;  10  11     RedisChannelEnums(String code, String description) {  12         this.code = code;  13         this.description = description;  14     }  15  16  17     /** 根據code獲取對應的枚舉對象 */  18     public static RedisChannelEnums getEnum(String code) {  19         RedisChannelEnums[] values = RedisChannelEnums.values();  20         if (null != code && values.length > 0) {  21             for (RedisChannelEnums value : values) {  22                 if (value.code == code) {  23                     return value;  24                 }  25             }  26         }  27         return null;  28     }  29  30     /** 該code在枚舉列表code屬性是否存在 */  31     public static boolean containsCode(String code) {  32         RedisChannelEnums anEnum = getEnum(code);  33         return anEnum != null;  34     }  35  36     /** 判斷code與枚舉中的code是否相同 */  37     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {  38         return calendarSourceEnum.code == code;  39     }  40  41  42     public String getCode() {  43         return code;  44     }  45  46     public String getDescription() {  47         return description;  48     }  49  50  51 }

 

消息模板

為了兼容不同的業務場景,需要定義消息模板對象BasePubMessage
其中ToString方法的作用是將對象轉成Json字元

 1 @Data   2 public abstract class BasePubMessage {   3   4     /**發布訂閱頻道名稱*/   5     protected String channel;   6   7     protected String extra;   8   9     @Override  10     public String toString() {  11         return GsonUtil.toJson(this, BasePubMessage.class);  12     }  13  14 }

 

消息對象LiveChangeMessage
其中ToString方法的作用是將對象轉成Json字元

 1 @Data   2 public class LiveChangeMessage extends BasePubMessage {   3   4   5     /**直播Ids*/   6     private String liveIds;   7   8     @Override   9     public String toString() {  10         return GsonUtil.toJson(this, LiveChangeMessage.class);  11     }  12  13 }

 

發布者服務

public interface RedisPub {          /**       * 集成redis實現消息發布訂閱模式-雙通道       * @param redisChannelEnums 枚舉定義       * @param basePubMessage 消息       */      void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage);    }

 

 1 @Service   2 public class RedisPubImpl implements RedisPub {   3   4     @Resource   5     private StringRedisTemplate stringRedisTemplate;   6   7     @Override   8     public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) {   9  10         if(redisChannelEnums ==null || basePubMessage ==null){  11             return;  12         }  13  14         basePubMessage.setChannel(redisChannelEnums.getCode());  15         stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString());  16         System.out.println("發布成功!");  17     }  18 }

 

訂閱者

 

註解配置

RedisConfig作為訂閱者的配置類,主要作用是:Redis消息監聽器容器、配置消息接收處理類
同時新加入的功能解決了我們上面提出的幾個問題

 1 @Service   2 @Configuration   3 @EnableCaching   4 public class RedisConfig {   5   6   7     /**   8      * 存放策略實例   9      * classInstanceMap : key-beanName value-對應的策略實現  10      */  11     private ConcurrentHashMap<String, BaseSub> classInstanceMap = new ConcurrentHashMap<>(20);  12  13     /**  14      * 注入所有實現了Strategy介面的Bean  15      *  16      * @param strategyMap  17      *         策略集合  18      */  19     @Autowired  20     public RedisConfig(Map<String, BaseSub> strategyMap) {  21         this.classInstanceMap.clear();  22         strategyMap.forEach((k, v) ->  23                 this.classInstanceMap.put(k.toLowerCase(), v)  24         );  25     }  26  27  28     /**  29      * Redis消息監聽器容器  30      *  31      * @param connectionFactory  32      *  33      * @return  34      */  35     @Bean  36     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {  37  38         RedisMessageListenerContainer container = new RedisMessageListenerContainer();  39         container.setConnectionFactory(connectionFactory);  40  41         RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values();  42         if (redisChannelEnums.length > 0) {  43             for (RedisChannelEnums redisChannelEnum : redisChannelEnums) {  44                 if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) {  45                     continue;  46                 }  47                 //訂閱了一個叫pmp和channel 的通道,多通道  48                 //一個訂閱者接收一個頻道資訊,新增訂閱者需要新增RedisChannelEnums定義+BaseSub的子類  49  50                 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();  51                 BaseSub baseSub = classInstanceMap.get(toLowerCase);  52                 container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));  53             }  54         }  55         return container;  56     }  57  58     /**  59      * 配置消息接收處理類  60      *  61      * @param baseSub  62      *         自定義消息接收類  63      *  64      * @return MessageListenerAdapter  65      */  66     @Bean()  67     @Scope("prototype")  68     MessageListenerAdapter listenerAdapter(BaseSub baseSub) {  69         //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“receiveMessage”  70         //也有好幾個重載方法,這邊默認調用處理器的方法 叫handleMessage 可以自己到源碼裡面看  71         //注意2個通道調用的方法都要為receiveMessage  72         return new MessageListenerAdapter(baseSub, "receiveMessage");  73     }  74  75 }

  

@Autowired  public RedisConfig(Map<String, BaseSub> strategyMap) 方法的作用是將所有的配置消息接收處理類注入進來,那麼消息接收處理類裡面的註解對象也會注入進來。  解決了我們提出的第二個問題    而String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();  BaseSub baseSub = classInstanceMap.get(toLowerCase);  container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));  是根據不同的管道對應不同的訂閱者,也就是一個訂閱者對應一個管道。方便根據不同的業務場景進行處理。  使用這種方式主需要配置redisChannelEnum枚舉即可,解決了我們提出的第一個問題。  這樣一來,訂閱者就變得比較通用了

  

枚舉

RedisChannelEnums作用:定義不同管道對應的訂閱者,後期增加一個管道類型只需要增加一個枚舉即可

 1 public enum RedisChannelEnums {   2   3     /**redis頻道名稱定義 需要與發布者一致*/   4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "直播資訊改變"),   5   6     ;   7     /** 枚舉定義+描述 */   8     private String code;   9     private Class<? extends BaseSub> className;  10     private String description;  11  12     RedisChannelEnums(String code, Class<? extends BaseSub> className, String description) {  13         this.code = code;  14         this.className=className;  15         this.description = description;  16     }  17  18  19     /** 根據code獲取對應的枚舉對象 */  20     public static RedisChannelEnums getEnum(String code) {  21         RedisChannelEnums[] values = RedisChannelEnums.values();  22         if (null != code && values.length > 0) {  23             for (RedisChannelEnums value : values) {  24                 if (value.code == code) {  25                     return value;  26                 }  27             }  28         }  29         return null;  30     }  31  32     /** 該code在枚舉列表code屬性是否存在 */  33     public static boolean containsCode(String code) {  34         RedisChannelEnums anEnum = getEnum(code);  35         return anEnum != null;  36     }  37  38     /** 判斷code與枚舉中的code是否相同 */  39     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {  40         return calendarSourceEnum.code == code;  41     }  42  43  44     public String getCode() {  45         return code;  46     }  47  48     public String getDescription() {  49         return description;  50     }  51  52     public Class<? extends BaseSub> getClassName() {  53         return className;  54     }  55 }

 

消息模板

BaseSubMessage定義通用的欄位,與json字元的通用轉換

 1 @Data   2 abstract class BaseSubMessage {   3   4     /** 發布訂閱頻道名稱 */   5     private String channel;   6   7     private String extra;   8   9     private String json;  10  11     BaseSubMessage(String json) {  12         if(StringUtils.isEmpty(json)){  13             return;  14         }  15  16         this.json = json;  17         Map map = new Gson().fromJson(this.json, Map.class);  18         BeanHelper.populate(this, map);  19     }  20  21 }

 

LiveChangeMessage定義當前業務場景的欄位

 1 @Data   2 @ToString(callSuper = true)   3 public class LiveChangeMessage extends BaseSubMessage {   4   5     /** 直播Ids */   6     private String liveIds;   7   8     public LiveChangeMessage(String json) {   9         super(json);  10     }  11  12 }

 

 

訂閱者服務

BaseSub定義接收消息的通用方法

1 public interface BaseSub {  2  3     /**  4      * 接收消息  5      * @param jsonMessage  json字元  6      */  7     void receiveMessage(String jsonMessage);  8 }

 

LiveChangeSub具體消息接收對象

 1 @Component   2 public class LiveChangeSub implements BaseSub {   3   4     /**只是定義的註解測試,可以換成自己的*/   5     @Autowired   6     private CategoryMapper categoryMapper;   7   8     @Override   9     public void receiveMessage(String jsonMessage) {  10  11         System.out.println("項目aries-server.....................");  12         //注意通道調用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter參數2相同  13         System.out.println("這是 LiveChangeSub" + "-----" + jsonMessage);  14  15         LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage);  16         System.out.println(liveChangeMessage);  17  18         Category category = categoryMapper.get(1L);  19         System.out.println("category:" + category);  20  21  22     }  23 }

 

 

總結

發布者配置場景:獨立的伺服器,獨立的項目,A redis快取伺服器  訂閱者配置場景:不同於發布者的獨立的伺服器,獨立的項目,A redis快取伺服器  使用場景:一個發布者、一個或者多個訂閱者。發布者負責發布消息,訂閱者負責接收消息。一旦發布者消息發布出來,那麼  訂閱者可以通過管道進行監聽。同時可以根據不同的管道設置不同的消息接收者或者叫消息處理者。    優點:容易配置,好管理  缺點:由於基於redis去做,不同的redis服務就不適用了。需要考慮消息丟失,持久化的問題。

 

Exit mobile version