靈活運用分佈式鎖解決數據重複插入問題

一、業務背景

許多面向用戶的互聯網業務都會在系統後端維護一份用戶數據,快應用中心業務也同樣做了這件事。快應用中心允許用戶對快應用進行收藏,並在服務端記錄了用戶的收藏列表,通過用戶賬號標識OpenID來關聯收藏的快應用包名。

為了使用戶在快應用中心的收藏列表能夠與快應用Menubar的收藏狀態打通,我們同時也記錄了用戶賬號標識OpenID與客戶端本地標識local_identifier的綁定關係。因為快應用Manubar由快應用引擎持有,獨立於快應用中心外,無法通過賬號體系獲取到用戶賬號標識,只能獲取到客戶端本地標識local_identifier,所以我們只能通過二者的映射關係來保持狀態同步。

在具體實現上,我們是在用戶啟動快應用中心的時候觸發一次同步操作,由客戶端將OpenID和客戶端本地標識提交到服務端進行綁定。服務端的綁定邏輯是:判斷OpenID是否已經存在,如果不存在則插入數據庫,否則更新對應數據行的local_identifier字段(因為用戶可能先後在兩個不同的手機上登錄同一個vivo賬號)。在後續的業務流程中,我們就可以根據OpenID查詢對應的local_identifier,反之亦可。

但是代碼上線一段時間後,我們發現t_account數據表中居然存在許多重複的OpenID記錄。根據如上所述的綁定邏輯,這種情況理論上是不應該發生的。所幸這些重複數據並沒有對更新和查詢的場景造成影響,因為在查詢的SQL中我們加入了LIMIT 1的限制,因此針對一個OpenID的更新和查詢操作實際上都只作用於ID最小的那條記錄。

二、問題分析與定位

雖然冗餘數據沒有對實際業務造成影響,但是這種明顯的數據問題也肯定是不能容忍的。因此我們開始着手排查問題。

首先想到的就是從數據本身入手。先通過對t_account表數據進行粗略觀察,發現大約有3%的OpenID會存在重複的情況。也就是說重複插入的情況是偶現的,大多數請求的處理都是按照預期被正確處理了。我們對代碼重新進行了走讀,確認了代碼在實現上確實不存在什麼明顯的邏輯錯誤。

我們進一步對數據進行細緻觀察。我們挑選了幾個出現重複情況的OpenID,將相關的數據記錄查詢出來,發現這些OpenID重複的次數也不盡相同,有的只重複一次,有的則更多。但是,這時候我們發現了一個更有價值的信息——這些相同OpenID的數據行的創建時間都是完全相同的,而且自增ID是連續的。

於是,我們猜測問題的產生應該是由於並發請求造成的!我們模擬了客戶端對接口的並發調用,確實出現了重複插入數據的現象,進一步證實了這個猜測的合理性。但是,明明客戶端的邏輯是每個用戶在啟動的時候進行一次同步,為什麼會出現同一個OpenID並發請求呢?

事實上,代碼的實際運行並不如我們想像中的那麼理想,計算機的運行過程中往往存在一些不穩定的因素,比如網絡環境、服務器的負載情況。而這些不穩定因素就可能導致客戶端發送請求失敗,這裡的「失敗」可能並不意味着真正的失敗,而是可能整個請求時間過長,超過了客戶端設定的超時時間,從而被人為地判定為失敗,於是通過重試機制再次發送請求。那麼最終就可能導致同樣的請求被提交了多次,而且這些請求也許在中間某個環節被阻塞了(比如當服務器的處理線程負載過大,來不及處理請求,請求進入了緩衝隊列),當阻塞緩解後這幾個請求就可能在很短的時間內被並發處理了。

這其實是一個典型的並發衝突問題,可以把這個問題簡單抽象為:如何避免並發情況下寫入重複數據。事實上,有很多常見的業務場景都可能面臨這個問題,比如用戶註冊時不允許使用相同的用戶名。

一般來說,我們在處理這類問題時,最直觀的方式就是先進行一次查詢,當判斷數據庫中不存在當前數據時才允許插入。

顯然,這個流程從單個請求的角度來看是沒有問題的。但是當多個請求並發時,請求A和請求B都先發起一次查詢,並且都得到結果是不存在,於是兩者都又執行了數據插入,最終導致並發衝突。

