芋道 Spring Boot Redis 入門(下)

  • 2019 年 11 月 20 日
  • 筆記

摘要: 原創出處 http://www.iocoder.cn/Spring-Boot/Redis/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!

  • 4. 項目實踐
    • 4.1 Cache Object
    • 4.2 數據訪問層
    • 4.3 序列化
  • 5. 示例補充
    • 5.1 Pipeline
    • 5.2 Transaction
    • 5.3 Session
    • 5.4 Pub/Sub
    • 5.5 Script
  • 6. 嘗試 Redisson
    • 6.1 快速入門
    • 6.2 Redis 分散式鎖
    • 6.3 Redis 限流器
  • 666. 彩蛋

4. 項目實踐

本小節,我們來分享我們在生產中的一些實踐。關於這塊,希望大家可以一起討論,能夠讓我們的程式碼更加優雅乾淨。

4.1 Cache Object

在我們使用資料庫時,我們會創建 dataobject 包,存放 DO(Data Object)資料庫實體對象。

那麼同理,我們快取對象,怎麼進行對應呢?對於複雜的快取對象,我們創建了 cacheobject 包,和 dataobject 包同層。如:

service # 業務邏輯層  dao # 資料庫訪問層  dataobject # DO  cacheobject # 快取對象  

並且所有的 Cache Object 對象使用 CacheObject 結尾,例如說 UserCacheObject、ProductCacheObject 。

4.2 數據訪問層

在我們訪問資料庫時,我們會創建 dao 包,存放每個 DO 對應的 Dao 對應。那麼對於每一個 CacheObject 類,我們也會創建一個其對應的 Dao 類。例如說,UserCacheObject 對應 UserCacheObjectDao 類。示例程式碼如下:

@Repository  public class UserCacheDao {        private static final String KEY_PATTERN = "user:%d"; // user:用戶編號 <1>        @Resource(name = "redisTemplate")      @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")      private ValueOperations<String, String> operations; // <2>        private static String buildKey(Integer id) { // <3>          return String.format(KEY_PATTERN, id);      }        public UserCacheObject get(Integer id) {          String key = buildKey(id);          String value = operations.get(key);          return JSONUtil.parseObject(value, UserCacheObject.class);      }        public void set(Integer id, UserCacheObject object) {          String key = buildKey(id);          String value = JSONUtil.toJSONString(object);          operations.set(key, value);      }    }  
  • <1> 處,通過靜態變數,聲明 KEY 的前綴,並且使用冒號作為間隔
  • <3> 處,聲明 KEY_PATTERN 對應的 KEY 拼接方法,避免散落在每個方法中。
  • <2> 處,通過 @Resource 注入指定名字的 RedisTemplate 對應的 Operations 對象,這樣明確每個 KEY 的類型。
  • 剩餘的,就是每個方法封裝對應的操作。

可能會有胖友問,為什麼不支援將 RedisTemplate 直接 Service 業務層調用呢?如果這樣,我們業務程式碼里,就容易混雜著很多 Redis 訪問程式碼的細節,導致很髒亂。我們試著把 RedisTemplate 想像成 Spring JDBCTemplate ,我們一定會聲明對應的 Dao 類,訪問資料庫。所以,同理落。

那麼還有一個問題,UserCacheDao 放在哪個包下?目前的想法是,將 dao 包下拆成 mysqlredis 包。這樣,MySQL 相關的 Dao 放在 mysql 包下,Redis 相關的 Dao 放在 redis

4.3 序列化

在 「3. 序列化」 小節中,我們仔細翻看了每個序列化方式,暫時沒有一個能夠完美的契合我們的需求,所以我們直接使用最簡單的 StringRedisSerializer 作為序列化實現類。而真正的序列化,我們在各個 Dao 類里,自己手動來調用。

例如說,在 UserCacheDao 示例中,已經看到了這麼做了。這裡還有一個細化點,雖然我們是自己手動序列化,可以自己簡單封裝一個 JSONUtil 類,未來如果我們想換 JSON 庫,就比較方便了。其實,這個和 Spring Data Redis 所做的封裝是一個思路。

5. 示例補充

像 String、List、Set、ZSet、Geo、HyperLogLog 等等數據結構的操作,胖友自己去用用對應的 Operations 操作類的 API 方法,就非常容易懂了,我們更多的,補充 Pipeline、Transaction、Pub/Sub、Script 等等功能的示例。

5.1 Pipeline

如果胖友沒有了解過 Redis 的 Pipeline 機制,可以看看 《Redis 文檔 —— Pipeline》 文章,批量操作,提升性能必備神器。

在 RedisTemplate 類中,提供了 2 組四個方法,用於執行 Redis Pipeline 操作。程式碼如下:

// <1> 基於 Session 執行 Pipeline  @Override  public List<Object> executePipelined(SessionCallback<?> session) {      return executePipelined(session, valueSerializer);  }  @Override  public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {      // ... 省略程式碼  }    // <2> 直接執行 Pipeline  @Override  public List<Object> executePipelined(RedisCallback<?> action) {      return executePipelined(action, valueSerializer);  }  @Override  public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {      // ... 省略程式碼  }  
  • 兩組方法的差異,在於是否是 Session 中執行。那麼 Session 是什麼呢?賣個關子,在 「5.3 Session」 中來詳細解析。本小節,我們只講 Pipeline + RedisCallback 的組合的方法。
  • 每組方法里,差別在於是否傳入 RedisSerializer 參數。如果不傳,則使用 RedisTemplate 自己的序列化相關的屬性。

5.1.1 源碼解讀

在看具體的 #executePipelined(RedisCallback<?> action, ...) 方法的示例之前,我們先來看一波源碼,這樣我們才能更好的理解具體的使用方法。程式碼如下:

// RedisTemplate.java  @Override  public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {      // <1> 執行 Redis 方法      return execute((RedisCallback<List<Object>>) connection -> {          // <2> 打開 pipeline          connection.openPipeline();          boolean pipelinedClosed = false; // 標記 pipeline 是否關閉          try {              // <3> 執行              Object result = action.doInRedis(connection);              // <4> 不要返回結果              if (result != null) {                  throw new InvalidDataAccessApiUsageException(                          "Callback cannot return a non-null value as it gets overwritten by the pipeline");              }              // <5> 提交 pipeline 執行              List<Object> closePipeline = connection.closePipeline();              pipelinedClosed = true;              // <6> 反序列化結果,並返回              return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);          } finally {              if (!pipelinedClosed) {                  connection.closePipeline();              }          }      });  }  
  • <1> 處,調用 #execute(RedisCallback action) 方法,執行 Redis 方法。注意,此處傳入的 action 參數,不是我們傳入的 RedisCallback 參數。我們的會在該 action 中被執行。
  • <2> 處,調用 RedisConnection#openPipeline() 方法,自動打開 Pipeline 模式。這樣,我們就不需要手動去打開了。
  • <3> 處,調用我們傳入的實現的 RedisCallback#doInRedis(RedisConnection connection) 方法,執行在 Pipeline 中,想要執行的 Redis 操作。
  • <4> 處,不要返回結果。因為 RedisCallback 是統一定義的介面,所以可以返回一個結果。但是在 Pipeline 中,未提交執行時,顯然是沒有結果,返回也沒有意思。簡單來說,就是我們在實現 RedisCallback#doInRedis(RedisConnection connection) 方法時,返回 null 即可。
  • <5> 處,調用 RedisConnection#closePipeline() 方法,自動提交 Pipeline 執行,並返回執行結果。
  • <6> 處,反序列化結果,並返回 Pipeline 結果。

至此,Spring Data Redis 對 Pipeline 的封裝,我們已經做了一個簡單的了解,實際就是經典的「模板方法」設計模式化的應用。下面,在讓我們來看看 org.springframework.data.redis.core.RedisCallback<T> 介面,Redis 回調介面。程式碼如下:

// RedisCallback.java  public interface RedisCallback<T> {        /**       * Gets called by {@link RedisTemplate} with an active Redis connection. Does not need to care about activating or       * closing the connection or handling exceptions.       *       * @param connection active Redis connection       * @return a result object or {@code null} if none       * @throws DataAccessException       */      @Nullable      T doInRedis(RedisConnection connection) throws DataAccessException;  }  
  • 雖然介面名是以 Callback 結尾,但是通過 #doInRedis(RedisConnection connection) 方法可以很容易知道,實際可以理解是 Redis Action ,想要執行的 Redis 操作。
  • 有一點要注意,傳入的 connection 參數是 RedisConnection 對象,它提供的 'low level' 更底層的 Redis API 操作。例如說: // RedisStringCommands.java // RedisConnection 實現 RedisStringCommands 介面 byte[] get(byte[] key); Boolean set(byte[] key, byte[] value);
    • 傳入和返回的是二進位數組,實際就是 RedisTemplate 已經序列化的入參和會被反序列化的出參。

5.1.2 具體示例

示例程式碼對應測試類:PipelineTest 。

創建 PipelineTest 單元測試類,編寫程式碼如下:

// PipelineTest.java    @RunWith(SpringRunner.class)  @SpringBootTest  public class PipelineTest {        @Autowired      private StringRedisTemplate stringRedisTemplate;        @Test      public void test01() {          List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {                @Override              public Object doInRedis(RedisConnection connection) throws DataAccessException {                  // set 寫入                  for (int i = 0; i < 3; i++) {                      connection.set(String.format("yunai:%d", i).getBytes(), "shuai".getBytes());                  }                    // get                  for (int i = 0; i < 3; i++) {                      connection.get(String.format("yunai:%d", i).getBytes());                  }                    // 返回 null 即可                  return null;              }          });            // 列印結果          System.out.println(results);      }  }  

執行 #test01() 方法,結果如下:

[true, true, true, shuai, shuai, shuai]  
  • 因為我們使用 StringRedisTemplate 自己的序列化相關屬性,所以 Redis GET 命令返回的二進位,被反序列化成了字元串。

5.2 Transaction

基情提示:實際項目實戰中,Redis Transaction 事務基本不用,至少問了一些胖友,包括自己,都沒有再用。所以呢,本小節可以選擇性看看。或者,就不看,哈哈哈哈。

在看 Redis Transaction 事務之前,我們先回想下 Spring 是如何管理資料庫 Transaction 的。在應用程式中處理一個請求時,如果我們的方法開啟Trasaction 功能,Spring 會把資料庫的 Connection 連接和當前執行緒進行綁定,從而實現 Connection 打開一個 Transaction 後,所有當前執行緒的資料庫操作都在該 Connection 上執行,達到所有操作在這個 Transaction 中,最終提交或回滾。

在 Spring Data Redis 中,實現 Redis Transaction 也是這個思路。通過 SessionCallback 操作 Redis 時,會從當前執行緒獲得 Redis Connection ,如果獲取不到,則會去「創建」一個 Redis Connection 並綁定到當前執行緒中。這樣,我們在該 Redis Connection 開啟 Redis Transaction 後,在該執行緒的所有操作,都可以在這個 Transaction 中,最後交由 Spring 事務管理器統一提供或回滾 Transaction 。

如果想要使用 Redis Transaction 功能,需要創建 RedisTemplate Bean 時,設置其 enableTransactionSupport 屬性為 true ,默認為 false 不開啟。示例如下:

@Configuration  public class RedisConfiguration {        @Bean      public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {          // 創建 RedisTemplate 對象          RedisTemplate<String, Object> template = new RedisTemplate<>();            // 【重要】設置開啟事務支援          template.setEnableTransactionSupport(true);            // 設置 RedisConnection 工廠。? 它就是實現多種 Java Redis 客戶端接入的秘密工廠。感興趣的胖友,可以自己去擼下。          template.setConnectionFactory(factory);            // 使用 String 序列化方式,序列化 KEY 。          template.setKeySerializer(RedisSerializer.string());            // 使用 JSON 序列化方式(庫是 Jackson ),序列化 VALUE 。          template.setValueSerializer(RedisSerializer.json());          return template;      }    }  

5.2.1 源碼解析

概念和原理層面的東西,一旦複雜,就會特別抽象,那麼還是老規矩,讓我們一起擼下源碼,讓原理具象化。很多時候,這就是為什麼我們要去擼源碼的意義。

我們先來看看,配置下 enableTransactionSupport 屬性,Redis 在執行命令,是如何獲得 Connection 連接的。程式碼如下:

// RedisTemplate.java    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");      Assert.notNull(action, "Callback object must not be null");        RedisConnectionFactory factory = getRequiredConnectionFactory();      RedisConnection conn = null;      try {          // <1.1>          if (enableTransactionSupport) {              // only bind resources in case of potential transaction synchronization              conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);          } else {              // <1.2>              conn = RedisConnectionUtils.getConnection(factory);          }            // ... 省略中間,執行 Redis 命令的程式碼。      } finally {          // <2>          RedisConnectionUtils.releaseConnection(conn, factory);      }  }  
  • 考慮到盡量讓內容簡單一些,我們不會對每一行程式碼做特別的深究,主要是保證胖友對 Spring Data Redis 對 Transaction 的封裝,有個總體了解。
  • <1.2> 處,當我們開啟 enableTransactionSupport 事務時,調用 `RedisConnectionUtils#getConnection(factory)` 方法,獲得 Redis Connection 。如果獲取不到,則進行創建。
  • <1.1> 處,當我們開啟 enableTransactionSupport 事務時,調用 `RedisConnectionUtils#bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport)` 方法,在 RedisConnectionUtils#getConnection(factory) 的基礎上,如果是創建的 Redis Connection ,會綁定到當前啊執行緒中。因為 Transaction 是需要在 Connection 打開,然後後續的 Redis 的操作,都需要在其上。並且,還有一個非常重要的操作,打開 Redis Transaction ,會在該方法中,通過調用 `RedisConnectionUtils#potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connHolder, final RedisConnectionFactory factory)` 。
  • <2> 處,調用 `RedisConnectionUtils#releaseConnection(RedisConnection conn, RedisConnectionFactory factory)` 方法,釋放 Redis Connection 。當然,這是有一個前提,整個 Transaction 已經完成。如果未完成,實際 Redis Connection 不會釋放。

那麼,此時會有胖友有疑問,Redis Transaction 的提交和回滾在哪呢?答案在 RedisConnectionUtils 的內部類 RedisTransactionSynchronizer 中。程式碼如下:

// RedisConnectionUtils.java    private static class RedisTransactionSynchronizer extends TransactionSynchronizationAdapter {        private final RedisConnectionHolder connHolder;      private final RedisConnection connection;      private final RedisConnectionFactory factory;        @Override      public void afterCompletion(int status) {            try {              switch (status) {                  // 提交                  case TransactionSynchronization.STATUS_COMMITTED:                      connection.exec();                      break;                  // 回滾                  case TransactionSynchronization.STATUS_ROLLED_BACK:                  case TransactionSynchronization.STATUS_UNKNOWN:                  default:                      connection.discard();              }          } finally {              connHolder.setTransactionSyncronisationActive(false);              connection.close();              TransactionSynchronizationManager.unbindResource(factory);          }      }  }  
  • 根據事務結果的狀態,進行 Redis Transaction 提交或回滾。? 如果想進一步的深入,胖友就需要去了解 Spring Transaction 的源碼。

5.2.2 具體示例

示例程式碼對應測試類:TransactionTest 。

創建 TransactionTest 單元測試類,編寫程式碼如下:

// TransactionTest.java    @RunWith(SpringRunner.class)  @SpringBootTest  public class TransactionTest {        @Autowired      private StringRedisTemplate stringRedisTemplate;        @Test  //    @Transactional      public void test01() {          // 這裡是偷懶,沒在 RedisConfiguration 配置類中,設置 stringRedisTemplate 開啟事務。          stringRedisTemplate.setEnableTransactionSupport(true);            // 執行想要的操作          stringRedisTemplate.opsForValue().set("yunai:1", "shuai");          stringRedisTemplate.opsForValue().set("yudaoyuanma:1", "dai");      }  }  

目前這僅僅是一個示例。因為 Redis Transaction 實際創建事務的前提,是當前已經存在 Spring Transaction 。具體可以看看傳送門處的判斷的程式碼。? 略感神奇,不曉得為什麼是這樣的設定。

5.2.3 補充資料

如果覺得還是無法理解的胖友,可以在看看如下幾篇文章:

  • 《Spring Data Redis(Redis Transactions)》
  • 《Redis 之坑:spring-data-redis 中的 Redis 事務》
  • 《Spring Data Redis 事務專題》

5.2.4 閑話兩句

實際場景下,如果胖友有 Redis 事務的訴求,建議把事務的、和非事務的 RedisTemplate 拆成兩個連接池,相互獨立。主要原因有兩個:

  • 1)Spring Data Redis 的事務設計,是將其融入到 Spring 整個 Transaction 當中。一般來說,Spring Transaction 中,肯定會存在資料庫的 Transaction 。考慮到資料庫操作相比 Redis 來說,肯定是慢得多,那麼就會導致 Redis 的 Connection 一直被當前 Transaction 佔用著。
  • 2)How can i eliminate getting junk value through redis get command?

5.3 Session

首先,我們需要澄清下,Session 不是 Redis 的功能,而是 Spring Data Redis 封裝的一個功能。一次 Session ,代表通過同一個 Redis Connection 執行一系列的 Redis 操作。

在 「5.2.1 源碼解析」 中,我們可以發現,如果我們在一個 Redis Transaction 中的時候,所有 Redis 操作都使用通過同一個 Redis Connection ,因為我們會將獲得到的 Connection 綁定到當前執行緒中。

但是,如果我們不在一個 Redis Transaction 中的時候,我們每一次使用 Redis Operations 執行 Redis 操作的時候,每一次都會獲取一次 Redis Connection 的獲取。實際項目中,我們必然會使用 Redis Connection 連接池,那麼在獲取的時候,會存在一定的競爭,會有資源上的消耗。那麼,如果我們希望如果已知我們要執行一個系列的 Redis 操作,能不能使用同一個 Redis Connection ,避免重複獲取它呢?答案是有,那就是 Session 。

當我們要執行在同一個 Session 里的操作時,我們通過實現 org.springframework.data.redis.core.SessionCallback<T> 介面,其程式碼如下:

// SessionCallback.java    public interface SessionCallback<T> {        @Nullable      <K, V> T execute(RedisOperations<K, V> operations) throws DataAccessException;  }  
  • 相比 RedisCallback 來說,總比是比較相似的。但是比較友好的是,它的入參 operations 是 org.springframework.data.redis.core.RedisOperations 介面類型,而 RedisTemplate 的各種操作,實際就是在 RedisOperations 介面中定義,由 RedisTemplate 來實現。所以使用上也會更加便利。
  • 實際上,我們在實現 RedisCallback 介面,也能實現在同一個 Connection 執行一系列的 Redis 操作,因為 RedisCallback 的入參本身就是一個 Redis Connection 。

5.3.1 源碼解析

在生產中,Transaction 和 Pipeline 會經常一起時候用,從而提升性能。所以在 RedisTemplate#executePipelined(SessionCallback<?> session, ...) 方法中,提供了這種的功能。而在這個方法的實現上,本質和 RedisTemplate#executePipelined(RedisCallback<?> action, ...) 方法是基本一致的,差別在於這一行 ,替換成了調用 #executeSession(SessionCallback<?> session) 方法。所以,我們來直接來看被調用的這個方法的實現。程式碼如下:

// RedisTemplate.java    @Override  public <T> T execute(SessionCallback<T> session) {        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");      Assert.notNull(session, "Callback object must not be null");        RedisConnectionFactory factory = getRequiredConnectionFactory();      // bind connection      // <1> 獲得並綁定 Connection 。      RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);      try {         // <2> 執行定義的一系列 Redis 操作          return session.execute(this);      } finally {          // <3> 釋放並解綁 Connection 。          RedisConnectionUtils.unbindConnection(factory);      }  }  
  • <1> 處,調用 RedisConnectionUtils#bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport) 方法,實際和我們開啟 enableTranactionSupport 事務時候,獲取 Connection 和處理的方式,是一模一樣的。也就是說:
    • 如果當前執行緒已經有一個綁定的 Connection 則直接使用(例如說,當前正在 Redis Transaction 事務中);
    • 如果當前執行緒未綁定一個 Connection ,則進行創建並綁定到當前執行緒。甚至,如果此時是配置開啟 `enableTranactionSupport` 事務的,那麼此處就會觸發 Redis Transaction 的開啟。
  • <2> 處,調用 SessionCallback#execute(RedisOperations operations) 方法,執行我們定義的一系列的 Redis 操作。看看此處傳入的參數是 this ,是不是彷彿更加明白點什麼了?
  • <3> 處,調用 `RedisConnectionUtils#unbindConnection(RedisConnectionFactory factory)` 方法,釋放並解綁 Connection 。當前,前提是當前不存在激活的 Redis Transaction ,不然不就提早釋放了嘛。

恩,現在胖友在回過頭,好好在想一想 Pipeline、Transaction、Session 之間的關係,以及組合排列。之後,我們在使用上,會更加得心應手。

5.3.2 具體示例

示例程式碼對應測試類:SessionTest 。

創建 SessionTest 單元測試類,編寫程式碼如下:

// SessionTest.java    @RunWith(SpringRunner.class)  @SpringBootTest  public class SessionTest {        @Autowired      private StringRedisTemplate stringRedisTemplate;        @Test      public void test01() {          String result = stringRedisTemplate.execute(new SessionCallback<String>() {                @Override              public String execute(RedisOperations operations) throws DataAccessException {                  for (int i = 0; i < 100; i++) {                      operations.opsForValue().set(String.format("yunai:%d", i), "shuai02");                  }                  return (String) operations.opsForValue().get(String.format("yunai:%d", 0));              }            });            System.out.println("result:" + result);      }    }  

執行 #test01() 方法,結果如下:

result:shuai02  
  • 卧槽,一直被 Redis 誇獎,已經超級不好意思了。

5.4 Pub/Sub

Redis 提供了 Pub/Sub 功能,實現簡單的訂閱功能,不了解的胖友,可以看看 「Redis 文檔 —— Pub/Sub」 。

5.4.1 源碼解析

暫時不提供,感興趣的胖友,可以自己看看最核心的 org.springframework.data.redis.listener.RedisMessageListenerContainer 類,Redis 消息監聽器容器,基於 Pub/Sub 的 SUBSCRIBE、PSUBSCRIBE 命令實現,我們只需要添加相應的 org.springframework.data.redis.connection.MessageListener 即可。不算複雜,1000 多行,只要調試下核心的功能即可。

5.4.2 具體示例

示例程式碼對應測試類:PubSubTest 。

Spring Data Redis 實現 Pub/Sub 的示例,主要分成兩部分:

