Mysql高手系列 – 第26篇:聊聊如何使用mysql實現分佈式鎖

  • 2019 年 10 月 28 日
  • 筆記

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高級開發所需要的全部技能。

歡迎大家加我微信itsoku一起交流java、算法、數據庫相關技術。

這是Mysql系列第26篇。

本篇我們使用mysql實現一個分佈式鎖。

分佈式鎖的功能

  1. 分佈式鎖使用者位於不同的機器中,鎖獲取成功之後,才可以對共享資源進行操作
  2. 鎖具有重入的功能:即一個使用者可以多次獲取某個鎖
  3. 獲取鎖有超時的功能:即在指定的時間內去嘗試獲取鎖,超過了超時時間,如果還未獲取成功,則返回獲取失敗
  4. 能夠自動容錯,比如:A機器獲取鎖lock1之後,在釋放鎖lock1之前,A機器掛了,導致鎖lock1未釋放,結果會lock1一直被A機器佔有着,遇到這種情況時,分佈式鎖要能夠自動解決,可以這麼做:持有鎖的時候可以加個持有超時時間,超過了這個時間還未釋放的,其他機器將有機會獲取鎖

預備技能:樂觀鎖

通常我們修改表中一條數據過程如下:

t1:select獲取記錄R1  t2:對R1進行編輯  t3:update R1

我們來看一下上面的過程存在的問題:

如果A、B兩個線程同時執行到t1,他們倆看到的R1的數據一樣,然後都對R1進行編輯,然後去執行t3,最終2個線程都會更新成功,後面一個線程會把前面一個線程update的結果給覆蓋掉,這就是並發修改數據存在的問題。

我們可以在表中新增一個版本號,每次更新數據時候將版本號作為條件,並且每次更新時候版本號+1,過程優化一下,如下:

t1:打開事務start transaction  t2:select獲取記錄R1,聲明變量v=R1.version  t3:對R1進行編輯  t4:執行更新操作      update R1 set version = version + 1 where user_id=#user_id# and version = #v#;  t5:t4中的update會返回影響的行數,我們將其記錄在count中,然後根據count來判斷提交還是回滾      if(count==1){          //提交事務          commit;      }else{          //回滾事務          rollback;      }

上面重點在於步驟t4,當多個線程同時執行到t1,他們看到的R1是一樣的,但是當他們執行到t4的時候,數據庫會對update的這行記錄加鎖,確保並發情況下排隊執行,所以只有第一個的update會返回1,其他的update結果會返回0,然後後面會判斷count是否為1,進而對事務進行提交或者回滾。可以通過count的值知道修改數據是否成功了。

上面這種方式就樂觀鎖。我們可以通過樂觀鎖的方式確保數據並發修改過程中的正確性。

使用mysql實現分佈式鎖

建表

我們創建一個分佈式鎖表,如下