三、探索可行的方案

既然問題定位到了,接下來就要開始尋求解決方案了。面對這種情況,我們通常有兩種選擇,一種是讓數據庫來解決,另一種是由應用程序來解決。

3.1 數據庫層面處理——唯一索引

當使用MySQL數據庫及InnoDB存儲引擎時,我們可以利用唯一索引來保障同一個列的值具有唯一性。顯然,在t_account這張表中,我們最開始是沒有為open_id列創建唯一索引的。如果我們想要此時加上唯一索引的話,可以利用下列的ALTER TABLE語句。

ALTER TABLE t_account ADD UNIQUE uk_open_id( open_id );

一旦為open_id列加上唯一索引後,當上述並發情況發生時,請求A和請求B中必然有一者會優先完成數據的插入操作,而另一者則會得到類似錯誤。因此,最終保證t_account表中只有一條openid=xxx的記錄存在。

Error Code: 1062. Duplicate entry 'xxx' for key 'uk_open_id'

3.2 應用程序層面處理——分佈式鎖

另一種解決的思路是我們不依賴底層的數據庫來為我們提供唯一性的保障,而是靠應用程序自身的代碼邏輯來避免並發衝突。應用層的保障其實是一種更具通用性的方案,畢竟我們不能假設所有系統使用的數據持久化組件都具備數據唯一性檢測的能力。

那具體怎麼做呢?簡單來說,就是化並行為串行。之所以我們會遇到重複插入數據的問題,是因為「檢測數據是否已經存在」和「插入數據」兩個動作被分割開來。由於這兩個步驟不具備原子性,才導致兩個不同的請求可以同時通過第一步的檢測。如果我們能夠把這兩個動作合併為一個原子操作,就可以避免數據衝突了。這時候我們就需要通過加鎖,來實現這個代碼塊的原子性。

對於Java語言,大家最熟悉的鎖機制就是synchronized關鍵字了。

public synchronized void submit(String openId, String localIdentifier){
    Account account = accountDao.find(openId);
    if (account == null) {
        // insert
    }
    else {
        // update
    }
}

但是,事情可沒這麼簡單。要知道,我們的程序可不是只部署在一台服務器上,而是部署了多個節點。也就是說這裡的並發不僅僅是線程間的並發,而是進程間的並發。因此,我們無法通過java語言層面的鎖機制來解決這個同步問題,我們這裡需要的應該是分佈式鎖。

3.3 兩種解決方案的權衡

基於以上的分析,看上去兩種方案都是可行的,但最終我們選擇了分佈式鎖的方案。為什麼明明第一種方案只需要簡單地加個索引,我們卻不採用呢?

因為現有的線上數據已然在open_id列上存在重複數據,如果此時直接去加唯一索引是無法成功的。為了加上唯一索引,我們必須首先將已有的重複數據先進行清理。但是問題又來了,線上的程序一直持續運行着,重複數據可能會源源不斷地產生。那我們能不能找一個用戶請求不活躍的時間段去進行清理,並在新的重複數據插入之前完成唯一索引的建立?答案當然是肯定的,只不過這種方案需要運維、DBA、開發多方協同處理,而且由於業務特性,最合適的處理時間段應該是凌晨這種夜深人靜的時候。即便是採取這麼苛刻的修復措施,也不能百分之百完全保證數據清理完成到索引建立之間不會有新的重複數據插入。因此,基於唯一索引的修復方案乍看之下非常合適,但是具體操作起來還是略為麻煩。

事實上,建立唯一索引最合適的契機應該是在系統最初的設計階段,這樣就能有效避免重複數據的問題。然而木已成舟,在當前這個情景下,我們還是選擇了可操作性更強的分佈式鎖方案。因為選擇這個方案的話,我們可以先上線加入了分佈式鎖修復的新代碼,阻斷新的重複數據插入,然後再對原有的重複數據執行清理操作,這樣一來只需要修改代碼並一次上線即可。當然,待問題徹底解決之後,我們可以重新再考慮為數據表加上唯一索引。

那麼接下來,我們就來看看基於分佈式鎖的方案如何實現。首先我們先來回顧一下分佈式鎖的相關知識。