  • 配置 RedisMessageListenerContainer Bean 對象,並添加我們自己實現的 MessageListener 對象,用於監聽處理相應的消息。
  • 使用 RedisTemplate 發布消息。

下面,我們通過四個步驟,來實現一個簡單的示例。

第一步,了解 Topic

org.springframework.data.redis.listener.Topic 介面,表示 Redis 消息的 Topic 。它有兩個子類實現:

  • ChannelTopic :對應 SUBSCRIBE 訂閱命令。
  • PatternTopic :對應 PSUBSCRIBE 訂閱命令。

第二步,實現 MessageListener 類

創建 TestChannelTopicMessageListener 類,編寫程式碼如下:

public class TestPatternTopicMessageListener implements MessageListener {        @Override      public void onMessage(Message message, byte[] pattern) {          System.out.println("收到 PatternTopic 消息:");          System.out.println("執行緒編號:" + Thread.currentThread().getName());          System.out.println("message:" + message);          System.out.println("pattern:" + new String(pattern));      }    }  
  • message 參數,可獲得到具體的消息內容,不過是二進位數組,需要我們自己序列化。具體可以看下 `org.springframework.data.redis.connection.DefaultMessage` 類。
  • pattern 參數,發布的 Topic 的內容。

有一點要注意,默認的 RedisMessageListenerContainer 情況下,MessageListener 是並發消費,在執行緒池中執行(具體見傳送門程式碼)。所以如果想相同 MessageListener 串列消費,可以在方法上加 synchronized 修飾,來實現同步。

第三步,創建 RedisMessageListenerContainer Bean

在 RedisConfiguration 中,配置 RedisMessageListenerContainer Bean 。程式碼如下:

// RedisConfiguration.java    @Bean  public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory factory) {      // 創建 RedisMessageListenerContainer 對象      RedisMessageListenerContainer container = new RedisMessageListenerContainer();        // 設置 RedisConnection 工廠。? 它就是實現多種 Java Redis 客戶端接入的秘密工廠。感興趣的胖友,可以自己去擼下。      container.setConnectionFactory(factory);        // 添加監聽器      container.addMessageListener(new TestChannelTopicMessageListener(), new ChannelTopic("TEST"));  //        container.addMessageListener(new TestChannelTopicMessageListener(), new ChannelTopic("AOTEMAN"));  //        container.addMessageListener(new TestPatternTopicMessageListener(), new PatternTopic("TEST"));      return container;  }  

要注意,雖然 RedisConnectionFactory 可以多次調用 #addMessageListener(MessageListener listener, Topic topic) 方法,但是一定要都是相同的 Topic 類型。例如說,添加了 ChannelTopic 類型,就不能添加 PatternTopic 類型。為什麼呢?因為 RedisMessageListenerContainer 是基於一次 SUBSCRIBE 或 PSUBSCRIBE 命令,所以不支援不同類型的 Topic 。當然,如果是相同類型的 Topic ,多個 MessageListener 是支援的。

那麼,可能會有胖友會問,如果我添加了 "Test" 給 MessageListenerA"AOTEMAN" 給 MessageListenerB ,兩個 Topic 是怎麼分發(Dispatch)的呢?在 RedisMessageListenerContainer 中,有個 DispatchMessageListener 分發器,負責將不同的 Topic 分發到配置的 MessageListener 中。看到此處,有木有想到 Spring MVC 的 DispatcherServlet 分發不同的請求到對應的 @RequestMapping 方法。

第四步,使用 RedisTemplate 發布消息

創建 PubSubTest 測試類,編寫程式碼如下:

@RunWith(SpringRunner.class)  @SpringBootTest  public class PubSubTest {        public static final String TOPIC = "TEST";        @Autowired      private StringRedisTemplate stringRedisTemplate;        @Test      public void test01() throws InterruptedException {          for (int i = 0; i < 10; i++) {              stringRedisTemplate.convertAndSend(TOPIC, "yunai:" + i);              Thread.sleep(1000L);          }      }    }  
  • 通過 RedisTemplate#convertAndSend(String channel, Object message) 方法,PUBLISH 消息。

執行 #test01() 方法,運行結果如下:

收到 ChannelTopic 消息:  執行緒編號:listenerContainer-2  message:yunai:0  pattern:TEST  收到 ChannelTopic 消息:  執行緒編號:listenerContainer-3  message:yunai:1  pattern:TEST  收到 ChannelTopic 消息:  執行緒編號:listenerContainer-4  message:yunai:2  pattern:TEST  
  • 整整齊齊,發送和訂閱都成功了。注意,執行緒編號

5.4.3 閑話兩句

Redis 提供了 PUB/SUB 訂閱功能,實際我們在使用時,一定要注意,它提供的不是一個可靠的訂閱系統。例如說,有消息 PUBLISH 了,Redis Client 因為網路異常斷開,無法訂閱到這條消息。等到網路恢復後,Redis Client 重連上後,是無法獲得到該消息的。相比來說,成熟的消息隊列提供的訂閱功能,因為消息會進行持久化(Redis 是不持久化 Publish 的消息的),並且有客戶端的 ACK 機製做保障,所以即使網路斷開重連,消息一樣不會丟失。

Redis 5.0 版本後,正式發布 Stream 功能,相信是有可能可以替代掉 Redis Pub/Sub 功能,提供可靠的消息訂閱功能。

上述的場景,艿艿自己在使用 PUB/SUB 功能的時候,確實被這麼坑過。當時我們的管理後台的許可權,是快取在 Java 進程當中,通過 Redis Pub/Sub 實現快取的刷新。結果,當時某個 Java 節點網路出問題,恰好那個時候,有一條刷新許可權快取的消息 PUBLISH 出來,結果沒刷新到。結果呢,運營在訪問某個功能的時候,一會有許可權(因為其他 Java 節點快取刷新了),一會沒有許可權。

最近,艿艿又去找了幾個朋友請教了下,問問他們在生產環境下,是否使用 Redis Pub/Sub 功能,他們說使用 Kafka、或者 RocketMQ 的廣播消費功能,更加可靠有保障。

對了,我們有個管理系統裡面有 Websocket 需要實時推送管理員消息,因為不知道管理員當前連接的是哪個 Websocket 服務節點,所以我們是通過 Redis Pub/Sub 功能,廣播給所有 Websocket 節點,然後每個 Websocket 節點判斷當前管理員是否連接的是它,如果是,則進行 Websocket 推送。因為之前網路偶爾出故障,會存在消息丟失,所以近期我們替換成了 RocketMQ 的廣播消費,替代 Redis Pub/Sub 功能。

當然,不能說 Redis Pub/Sub 毫無使用的場景,以下艿艿來列舉幾個:

