來吧,自己動手擼一個分佈式ID生成器組件

在經過了眾多輪的面試之後,小林終於進入到了一家互聯網公司的基礎架構組,小林目前在公司有使用到架構組研究到分佈式id生成器,前一陣子大概看了下其內部的實現,發現還是存在一些架構設計不合理之處。但是又由於適用於當前的業務場景,所以並沒有做過多的優化,這裡記錄一些相關的技術筆記。

研發背景

在分佈式服務中,各種複雜的業務場景需要有一個用於做唯一標識的id,例如訂單業務,支付流水,聊天通信等業務場景。尤其是在分庫分表場景中,分佈式id生成器的使用頻率更高。因此分佈式id組件的設計應該要能支持以下幾個特性:

1.全局唯一特性

這個點比較好理解,這裡就不做過多的解釋。

2.組件遞增特性

可以是每個id都具有遞增的特性也可以是支持區間段內具備遞增的特性。

3.安全性

有些重要的id如果無意中暴露在了外網環境中,如果沒有做過安全防範其實是一件非常危險的事情。例如說訂單的id如果只是更具日期加訂單數目的格式生成,例如說:2020111100001表示2020年11月11日的第一筆訂單,那麼如果競對獲取到了

2020111100999這個id,再根據訂單的生成時間則大概可以推斷出該公司某日生成的訂單數目的大致量級。

4.高qps

分佈式id生成組件在使用過程中主要是qps偏高,因此在設計起初應該要能支持較高的qps查詢,同時對於網絡的延遲特性也需要儘可能降低。

5.高可用

由於分佈式id生成器是一個需要支持多個服務調用方共同使用的公共服務,一旦出現崩潰後果不堪設想,可能會導致大面積的業務線崩塌,所以在高可用方面需要考慮得尤其重要。

業界常見的分佈式id生成方案比對

uuid

java程序中實現uuid的代碼:

String result = UUID.randomUUID().toString();
System.out.println(result);

生成的格式如下所示:

b0b2197d-bc8c-4fab-ad73-2b54e11b0869

uuid的格式其實是會被 – 符號劃分為五個模塊,主要是分為了8-4-4-4-12個字符拼接而成的一段字符串。但是這種字符串的格式對於互聯網公司中主推的MySQL數據庫並不友好。

尤其是當使用生成的id作為索引的時候,uuid長度過長,大數據量的時候會導致b+樹的葉子結點裂變頻率加大,而且在進行索引比對的時候需要進行逐個字符比對,性能損耗也較高,應該拋棄該方案。小林查詢了一些網上的資料發現uuid的主要組成由以下幾個部位:

  • 當前日期和時間
  • 隨機數字
  • 機器的MAC地址(能夠保證全球範圍內機器的唯一特性)

雪花算法

SnowFlake是Twitter公司採用的一種算法,目的是在分佈式系統中產生全局唯一且趨勢遞增的ID。

來吧,自己動手擼一個分佈式ID生成器組件

稍微解釋一些雪花算法的含義:

第一位通常是0,沒有特殊使用含義,因為1通常表示為補碼。

中間的41位是用於存儲時間,41位的長度足夠容納69年左右的時長。

10bit用於標示機器自身的id,從而表示不通機器自身id的不同。

最後12位bit用於表示某一毫秒內的序列號,12位(bit)可以表示的最大正整數是4096-1=4095,所以也就是說一毫秒內可以同時生成4095個id。

時間戳位置和序列號位置還不能隨意調整,應為要保證逐漸遞增的特性。

好處

能夠保證遞增的特性,id具有明確的含義,易懂。

不足點

但是對於機器自身的系統時間有所依賴,一旦機器的系統時間發生了變化,在高並發環境下就有可能會有重複id生成的風險。

有些業務場景希望在id中加入特殊的業務規則名稱前綴

例如短訊的id:

sms_108678123

獎券的id:

coupon_12908123

需要基於這種算法進行改造,實現支持id注入「基因」的這一特性。

mongodb的主鍵id設計思路

其實在mongodb裏面也有使用到往主鍵id中注入一些「基因」要素點的這類思路:

mongodb裏面沒有自增的id。

來吧,自己動手擼一個分佈式ID生成器組件

_id是唯一標識的key,value通常我們會設置為objectid對象。

objectid裏面包含了時間戳,宿主機的ip,進程號碼,自增號

