分佈式鎖的三種實現方式

一、基本概念

1、引入
        傳統的鎖都是有JDK官方提供的鎖的解決方案,也就是說這些鎖只能在一個JVM進程內有效,我們把這種鎖叫做單體應用鎖。但是,在互聯網高速發展的今天,單體應用鎖能夠滿足我們的需求嗎?
新的閱讀體驗://www.zhouhong.icu/post/143

本篇文章所有代碼://github.com/Tom-shushu/Distributed-system-learning-notes/

2、互聯網系統架構的演進
        在互聯網系統發展之初,系統比較簡單,消耗資源小,用戶訪問量也比較少,我們只部署一個Tomcat應用就可以滿足需求。系統架構圖如下:
        一個Tomcat可以看作是一個JVM進程,當大量的請求並發到達系統時,所有的請求都落在這唯一的一個Tomcat上,如果某些請求方法是需要加鎖的,比如:秒殺扣減庫存,是可以滿足需求的,這和我們前面章節所講的內容是一樣的。但是隨着訪問量的增加,導致一個Tomcat難以支撐,這時我們就要集群部署Tomcat,使用多個Tomcat共同支撐整個系統
        上圖中,我們部署了兩個Tomcat,共同支撐系統。當一個請求到達系統時,首先會經過Nginx,Nginx主要是做負載轉發的,它會根據自己配置的負載均衡策略將請求轉發到其中的一個Tomcat中。當大量的請求並發訪問時,兩個Tomcat共同承擔所有的訪問量,這時,我們同樣在秒殺扣減庫存的場景中,使用單體應用鎖,還能夠滿足要求嗎?
3、單體應用鎖的局限性
        如上圖所示,在整個系統架構中,存在兩個Tomcat,每個Tomcat是一個JVM。在進行秒殺業務的時候,由於大家都在搶購秒殺商品,大量的請求同時到達系統,通過Nginx分發到兩個Tomcat上。我們通過一個極端的案例場景,可以更好地理解單體應用鎖的局限性。假如,秒殺商品的數量只有1個,這時,這些大量的請求當中,只有一個請求可以成功的搶到這個商品,這就需要在扣減庫存的方法上加鎖,扣減庫存的動作只能一個一個去執行,而不能同時去執行,如果同時執行,這1個商品可能同時被多個人搶到,從而產生超賣現象。加鎖之後,扣減庫存的動作一個一個去執行,凡是將庫存扣減為負數的,都拋出異常,提示該用戶沒有搶到商品。通過加鎖看似解決了秒殺的問題,但是事實上真的是這樣嗎?
        我們看到系統中存在兩個Tomcat,我們加的鎖是JDK提供的鎖,這種鎖只能在一個JVM下起作用,也就是             在一個Tomcat內是沒有問題的。當存在兩個或兩個以上的Tomcat時,大量的並發請求分散到不同的Tomcat上,在每一個Tomcat中都可以防止並發的產生,但是在多個Tomcat之間,每個Tomcat中獲得鎖的這個請求,又產生了並發,從而產生超賣現象。這也就是單體應用鎖的局限性,它只能在一個JVM內加鎖,而不能從這個應用層面去加鎖。
那麼這個問題如何解決呢?這就需要使用分佈式鎖了,在整個應用層面去加鎖。什麼是分佈式鎖呢?我們怎麼去使用分佈式鎖呢?
4、什麼是分佈式鎖
        在說分佈式鎖之前,我們看一看單體應用鎖的特點,單體應用鎖是在一個JVM進程內有效,無法跨JVM、跨進程。那麼分佈式鎖的定義就出來了,分佈式鎖就是可以跨越多個JVM、跨越多個進程的鎖,這種鎖就叫做分佈式鎖。
5、分佈式鎖的設計思路
        在上圖中,由於Tomcat是由Java啟動的,所以每個Tomcat可以看成一個JVM,JVM內部的鎖是無法跨越多個進程的。所以,我們要實現分佈式鎖,我們只能在這些JVM之外去尋找,通過其他的組件來實現分佈式鎖。系統的架構如圖所示:

        兩個Tomcat通過第三方的組件實現跨JVM、跨進程的分佈式鎖。這就是分佈式鎖的解決思路,找到所有JVM可以共同訪問的第三方組件,通過第三方組件實現分佈式鎖。
6、目前存在的分佈式的方案
分佈式鎖都是通過第三方組件來實現的,目前比較流行的分佈式鎖的解決方案有:
  • 數據庫,通過數據庫可以實現分佈式鎖,但是在高並發的情況下對數據庫壓力較大,所以很少使用。
  • Redis,藉助Redis也可以實現分佈式鎖,而且Redis的Java客戶端種類很多,使用的方法也不盡相同。
  • Zookeeper,Zookeeper也可以實現分佈式鎖,同樣Zookeeper也存在多個Java客戶端,使用方法也不相同。