  • 1、在使用 Redis Sentinel 做高可用時,Jedis 通過 Redis Pub/Sub 功能,實現對 Redis 主節點的故障切換,刷新 Jedis 客戶端的主節點的快取。如果出現 Redis Connection 訂閱的異常斷開,會重新主動去 Redis Sentinel 的最新主節點資訊,從而解決 Redis Pub/Sub 可能因為網路問題,丟失消息。
  • 2、Redis Sentinel 節點之間的部分資訊同步,通過 Redis Pub/Sub 訂閱發布。
  • 3、在我們實現 Redis 分散式鎖時,如果獲取不到鎖,可以通過 Redis 的 Pub/Sub 訂閱鎖釋放消息,從而實現其它獲得不到鎖的執行緒,快速搶佔鎖。當然,Redis Client 釋放鎖時,需要 PUBLISH 一條釋放鎖的消息。在 Redisson 實現分散式鎖的源碼中,我們可以看到。
  • 4、Dubbo 使用 Redis 作為註冊中心時,使用 Redis Pub/Sub 實現註冊資訊的同步。

也就是說,如果想要有保障的使用 Redis Pub/Sub 功能,需要處理下發起訂閱的 Redis Connection 的異常,例如說網路異常。然後,重新主動去查詢最新的數據的狀態。?

5.5 Script

Redis 提供 Lua 腳本,滿足我們希望組合排列使用 Redis 的命令,保證串列執行的過程中,不存在並發的問題。同時,通過將多個命令組合在同一個 Lua 腳本中,一次請求,直接處理,也是一個提升性能的手段。不了解的胖友,可以看看 「Redis 文檔 —— Lua 腳本」 。

下面,我們來看看 Spring Data Redis 使用 Lua 腳本的示例。

示例程式碼對應測試類:ScriptTest 。

第一步,編寫 Lua 腳本

創建 resources/compareAndSet.lua 腳本,實現 CAS 功能。程式碼如下:

if redis.call('GET', KEYS[1]) ~= ARGV[1] then      return 0  end  redis.call('SET', KEYS[1], ARGV[2])  return 1  
  • 第 1 到 3 行:判斷 KEYS[1] 對應的 VALUE 是否為 ARGV[1] 值。如果不是(Lua 中不等於使用 ~=),則直接返回 0 表示失敗。
  • 第 4 到 5 行:設置 KEYS[1] 對應的 VALUE 為新值 ARGV[2] ,並返回 1 表示成功。

第二步,調用 Lua 腳本

創建 ScriptTest 測試類,編寫程式碼如下:

@RunWith(SpringRunner.class)  @SpringBootTest  public class ScriptTest {        @Autowired      private StringRedisTemplate stringRedisTemplate;        @Test      public void test01() throws IOException {          // <1.1> 讀取 /resources/lua/compareAndSet.lua 腳本 。注意,需要引入下 commons-io 依賴。          String  scriptContents = IOUtils.toString(getClass().getResourceAsStream("/lua/compareAndSet.lua"), "UTF-8");          // <1.2> 創建 RedisScript 對象          RedisScript<Long> script = new DefaultRedisScript<>(scriptContents, Long.class);          // <2> 執行 LUA 腳本          Long result = stringRedisTemplate.execute(script, Collections.singletonList("yunai:1"), "shuai02", "shuai");          System.out.println(result);      }  }  
  • <1.1> 行,讀取 /resources/lua/compareAndSet.lua 腳本。注意,需要引入下 commons-io 依賴。
  • <1.2> 行,創建 DefaultRedisScript 對象。第一個參數是腳本內容( scriptSource ),第二個是腳本執行返回值( resultType )。
  • <2> 處,調用 `RedisTemplate#execute(RedisScript script, List keys, Object… args)` 方法,發送 Redis 執行 LUA 腳本。

最後,我們列印下執行結果。胖友可以自己執行下試試。?

6. 嘗試 Redisson

可能胖友不是很了解 Redisson 這個庫,胖友可以跳轉 Redis 客戶端 Redisson ,看看對它的介紹。簡單來說,這是 Java 最強的 Redis 客戶端!除了提供了 Redis 客戶端的常見操作之外,還提供了 Redis 分散式鎖、BloomFilter 布隆過濾器等強大的功能。

在 redisson-examples 中,Redisson 官方提供了大量的示例。

6.1 快速入門

示例程式碼對應倉庫:spring-data-redis-with-redisson 。

6.1.1 引入依賴

pom.xml 中,引入相關依賴。

<parent>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-parent</artifactId>      <version>2.1.3.RELEASE</version>      <relativePath/> <!-- lookup parent from repository -->  </parent>    <dependencies>        <!-- 實現對 Redisson 的自動化配置 --> <!-- X -->      <dependency>          <groupId>org.redisson</groupId>          <artifactId>redisson-spring-boot-starter</artifactId>          <version>3.11.3</version>      </dependency>        <!-- 方便等會寫單元測試 -->      <dependency>          <groupId>org.springframework.boot</groupId>          <artifactId>spring-boot-starter-test</artifactId>          <scope>test</scope>      </dependency>        <!-- 等會示例會使用 fastjson 作為 JSON 序列化的工具 -->      <dependency>          <groupId>com.alibaba</groupId>          <artifactId>fastjson</artifactId>          <version>1.2.61</version>      </dependency>        <!-- Spring Data Redis 默認使用 Jackson 作為 JSON 序列化的工具 -->      <dependency>          <groupId>com.fasterxml.jackson.core</groupId>          <artifactId>jackson-databind</artifactId>      </dependency>        <dependency>          <groupId>commons-io</groupId>          <artifactId>commons-io</artifactId>          <version>2.6</version>      </dependency>    </dependencies>  

和 「2.1 引入依賴」 的差異點是,我們需要引入 redisson-spring-boot-starter 依賴,實現 Redisson 的自動化配置。

6.1.2 配置文件

application.yml 中,添加 Redis 配置,如下:

spring:    # 對應 RedisProperties 類    redis:      host: 127.0.0.1      port: 6379  #    password: # Redis 伺服器密碼,默認為空。生產中,一定要設置 Redis 密碼!      database: 0 # Redis 資料庫號,默認為 0 。      timeout: 0 # Redis 連接超時時間,單位:毫秒。      # 對應 RedissonProperties 類      redisson:        config: classpath:redisson.yml # 具體的每個配置項,見 org.redisson.config.Config 類。  

和 「2.2 配置文件」 的差異點是:

1)去掉 Jedis 相關的配置項

2)增加 redisson.config 配置

在我們使用 Spring Boot 整合 Redisson 時候,通過該配置項,引入一個外部的 Redisson 相關的配置文件。例如說,示例中,我們引入了 classpath:redisson.yaml 配置文件。它可以使用 JSON 或 YAML 格式,進行配置。

而引入的 redisson.config 對應的配置文件,對應的類是 org.redisson.config.Config 類。因為示例中,我們使用的比較簡單,所以就沒有做任何 Redisson 相關的自定義配置。如下是 Redisson 的每個配置項的解釋:

FROM 《Spring Boot 2.x 整合 lettuce redis 和 redisson》 文章。

clusterServersConfig:    # 連接空閑超時 如果當前連接池裡的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那麼這些連接將會自動被關閉,並從連接池裡去掉。時間單位是毫秒。    idleConnectionTimeout: 10000    pingTimeout: 1000    # 連接超時    connectTimeout: 10000    # 命令等待超時    timeout: 3000    # 命令失敗重試次數    retryAttempts: 3    # 命令重試發送時間間隔    retryInterval: 1500    # 重新連接時間間隔    reconnectionTimeout: 3000    # failedAttempts    failedAttempts: 3    # 密碼    password: null    # 單個連接最大訂閱數量    subscriptionsPerConnection: 5    # 客戶端名稱    clientName: null    #負載均衡演算法類的選擇  默認輪詢調度演算法RoundRobinLoadBalancer    loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}    slaveSubscriptionConnectionMinimumIdleSize: 1    slaveSubscriptionConnectionPoolSize: 50    # 從節點最小空閑連接數    slaveConnectionMinimumIdleSize: 32    # 從節點連接池大小    slaveConnectionPoolSize: 64    # 主節點最小空閑連接數    masterConnectionMinimumIdleSize: 32    # 主節點連接池大小    masterConnectionPoolSize: 64    # 只在從服務節點裡讀取    readMode: "SLAVE"    # 主節點資訊    nodeAddresses:    - "redis://192.168.56.128:7000"    - "redis://192.168.56.128:7001"    - "redis://192.168.56.128:7002"    #集群掃描間隔時間 單位毫秒    scanInterval: 1000  threads: 0  nettyThreads: 0  codec: !<org.redisson.codec.JsonJacksonCodec> {}  

注意 注意 注意

如果 redisson.config 對應的配置文件,如果沒有配置任何內容,需要在 application.yml 里注釋掉 redisson.config 。像這樣:

spring:    # 對應 RedisProperties 類    redis:      host: 127.0.0.1      port: 6379  #    password: # Redis 伺服器密碼,默認為空。生產中,一定要設置 Redis 密碼!      database: 0 # Redis 資料庫號,默認為 0 。      timeout: 0 # Redis 連接超時時間,單位:毫秒。      # 對應 RedissonProperties 類  #    redisson:  #      config: classpath:redisson.yml # 具體的每個配置項,見 org.redisson.config.Config 類。  

6.1.3 簡單測試

創建 Test01 測試類,我們來測試一下簡單的 SET 指令。程式碼如下:

@RunWith(SpringRunner.class)  @SpringBootTest  public class Test01 {        @Autowired      private StringRedisTemplate stringRedisTemplate;        @Test      public void testStringSetKey() {          stringRedisTemplate.opsForValue().set("yunai", "shuai");      }  }  

我們先來執行下 #testStringSetKey() 方法這個測試方法。執行完成後,我們在控制台查詢,看看是否真的執行成功了。

$ redis-cli get yunai  "shuai"  
  • 請大聲的告訴我,Redis 是怎麼誇獎 "yunai" 的,哈哈哈哈。

6.1.4 閑聊兩句

因為有 Spring Data Redis 的存在,我們其實已經能感受到,即使我們將 Jedis 替換成了 Redisson ,依然調用的是相同的 Spring Data Redis 提供的 API ,而無需感知到 Redisson 或是 Jedis 的存在。如果哪一天,Spring Boot 2.X 版本默認推薦的 Lettuce 真的成熟了,那麼我們也可以無感知的進行替換。

6.2 Redis 分散式鎖

示例程式碼對應測試類:LockTest 。

一說到分散式鎖,大家一般會想到的就是基於 Zookeeper 或是 Redis 實現分散式鎖。相對來說,在考慮性能為優先因素,不需要特別絕對可靠性的場景下,我們會優先考慮使用 Redis 實現的分散式鎖。

在 Redisson 中,提供了 8 種分散式鎖的實現,具體胖友可以看看 《Redisson 文檔 —— 分散式鎖和同步器》 。真特碼的強大!大多數開發者可能連 Redis 怎麼實現分散式鎖都沒完全搞清楚,Redisson 直接給了 8 種鎖,氣人,簡直了。

本小節,我們來編寫一個簡單使用 Redisson 提供的可重入鎖 RLock 的示例。

創建 LockTest 測試類,編寫程式碼如下:

@RunWith(SpringRunner.class)  @SpringBootTest  public class LockTest {        private static final String LOCK_KEY = "anylock";        @Autowired // <1>      private RedissonClient redissonClient;        @Test      public void test() throws InterruptedException {          // <2.1> 啟動一個執行緒 A ,去佔有鎖          new Thread(new Runnable() {              @Override              public void run() {                  // 加鎖以後 10 秒鐘自動解鎖                  // 無需調用 unlock 方法手動解鎖                  final RLock lock = redissonClient.getLock(LOCK_KEY);                  lock.lock(10, TimeUnit.SECONDS);              }          }).start();          // <2.2> 簡單 sleep 1 秒,保證執行緒 A 成功持有鎖          Thread.sleep(1000L);            // <3> 嘗試加鎖,最多等待 100 秒,上鎖以後 10 秒自動解鎖          System.out.println(String.format("準備開始獲得鎖時間:%s", new SimpleDateFormat("yyyy-MM-DD HH:mm:ss").format(new Date())));          final RLock lock = redissonClient.getLock(LOCK_KEY);          boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);          if (res) {              System.out.println(String.format("實際獲得鎖時間:%s", new SimpleDateFormat("yyyy-MM-DD HH:mm:ss").format(new Date())));          } else {              System.out.println("加鎖失敗");          }      }    }  
  • 整個測試用例,意圖是:1)啟動一個執行緒 A ,先去持有鎖 10 秒然後釋放;2)主執行緒,也去嘗試去持有鎖,因為執行緒 A 目前正在佔用著該鎖,所以需要等待執行緒 A 釋放到該鎖,才能持有成功。
  • <1> 處,注入 RedissonClient 對象。因為我們需要使用 Redisson 獨有的功能,所以需要使用到它。
  • <2.1> 處,啟動執行緒 A ,然後調用 RLock#lock(long leaseTime, TimeUnit unit) 方法,加鎖以後 10 秒鐘自動解鎖,無需調用 unlock 方法手動解鎖。
  • <2.2> 處,簡單 sleep 1 秒,保證執行緒 A 成功持有鎖。
  • <3> 處,主執行緒,調用 RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法,嘗試加鎖,最多等待 100 秒,上鎖以後 10 秒自動解鎖。

