java架構之路-(Redis專題)簡單聊聊redis分散式鎖
- 2019 年 10 月 28 日
- 筆記
這次我們來簡單說說分散式鎖,我記得過去我也過一篇JMM的記憶體一致性演算法,就是說拿到鎖的可以繼續操作,沒拿到的自旋等待。
思路與場景
我們在Zookeeper中提到過分散式鎖,這裡我們先用redis實現一個簡單的分散式鎖,這裡是我們一個簡單的售賣減庫存的小實例,剩餘庫存假設存在資料庫內。
@GetMapping(value = "/getLock") public String getLock() { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; }else{ System.out.println("剩餘庫存不足"); return "fail"; } }
這樣簡單的實現了一個售賣的過程,現在看來確實沒什麼問題的,但是如果是一個並發下的場景就可能會出現超賣的情況了,我們來改造一下程式碼。
@GetMapping(value = "/getLock") public String getLock() { synchronized (this) { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } }
貌似這回就可以了,可以抗住高並發了,但是新的問題又來了,我們如果是分散式的場景下,synchronized關鍵字是不起作用的啊。也就是說還是會出現超賣的情況的啊,我們再來改造一下
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xiaocai");//相當於我們的setnx命令 if(!bool){ return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); stringRedisTemplate.delete(lockKey); return "success"; } else { System.out.println("剩餘庫存不足"); stringRedisTemplate.delete(lockKey); return "fail"; } }
這次我們看來基本可以了,使用我們的setnx命令來做一次唯一的限制,萬一報錯了呢?解鎖怎麼辦?再來改造一下。
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xiaocai", 10, TimeUnit.SECONDS);//相當於我們的setnx命令 try { if (!bool) { return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } finally { if (bool) { stringRedisTemplate.delete(lockKey); } } }
這次貌似真的可以了,可以加鎖,最後在finally解鎖,如果解鎖還是不成功,我們還設置了我們的超時時間,貌似完美了,我們再來提出一個場景。
就是什麼意思呢?我們的執行緒來爭搶鎖,拿到鎖的執行緒開始執行,但是我們並不知道何時執行完成,我們只是設定了10秒自動釋放掉鎖,如果說我們的執行緒10秒還沒有結束,其它執行緒會拿到鎖資源,開始執行程式碼,但是過了一段時間(藍色執行緒還未執行完成),這時我們的綠色執行緒執行完畢了,開始釋放鎖資源,他釋放的其實已經不是他自己的鎖了,他自己的鎖超時了,自動釋放了,實則綠色執行緒釋放的藍色的資源,這也就造成了釋放其它的鎖,其它的執行緒又會重複的拿到鎖,重複執行該操作。明顯有點亂了,這不合理,我們來改善一下。
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; String lockValue = UUID.randomUUID().toString(); Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);//相當於我們的setnx命令 try { if (!bool) { return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } finally { if (lockValue.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } }
這次再來看一下流程,我們設置一個UUID,設置為鎖的值,也就是說,每次上鎖的UUID都是不一致的,我們的執行緒A的鎖這次只能由我們的執行緒A來釋放掉,不會造成釋放其它鎖的問題了,還是上次的圖,我們回過頭來看一下,10秒?真的合理嗎?萬一10秒還沒有執行完成呢?有的人還會問,那設置100秒?萬一執行到delete操作的時候,服務宕機了呢?是不是還要等待100秒才可以釋放鎖。別說那只是萬一,我們的程式碼希望達到我們能力範圍之內的最嚴謹。這次來說一下我們本節的其中一個重點,Lua腳本,後面會去說,我們來先用我們這次博文的Redisson吧
Redisson
剛才我們提到了我們鎖的時間設置,多長才是合理的,100秒?可能宕機,造成等待100秒自動釋放,1秒?執行緒可能執行不完,我們可不可以這樣來做呢?我們設置一個30秒,或者說設置10秒,然後我們給予一個固定時間來檢查我們的主執行緒是否執行完成,執行完成再釋放我們的鎖,思路有了,但是程式碼實現起來並不簡單,別著急,我們已經有了現成的包供我們使用的,就是我們的Redisson,首先我們來引入我們的依賴,修改一下pom文件。
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.4</version> </dependency>
然後通過@Bean的方式注入容器,三種方式我都寫在上面了。
@Bean public Redisson redisson(){ Config config = new Config(); //主從(單機) config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); //哨兵 // config.useSentinelServers().setMasterName("mymaster"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1.1:26379"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1.2:26379"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1..3:26379"); // config.useSentinelServers().setDatabase(0); // //集群 // config.useClusterServers() // .addNodeAddress("redis://192.168.0.1:8001") // .addNodeAddress("redis://192.168.0.2:8002") // .addNodeAddress("redis://192.168.0.3:8003") // .addNodeAddress("redis://192.168.0.4:8004") // .addNodeAddress("redis://192.168.0.5:8005") // .addNodeAddress("redis://192.168.0.6:8006"); // config.useSentinelServers().setPassword("xiaocai");//密碼設置 return (Redisson) Redisson.create(config); }
如果我們的是springboot也可以通過配置來實現的。
application.properties
## 因為springboot-data-redis 是用到了jedis,所已這裡得配置 spring.redis.database=10 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 ## jedis 哨兵配置 spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=192.168.1.241:26379,192.168.1.241:36379,192.168.1.241:46379 spring.redis.password=admin ## 關鍵地方 redisson spring.redis.redisson.config=classpath:redisson.json
redisson.json
## redisson.json 文件 { "sentinelServersConfig":{ "sentinelAddresses": ["redis://192.168.1.241:26379","redis://192.168.1.241:36379","redis://192.168.1.241:46379"], "masterName": "mymaster", "database": 0, "password":"admin" } }
這樣我們就建立了我們的Redisson的連接了,我們來看一下如何使用吧。
package com.redisclient.cluster; import org.redisson.Redisson; import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RedisCluster { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private Redisson redisson; @GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; RLock redissonLock = redisson.getLock(lockKey); try { redissonLock.lock(); int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } finally { redissonLock.unlock(); } } }
使用也是超級簡單的,Redisson還有重入鎖功能等等,有興趣的可以去Redisson查看,地址:https://redisson.org/ 國外的地址打開可能會慢一些。Redis的分散式鎖使用就差不多說到這裡了,我們來回到我們剛才說到的Lua腳本這裡。
Lua腳本和管道
Lua腳本
lua腳本就是一個事務控制的過程,我們可以在lua腳本中寫一些列的命令,一次性的塞入到我們的redis客戶端,保證了原子性,要麼都成功,要麼都失敗。好處在於減少與reidis的多次連接,可以替代redis的事務操作以及保證我們的原子性。
String luaString = "";//Lua腳本 jedis.eval(luaString, Arrays.asList("keysList"),Arrays.asList("valueList"));
腳本我就不寫了(我也不熟悉),我來解釋一下eval的三個參數,第一個是我們的寫好的腳本,然後我們的腳本可能傳參數的,也就是我們KEYS[1]或者是ARGV[4],意思就是我們的KEYS[1]就是我們的ArrayList(“keysList”)中的第一項,ARGV[4]就是我們的ArrayList(“valueList”)的第四項。
管道
管道和我們的和我們的Lua腳本差不多,不一樣就是管道不會保證我們的事務,也就是說我們現在塞給管道10條命令 ,我們執行到第三條時報錯了,後面的依然會執行,前面執行過的兩條還是生效的。雖然可以減少我們的網路開銷,也別一次塞太多命令進去,畢竟redis的是單執行緒的,不建議使用管道來操作redis,想深入了解的可以參照https://www.runoob.com/redis/redis-pipelining.html
redis的分散式鎖差不多就說這麼多了,關鍵是實現思路,使用Redisson倒是很簡單的,還有我們的Lua腳本和管道,Lua腳本可以保證事務,管道一次性可以執行多條命令,減少網路開銷,但不建議使用,下次我們來說下,大廠用redis的一些使用注意事項和優化吧。
最進弄了一個公眾號,小菜技術,歡迎大家的加入