CMU15445 (Fall 2019) 之 Project#1 – Buffer Pool 詳解

前言

這個實驗有兩個任務:時鐘替換算法和緩衝池管理器,分別對應 ClockReplacerBufferPoolManager 類,BufferPoolManager 會用 ClockReplacer 挑選被換出的頁,並通過 DiskManager 將換出的頁寫到數據庫文件中。下面介紹這兩個類的實現過程。

代碼實現

如果直接克隆 Bustub 倉庫,得到的是 fall 2021 的實驗代碼,對於 fall 2019,可以將 commit 切換至 5972018: Fix typo in type.cpp(#66)。但是這樣引入一個坑,就是需要將 build_support/gtest_CMakeLists.txt.in 的內容改為:

cmake_minimum_required(VERSION 3.8)

project(googletest-download NONE)

include(ExternalProject)
ExternalProject_Add(googletest
        GIT_REPOSITORY [email protected]:google/googletest.git
        GIT_TAG main
        SOURCE_DIR "${CMAKE_BINARY_DIR}/googletest-src"
        BINARY_DIR "${CMAKE_BINARY_DIR}/googletest-build"
        CONFIGURE_COMMAND ""
        BUILD_COMMAND ""
        INSTALL_COMMAND ""
        TEST_COMMAND ""
        )

這裡主要修改了 GIT_TAGmain,因為 googletest 倉庫似乎將 master 分支重命名為 main 了。

ClockReplacer 類

項目主頁對該類的實現方式做出了一點介紹:

The size of the ClockReplacer is the same as buffer pool since it contains placeholders for all of the frames in the BufferPoolManager. However, not all the frames are considered as in the ClockReplacer. The ClockReplacer is initialized to have no frame in it. Then, only the newly unpinned ones will be considered in the ClockReplacer. Adding a frame to or removing a frame from a replacer is implemented by changing a reference bit of a frame. The clock hand initially points to the placeholder of frame 0. For each frame, you need to track two things: 1. Is this frame currently in the ClockReplacer? 2. Has this frame recently been unpinned (ref flag)?

In some scenarios, the two are the same. For example, when you unpin a page, both of the above are true. However, the frame stays in the ClockReplacer until it is pinned or victimized, but its ref flag is modified by the clock hand.

簡單翻譯一下,就是 ClockReplacer 類內部維護了一個 frame 的集合,集合大小和緩衝池的大小一致。由於緩衝池中的某些 frame 正在被別的線程訪問,這些 framepin count (等於訪問該幀的線程數量)會大於 0,此時這些 frame 不允許被換出,換個角度來說,就是這些 frame 不在 ClockReplacer 維護的集合中。對於可以被換出的 frame,它必須滿足兩個條件:

  • pin count 為 0,即該幀在 ClockReplacer 中。一旦某個幀的 pin count 大於零,就要被移出 ClockReplacer(調用 ClockReplacer::Pin
  • reference bitfalse,即該幀最近沒被訪問過。對於 pin count 剛變成 0 而被加入 ClockReplacer 的幀而言,由於它剛被訪問過,所以其 reference bittrue(調用 ClockReplacer::Unpin

至於時鐘替換算法的過程,其實就是按順序從 frame 集合中挑選出一個滿足上述換出條件的過程。為了維護時鐘指針的位置並保證線程安全,需要添加一個時鐘指針成員 clock_hand_ 和一個讀寫鎖 mutex_,幀集合 frames_ 的每個元素代表該幀是否在 ClockReplacer 中及其 reference bit

/**
 * ClockReplacer implements the clock replacement policy, which approximates the Least Recently Used policy.
 */
class ClockReplacer : public Replacer {
 public:
  /**
   * Create a new ClockReplacer.
   * @param num_pages the maximum number of pages the ClockReplacer will be required to store
   */
  explicit ClockReplacer(size_t num_pages);

  /**
   * Destroys the ClockReplacer.
   */
  ~ClockReplacer() override;

  bool Victim(frame_id_t *frame_id) override;

  void Pin(frame_id_t frame_id) override;

  void Unpin(frame_id_t frame_id) override;

  size_t Size() override;

 private:
  frame_id_t clock_hand_ = 0;
  std::vector<std::tuple<bool, bool>> frames_;
  std::shared_mutex mutex_;
};

各個方法的定義如下,裏面使用了 std::lock_guard 以保證代碼是異常安全的:

ClockReplacer::ClockReplacer(size_t num_pages) {
  for (size_t i = 0; i < num_pages; ++i) {
    frames_.push_back(std::make_tuple(false, false));
  }
}

ClockReplacer::~ClockReplacer() = default;

bool ClockReplacer::Victim(frame_id_t *frame_id) {
  if (Size() == 0) {
    return false;
  }

  std::lock_guard<std::shared_mutex> lock(mutex_);
  while (true) {
    auto &[contains, ref] = frames_[clock_hand_];
    if (contains) {
      if (ref) {
        ref = false;
      } else {
        *frame_id = clock_hand_;
        contains = false;
        return true;
      }
    }
    clock_hand_ = (clock_hand_ + 1) % frames_.size();
  }
}

void ClockReplacer::Pin(frame_id_t frame_id) {
  assert(static_cast<size_t>(frame_id) < frames_.size());
  std::lock_guard<std::shared_mutex> lock(mutex_);
  auto &[contains, ref] = frames_[frame_id];
  contains = false;
  ref = false;
}

void ClockReplacer::Unpin(frame_id_t frame_id) {
  assert(static_cast<size_t>(frame_id) < frames_.size());
  std::lock_guard<std::shared_mutex> lock(mutex_);
  auto &[contains, ref] = frames_[frame_id];
  contains = true;
  ref = true;
}

size_t ClockReplacer::Size() {
  std::shared_lock<std::shared_mutex> lock(mutex_);
  size_t size = 0;
  for (auto &[contains, ref] : frames_) {
    size += contains;
  }
  return size;
}

在終端輸入命令:

mkdir build
cd build
cmake ..
make clock_replacer_test
./test/clock_replacer_test

測試結果如下:

時鐘替換算法測試結果

BufferPoolManager 類

這裡將互斥鎖換成了讀寫鎖,用於保護 page_table_pages_free_list_,同時引入了一個輔助函數 GetVictimFrameId()

class BufferPoolManager {
 // 省略部分代碼

 protected:

  /**
   * select a victim frame from the free list or replacer.
   * @return the frame id, INVALID_PAGE_ID if the victim could not be found
   */
  frame_id_t GetVictimFrameId();

  /** This latch protects shared data structures. We recommend updating this comment to describe what it protects. */
  std::shared_mutex latch_;
};

BufferPoolManager 類要求我們實現五個函數:

  • FetchPageImpl(page_id)
  • NewPageImpl(page_id)
  • UnpinPageImpl(page_id, is_dirty)
  • FlushPageImpl(page_id)
  • DeletePageImpl(page_id)
  • FlushAllPagesImpl()

下面會一個個實現上述函數。

FetchPageImpl(page_id)

該函數實現了緩衝池的主要功能:向上層提供指定的 page。緩衝池管理器首先在 page_table_ 中查找 page_id 鍵是否存在:

  • 如果存在就根據 page_id 對應的 frame_id 從緩衝池 pages_ 取出 page
  • 如果不存在就通過 GetVictimFrameId() 函數選擇被換出的幀,該函數首先從 free_list_ 中查找緩衝池的空位,如果沒找到空位就得靠上一節實現的 ClockReplacer 選出被換出的冤大頭

具體代碼如下:

Page *BufferPoolManager::FetchPageImpl(page_id_t page_id) {
  // 1. Search the page table for the requested page (P).
  std::lock_guard<std::shared_mutex> lock(latch_);
  Page *page;

  // 1.1  If P exists, pin it and return it immediately.
  auto it = page_table_.find(page_id);
  if (it != page_table_.end()) {
    page = &pages_[it->second];
    if (page->pin_count_++ == 0) {
      replacer_->Pin(it->second);
    }
    return page;
  }

  // 1.2  If P does not exist, find a replacement page (R) from either the free list or the replacer.
  //      Note that pages are always found from the free list first.
  frame_id_t frame_id = GetVictimFrameId();
  if (frame_id == INVALID_PAGE_ID) {
    return nullptr;
  }

  // 2. If R is dirty, write it back to the disk.
  page = &pages_[frame_id];
  if (page->IsDirty()) {
    disk_manager_->WritePage(page->page_id_, page->data_);
  }

  // 3. Delete R from the page table and insert P.
  page_table_.erase(page->GetPageId());
  page_table_[page_id] = frame_id;

  // 4. Update P's metadata, read in the page content from disk, and then return a pointer to P.
  disk_manager_->ReadPage(page_id, page->data_);
  page->update(page_id, 1, false);
  replacer_->Pin(frame_id);
  return page;
}

frame_id_t BufferPoolManager::GetVictimFrameId() {
  frame_id_t frame_id;

  if (!free_list_.empty()) {
    frame_id = free_list_.front();
    free_list_.pop_front();
  } else {
    if (!replacer_->Victim(&frame_id)) {
      return INVALID_PAGE_ID;
    }
  }

  return frame_id;
}

上述代碼中還用了一個 Page::update 輔助函數,用於更新 page 的元數據:

/**
* update the meta data of page
* @param page_id the page id
* @param pin_count the pin count
* @param is_dirty is page dirty
* @param reset_memory whether to reset the memory of page
*/
void update(page_id_t page_id, int pin_count, bool is_dirty, bool reset_memory = false) {
  page_id_ = page_id;
  pin_count_ = pin_count;
  is_dirty_ = is_dirty;
  if (reset_memory) {
    ResetMemory();
  }
}

NewPageImpl(page_id)

該函數在緩衝池中插入一個新頁,如果緩衝池中的所有頁面都正在被線程訪問,插入失敗,否則靠 GetVictimFrameId() 計算插入位置:

Page *BufferPoolManager::NewPageImpl(page_id_t *page_id) {
  // 0. Make sure you call DiskManager::AllocatePage!
  std::lock_guard<std::shared_mutex> lock(latch_);

  // 1. If all the pages in the buffer pool are pinned, return nullptr.
  if (free_list_.empty() && replacer_->Size() == 0) {
    *page_id = INVALID_PAGE_ID;
    return nullptr;
  }

  // 2. Pick a victim page P from either the free list or the replacer. Always pick from the free list first.
  frame_id_t frame_id = GetVictimFrameId();
  if (frame_id == INVALID_PAGE_ID) {
    *page_id = INVALID_PAGE_ID;
    return nullptr;
  }

  // 3. Update P's metadata, zero out memory and add P to the page table.
  Page *page = &pages_[frame_id];
  if (page->IsDirty()) {
    disk_manager_->WritePage(page->page_id_, page->data_);
  }

  *page_id = disk_manager_->AllocatePage();
  page_table_.erase(page->GetPageId());
  page_table_[*page_id] = frame_id;
  // 需要把 dirty bit 設置為 false 才能通過 IsDirty 測試用例
  page->update(*page_id, 1, true, true);

  // 4. Set the page ID output parameter. Return a pointer to P.
  return page;
}

DeletePageImpl(page_id)

該函數從緩衝池和數據庫文件中刪除一個 page,並將其 page_id 設置為 INVALID_PAGE_ID

bool BufferPoolManager::DeletePageImpl(page_id_t page_id) {
  // 0.   Make sure you call DiskManager::DeallocatePage!
  std::lock_guard<std::shared_mutex> lock(latch_);

  // 1. search the page table for the requested page (P).
  // If P does not exist, return true.
  auto it = page_table_.find(page_id);
  if (it == page_table_.end()) {
    return true;
  }

  // 2. If P exists, but has a non-zero pin-count, return false. Someone is using the page.
  Page &page = pages_[it->second];
  if (page.pin_count_ > 0) {
    return false;
  }

  // 3. Otherwise, P can be deleted. Remove P from the page table, reset its metadata and return it to the free list.
  disk_manager_->DeallocatePage(page_id);
  page_table_.erase(page_id);
  page.update(INVALID_PAGE_ID, 0, false, true);
  free_list_.push_back(it->second);
  return true;
}

UnpinPageImpl(page_id, is_dirty)

該函數用以減少對某個頁的引用數 pin count,當 pin_count 為 0 時需要將其添加到 ClockReplacer 中:

bool BufferPoolManager::UnpinPageImpl(page_id_t page_id, bool is_dirty) {
  std::lock_guard<std::shared_mutex> lock(latch_);
  auto it = page_table_.find(page_id);
  if (it == page_table_.end()) {
    return false;
  }

  Page &page = pages_[it->second];
  if (page.pin_count_ <= 0) {
    return false;
  }

  // add page to replacer when the pin count is 0
  page.is_dirty_ |= is_dirty;
  if (--page.pin_count_ == 0) {
    replacer_->Unpin(it->second);
  }

  return true;
}

FlushPageImpl(page_id)

如果緩衝池的 page 被修改過,需要將其寫入磁盤以保持同步:

bool BufferPoolManager::FlushPageImpl(page_id_t page_id) {
  // Make sure you call DiskManager::WritePage!
  std::shared_lock<std::shared_mutex> lock(latch_);
  auto it = page_table_.find(page_id);
  if (it == page_table_.end()) {
    return false;
  }

  // write page to disk if it's dirty
  Page &page = pages_[it->second];
  if (page.IsDirty()) {
    disk_manager_->WritePage(page_id, pages_[it->second].data_);
    page.is_dirty_ = false;
  }

  return true;
}

FlushAllPagesImpl()

該函數將緩衝池中的所有 page 寫入磁盤:

void BufferPoolManager::FlushAllPagesImpl() {
  // You can do it!
  std::lock_guard<std::shared_mutex> lock(latch_);
  for (size_t i = 0; i < pool_size_; ++i) {
    Page &page = pages_[i];
    if (page.page_id_ != INVALID_PAGE_ID && page.IsDirty()) {
      disk_manager_->WritePage(i, page.data_);
      page.is_dirty_ = false;
    }
  }
}

測試

在終端輸入指令:

cd build
make buffer_pool_manager_test
./test/buffer_pool_manager_test

測試結果如下:

緩衝池管理器測試結果

總結

該實驗考察了學生對並發和 STL 的掌握程度,由於注釋中列出了實現步驟(最搞的是 You can do it! 注釋),所以代碼寫起來也比較順暢,以上~~