執行 #test() 測試用例,結果如下:

準備開始獲得鎖時間:2019-10-274 00:44:08  實際獲得鎖時間:2019-10-274 00:44:17  
  • 9 秒後(因為我們 sleep 了 1 秒),主執行緒成功獲得到 Redis 分散式鎖,符合預期。

6.3 Redis 限流器

在開始本節之前,先推薦看一篇乾貨 《你應該如何正確健壯後端服務?》 。

限流,無論在系統層面,還是在業務層面,使用都非常廣泛。例如說:

  • 【業務】為了避免惡意的灌水機或者用戶,限制每分鐘至允許回復 10 個帖子。
  • 【系統】為了避免服務系統被大規模調用,超過極限,限制每個調用方只允許每秒調用 100 次。

限流演算法,常用的分成四種:

每一種的概念,推薦看看 《計數器、滑動窗口、漏桶、令牌演算法比較和偽程式碼實現》 文章。

  • 計數器 比較簡單,每固定單位一個計數器即可實現。
  • 滑動窗口 Redisson 提供的是基於滑動窗口 RateLimiter 的實現。相比計數器的實現,它的起點不是固定的,而是以開始計數的那個時刻開始為一個窗口。 所以,我們可以把計數器理解成一個滑動窗口的特例,以固定單位為一個窗口。
  • 令牌桶演算法 《Eureka 源碼解析 —— 基於令牌桶演算法的 RateLimiter》 ,單機並發場景下的 RateLimiter 實現。 《Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.10) 之 RequestRateLimiterGatewayFilterFactory 請求限流》 ,基於 Redis 實現的令牌桶演算法的 RateLimiter 實現。
  • 漏桶演算法 > 漏桶演算法,一直沒搞明白和令牌桶演算法的區別。現在的理解是: > > * 令牌桶演算法,桶里裝的是令牌。每次能拿取到令牌,就可以進行訪問。並且,令牌會按照速率不斷恢復放到令牌桶中直到桶滿。 > * 漏桶演算法,桶里裝的是請求。當桶滿了,請求就進不來。例如說,Hystrix 使用執行緒池或者 Semaphore 訊號量,只有在請求未滿的時候,才可以進行執行。