四、分佈式鎖概述

4.1 分佈式鎖需要具備哪些特性?

  • 在分佈式系統環境下,同一時間只有一台機器的一個線程可以獲取到鎖;

  • 高可用的獲取鎖與釋放鎖;

  • 高性能的獲取鎖與釋放鎖;

  • 具備可重入特性;

  • 具備鎖失效機制,防止死鎖;

  • 具備阻塞/非阻塞鎖特性。

4.2 分佈式鎖有哪些實現方式?

分佈式鎖實現主要有如下三種:

  • 基於數據庫實現分佈式鎖;

  • 基於Zookeeper實現分佈式鎖;

  • 基於Redis實現分佈式鎖;

4.2.1 基於數據庫的實現方式

基於數據庫的實現方式就是直接創建一張鎖表,通過操作表數據來實現加鎖、解鎖。以MySQL數據庫為例,我們可以創建這樣一張表,並且對method_name進行加上唯一索引的約束:

CREATE TABLE `myLock` (
 `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
 `method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '鎖定的方法名',
 `value` varchar(1024) NOT NULL DEFAULT '鎖信息',
 PRIMARY KEY (`id`),
 UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的方法';

然後,我們就可以通過插入數據和刪除數據的方式來實現加鎖和解鎖:

#加鎖
insert into myLock(method_name, value) values ('m1', '1');
 
#解鎖
delete from myLock where method_name ='m1';

基於數據庫實現的方式雖然簡單,但是存在一些明顯的問題:

  • 沒有鎖失效時間,如果解鎖失敗,就會導致鎖記錄永遠留在數據庫中,造成死鎖。

  • 該鎖不可重入,因為它不認識請求方是不是當前佔用鎖的線程。

  • 當前數據庫是單點,一旦宕機,鎖機制就會完全崩壞。

4.2.2 基於Zookeeper的實現方式

ZooKeeper是一個為分佈式應用提供一致性服務的開源組件,它內部是一個分層的文件系統目錄樹結構,規定同一個目錄下的節點名稱都是唯一的。

ZooKeeper的節點(Znode)有4種類型:

  • 持久化節點(會話斷開後節點還存在)

  • 持久化順序節點

  • 臨時節點(會話斷開後節點就刪除了)

  • 臨時順序節點

當一個新的Znode被創建為一個順序節點時,ZooKeeper通過將10位的序列號附加到原始名稱來設置Znode的路徑。例如,如果將具有路徑/mynode的Znode創建為順序節點,則ZooKeeper會將路徑更改為/mynode0000000001,並將下一個序列號設置為0000000002,這個序列號由父節點維護。如果兩個順序節點是同時創建的,那麼ZooKeeper不會對每個Znode使用相同的數字。

基於ZooKeeper的特性,可以按照如下方式來實現分佈式鎖:

  • 創建一個目錄mylock;

  • 線程A想獲取鎖就在mylock目錄下創建臨時順序節點;

  • 獲取mylock目錄下所有的子節點,然後獲取比自己小的兄弟節點,如果不存在,則說明當前線程順序號最小,獲得鎖;

  • 線程B獲取所有節點,判斷自己不是最小節點,設置監聽比自己次小的節點;

  • 線程A處理完,刪除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。

由於創建的是臨時節點,當持有鎖的線程意外宕機時,鎖依然可以得到釋放,因此可以避免死鎖的問題。另外,我們也可以通過節點排隊監聽機制實現阻塞特性,也可以通過在Znode中攜帶線程標識來實現可重入鎖。同時,由於ZooKeeper集群的高可用特性,分佈式鎖的可用性也能夠得到保障。不過,因為需要頻繁的創建和刪除節點,Zookeeper方式在性能上不如Redis方式。

4.2.3 基於Redis的實現方式

Redis是一個開源的鍵值對(Key-Value)存儲數據庫,其基於內存實現,性能非常高,常常被用作緩存。

基於Redis實現分佈式鎖的核心原理是:嘗試對特定key進行set操作,如果設置成功(key之前不存在)了,則相當於獲取到鎖,同時對該key設置一個過期時間,避免線程在釋放鎖之前退出造成死鎖。線程執行完同步任務後主動釋放鎖則通過delete命令來完成。

這裡需要特別注意的一點是如何加鎖並設置過期時間。有的人會使用setnx + expire這兩個命令來實現,但這是有問題的。假設當前線程執行setnx獲得了鎖,但是在執行expire之前宕機了,就會造成鎖無法被釋放。當然,我們可以將兩個命令合併在一段lua腳本里,實現兩條命令的原子提交。

其實,我們簡單利用set命令可以直接在一條命令中實現setnx和設置過期時間,從而完成加鎖操作:

SET key value [EX seconds] [PX milliseconds] NX

解鎖操作只需要:

DEL key

五、基於Redis分佈式鎖的解決方案

在本案例中,我們採用了基於Redis實現分佈式鎖的方式。

5.1 分佈式鎖的Java實現

由於項目採用了Jedis框架,而且線上Redis部署為集群模式,因此我們基於redis.clients.jedis.JedisCluster封裝了一個RedisLock類,提供加鎖與解鎖接口。

public class RedisLock {
 
    private static final String LOCK_SUCCESS = "OK";
    private static final String LOCK_VALUE = "lock";
    private static final int EXPIRE_SECONDS = 3;
 
    @Autowired
    protected JedisCluster jedisCluster;
 
    public boolean lock(String openId) {
        String redisKey = this.formatRedisKey(openId);
        String ok = jedisCluster.set(redisKey, LOCK_VALUE, "NX", "EX", EXPIRE_SECONDS);
        return LOCK_SUCCESS.equals(ok);
    }
 
    public void unlock(String openId) {
        String redisKey = this.formatRedisKey(openId);
        jedisCluster.del(redisKey);
    }
 
    private String formatRedisKey(String openId){
        return "keyPrefix:" + openId;
    }
}

在具體實現上,我們設置了3秒鐘的過期時間,因為被加鎖的任務是簡單的數據庫查詢和插入,而且服務器與數據庫部署在同個機房,正常情況下3秒鐘已經完全能夠足夠滿足代碼的執行。

事實上,以上的實現是一個簡陋版本的Redis分佈式鎖,我們在實現中並沒有考慮線程的可重入性,也沒有考慮鎖被其他進程誤釋放的問題,但是它在這個業務場景下已經能夠滿足我們的需求了。假設推廣到更為通用的業務場景,我們可以考慮在value中加入當前進程的特定標識,並在上鎖和釋放鎖的階段做相對應的匹配檢測,就可以得到一個更為安全可靠的Redis分佈式鎖的實現了。

當然,像Redission之類的框架也提供了相當完備的Redis分佈式鎖的封裝實現,在一些要求相對嚴苛的業務場景下,我建議直接使用這類框架。由於本文側重於介紹排查及解決問題的思路,因此沒有對Redisson分佈式的具體實現原理做更多介紹,感興趣的小夥伴可以在網上找到非常豐富的資料。

5.2 改進後的代碼邏輯

現在,我們可以利用封裝好的RedisLock來改進原來的代碼了。

public class AccountService {
 
    @Autowired
    private RedisLock redisLock;
 
    public void submit(String openId, String localIdentifier) {
        if (!redisLock.lock(openId)) {
            // 如果相同openId並發情況下,線程沒有搶到鎖,則直接丟棄請求
            return;
        }
 
        // 獲取到鎖,開始執行用戶數據同步邏輯
        try {
            Account account = accountDao.find(openId);
            if (account == null) {
                // insert
            } else {
                // update
            }
        } finally {
            // 釋放鎖
            redisLock.unlock(openId);
        }
    }
}

5.3 數據清理

最後再簡單說一下收尾工作。由於重複數據的數據量較大,不太可能手工去慢慢處理。於是我們編寫了一個定時任務類,每隔一分鐘執行一次清理操作,每次清理1000個重複的OpenID,避免短時間內大量查詢和刪除操作對數據庫性能造成影響。當確認重複數據已經完全清理完畢後就停掉定時任務的調度,並在下一次版本迭代中將此代碼移除。

六、總結

在日常開發過程中難免會各種各樣的問題,我們要學會順藤摸瓜逐步分析,找到問題的根因;然後在自己的認知範圍內盡量去尋找可行的解決方案,並且仔細權衡各種方案的利弊,才能最終高效地解決問題。