Redis學習三(進階功能).

  • 2019 年 10 月 21 日
  • 筆記

一、排序

redis 支援對 list,set 和 zset 元素的排序,排序的時間複雜度是 O(N+M*log(M))。(N 是集合大小,M 為返回元素的數量)

sort key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC|DESC] [ALPHA] [STORE destination]
  • [BY pattern]:sort 命令默認使用集合元素進行排序,可以通過 「BY pattern」 使用外部 key 的數據作為權重排序。
  • [LIMIT offset count]:排序之後返回元素的數量可以通過 LIMIT 修飾符進行限制,修飾符接受 offset (要跳過的元素數量,即起始位置)和 count (返回的元素數量)兩個參數。
  • [GET pattern [GET pattern …]]:get 可以根據排序的結果來取出相應的鍵值,「get #」 表示返回自身元素,「get pattern」 可以返回外部 key 的數據 。
  • [ASC|DESC] [ALPHA]:選擇按照順序、逆序或者字元串排序,set 集合(本身沒有索引值)排序操作必須指定 ALPHA。
  • [STORE destination]:默認情況下, sort 操作只是簡單地返回排序結果,並不進行任何保存操作。通過給 store 選項指定一個 key 參數,可以將排序結果保存到給定的鍵上。

假設現在有用戶數據如下:

127.0.0.1:6379> sadd uid 1 2 3 4  127.0.0.1:6379> mset user_name_1 admin user_level_1 9999  127.0.0.1:6379> mset user_name_2 jack user_level_2 10  127.0.0.1:6379> mset user_name_3 peter user_level_3 25  127.0.0.1:6379> mset user_name_4 mary user_level_4 70

首先,直接利用集合內的元素做排序操作:

127.0.0.1:6379> sort uid  1) "1"  2) "2"  3) "3"  4) "4"

接著,我們來試試 [BY pattern] 和 [GET pattern [GET pattern …]] 操作:

127.0.0.1:6379> sort uid by user_name_* get # get user_name_* get user_level_* alpha   1) "1"   2) "admin"   3) "9999"   4) "2"   5) "jack"   6) "10"   7) "4"   8) "mary"   9) "70"  10) "3"  11) "peter"  12) "25"

這個語句有點晦澀,試著這麼理解 「by user_name_* 」, user_name_* 是一個佔用符,它先取出 uid 中的值,然後再用這個值拼接成外部鍵,而真正進行排序的正是這些外部鍵值;「get # get user_name_* get user_level_* 」 的含義也可以這麼理解,get # 表示返回自己元素,[get pattern] 表示返回外部鍵值。

二、事務

redis 的事務機制主要是由下面的幾個指令來完成:

  • multi:標記一個事務塊的開始
  • exec:執行所有事務塊中的命令
  • discard:取消事務,放棄執行事務塊中的所有指令
  • watch key [key…]:監視一個或多個 key,如果在事務執行之前這個(或這些key)被其他命令所改動,這個改動也被稱為 CAS 錯誤,那麼事務將被打斷
  • unwatch:取消 watch 命令對所有 key 的監視

當 redis 接受到 multi 指令時,這個連接會進入一個事務上下文,該連接後續的命令並不是立即執行,而是先放到一個隊列中;當從連接受到 exec 命令後,redis 會順序的執行隊列中的所有命令。並將所有命令的運行結果打包到一起返回給 client。然後此連接就結束事務上下文。

redis 將是否有 watch 命令分為普通類型事務和 CAS(Check And Set)類型事務,無 watch 命令的為普通類型事務,有 watch 命令的為 CAS類型事務。

事務失敗的原因可以分為靜態錯誤(如不存在的命令)和運行時錯誤(如 CAS 錯誤、對 string 用 lpop 操作等)。靜態錯誤會在提交 exec 時返回錯誤資訊,使事務不能執行;而除 CAS 以外的運行時錯誤不會阻止事務繼續執行。因此,Redis 的事務機制並不具有原子性。

127.0.0.1:6379> multi  OK  127.0.0.1:6379> lpush list java  QUEUED  127.0.0.1:6379> scard list  QUEUED  127.0.0.1:6379> exec  1) (integer) 1  2) (error) WRONGTYPE Operation against a key holding the wrong kind of value  127.0.0.1:6379> lrange list 0 -1  1) "java"