二、電商平台中針對超賣的解決思路

① 單體架構下針對超賣的解決方案

1、超賣現象一
什麼是超賣:某件商品庫存數量10件,結果賣出了15件。

 

 

A和B同時下單,同時讀到庫存,同時減庫存,同時更新數據庫,這是庫存減1,結果卻下單了兩件。
解決辦法:
扣減庫存不在程序中進行,同時通過數據庫解決;向數據庫傳遞庫存增量,扣減1個庫存,增量為-1;在數據庫update語句計算庫存,通過update行鎖解決並發問題
2、超賣現象二
使用上述通過數據庫行鎖解決,會出現下面的第二種現象:數據庫的庫存會減為負數。

 

解決方案一
更新庫存成功後,再次檢索商品庫存,如果商品為負數,拋出異常。
解決方案二
添加鎖、將數據庫庫存校驗和數據庫庫存更新綁定到一起加上鎖,每次只能有一個得到這個鎖,從而避免庫存被減為負數(-1)的情況。

3、具體代碼實現

  • 創建數據庫表
CREATE DATABASE /*!32312 IF NOT EXISTS*/`distribute` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;
USE `distribute`;
DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `order_status` int(1) NOT NULL,
  `receiver_name` varchar(255) NOT NULL,
  `receiver_mobile` varchar(11) NOT NULL,
  `order_amount` decimal(11,0) NOT NULL,
  `create_time` time NOT NULL,
  `create_user` varchar(255) NOT NULL,
  `update_time` time NOT NULL,
  `update_user` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4;
DROP TABLE IF EXISTS `order_item`;
CREATE TABLE `order_item` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `order_id` int(11) NOT NULL,
  `product_id` int(11) NOT NULL,
  `purchase_price` decimal(11,0) NOT NULL,
  `purchase_num` int(3) NOT NULL,
  `create_time` time NOT NULL,
  `create_user` varchar(255) NOT NULL,
  `update_time` time NOT NULL,
  `update_user` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4;
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `product_name` varchar(255) NOT NULL COMMENT '商品名稱',
  `price` decimal(11,0) NOT NULL COMMENT '價格',
  `count` int(5) NOT NULL COMMENT '庫存',
  `product_desc` varchar(255) NOT NULL COMMENT '描述',
  `create_time` time NOT NULL COMMENT '創建時間',
  `create_user` varchar(255) NOT NULL COMMENT '創建人',
  `update_time` time NOT NULL COMMENT '更新時間',
  `update_user` varchar(255) NOT NULL COMMENT '更新人',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=100101 DEFAULT CHARSET=utf8mb4;
insert  into `product`(`id`,`product_name`,`price`,`count`,`product_desc`,`create_time`,`create_user`,`update_time`,`update_user`) values (100100,'測試商品','1',1,'測試商品','18:06:00','周紅','19:19:21','xxx');
/**後續分佈式鎖需要用到**/
DROP TABLE IF EXISTS `distribute_lock`;
CREATE TABLE `distribute_lock` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `business_code` varchar(255) NOT NULL COMMENT '根據業務代碼區分,不同業務使用不同鎖',
  `business_name` varchar(255) NOT NULL COMMENT '注釋,標記編碼用途',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
insert  into `distribute_lock`(`id`,`business_code`,`business_name`) values (1,'demo','test');
  • 訂單創建,庫存減1等主要邏輯代碼(這裡使用 ReentrantLock 當然,也可以使用其他鎖 )