來吧,自己動手擼一個分佈式ID生成器組件

在對這幾種方案進行了調研之後,於是小林便開始萌生了開發一款專用於自己公司平台的id生成器。

自研主要設計思路

MySQL配置id生成規則,拉取到本地緩存中形成一段本地id,從而降低對於db的訪問。

支持集群配置id生成器,能夠支持高qps訪問和較好的擴容性。

來吧,自己動手擼一個分佈式ID生成器組件

配置表如下方所示:

來吧,自己動手擼一個分佈式ID生成器組件

建表sql:

CREATE TABLE `t_id_builder_config` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `des` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '描述',
  `init_num` bigint(13) DEFAULT NULL COMMENT 'id起步值',
  `current_threshold` bigint(16) DEFAULT NULL COMMENT '當前id所在階段的閾值',
  `step` int(8) DEFAULT NULL COMMENT 'id遞增區間',
  `biz_code` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '業務前綴碼,如果沒有則返回時不攜帶',
  `version` int(11) NOT NULL DEFAULT '0' COMMENT '樂觀鎖版本號',
  `is_seq` smallint(2) NOT NULL DEFAULT '0' COMMENT 'id連續遞增,0 是  1 不是',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

幾個核心設計點:

當同時有多個請求訪問mysql獲取id配置的時候該如何防止並發問題?

這裡我採用了for update的方式加行鎖進行讀取,同時對於行信息進行更新的時候加入了version版本號信息字段防止更新重複的情況。

假設說更新失敗,也會有cas的方式進行重試,重試超過一定次數之後直接中斷。

為何不引入redis作為分佈式鎖來防止並發修改數據庫操作?

不希望將該組件變得過於繁雜,減少系統對於第三方的依賴性

假設本地id還沒使用完,結果當前服務器宕機了,該如何預防?

每次服務啟動都需要更新表的配置,拉去最新的一批id集合到本地,這樣就不會出現和之前id衝突的問題了。

本地id集合中如何判斷id是否已經使用過?

如果是連續遞增型的id,這一步可以忽略,因為本地id每次獲取的時候都會比上一個id要大。但是如果是拉取了一段區間的id到本地之後進行隨機返回就需要加入bitset作為過濾器了。對於已經使用過的id,則對應bit置為1。如果隨機返回的區間id多次都已經被佔用了,則超過一定頻率之後需要重新拉取id到本地。

來吧,自己動手擼一個分佈式ID生成器組件

不通機器的狀態表示碼該如何設置?

可以通過啟動腳本中配置相關參數:

-DidBuilder.index=1001

進行配置,然後通過System.getProperty(“idBuilder.index”)的方式來獲取.

核心代碼思路:

接口設計:

package com.qiyu.tech.id.builder.service;
/**
 * @Author idea
 * @Date created in 11:16 下午 2020/12/17
 */
public interface IdBuilderService {
    /**
     * 根據本地步長度來生成唯一id(區間性遞增)
     *
     * @return
     */
    Long unionId(int code);
    /**
     * 對於unionId的算法進行優化(連續性遞增)
     *
     * @param code
     * @return
     */
    Long unionSeqId(int code);
    /**
     * 生成包含業務前綴的自增id(區間性遞增)
     *
     * @param code
     * @return
     */
    String unionIdStr(int code);
    /**
     * 生成包含業務前綴的自增id(連續性遞增)
     *
     * @param code
     * @return
     */
    String unionSeqIdStr(int code);
}

 

具體實現:

package com.qiyu.tech.id.builder.service.impl;
import com.qiyu.tech.id.builder.bean.IdBuilderPO;
import com.qiyu.tech.id.builder.bean.LocalSeqId;
import com.qiyu.tech.id.builder.dao.IdBuilderMapper;
import com.qiyu.tech.id.builder.service.IdBuilderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.management.RuntimeErrorException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static com.qiyu.tech.id.builder.constants.IdTypeConstants.*;
/**
 * @Author idea
 * @Date created in 11:18 下午 2020/12/17
 */
@Service
@Slf4j
public class IdBuilderServiceImpl implements IdBuilderService, InitializingBean {
    private static ConcurrentHashMap<Integer, BitSet> bitSetMap = new ConcurrentHashMap<>();
    private static Map<Integer, IdBuilderPO> idBuilderNotSeqMap;
    private static Map<Integer, IdBuilderPO> idBuilderSeqMap;
    private static Map<Integer, LocalSeqId> localSeqMap;
    private static Map<Integer, Boolean> newBuilderMap;
    private final static Object monitor = new Object();
    @Resource
    private IdBuilderMapper idBuilderMapper;
  