DROP DATABASE IF EXISTS javacode2018;  CREATE DATABASE javacode2018;  USE javacode2018;  DROP TABLE IF EXISTS t_lock;  create table t_lock(    lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標誌',    request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標識請求對象的',    lock_count INT NOT NULL DEFAULT 0 COMMENT '當前上鎖次數',    timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間',    version INT NOT NULL DEFAULT 0 COMMENT '版本號,每次更新+1'  )COMMENT '鎖信息表';

分佈式鎖工具類:

package com.itsoku.sql;    import lombok.Builder;  import lombok.Getter;  import lombok.Setter;  import lombok.extern.slf4j.Slf4j;  import org.junit.Test;    import java.sql.*;  import java.util.Objects;  import java.util.UUID;  import java.util.concurrent.TimeUnit;      /**   * 工作10年的前阿里P7分享Java、算法、數據庫方面的技術乾貨!堅信用技術改變命運,讓家人過上更體面的生活!   * 喜歡的請關注公眾號:路人甲Java   */  @Slf4j  public class LockUtils {        //將requestid保存在該變量中      static ThreadLocal<String> requestIdTL = new ThreadLocal<>();        /**       * 獲取當前線程requestid       *       * @return       */      public static String getRequestId() {          String requestId = requestIdTL.get();          if (requestId == null || "".equals(requestId)) {              requestId = UUID.randomUUID().toString();              requestIdTL.set(requestId);          }          log.info("requestId:{}", requestId);          return requestId;      }        /**       * 獲取鎖       *       * @param lock_key        鎖key       * @param locktimeout(毫秒) 持有鎖的有效時間,防止死鎖       * @param gettimeout(毫秒)  獲取鎖的超時時間,這個時間內獲取不到將重試       * @return       */      public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {          log.info("start");          boolean lockResult = false;          String request_id = getRequestId();          long starttime = System.currentTimeMillis();          while (true) {              LockModel lockModel = LockUtils.get(lock_key);              if (Objects.isNull(lockModel)) {                  //插入一條記錄,重新嘗試獲取鎖                  LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build());              } else {                  String reqid = lockModel.getRequest_id();                  //如果reqid為空字符,表示鎖未被佔用                  if ("".equals(reqid)) {                      lockModel.setRequest_id(request_id);                      lockModel.setLock_count(1);                      lockModel.setTimeout(System.currentTimeMillis() + locktimeout);                      if (LockUtils.update(lockModel) == 1) {                          lockResult = true;                          break;                      }                  } else if (request_id.equals(reqid)) {                      //如果request_id和表中request_id一樣表示鎖被當前線程持有者,此時需要加重入鎖                      lockModel.setTimeout(System.currentTimeMillis() + locktimeout);                      lockModel.setLock_count(lockModel.getLock_count() + 1);                      if (LockUtils.update(lockModel) == 1) {                          lockResult = true;                          break;                      }                  } else {                      //鎖不是自己的,並且已經超時了,則重置鎖,繼續重試                      if (lockModel.getTimeout() < System.currentTimeMillis()) {                          LockUtils.resetLock(lockModel);                      } else {                          //如果未超時,休眠100毫秒,繼續重試                          if (starttime + gettimeout > System.currentTimeMillis()) {                              TimeUnit.MILLISECONDS.sleep(100);                          } else {                              break;                          }                      }                  }              }          }          log.info("end");          return lockResult;      }        /**       * 釋放鎖       *       * @param lock_key       * @throws Exception       */      public static void unlock(String lock_key) throws Exception {          //獲取當前線程requestId          String requestId = getRequestId();          LockModel lockModel = LockUtils.get(lock_key);          //當前線程requestId和庫中request_id一致 && lock_count>0,表示可以釋放鎖          if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {              if (lockModel.getLock_count() == 1) {                  //重置鎖                  resetLock(lockModel);              } else {                  lockModel.setLock_count(lockModel.getLock_count() - 1);                  LockUtils.update(lockModel);              }          }      }        /**       * 重置鎖       *       * @param lockModel       * @return       * @throws Exception       */      public static int resetLock(LockModel lockModel) throws Exception {          lockModel.setRequest_id("");          lockModel.setLock_count(0);          lockModel.setTimeout(0L);          return LockUtils.update(lockModel);      }        /**       * 更新lockModel信息,內部採用樂觀鎖來更新       *       * @param lockModel       * @return       * @throws Exception       */      public static int update(LockModel lockModel) throws Exception {          return exec(conn -> {              String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND  version = ?";              PreparedStatement ps = conn.prepareStatement(sql);              int colIndex = 1;              ps.setString(colIndex++, lockModel.getRequest_id());              ps.setInt(colIndex++, lockModel.getLock_count());              ps.setLong(colIndex++, lockModel.getTimeout());              ps.setString(colIndex++, lockModel.getLock_key());              ps.setInt(colIndex++, lockModel.getVersion());              return ps.executeUpdate();          });      }        public static LockModel get(String lock_key) throws Exception {          return exec(conn -> {              String sql = "select * from t_lock t WHERE t.lock_key=?";              PreparedStatement ps = conn.prepareStatement(sql);              int colIndex = 1;              ps.setString(colIndex++, lock_key);              ResultSet rs = ps.executeQuery();              if (rs.next()) {                  return LockModel.builder().                          lock_key(lock_key).                          request_id(rs.getString("request_id")).                          lock_count(rs.getInt("lock_count")).                          timeout(rs.getLong("timeout")).                          version(rs.getInt("version")).build();              }              return null;          });      }        public static int insert(LockModel lockModel) throws Exception {          return exec(conn -> {              String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";              PreparedStatement ps = conn.prepareStatement(sql);              int colIndex = 1;              ps.setString(colIndex++, lockModel.getLock_key());              ps.setString(colIndex++, lockModel.getRequest_id());              ps.setInt(colIndex++, lockModel.getLock_count());              ps.setLong(colIndex++, lockModel.getTimeout());              ps.setInt(colIndex++, lockModel.getVersion());              return ps.executeUpdate();          });      }        public static <T> T exec(SqlExec<T> sqlExec) throws Exception {          Connection conn = getConn();          try {              return sqlExec.exec(conn);          } finally {              closeConn(conn);          }      }        @FunctionalInterface      public interface SqlExec<T> {          T exec(Connection conn) throws Exception;      }        @Getter      @Setter      @Builder      public static class LockModel {          private String lock_key;          private String request_id;          private Integer lock_count;          private Long timeout;          private Integer version;      }        private static final String url = "jdbc:mysql://localhost:3306/javacode2018?useSSL=false";        //數據庫地址      private static final String username = "root";        //數據庫用戶名      private static final String password = "root123";        //數據庫密碼      private static final String driver = "com.mysql.jdbc.Driver";        //mysql驅動        /**       * 連接數據庫       *       * @return       */      public static Connection getConn() {          Connection conn = null;          try {              Class.forName(driver);  //加載數據庫驅動              try {                  conn = DriverManager.getConnection(url, username, password);  //連接數據庫              } catch (SQLException e) {                  e.printStackTrace();              }          } catch (ClassNotFoundException e) {              e.printStackTrace();          }          return conn;      }        /**       * 關閉數據庫鏈接       *       * @return       */      public static void closeConn(Connection conn) {          if (conn != null) {              try {                  conn.close();  //關閉數據庫鏈接              } catch (SQLException e) {                  e.printStackTrace();              }          }      }  }

上面代碼中實現了文章開頭列的分佈式鎖的所有功能,大家可以認真研究下獲取鎖的方法:lock,釋放鎖的方法:unlock

測試用例

package com.itsoku.sql;    import lombok.extern.slf4j.Slf4j;  import org.junit.Test;    import static com.itsoku.sql.LockUtils.lock;  import static com.itsoku.sql.LockUtils.unlock;    /**   * 工作10年的前阿里P7分享Java、算法、數據庫方面的技術乾貨!堅信用技術改變命運,讓家人過上更體面的生活!   * 喜歡的請關注公眾號:路人甲Java   */  @Slf4j  public class LockUtilsTest {        //測試重複獲取和重複釋放      @Test      public void test1() throws Exception {          String lock_key = "key1";          for (int i = 0; i < 10; i++) {              lock(lock_key, 10000L, 1000);          }          for (int i = 0; i < 9; i++) {              unlock(lock_key);          }      }        //獲取之後不釋放,超時之後被thread1獲取      @Test      public void test2() throws Exception {          String lock_key = "key2";          lock(lock_key, 5000L, 1000);          Thread thread1 = new Thread(() -> {              try {                  try {                      lock(lock_key, 5000L, 7000);                  } finally {                      unlock(lock_key);                  }              } catch (Exception e) {                  e.printStackTrace();              }          });          thread1.setName("thread1");          thread1.start();          thread1.join();      }  }

test1方法測試了重入鎖的效果。

test2測試了主線程獲取鎖之後一直未釋放,持有鎖超時之後被thread1獲取到了。

留給大家一個問題

上面分佈式鎖還需要考慮一個問題:比如A機會獲取了key1的鎖,並設置持有鎖的超時時間為10秒,但是獲取鎖之後,執行了一段業務操作,業務操作耗時超過10秒了,此時機器B去獲取鎖時可以獲取成功的,此時會導致A、B兩個機器都獲取鎖成功了,都在執行業務操作,這種情況應該怎麼處理?大家可以思考一下然後留言,我們一起討論一下。

更多優質文章

  1. java高並發系列全集(34篇)
  2. mysql高手系列(20多篇,高手必備)
  3. 聊聊db和緩存一致性常見的實現方式

mysql系列大概有20多篇,喜歡的請關注一下,歡迎大家加我微信itsoku或者留言交流mysql相關技術!