可以看到,即使命令錯誤,事務依然沒有被回滾。因此,redis 的事務機制過於原始,不建議使用。

需要注意的是,如果我們使用 AOF 的方式持久化,可能存在事務被部分寫入的情況(事務執行過程中 redis 掛掉等)從而導致 redis 啟動失敗退出,可以使用 redis-check-aof 工具進行修復。

三、流水線(pipeline)

在事務中 redis 提供了隊列,可以批量執行任務,這樣性能就比較高,但使用 multi…exec 事務命令是有系統開銷的,因為它會檢測對應的鎖和序列化命令。有時我們希望在沒有任何附加條件的情況下使用隊列批量執行一系列命令,這時可以使用 redis的流水線(pipeline)技術。

看看如何在 spring-data-redis 中使用 pipeline 功能,需要注意的是 RedisTemplate 的序列化需要使用 StringRedisSerializer,不能使用 JdkSerializationRedisSerializer,否則會導致序列化失敗。

    @Test      public void pipeline() {          // 1.executePipelined 重寫 入參 RedisCallback 的doInRedis方法          List<Object> resultList = redisTemplate.executePipelined(new RedisCallback<String>() {              @Override              public String doInRedis(RedisConnection redisConnection) throws DataAccessException {                  // 2.redisConnection 給本次管道內添加 要一次性執行的多條命令                  // 2.1 一個set操作                  redisConnection.set("hello".getBytes(), "world".getBytes());                  // 2.2一個批量mset操作                  Map<byte[], byte[]> tuple = new HashMap();                  tuple.put("m_hello_1".getBytes(), "m_world_1".getBytes());                  tuple.put("m_hello_2".getBytes(), "m_world_2".getBytes());                  tuple.put("m_hello_3".getBytes(), "m_world_3".getBytes());                  redisConnection.mSet(tuple);                  // 2.3一個get操作                  redisConnection.get("m_hello_1".getBytes());                  // 3 這裡一定要返回null,最終pipeline的執行結果,才會返回給最外層                  return null;              }          });          // 4. 最後對redis pipeline管道操作返回結果進行判斷和業務補償          for (Object str : resultList) {              System.out.println(str);          }      }

四、發布訂閱

繼 RxJava、Reactor、CompletableFuture 以及 Spring 的事件驅動模型後,我們又要接觸一種觀察者模式啦!redis 作為一個pub/sub server,在訂閱者和發布者之間起到了消息路由的功能。訂閱者可以通過 subscribe 和 psubscribe 命令向 redis server 訂閱自己感興趣的channel ;發布者通過 publish 命令向 redis server 的 channel 發送消息,訂閱該 channel 的全部 client 都會收到此消息。一個 client 可以訂閱多個 channel,也可以向多個 channel 發送消息。

訂閱 channel:

127.0.0.1:6379> subscribe channel  Reading messages... (press Ctrl-C to quit)  1) "subscribe"  2) "channel"  3) (integer) 1  1) "message"  2) "channel"  3) "Hello,World"

發布 channel 消息:

127.0.0.1:6379> publish channel Hello,World  (integer) 1

接下來我們來看看在 spring-data-redis 中如何實現發布訂閱功能。首先我們需要一個消息監聽器,只要讓它實現 MessageListener 介面即可:

public class ChannelListener implements MessageListener {        @Override      public void onMessage(Message message, byte[] pattern) {          System.out.println("channel is:" + new String(message.getChannel()));          System.out.println("channel content:" + new String(message.getBody()));      }  }

那麼,下面怎麼做呢?當然是把消息監聽器和 channel 綁定在一起,讓消息監聽器知道處理哪個 channel 的消息:

    /**       * redis 消息監聽器容器, 綁定消息監聽器和 channel       *       * @param connectionFactory       * @return       */      @Bean      public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {          RedisMessageListenerContainer container = new RedisMessageListenerContainer();          container.setConnectionFactory(connectionFactory);          //訂閱了一個叫 channel 的通道          container.addMessageListener(channelAdapter(), new PatternTopic("channel"));          //這個 container 可以添加多個 messageListener          return container;      }        /**       * 消息監聽器適配器       */      @Bean      public MessageListenerAdapter channelAdapter() {          return new MessageListenerAdapter(new ChannelListener());      }

接下來,讓我們試著往這個 channel 發布一個消息吧!

    @Test      public void publish() {          redisTemplate.convertAndSend("channel", "Hello,World");      }