    private int idBuilderIndex;
    @Override
    public Long unionId(int code) {
        //考慮到鎖升級問題,在高並發場景下使用synchronized要比cas更佳
        synchronized (monitor) {
            IdBuilderPO idBuilderPO = idBuilderNotSeqMap.get(code);
            if (idBuilderPO == null) {
                return null;
            }
            boolean isNew = newBuilderMap.get(code);
            if (isNew) {
                //預防出現id生成器網絡中斷問題
                IdBuilderPO newIdBuilderPO = this.refreshIdBuilderConfig(idBuilderPO);
                if (newIdBuilderPO == null) {
                    log.error("[unionId] refreshIdBuilderConfig出現異常");
                    return null;
                }
                idBuilderPO.setCurrentThreshold(newIdBuilderPO.getCurrentThreshold());
                newBuilderMap.put(code, false);
            }
            long initNum = idBuilderPO.getCurrentThreshold();
            int step = idBuilderPO.getStep();
            int randomIndex = RandomUtils.nextInt((int) initNum, (int) initNum + step);
            BitSet bitSet = bitSetMap.get(code);
            if (bitSet == null) {
                bitSet = new BitSet();
                bitSetMap.put(code, bitSet);
            }
            Long id;
            int countTime = 0;
            while (true) {
                boolean indexExist = bitSet.get(randomIndex);
                countTime++;
                if (!indexExist) {
                    bitSet.set(randomIndex);
                    id = Long.valueOf(randomIndex);
                    break;
                }
                //如果重試次數大於了空間的0.75則需要重新獲取新的id區間 測試之後得出 循環一千萬次隨機函數,16gb內存條件下,大約耗時在124ms左右
                if (countTime >= step * 0.75) {
                    //擴容需要修改表配置
                    IdBuilderPO newIdBuilderPO = this.updateIdBuilderConfig(idBuilderPO);
                    if (newIdBuilderPO == null) {
                        log.error("重試超過100次沒有更新自增id配置成功");
                        return null;
                    }
                    initNum = newIdBuilderPO.getCurrentThreshold();
                    step = newIdBuilderPO.getStep();
                    idBuilderPO.setCurrentThreshold(initNum);
                    bitSet.clear();
                    log.info("[unionId] 擴容IdBuilder,new idBuilderPO is {}",idBuilderPO);
                }
                randomIndex = RandomUtils.nextInt((int) initNum, (int) initNum + step);
            }
            return id;
        }
    }
    @Override
    public Long unionSeqId(int code) {
        synchronized (monitor) {
            LocalSeqId localSeqId = localSeqMap.get(code);
            IdBuilderPO idBuilderPO = idBuilderSeqMap.get(code);
            if (idBuilderPO == null || localSeqId == null) {
                log.error("[unionSeqId] code 參數有誤,code is {}", code);
                return null;
            }
            boolean isNew = newBuilderMap.get(code);
            long result = localSeqId.getCurrentId();
            localSeqId.setCurrentId(result + 1);
            if (isNew) {
                //預防出現id生成器網絡中斷問題
                IdBuilderPO updateIdBuilderPO = this.refreshIdBuilderConfig(idBuilderPO);
                if (updateIdBuilderPO == null) {
                    log.error("[unionSeqId] refreshIdBuilderConfig出現異常");
                    return null;
                }
                newBuilderMap.put(code, false);
                localSeqId.setCurrentId(updateIdBuilderPO.getCurrentThreshold());
                localSeqId.setNextUpdateId(updateIdBuilderPO.getCurrentThreshold() + updateIdBuilderPO.getStep());
            }
            //需要更新本地步長
            if (localSeqId.getCurrentId() >= localSeqId.getNextUpdateId()) {
                IdBuilderPO newIdBuilderPO = this.updateIdBuilderConfig(idBuilderPO);
                if (newIdBuilderPO == null) {
                    log.error("[unionSeqId] updateIdBuilderConfig出現異常");
                    return null;
                }
                idBuilderPO.setCurrentThreshold(newIdBuilderPO.getCurrentThreshold());
                localSeqId.setCurrentId(newIdBuilderPO.getCurrentThreshold());
                localSeqId.setNextUpdateId(newIdBuilderPO.getCurrentThreshold() + newIdBuilderPO.getStep());
                log.info("[unionSeqId] 擴容IdBuilder,new localSeqId is {}",localSeqId);
            }
            return result;
        }
    }
    /**
     * 刷新id生成器的配置
     *
     * @param idBuilderPO
     */
    private IdBuilderPO refreshIdBuilderConfig(IdBuilderPO idBuilderPO) {
        IdBuilderPO updateResult = this.updateIdBuilderConfig(idBuilderPO);
        if (updateResult == null) {
            log.error("更新數據庫配置出現異常,idBuilderPO is {}", idBuilderPO);
            throw new RuntimeErrorException(new Error("更新數據庫配置出現異常,idBuilderPO is " + idBuilderPO.toString()));
        }
        return updateResult;
    }
    /**
     * 考慮分佈式環境下 多個請求同時更新同一行數據的情況
     *
     * @param idBuilderPO
     * @return
     */
    private IdBuilderPO updateIdBuilderConfig(IdBuilderPO idBuilderPO) {
        int updateResult = -1;
        //假設重試過程中出現網絡異常,那麼使用cas的時候必須要考慮退出情況 極限情況下更新100次
        for (int i = 0; i < 100; i++) {
            IdBuilderPO newIdBuilderPO = idBuilderMapper.selectOneForUpdate(idBuilderPO.getId());
            updateResult = idBuilderMapper.updateCurrentThreshold(newIdBuilderPO.getCurrentThreshold() + newIdBuilderPO.getStep(), newIdBuilderPO.getId(), newIdBuilderPO.getVersion());
            if (updateResult > 0) {
                return newIdBuilderPO;
            }
        }
        return null;
    }
    @Override
    public String unionIdStr(int code) {
        long id = this.unionId(code);
        IdBuilderPO idBuilderPO = idBuilderNotSeqMap.get(code);
        return idBuilderPO.getBizCode() + idBuilderIndex + id;
    }
    @Override
    public String unionSeqIdStr(int code) {
        long id = this.unionSeqId(code);
        IdBuilderPO idBuilderPO = idBuilderSeqMap.get(code);
        return idBuilderPO.getBizCode() + idBuilderIndex + id;
    }
    @Override
    public void afterPropertiesSet() {
        List<IdBuilderPO> idBuilderPOS = idBuilderMapper.selectAll();
        idBuilderNotSeqMap = new ConcurrentHashMap<>(idBuilderPOS.size());
        newBuilderMap = new ConcurrentHashMap<>(idBuilderPOS.size());
        idBuilderSeqMap = new ConcurrentHashMap<>(idBuilderPOS.size());
        localSeqMap = new ConcurrentHashMap<>(0);
        //每次重啟到時候,都需要將之前的上一個區間的id全部拋棄,使用新的步長區間
        for (IdBuilderPO idBuilderPO : idBuilderPOS) {
            if (idBuilderPO.getIsSeq() == NEED_SEQ) {
                idBuilderSeqMap.put(idBuilderPO.getId(), idBuilderPO);
                LocalSeqId localSeqId = new LocalSeqId();
                localSeqId.setNextUpdateId(idBuilderPO.getCurrentThreshold() + idBuilderPO.getStep());
                localSeqId.setCurrentId(idBuilderPO.getCurrentThreshold());
                localSeqMap.put(idBuilderPO.getId(), localSeqId);
            } else {
                idBuilderNotSeqMap.put(idBuilderPO.getId(), idBuilderPO);
            }
            newBuilderMap.put(idBuilderPO.getId(), true);
        }
        this.idBuilderIndex= Integer.parseInt(System.getProperty("idBuilder.index"));
    }
}

 

