Redis 發佈訂閱,小功能大用處,真沒那麼廢材!

jae-park-7GX5aICb5i4-unsplash

今天小黑哥來跟大家介紹一下 Redis 發佈/訂閱功能。

也許有的小夥伴對這個功能比較陌生,不太清楚這個功能是幹什麼的,沒關係小黑哥先來舉個例子。

假設我們有這麼一個業務場景,在網站下單支付以後,需要通知庫存服務進行發貨處理。

上面業務實現不難,我們只要讓庫存服務提供給相關的給口,下單支付之後只要調用庫存服務即可。

後面如果又有新的業務,比如說積分服務,他需要獲取下單支付的結果,然後增加用戶的積分。

這個實現也不難,讓積分服務同樣提供一個接口,下單支付之後只要調用庫存服務即可。

如果就兩個業務需要獲取下單支付的結果,那也還好,程序改造也快。可是隨着業務不斷的發展,越來越多的新業務說是要下單支付的結果。

這時我們會發現上面這樣的系統架構存在很多問題:

第一,下單支付業務與其他業務重度耦合,每當有個新業務需要支付結果,就需要改動下單支付的業務。

第二,如果調用業務過多,會導致下單支付接口響應時間變長。另外,如果有任一下游接口響應變慢,就會同步導致下單支付接口響應也變長。

第三,如果任一下游接口失敗,可能導致數據不一致的情況。比如說下圖,先調用 A,成功之後再調用 B,最後再調用 C。

如果在調用 B 接口的發生異常,此時可能就導致下單支付接口返回失敗,但是此時 A 接口其實已經調用成功,這就代表它內部已經處理下單支付成功的結果。

這樣就會導致 A,B,C 三個下游接口,A 獲取成功獲取支付結果,但是 B,C 沒有拿到,導致三者系統數據不一致的情況。

其實我們仔細想一下,對於下單支付業務來講,它其實不需要關心下游調用結果,只要有某種機制通知能通知到他們就可以了。

講到這裡,這就需要引入今天需要介紹發佈訂閱機制。

Redis 發佈與訂閱

Redis 提供了基於「發佈/訂閱」模式的消息機制,在這種模式下,消息發佈者與訂閱者不需要進行直接通信。

如上圖所示,消息發佈者只需要想指定的頻道發佈消息,訂閱該頻道的每個客戶端都可以接受到到這個消息。

使用 Redis 發佈訂閱這種機制,對於上面業務,下單支付業務只需要向支付結果這個頻道發送消息,其他下游業務訂閱支付結果這個頻道,就能收相應消息,然後做出業務處理即可。

這樣就可以解耦系統上下游之間調用關係。

接下來我們來看下,我們來看下如何使用 Redis 發佈訂閱功能。

Redis 中提供了一組命令,可以用於發佈消息,訂閱頻道,取消訂閱以及按照模式訂閱。

首先我們來看下如何發佈一條消息,其實很簡單只要使用 publish 指令:

publish channel message

上圖中,我們使用 publish 指令向 pay_result 這個頻道發送了一條消息。我們可以看到 redis 向我們返回 0 ,這其實代表當前訂閱者個數,由於此時沒有訂閱,所以返回結果為 0 。

接下來我們使用 subscribe 訂閱一個或多個頻道

subscribe channel [channel ...]

如上圖所示,我們訂閱 pay_result 這個頻道,當有其他客戶端往這個頻道發送消息,

當前訂閱者就會收到消息。

我們子在使用訂閱命令,需要主要幾點:

第一,客戶端執行訂閱指令之後,就會進入訂閱狀態,之後就只能接收 subscribepsubscribeunsubscribepunsubscribe 這四個命令。

第二,新訂閱的客戶端,是無法收到這個頻道之前的消息,這是因為 Redis 並不會對發佈的消息持久化的。

相比於很多專業 MQ,比如 kafka、rocketmq 來說, redis 發佈訂閱功能就顯得有點簡陋了。不過 redis 發佈訂閱功能勝在簡單,如果當前場景可以容忍這些缺點,還是可以選擇使用的。

除了上面的功能以外的,Redis 還支持模式匹配的訂閱方式。簡單來說,客戶端可以訂閱一個帶 * 號的模式,如果某些頻道的名字與這個模式匹配,那麼當其他客戶端發送給消息給這些頻道時,訂閱這個模式的客戶端也將會到收到消息。

使用 Redis 訂閱模式,我們需要使用一個新的指令 psubscribe

