Redis的批量操作是什麼?怎麼實現的延時隊列?以及訂閱模式、LRU。
前言
這次的內容是我自己為了總結Redis知識而擴充的,上一篇其實已經總結了幾點知識了,但是Redis的強大,以及適用範圍之廣可不是單單一篇博文就能總結清的。所以這次準備繼續總結,因為第一個問題,Redis的批量操作,是我在面試過程中被真實問到的,當時沒答上來,也是因為確實沒了解過Redis的批量操作。
當時的問題,我還記得比較清晰:Redis執行批量操作的功能是什麼?使用場景就是搞促銷活動時,會做預緩存,會往緩存里放大批數據,如果直接放的話那麼會很慢,怎麼能提高效率呢?
Redis的批量操作-管道(pipeline)
首先Redis的管道(pipeline)並不是Redis服務端提供的功能,而是Redis客戶端為了減少網絡交互而提供的一種功能。
正常的一次Redis網絡交互如下:
pipeline主要就是將多個請求合併,進行一次提交給Redis服務器,Redis服務器將所有請求處理完成之後,再一次性返回給客戶端。
下面我們分析一下pipeline的原理
pipeline的一個交互過程是這樣的:
- 客戶端進程調用
write
命令將消息寫入到操作系統內核為套接字分配的發送緩衝區send buffer。 - 客戶端操作系統通過網絡路由,將send buffer中的數據發送給服務器操作系統為套接字分配的接收緩衝區 receive buffer。
- 服務端進程調用
read
命令從receive buffer中取出數據進行處理,然後調用write
命令將相應信息寫入到服務端的send buffer中。 - 服務端操作系統通過網絡路由,將send buffer中的數據發送給客戶端操作系統的receive buffer。
- 客戶端進程調用read命令將數據從receive buffer中取出進行業務處理。
在使用pipeline時需要注意:
- pipeline執行的操作,和mget,mset,hmget這樣的操作不同,pipeline的操作是不具備原子性的。
- 還有在集群模式下因為數據是被分散在不同的slot裏面的,因此在進行批量操作的時候,不能保證操作的數據都在同一台服務器的slot上,所以集群模式下是禁止執行像mget、mset、pipeline等批量操作的,如果非要使用批量操作,需要自己維護key與slot的關係。
- pipeline也不能保證批量操作中有命令執行失敗了而中斷,也不能讓下一個指令依賴上一個指令,如果非要這樣的複雜邏輯,建議使用lua腳本來完成操作。
Redis實現消息隊列和延時隊列
消息隊列
Redis的實現消息隊列可以用list來實現,通過lpush與rpop或者rpush與lpop結合來實現消息隊列。
但是若是list為空後,無論是lpop還是rpop都會持續的獲取list中的數據,若list一直為空,持續的拉取數據,一是會增加客戶端的cpu利用率,二是也增高了Redis的QPS,解決方案是使用blpop或brpop來代替lpop或rpop。
其實blpop和brpop的作用是bloking pop,就是阻塞拉取數據,當消息隊列中為空時就會停止拉取,有數據後立即恢復拉取。
但是當沒有數據的時候,阻塞拉取,就會一直阻塞在那裡,時間久了就成了空閑連接,那麼Redis服務器一般會將時間閑置過久的連接直接斷掉,以減少連接資源。所以還要檢測阻塞拉取拋出的異常然後進行重試。
另外一點,就是Redis實現的消息隊列,沒有ACK機制,所以想要實現消息的可靠性,還要自己實現當消息處理失敗後,能繼續拋回隊列。
延時隊列
用Redis實現延時隊列,其實就是使用zset來實現,將消息序列化成一個字符串(可以是json格式),作為為value
,消息的到期處理時間做為score
,然後用多線程去輪詢zset來獲取到期消息進行處理。
多線程輪詢處理,保證了可用性,但是要做冪等或鎖處理,保證不要重複處理消息。
主要的實現代碼如下。
/**
* 放入延時隊列
* @param queueMsg
*/
private void delay(QueueMsg queueMsg){
String msg = JSON.toJSONString(queueMsg);
jedis.zadd(queueKey,System.currentTimeMillis()+5000,msg);
}
/**
* 處理隊列中從消息
*/
private void lpop(){
while (!Thread.interrupted()){
// 從隊列中取出,權重為0到當前時間的數據,並且數量只取一個
Set<String> strings = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
// 如果消息為空,就歇會兒再取。
if(strings.isEmpty()){
try {
//休息一會兒
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
continue;
}
String next = strings.iterator().next();
// 如果搶到了消息
if(jedis.zrem(queueKey,next)>0){
// 反序列化後獲取到消息
QueueMsg queueMsg = JSON.parseObject(next, QueueMsg.class);
// 進行消息處理
handleMsg(queueMsg);
}
}
}
訂閱模式
Redis的主題訂閱模式,其實並不想過多總結,因為由於它本身的一些缺點,導致它的應用場景比較窄。
前面總結的用Redis的list實現的消息隊列,雖然可以使用,但是並不支持消息多播的場景,即一個生產者,將消息放入到多個隊列中,然後多個消費者進行消費。
這種消息多播的場景常用來做分佈式系統中的解耦。用哦publish
進行生產者發送消息,消費者使用subscribe
進行獲取消息。
例如:我向jimoerChannel發送了一條消息 b-tree
127.0.0.1:6379> publish jimoerChannel b-tree
(integer) 1
訂閱這個渠道的消費者立馬收到了一條b-tree的消息。
127.0.0.1:6379> subscribe jimoerChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "jimoerChannel"
3) (integer) 1
1) "message"
2) "jimoerChannel"
3) "b-tree"
我前面也說到了,Redis的pub/sub訂閱模式,其實最大的缺點就是,消息不能持久化,這樣就導致,若是消費者掛了或是沒有消費者,那麼消息就會被直接丟棄。因為這個原因,所以導致他的使用場景比較少。
IO模型
Redis的過期策略
Redis的過期策略是適用於所有數據結構的。數據一到過期時間就自動刪除,Redis會將設置了過期時間的key 放置在一個字典表裡。
定期刪除
Redis會定期遍歷字典表裏面數據來刪除過期的Key。
Redis默認的定期刪除策略是每秒進行10次過期掃描,即每100ms掃描一次。並不是掃描全部設置了過期時間的key,而是隨機掃描20個key,刪除掉已經過期的key,如果過期的比率超過25%,那麼就繼續進行掃描。
惰性刪除
因為定期刪除是隨機抽取一些key來進行過期刪除,所以如果key並沒有被定期掃描到,那麼過期的key就不會被刪除。所以Redis還提供了惰性刪除的策略,就是當去查詢某些key的時候,若是key已經過期了,那麼就會刪除key,然後返回null。
另外一點當在集群條件下,主從同步情況中,主節點中的key過期後,會在aof中生成一條刪除指令,然後同步到從節點,這樣的從節點在接收到aof的刪除指令後,刪除掉從節點的key,因為主從同步的時候是異步的所以,短暫的會出現主節點已經沒有數據了,但是從節點還存在。
但是若是定期刪除也沒有掃描到key,而且好長時間也沒去去使用key,那麼這部分過期的key就會一直佔用的內存。
所以Redis又提供了內存淘汰機制。
內存淘汰機制
當Redis的內存出現不足時,就會持續的和磁盤進行交互,這樣就會導致Redis卡頓,效率降低等情況。這在線上是不允許發生的,所以Redis提供了配置參數 maxmemory
來限制內存超出期望大小。
當內存使用情況超過maxmemory的值時,Redis提供了以下幾種策略,來讓使用者通過配置決定該如何騰出內存空間來繼續提供服務。
noeviction
不會繼續提供寫請求(del請求可以),讀請求可以,寫請求會報錯,這樣保證的數據不會丟失,但是業務不可用,這是默認的策略。volatile-lru
會將設置了過期時間的key中,淘汰掉最近最少使用的key。沒有設置過期時間的key不會被淘汰,保證了需要持久化的數據不丟。volatile-ttl
嘗試將設置了過期時間的key中,剩餘生命周期越短,越容易被淘汰。volatile-random
嘗試將從設置了過期時間的key中,隨機選擇一些key進行淘汰。allkeys-lru
從所有key中,淘汰掉最近最少使用的key。allkeys-random
從所有key中,隨機淘汰一部分key。
那麼具體設置成哪種淘汰策略呢?
這就是要看在使用Redis時的具體場景了,如果只是用Redis做緩存的話,那麼可以配置allkeys-lru或allkey-random,客戶端在寫緩存的時候並不用攜帶着過期時間。若是還想要用持久化的功能,那麼就應該使用volatile-開頭的策略,這樣可以保證每月設置過期時間的key不會被淘汰。
內存淘汰策略的配置如下:
# 最大使用內存
maxmemory 5m
# 內存淘汰策略 The default is:noeviction
maxmemory-policy allkeys-lru
LRU算法
LRU算法的實現,其實可以靠一個鏈表。鏈表按照使用情況來進行排序,當空間不足時,會剔除掉尾部的數據。當某個元素被訪問時它會被移動到鏈表頭。
在真實的面試中,若是讓寫出LRU算法,我認為可以使用Java中的LikedHashMap來實現,因為LikedHashMap已經實現了基本的LRU功能,我只需要封裝一下就改造成了自己的了。
/**
* @author Jimoer
* @description
*/
public class MyLRUCache<K,V> {
// lru容量
private int lruCapacity;
// 數據容器(內存)
private Map<K,V> dataMap;
public MyLRUCache(int capacity){
this.lruCapacity = capacity;
// 設置LinkedHashMap的初始容量為LRU的最大容量,
// 擴容因子為默認的0.75,第三個參數是否將數據按照訪問順序排序。
dataMap = new LinkedHashMap<K, V>(capacity, 0.75f, true){
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
// 當數據量大於lruCapacity時,移除掉最老使用的數據。
return super.size()>lruCapacity;
}
};
}
public V get(K k){
return dataMap.get(k);
}
public void put(K key, V value){
dataMap.put(key,value);
}
public int getLruCapacity() {
return lruCapacity;
}
public Map<K, V> getDataMap() {
return dataMap;
}
}
測試代碼:
@Test
public void lruTest(){
// 內存容量為3,即存儲3條數據後,再放入數據,就會將最老使用的數據刪除
MyLRUCache myLRUCache = new MyLRUCache(3);
myLRUCache.put("1k","張三");
myLRUCache.put("2k","李四");
myLRUCache.put("3k","王五");
// 容量已滿
System.out.println("myLRUCache:"+JSON.toJSONString(myLRUCache.getDataMap()));
// 繼續放入數據,該刪除第一條數據為第四條數據騰出空間了
myLRUCache.put("4k","趙六");
// 打印出結果
System.out.println("myLRUCache:"+JSON.toJSONString(myLRUCache.getDataMap()));
}
運行結果:
myLRUCache:{"1k":"張三","2k":"李四","3k":"王五"}
myLRUCache:{"2k":"李四","3k":"王五","4k":"趙六"}
總結
好了,Redis的相關知識,就總結到這裡了,算上前面兩篇博文(Redis基礎數據結構總結、你說一下Redis為什麼快吧,怎麼實現高可用,還有持久化怎麼做的),這是Redis的第三篇了,這一篇博文也是新年的第一篇,元旦假期在家花了兩天時間,自己學習自己總結。元旦假期結束後,我要繼續面試了,後面我會繼續將我面試中遇到的各種問題,總結出來,一是增加自己的知識面,二也將知識進行的傳播。
畢竟獨樂樂不眾樂樂😏。