數據庫層面設計:

package com.qiyu.tech.id.builder.dao;
import com.baomidou.mybatisplus.mapper.BaseMapper;
import com.qiyu.tech.id.builder.bean.IdBuilderPO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import java.util.List;
/**
 * @Author idea
 * @Date created in 10:17 上午 2020/12/17
 */
@Mapper
public interface IdBuilderMapper extends BaseMapper<IdBuilderPO> {
    @Select("select * from t_id_builder_config")
    List<IdBuilderPO> selectAll();
    @Select("select * from t_id_builder_config where id=#{id} limit 1 for update")
    IdBuilderPO selectOneForUpdate(@Param("id") int id);
    @Update("UPDATE t_id_builder_config set current_threshold=#{currentThreshold},version=version+1 where id=#{id} and version=#{version}")
    Integer updateCurrentThreshold(@Param("currentThreshold") long currentThreshold,@Param("id") int id,@Param("version") int version);
}

  

這裏面我只貼出了部分核心代碼,http和rpc訪問部分其實大同小異,可以更具自己的需要進行額外定製。

下邊我貼出關於controller部分的代碼:

package com.qiyu.tech.id.builder.controller;
import com.qiyu.tech.id.builder.service.IdBuilderService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @Author idea
 * @Date created in 4:27 下午 2020/12/17
 */