我們執行下面這個指令:

psubscribe pay.*

那麼一旦有其他客戶端往 pay 開頭的頻道,比如 pay_resultpay_xxx,我們都可以收到消息。

如果需要取消訂閱模式,我們需要使用相應punsubscribe 指令,比如取消上面訂閱的模式:

punsubscribe pay.*

Redis 客戶端發佈訂閱使用方式

基於 Jedis 開發發佈/訂閱

聊完 Redis 發佈訂閱指令,我們來看下 Java Redis 客戶端如何使用發佈訂閱。

下面的例子主要基於 Jedis,maven 版本為:

<dependency>
 <groupId>redis.clients</groupId>
 <artifactId>jedis</artifactId>
 <version>3.1.0</version>
</dependency>

其他 Redis 客戶端大同小異。

jedis 發佈代碼比較簡單,只需要調用 Jedis 類的 publish 方法。

// 生產環境千萬不要這麼使用哦,推薦使用 JedisPool 線程池的方式 
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxxxx");
jedis.publish("pay_result", "hello world");

訂閱的代碼就相對複雜了,我們需要繼承 JedisPubSub 實現裏面的相關方法,一旦有其他客戶端往訂閱的頻道上發送消息,將會調用 JedisPubSub 相應的方法。

private static class MyListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("收到訂閱頻道:" + channel + " 消息:" + message);

    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("收到具體訂閱頻道:" + channel + "訂閱模式:" + pattern + " 消息:" + message);
    }

}

其次我們需要調用 Jedis 類的 subscribe 方法:

Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxx");
jedis.subscribe(new MyListener(), "pay_result");

當有其他客戶端往 pay_result頻道發送消息時,訂閱將會收到消息。

不過需要注意的是,jedis#subscribe 是一個阻塞方法,調用之後將會阻塞主線程的,所以如果需要在正式項目使用需要使用異步線程運行,這裡就不演示具體的代碼了。

基於 Spring-Data-Redis 開發發佈訂閱

原生 jedis 發佈訂閱操作,相對來說還是有點複雜。現在我們很多應用已經基於 SpringBoot 開發,使用 spring-boot-starter-data-redis ,可以簡化發佈訂閱開發。

首先我們需要引入相應的 startter 依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>lettuce-core</artifactId>
            <groupId>io.lettuce</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

這裡我們使用 Jedis 當做底層連接客戶端,所以需要排除 lettuce,然後引入 Jedis 依賴。

然後我們需要創建一個消息接收類,裏面需要有方法消費消息:

@Slf4j
public class Receiver {
    private AtomicInteger counter = new AtomicInteger();

    public void receiveMessage(String message) {
        log.info("Received <" + message + ">");
        counter.incrementAndGet();
    }

    public int getCount() {
        return counter.get();
    }
}

接着我們只需要注入 Spring- Redis 相關 Bean,比如:

  • StringRedisTemplate,用來操作 Redis 命令
  • MessageListenerAdapter ,消息監聽器,可以在這個類注入我們上面創建消息接受類 Receiver
  • RedisConnectionFactory, 創建 Redis 底層連接
@Configuration
public class MessageConfiguration {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 訂閱指定頻道使用 ChannelTopic
        // 訂閱模式使用 PatternTopic
        container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        // 注入 Receiver,指定類中的接受方法
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

}

最後我們使用 StringRedisTemplate#convertAndSend 發送消息,同時 Receiver 將會收到一條消息。

@SpringBootApplication
public class MessagingRedisApplication {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);

        StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
        Receiver receiver = ctx.getBean(Receiver.class);

        while (receiver.getCount() == 0) {
            template.convertAndSend("pay_result", "Hello from Redis!");
            Thread.sleep(500L);
        }

        System.exit(0);
    }
}

Redis 發佈訂閱實際應用

Redis Sentinel 節點發現

Redis Sentinel 是 Redis 一套高可用方案,可以在主節點故障的時候,自動將從節點提升為主節點,從而轉移故障。

今天這裡我們不詳細解釋 Redis Sentinel 詳細原理,主要來看下 Redis Sentinel 如何使用發佈訂閱機制。

Redis Sentinel 節點主要使用發佈訂閱機制,實現新節點的發現,以及交換主節點的之間的狀態。

如下所示,每一個 Sentinel 節點將會定時向 _sentinel_:hello 頻道發送消息,並且每個 Sentinel 都會訂閱這個節點。