// 注意:這邊不能使用註解的方式回滾,不然會在事務提交前下一個線程會進來
//    @Transactional(rollbackFor = Exception.class)
    public Integer createOrder() throws Exception{
        Product product = null;
        lock.lock();
        try {
            // 開啟事務
            TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
            // 查詢到所要購買的商品
            product = productMapper.selectByPrimaryKey(purchaseProductId);
            if (product==null){
                platformTransactionManager.rollback(transaction1);
                throw new Exception("購買商品:"+purchaseProductId+"不存在");
            }
            // 獲取商品當前庫存
            Integer currentCount = product.getCount();
            System.out.println(Thread.currentThread().getName()+"庫存數:"+currentCount);
            // 校驗庫存 (購買商品數量大於庫存數量,拋出異常)
            if (purchaseProductNum > currentCount){
                platformTransactionManager.rollback(transaction1);
                throw new Exception("商品"+purchaseProductId+"僅剩"+currentCount+"件,無法購買");
            }
            productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId());
            platformTransactionManager.commit(transaction1);
        }finally {
            lock.unlock();
        }
        // 創建訂單
        TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
        Order order = new Order();
        order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
        order.setOrderStatus(1);//待處理
        order.setReceiverName("xxx");
        order.setReceiverMobile("15287653421");
        order.setCreateTime(new Date());
        order.setCreateUser("不不不不");
        order.setUpdateTime(new Date());
        order.setUpdateUser("哈哈哈哈");
        orderMapper.insertSelective(order);
        // 創建訂單明細
        OrderItem orderItem = new OrderItem();
        orderItem.setOrderId(order.getId());
        orderItem.setProductId(product.getId());
        orderItem.setPurchasePrice(product.getPrice());
        orderItem.setPurchaseNum(purchaseProductNum);
        orderItem.setCreateUser("不不不");
        orderItem.setCreateTime(new Date());
        orderItem.setUpdateTime(new Date());
        orderItem.setUpdateUser("哈哈哈哈");
        orderItemMapper.insertSelective(orderItem);
        // 事務提交
        platformTransactionManager.commit(transaction);
        return order.getId();
    }
  • 測試(使用五個線程同時並發的下單)
@Test
public void concurrentOrder() throws InterruptedException {
    Thread.sleep(60000);
    CountDownLatch cdl = new CountDownLatch(5);
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    // 創建5個線程執行下訂單操作
    ExecutorService es = Executors.newFixedThreadPool(5);
    for (int i =0;i<5;i++){
        es.execute(()->{
            try {
                // 等5個線程同時達到 await()時再執行創建訂單服務,這時候5個線程會堆積到同一時間執行
                cyclicBarrier.await();
                Integer orderId = orderService.createOrder();
                System.out.println("訂單id:"+orderId);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                // 每個線程執行完成之後會減一
                cdl.countDown();
            }
        });
    }
    cdl.await();
    es.shutdown();
}

② 分佈式架構下分佈式鎖的實現

一、基於數據庫實現分佈式鎖

多個進程、多個線程訪問共同組件—數據庫
通過select…for update 訪問同一條數據、for update 鎖定數據
  • 在mapper.xml 裏面加入如下自定義的SQL
<select id="selectDistributeLock" resultType="com.example.distributelock.model.DistributeLock">
  select * from distribute_lock
  where business_code = #{businessCode,jdbcType=VARCHAR}
  for update
</select>
  • 主要的邏輯實現
@RequestMapping("singleLock")
/**
 * 沒有添加 Transactional 註解前,查詢分佈式鎖和sleep二者不是原子操作,在獲取到分佈式鎖後自動提交事務,
 * 故不會阻止第二個請求獲取鎖。添加了註解後,在sleep結束前,事務一直未提交,故會等待sleep結束後再行提交事務,
 * 此時第二個請求才能從數據庫中獲取分佈式鎖
 */