@RestController
@RequestMapping(value = "id-builder")
public class IdBuilderController {
    @Resource
    private IdBuilderService idBuilderService;
  
    @GetMapping("increase-id")
    public Long increaseId(int code){
        long result = idBuilderService.unionId(code);
        System.out.println(result);
        return result;
    }
    @GetMapping("increase-seq-id")
    public Long increaseSeqId(int code){
        long result = idBuilderService.unionSeqId(code);
        System.out.println(result);
        return result;
    }
    @GetMapping("increase-seq-id-str")
    public String unionSeqIdStr(int code){
        String result = idBuilderService.unionSeqIdStr(code);
        System.out.println(result);
        return result;
    }
    @GetMapping("increase-id-str")
    public String unionIdStr(int code){
        String result = idBuilderService.unionIdStr(code);
        System.out.println(result);
        return result;
    }
}

  

application.yml配置文件

mybatis-plus:
  configuration:
    map-underscore-to-camel-case: true
server:
  port: 8082
  tomcat:
    max-threads: 500
    max-connections: 5000

  

注意需要結合實際機器配置nginx的並發線程數目和tomcat的並發訪問參數。

啟動類:

ps:這裏面的db訪問配置是採用了自己封裝的一個db工具,其實本質和SpringBoot直接配置jdbc是一樣的,可以忽略

package com.qiyu.tech.id.builder;
import com.qiyu.datasource.annotation.AppDataSource;
import com.qiyu.datasource.enums.DatasourceConfigEnum;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * @Author idea
 * @Date created in 11:16 下午 2020/12/17
 */
@SpringBootApplication(scanBasePackages = "com.qiyu.*")
@AppDataSource(datasourceType = {DatasourceConfigEnum.PROD_DB},defaultType = DatasourceConfigEnum.PROD_DB)
public class IdBuilderApplication {
    public static void main(String[] args) {
        SpringApplication.run(IdBuilderApplication.class,args);
        System.out.println("========== IdBuilderApplication started! =========");
    }
}

  

測試環節:

通過將服務打包部署在機器上邊,同時運行多個服務,通過nginx配置負載均衡,請求到不通的機器上邊。

下邊是我自己進行壓測的一些相關配置參數:

來吧,自己動手擼一個分佈式ID生成器組件

壓測啟動後,後台控制台會打印相關係列參數:

來吧,自己動手擼一個分佈式ID生成器組件

當我們需要擴增機器的時候,新加的機器不會對原有發號令機器的id產生影響,可以支持較好的擴容。

每次拉取的本地id段應該設計在多次較好?

這裡我們先將本地id段簡稱為segment。

按照一些過往經驗的參考,通常是希望id發號器能夠經量減少對於MySQL的訪問次數,同時也需要結合實際部門的運維能力進行把控。

假設說我們MySQL是採用了1主2從的方式搭建,當某一從節點掛了,切換新的從節點時候需要消耗大約1分鐘時長,那麼我們的segment至少需要設計為高峰期QPS * 60 * 1 * 4 ,期間考需要額外考慮一些其他因素,例如網絡新的節點切換之後帶來的一些網絡抖動問題等等,這能夠保證即使MySQL出現了故障,本地的segment也可以暫時支撐一段時間。

設計待完善點

該系統的設計不足點在於,當本地id即將用光的時候需要進行數據庫查詢,因此這個關鍵點會拖慢系統的響應時長,所以這裡可以採用異步更新配置拉取id的思路進行完善。也就是說當本地id列表剩餘只有15%可以使用的時候,便可以進行開啟一個異步線程去拉取id列表了。

來吧,自己動手擼一個分佈式ID生成器組件