這樣一旦有節點往這個頻道發送消息,其他節點就可以立刻收到消息。

這樣一旦有的新節點加入,它往這個頻道發送消息,其他節點收到之後,判斷本地列表並沒有這個節點,於是就可以當做新的節點加入本地節點列表。

除此之外,每次往這個頻道發送消息內容可以包含節點的狀態信息,這樣可以作為後面 Sentinel 領導者選舉的依據。

以上都是對於 Redis 服務端來講,對於客戶端來講,我們也可以用到發佈訂閱機制。

Redis Sentinel 進行主節點故障轉移,這個過程各個階段會通過發佈訂閱對外提供。

對於我們客戶端來講,比較關心切換之後的主節點,這樣我們及時切換主節點的連接(舊節點此時已故障,不能再接受操作指令),

客戶端可以訂閱 +switch-master頻道,一旦 Redis Sentinel 結束了對主節點的故障轉移就會發佈主節點的的消息。

redission 分佈式鎖

redission 開源框架提供一些便捷操作 Redis 的方法,其中比較出名的 redission 基於 Redis 的實現分佈式鎖。

今天我們來看下 Redis 的實現分佈式鎖中如何使用 Redis 發佈訂閱機制,提高加鎖的性能。

PS:redission 分佈式鎖實現原理,可以參考之前寫過的文章:

  1. 可重入分佈式鎖的實現方式
  2. Redis 分佈式鎖,看似簡單,其實真不簡單

首先我們來看下 redission 加鎖的方法:

Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();

RLock 繼承自 Java 標準的 Lock 接口,調用 lock 方法,如果當前鎖已被其他客戶端獲取,那麼當前加鎖的線程將會被阻塞,直到其他客戶端釋放這把鎖。

這裡其實有個問題,當前阻塞的線程如何感知分佈式鎖已被釋放呢?

這裡其實有兩種實現方法:

第一鍾,定時查詢分佈時鎖的狀態,一旦查到鎖已被釋放(Redis 中不存在這個鍵值),那麼就去加鎖。

實現偽碼如下:

while (true) {
  boolean result=lock();
  if (!result) {
    Thread.sleep(N);
  }
}

這種方式實現起來起來簡單,不過缺點也比較多。

如果定時任務時間過短,將會導致查詢次數過多,其實這些都是無效查詢。

如果定時任務休眠時間過長,那又會導致加鎖時間過長,導致加鎖性能不好。

那麼第二種實現方案,就是採用服務通知的機制,當分佈式鎖被釋放之後,客戶端可以收到鎖釋放的消息,然後第一時間再去加鎖。

這個服務通知的機制我們可以使用 Redis 發佈訂閱模式。

當線程加鎖失敗之後,線程將會訂閱 redisson_lock__channel_xxx(xx 代表鎖的名稱) 頻道,使用異步線程監聽消息,然後利用 Java 中 Semaphore 使當前線程進入阻塞。

一旦其他客戶端進行解鎖,redission 就會往這個redisson_lock__channel_xxx 發送解鎖消息。

等異步線程收到消息,將會調用 Semaphore 釋放信號量,從而讓當前被阻塞的線程喚醒去加鎖。

ps:這裡只是簡單描述了 redission 加鎖部分原理,出於篇幅,這裡就不再消息解析源碼。

感興趣的小夥伴可以自己看下 redission 加鎖的源碼。

通過發佈訂閱機制,被阻塞的線程可以及時被喚醒,減少無效的空轉的查詢,有效的提高的加鎖的效率。

ps: 這種方式,性能確實提高,但是實現起來的複雜度也很高,這部分源碼有點東西,快看暈了。

總結

今天我們主要介紹 Redis 發佈訂閱功能,主要對應的 Redis 命令為:

  • subscribe channel [channel …] 訂閱一個或多個頻道
  • unsubscribe channel 退訂指定頻道
  • publish channel message 發送消息
  • psubscribe pattern 訂閱指定模式
  • punsubscribe pattern 退訂指定模式

我們可以利用 Redis 發佈訂閱功能,實現的簡單 MQ 功能,實現上下游的解耦。

不過需要注意了,由於 Redis 發佈的消息不會被持久化,這就會導致新訂閱的客戶端將不會收到歷史消息。

所以,如果當前的業務場景不能容忍這些缺點,那還是用專業 MQ 吧。

最後介紹了兩個使用 Redis 發佈訂閱功能使用場景供大家參考。

歡迎關注我的公眾號:程序通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn

Tags: