【數據庫內核】RocksDB:事務鎖設計與實現

本文主要介紹 RocksDB 鎖結構設計、加鎖解鎖過程,並與 InnoDB 鎖實現做一個簡單對比。

本文由作者授權發佈,未經許可,請勿轉載。

作者:王剛,網易杭研數據庫內核開發工程師

MyRocks 引擎目前是支持行鎖的,包括共享鎖和排它鎖,主要是在 RocksDB 層面實現的,與 InnoDB 引擎的鎖系統相比,簡單很多。

本文主要介紹 RocksDB 鎖結構設計、加鎖解鎖過程,並與 InnoDB 鎖實現做一個簡單對比。

事務鎖的實現類是:TransactionLockMgr ,它的主要數據成員包括:

private:
  PessimisticTransactionDB* txn_db_impl_;
  // 默認16個lock map 分片
  const size_t default_num_stripes_;
  // 每個column family 最大行鎖數
  const int64_t max_num_locks_;
  // lock map 互斥鎖
  InstrumentedMutex lock_map_mutex_;

  // Map of ColumnFamilyId to locked key info
  using LockMaps = std::unordered_map<uint32_t, std::shared_ptr<LockMap>>;
  LockMaps lock_maps_;
  std::unique_ptr<ThreadLocalPtr> lock_maps_cache_;
  // Must be held when modifying wait_txn_map_ and rev_wait_txn_map_.
  std::mutex wait_txn_map_mutex_;
  // Maps from waitee -> number of waiters.
  HashMap<TransactionID, int> rev_wait_txn_map_;
  // Maps from waiter -> waitee.
  HashMap<TransactionID, TrackedTrxInfo> wait_txn_map_;
  DeadlockInfoBuffer dlock_buffer_;
  // Used to allocate mutexes/condvars to use when locking keys
  std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;

加鎖的入口函數是:TransactionLockMgr::TryLock

Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, //加鎖的事務
                                   uint32_t column_family_id, //所屬的CF
                                   const std::string& key, //加鎖的健 Env* env,
                                   bool exclusive //是否排它鎖) {
  // Lookup lock map for this column family id
  std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); //1. 根據 cf id 查找其 LockMap
  LockMap* lock_map = lock_map_ptr.get();
  if (lock_map == nullptr) {
    char msg[255];
    snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
             column_family_id);

    return Status::InvalidArgument(msg);
  }

  // Need to lock the mutex for the stripe that this key hashes to
  size_t stripe_num = lock_map->GetStripe(key);// 2. 根據key 的哈希獲取 stripe_num,默認16個stripe
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);

  LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
  int64_t timeout = txn->GetLockTimeout();

  return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
                            timeout, lock_info); // 實際加鎖函數
} 

GetLockMap(column_family_id) 函數根據 cf id 查找其 LockMap , 其邏輯包括兩步:

  1. 在 thread 本地緩存 lock_maps_cache 中查找;
  2. 第1步沒有查找到,則去全局的 lock_map_ 中查找。

 

std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
    uint32_t column_family_id) {
  // First check thread-local cache
  if (lock_maps_cache_->Get() == nullptr) {
    lock_maps_cache_->Reset(new LockMaps());
  }
  auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
  auto lock_map_iter = lock_maps_cache->find(column_family_id);
  if (lock_map_iter != lock_maps_cache->end()) {
    // Found lock map for this column family.
    return lock_map_iter->second;
  }
  // Not found in local cache, grab mutex and check shared LockMaps
  InstrumentedMutexLock l(&lock_map_mutex_);
  lock_map_iter = lock_maps_.find(column_family_id);
  if (lock_map_iter == lock_maps_.end()) {
    return std::shared_ptr<LockMap>(nullptr);
  } else {
    // Found lock map.  Store in thread-local cache and return.
    std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
    lock_maps_cache->insert({column_family_id, lock_map});
    return lock_map;
  }
}

lock_maps_ 是全局鎖結構:

 // Map of ColumnFamilyId to locked key info
  using LockMaps = std::unordered_map<uint32_t, std::shared_ptr<LockMap>>;
  LockMaps lock_maps_;

LockMap 是每個 CF 的鎖結構:

struct LockMap {

  // Number of sepearate LockMapStripes to create, each with their own Mutex
  const size_t num_stripes_;

  // Count of keys that are currently locked in this column family.
  // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
  std::atomic<int64_t> lock_cnt{0};

  std::vector<LockMapStripe*> lock_map_stripes_;

};

為了減少加鎖時mutex 的爭用,LockMap 內部又進行了分片,num_stripes_ = 16(默認值),
LockMapStripe 是每個分片的鎖結構:

struct LockMapStripe {

  // Mutex must be held before modifying keys map
  std::shared_ptr<TransactionDBMutex> stripe_mutex;

  // Condition Variable per stripe for waiting on a lock
  std::shared_ptr<TransactionDBCondVar> stripe_cv;

  // Locked keys mapped to the info about the transactions that locked them.
  // TODO(agiardullo): Explore performance of other data structures.
  std::unordered_map<std::string, LockInfo> keys;
};

LockMapStripe 內部還是一個 unordered_map, 還包括 stripe_mutex、stripe_cv 。這樣設計避免了一把大鎖的尷尬,減小鎖的粒度常用的方法,LockInfo 包含事務id:

struct LockInfo {
  bool exclusive;
  autovector<TransactionID> txn_ids;
  // Transaction locks are not valid after this time in us
  uint64_t expiration_time;
};

LockMaps 、LockMap、LockMapStripe 、LockInfo就是RocksDB 事務鎖用到的數據結構了,可以看到並不複雜,代碼實現起來簡單,代價當然也有,後文在介紹再介紹。

AcquireWithTimeout 函數內部先獲取 stripe mutex ,獲取到了在進入AcquireLocked 函數:

if (timeout < 0) {
    // If timeout is negative, we wait indefinitely to acquire the lock
    result = stripe->stripe_mutex->Lock();
  } else {
    result = stripe->stripe_mutex->TryLockFor(timeout);
  }

獲取了 stripe_mutex之後,準備獲取鎖:

// Acquire lock if we are able to
  uint64_t expire_time_hint = 0;
  autovector<TransactionID> wait_ids;
  result = AcquireLocked(lock_map, stripe, key, env, lock_info,
                         &expire_time_hint, &wait_ids);

AcquireLocked 函數實現獲取鎖邏輯,它的實現邏輯是:

  1. 在 stripe 的 map 中查找該key 是否已經被鎖住。
  2. 如果key沒有被鎖住,判斷是否超過了max_num_locks_,沒超過則在 stripe的map 中插入{key, txn_lock_info},超過了max_num_locks_,加鎖失敗,返回狀態信息。
  3. 如果key已經被鎖住了,要判斷加在key上的鎖是排它鎖還是共享鎖,如果是共享鎖,那事務的加鎖請求可以滿足;
  4. 如果是排它鎖,如果是同一事務,加鎖請求可以滿足,如果不是同一事務,如果鎖沒有超時,則加鎖請求失敗,否則搶佔過來。
  5. 以上是加鎖過程,解鎖過程類似,也是需要根據cf_id 和 key 計算出落到哪個stripe上,然後就是從map中把數據清理掉,同時還要喚醒該stripe上的等待線程,這個喚醒的粒度有點大。
void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
                                uint32_t column_family_id,
                                const std::string& key, Env* env) {
  std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);//通過cf_id 獲取Lock_Map
  LockMap* lock_map = lock_map_ptr.get();
  if (lock_map == nullptr) {
    // Column Family must have been dropped.
    return;
  }
  // Lock the mutex for the stripe that this key hashes to
  size_t stripe_num = lock_map->GetStripe(key);//根據key 計算落到哪個stripe上
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
  stripe->stripe_mutex->Lock();
  UnLockKey(txn, key, stripe, lock_map, env); //從鎖的map中清理掉該key
  stripe->stripe_mutex->UnLock();
  // Signal waiting threads to retry locking
  stripe->stripe_cv->NotifyAll();
}

ocksdb lock整體結構如下:

如果一個事務需要鎖住大量記錄,rocksdb 鎖的實現方式可能要比innodb 消耗更多的內存,innodb 的鎖結構如下圖所示:

由於鎖信息是常駐內存,我們簡單分析下RocksDB鎖佔用的內存。每個鎖實際上是unordered_map中的一個元素,則鎖佔用的內存為key_length+8+8+1,假設key為bigint,佔8個位元組,則100w行記錄,需要消耗大約22M內存。但是由於內存與key_length正相關,導致RocksDB的內存消耗不可控。我們可以簡單算算RocksDB作為MySQL存儲引擎時,key_length的範圍。對於單列索引,最大值為2048個位元組,具體可以參考max_supported_key_part_length實現;對於複合索引,索引最大長度為3072個位元組,具體可以參考max_supported_key_length實現。

假設最壞的情況,key_length=3072,則100w行記錄,需要消耗3G內存,如果是鎖1億行記錄,則需要消耗300G內存,這種情況下內存會有撐爆的風險。因此RocksDB提供參數配置rocksdb_max_row_locks,確保內存可控,默認rocksdb_max_row_locks設置為1048576,對於大部分key為bigint場景,極端情況下,也需要消耗22G內存。而在這方面,InnoDB則比較友好,hash表的key是(space_id, page_no),所以無論key有多大,key部分的內存消耗都是恆定的。InnoDB在一個事務需要鎖大量記錄場景下是有優化的,多個記錄可以公用一把鎖,這樣也間接可以減少內存。

總結

RocksDB 事務鎖的實現整體來說不複雜,只支持行鎖,還不支持gap lock ,鎖占的資源也比較大,可通過rocksdb_max_row_locks 限制事務施加行鎖的數量。

利益相關:

網易輕舟微服務,提供分佈式事務框架GTXS,支持跨服務事務、跨數據源事務、混合事務、事務狀態監控、異常事務處理等能力,應用案例 工商銀行