上面嗶嗶了非常多的字,只看本文的話,就那一句話:「Redisson 提供的是基於滑動窗口 RateLimiter 的實現。」。

6.3.1 具體示例

示例程式碼對應測試類:PubSubTest 。

創建 RateLimiterTest 測試類,編寫程式碼如下:

@RunWith(SpringRunner.class)  @SpringBootTest  public class RateLimiterTest {        @Autowired      private RedissonClient redissonClient;        @Test      public void test() throws InterruptedException {          // 創建 RRateLimiter 對象          RRateLimiter rateLimiter = redissonClient.getRateLimiter("myRateLimiter");          // 初始化:最大流速 = 每 1 秒鐘產生 2 個令牌          rateLimiter.trySetRate(RateType.OVERALL, 2, 1, RateIntervalUnit.SECONDS);  //        rateLimiter.trySetRate(RateType.PER_CLIENT, 50, 1, RateIntervalUnit.MINUTES);            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");          for (int i = 0; i < 5; i++) {              System.out.println(String.format("%s:獲得鎖結果(%s)", simpleDateFormat.format(new Date()),                      rateLimiter.tryAcquire()));              Thread.sleep(250L);          }      }    }  

執行 #test() 測試用例,結果如下:

2019-10-02 22:46:40:獲得鎖結果(true)  2019-10-02 22:46:40:獲得鎖結果(true)  2019-10-02 22:46:41:獲得鎖結果(false)  2019-10-02 22:46:41:獲得鎖結果(false)  2019-10-02 22:46:41:獲得鎖結果(true)  
  • 第 1、2 次,成功獲取鎖。
  • 第 3、4 次,因為每 1 秒產生 2 個令牌,所以被限流了。
  • 第 5 次,已經過了 1 秒,所以獲得令牌成功。

6.3.2 閑聊兩句

有一點要糾正一下。Redisson 提供的限流器不是嚴格且完整的滑動窗口的限流器實現。舉個例子,我們創建了一個每分鐘允許 3 次操作的限流器。整個執行過程如下:

00:00:00 獲得鎖,剩餘令牌 2 。  00:00:20 獲得鎖,剩餘令牌 1 。  00:00:40 獲得鎖,剩餘令牌 0 。  
  • 那麼,00:01:00 時,鎖的數量會恢復,按照 Redisson 的限流器來說。
  • 如果是嚴格且完整的滑動窗口的限流器,此時在 00:01:00 剩餘可獲得的令牌數為 1 ,也就是說,起始點應該變成 00:00:20 。

如果基於 Redis 嚴格且完整的滑動窗口的限流器,可以通過基於 Redis Zset 實現。

666. 彩蛋

寫了老長一篇,都不曉得有木有會看。斷斷續續寫了小一周,不曉得有木有胖友會看完,甚至看到彩蛋環節,哈哈哈哈。

在高並發場景下,系統會大量依賴快取和消息隊列,實現所需要的高性能。而快取,絕大部分的選擇,基本都是 Redis ,這點毋庸置疑。所以,我們是非常有必要深入去學習下 Redis ,友情推薦下付磊大佬的 《Redis 開發與運維》 。

因為寫的還是略有些聰明,所以有錯誤或者表達不清晰的地方,歡迎胖友指出。國慶快樂,繼續學習!