@Transactional(rollbackFor = Exception.class)
public String singleLock() throws Exception {
    log.info("我進入了方法!");
    DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo");
    if (distributeLock==null) throw new Exception("分佈式鎖找不到");
    log.info("我進入了鎖!");
    try {
        Thread.sleep(20000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "我已經執行完成!";
}
  • 另一個項目和這個相同,只需要更改端口號即可
優點:
  • 簡單方便,易於理解,易於操作
缺點:
  • 並發量大,對數據庫壓力較大
建議:
  • 作為鎖的數據庫與業務數據庫分開

二、基於Redis的SetNX實現分佈式鎖

① 獲取鎖的Redis命令
  • Set resource_name my_random_value NX PX 30000
  • resource_name:資源名稱,可根據不同的業務區分不同的鎖
  • my_random_value:隨機值,每個線程的隨機值都不相同,用於釋放鎖時的校驗
  • NX: key不存在是設置成功,key存在則設置不成功
  • PX:自動失效時間,出現異常情況,鎖可以過期失效
② 實現原理
  • 利用NX的原子性,多個線程並發時,只有一個線程可以設置成功
  • 設置成功即獲得鎖,可以執行後續的業務處理
  • 如果出現異常,過了鎖的有效期,鎖自動釋放。
  • 釋放鎖採用了Redis的delete命令
  • 釋放鎖時校驗值錢設置的隨機數,相同才能釋放
  • 釋放鎖的LUA腳本
if redis.call("get",KEYS[1])==argv[1] then 
    return redis.call("del",KEYS[1])
else
    return 0
end
③ 為什麼要添加LUA腳本校驗:
沒有校驗可能導致鎖的混亂,如上圖所示:A可能釋放掉了B的鎖,會出現問題。
④ Redis分佈式鎖關鍵代碼封裝
package com.example.distributelock.lock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@Slf4j
public class RedisLock implements AutoCloseable {
    private RedisTemplate redisTemplate;
    private String key;
    private String value;
    // 過期時間 單位:秒
    private int expireTime;
    public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.expireTime=expireTime;
        this.value = UUID.randomUUID().toString();
    }
    /**
     * 獲取分佈式鎖
     * @return
     */
    public boolean getLock(){
        RedisCallback<Boolean> redisCallback = connection -> {
            //設置NX
            RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
            //設置過期時間
            Expiration expiration = Expiration.seconds(expireTime);
            //序列化key
            byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
            //序列化value
            byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
            //執行setnx操作
            Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
            return result;
        };
        //獲取分佈式鎖
        Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
        return lock;
    }
    // 釋放鎖
    public boolean unLock() {
        String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
        List<String> keys = Arrays.asList(key);

        Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
        log.info("釋放鎖的結果:"+result);
        return result;
    }
    @Override
    public void close() throws Exception {
        unLock();
    }
}
⑤ 測試
@RequestMapping("redisLock")
public String redisLock(){
    log.info("我進入了方法!");
    try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
        if (redisLock.getLock()) {
            log.info("我進入了鎖!!");
            Thread.sleep(15000);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
    log.info("方法執行完成");
    return "方法執行完成";
}
⑥ 在項目中使用ESJOB定時任務沒問題,但是如果項目中使用了SpringTask來定時,那麼在集群中可能會出現任務重複執行的情況。
解決辦法:使用分佈式鎖在定時到達後,在執行任務之前,哪個節點獲取到了鎖,哪個節點就來執行這個任務。
⑦ 代碼實現
public class SchedulerService {
    @Autowired
    private RedisTemplate redisTemplate;
    @Scheduled(cron = "0/5 * * * * ?")
    public void sendSms(){
        try(RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) {
            if (redisLock.getLock()){
                log.info("每五秒執行這個程序!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

三、基於Zookeeper實現分佈式鎖

zookeeper的觀察器
  1. 可設置觀察器的3個方法:getData();getChildren();exists();
  2. 節點數據發生變化,發送給客戶端;
  3. 觀察器只能監控一次,再監控需重新設置;
實現原理

  1. 利用zookeeper的瞬時有序節點的特性;
  2. 多線程並發創建瞬時節點時,得到有序的序列;
  3. 序列號最小的線程獲得鎖;
  4. 其他線程監聽自己序號的前一個序號;
  5. 前一個線程執行完成,刪除自己序號節點;
  6. 下一個序號的線程得到通知,繼續執行;
  7. 以此類推,創建節點時,已經確定了線程的執行順序;
代碼實現:
package com.example.distributelock.lock;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
@Slf4j
public class ZkLock implements Watcher,AutoCloseable {
    private ZooKeeper zooKeeper;
    private String businessName;
    private String znode;
    public ZkLock(String connectString,String businessName) throws IOException {
        this.zooKeeper = new ZooKeeper(connectString,30000,this);
        this.businessName = businessName;
    }
    public boolean getLock() throws KeeperException, InterruptedException {
        Stat existsNode = zooKeeper.exists("/" + businessName, false);
        if (existsNode == null){
            zooKeeper.create("/" + businessName,businessName.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
        znode = zooKeeper.create("/" + businessName + "/" + businessName + "_", businessName.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        znode = znode.substring(znode.lastIndexOf("/")+1);
        List<String> childrenNodes = zooKeeper.getChildren("/" + businessName, false);
        Collections.sort(childrenNodes);
        String firstNode = childrenNodes.get(0);
        if (!firstNode.equals(znode)){
            String lastNode = firstNode;
            for (String node:childrenNodes){
                if (!znode.equals(node)){
                    lastNode = node;
                }else {
                    zooKeeper.exists("/"+businessName+"/"+lastNode,true);
                    break;
                }
            }
            synchronized (this){
                wait();
            }
        }
        return true;
    }
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }
    @Override
    public void close() throws Exception {
        zooKeeper.delete("/"+businessName+"/"+znode,-1);
        zooKeeper.close();
        log.info("我釋放了鎖");
    }
}
測試:
@RequestMapping("zkLock")
    public String zkLock(){
        log.info("我進入了方法!");
        try (ZkLock zkLock = new ZkLock("localhost:2181","order")){
            if (zkLock.getLock()) {
                log.info("我進入了鎖!!");
                Thread.sleep(15000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("方法執行完成");
        return "方法執行完成";
    }