CMU 15-445:project #2 - Hash Index

据说这个project是在CMU 15445中难度最大的一个,但是我还是挺过来了.到了学期中期,各种实验,考试,小班任务也开始繁重起来,希望自己能高效地并发,继续做好这套项目.

1.实现基础

1) 索引基础概念

首先介绍一下索引相关的一些基础概念吧.

在此之前我接触过B+树方式的索引,我对于索引的理解,是一种基于存储文件所构建的数据结构,如果没有这种数据结果,当我们需要读写文件中的内容时,往往只能顺序查找,一旦数据量大了之后,带来的开销时非常惊人的.而引入索引后,在这里以B+ tree为例,B+ tree的索引文件中,会有类似于指针的域,指向其他的节点Page(有root,internal,leaf节点Page之分),尽管文件表面上看似是扁平的,但是通过这些“指针”的联系却可以组织成一种“树状”的结构,进而如果我们想获取某个leaf Page中的数据,就从一个全局已知的Root Page开始,借助其中的“指针”域,像下一个Page进行递归,最终定位到数据所在的leaf Page,这样对于文件Page的遍历次数相比顺序查找而言就大大减小了.不过索引虽然便于查询,但是在写的时候会由于更新索引而带来额外的开销.

所以我对此作出的简单总结一种依附于存储的数据所构建的,能减小查找开销的数据结构.

此外我查阅了《数据密集型应用系统设计》中关于Hash索引的一些概念,在此也简单的记录一下其中讲解的一些理论,虽然在本次project中用不到,但我觉得挺有意思的

《DDIA》书中关于Hash Index的叙述中,举了一个简单的例子,即Bitcask的默认存储索引的实现:将Hash Table放置于内存中,该Hash Table维护的是key和offset的映射,在该Table只关注key,而不关注具体的value,在磁盘的存储中相同key往往是连续存储的.对于新出现的key值往往需要更新Hash Table.像Bitcask这种比较适合key值数量比较少的情况,应该足够将其放入内存中.

此外《DDIA》关于Hash Index的叙述还设计到一些关于日志的东西,对于大部分存储在内存中的Hash Table而言,需要借助日志来保证其可靠性.但是对于Hash Table的更新,如果采用一般的追加式的日志方式,除此之外,为了限制日志占用磁盘的大小,所以需要一些压缩方式,因此书中关于日志的讨论也主要围绕着压缩与追加.

关于追加式日志的优越性:顺序写相比随机写,开销是比较小的.

此外关于Hash Index的局限性,书中提到一般比较适合于内存中保存(因此最好不要有太大的key数量),在磁盘上表现一般,因为随机IO太多了(其实在本次project中就是用了基于磁盘的可拓展Hash),此外在区间上查询的效率不高.

在这里我感觉日志对于保障内存中数据的可靠性具有重要的意义.

2) 可拓展哈希

首先简单地写一下可拓展Hash相关的基础知识.

在这里分享一个链接Extendible Hashing (Dynamic approach to DBMS).

可拓展Hash是一种动态的Hash,主要由两部分组成,一部分是一个directory和一个bucket,前者有一个,后者有多个,用来存储实际的数据,是一种一对多的关系.

基本名词的解释如下:

  • Directory,一般占据一个Page,所以其本身也就会有一个page_id,主要内容是Bucket(Page)的指针列表.其中Hash函数可以将一个key得出起所处的Bucket的索引(通常是在Directory的Bucket list中的一个索引).其中Bucket list的条目数量为2 ^ global depth .
  • Buckets.存储实际的k-v对,同一个Page对应的Buckets在directory中虽维护的指针数量为2 ^ global depth - 2 ^ local depth.
  • Global Depth,决定了Directory中的Buckets指针的数量以及用于Hash函数中(用于mask的长度).
  • Local Depth,类似于上面的,每个Bucket Page都会对应一个Local Depth,小于等于Global Depth.可以确定一个Bucket Page中的容量.

其中Global Depth,Local Depth以及Bucket Page数量的关系如下:

* Verify the following invariants:
* (1) All LD <= GD.
* (2) Each bucket has precisely 2^(GD - LD) pointers pointing to it.
* (3) The LD is the same at each index with the same bucket_page_id  

至于一些具体的操作过程,参考文章中的例子吧,我就不多写了.

其实在实现的过程中还有其他的细节,在后面再细说.

下面先简单地描述一下工作流:

  • kv pair,一般是将其key转化成二进制的形式.
  • 然后根据Global Depth将二进制串得出一个对应的hash值,对应的是Bucket的idx.
  • 对于插入的操作来说,收看需要检查改Bucket是否是overflow的.如果插入后不会超出该Bucket Page对应的容量(由Local Depth得来),直接将数据插入该Bucket Page中就可以了,如果有overflow的现象,那么则会更加复杂一点.可以进一步地细分为两种情况:
    1. 如果当前的Local Depth已经和Global Depth相等了,那么则需要将Directory进行扩容,简而言之是扩张一倍(Global Depth + 1,该Bucket对应的Local Depth也要增长),其中新扩张出的Bucket Page所指向的Page是根据已有的一半的Bucket Page得来的.(后面细说).
    2. 如果Local Depth不等于Global Depth,那么只是增加该Bucket Dpeth的Local depth,对于对应的Split Page的Local Depth与其相等.
  • 将原先的Bucket Page和Split Page中的k-v进行重新分配.

最后简单说一下其优势与局限性.

优势:

  • 数据检索成本很低,计算一个Hash函数,然后在一个Bucket内部计算即可,比如说一个Local Depth最多为9,那么最多有512个元素,在单个Bucket内部进行遍历的成本较低.
  • 能够一边增长数据一边扩容,这个过程中不会有数据的丢失.
  • 尽管在扩容的过程中会涉及到很多kv pair的重新分配,但是与此同时还是英语Hash函数的对应变化,尽管发生了Rehash,但是在后来的Hash函数中仍然可以正确地检索出新的位置.

局限性:

  • 如果插入的数据的Hash值集中于某一段范围内(比如每次Hash函数都会检索到同一个Bucket),将会带来比较频繁地扩容和rehash,成本因此比较高.
  • 每个Bucket Page的容量还是固定的.

其实我认为这种索引结构最巧妙的一点在于,当对一些数据进行ReHash的时候,其Hash函数的计算结果仍然可以很巧妙的适应.我觉得一般情况下对于Hash Index的设计中,比较困难的地方在于数据在桶中的迁移如何与Hash函数映射变化联系起来.

2.实现过程

在本次lab中的代码主要分为两个部分,一部分是关于Page的实现(task1),另一部分在于关于Hash的实现(也就是task2,3),后半部分的实现非常复杂.

1) Task 1

基本上结合上面的基础概念就能做个大概.

其中在src/storage/page/hash_table_header_page.cpp中我们所需要实现的一些函数以各种Set和Get函数为主,即使对可拓展Hash这种数据结构一无所知也能写个大概.不过还是有一些需要注意的地方.

首先是关于IncrGlobalDepth的实现,这一部分需要伴随着扩容的实现,也就是将当前目录中的桶的数量扩大一倍.或者不在IncreGlobalDepth写这一步可以(就像我额外增加了一个Expand函数),但是一定要保证需要增加全局深度的时候也作出相应的扩张工作!相比之下,DecrGlobalDepth直接将GlobalDepth递减就可以,因为GlobalDepth一旦降下来之后,有效范围只是2 ^ GlobalDepth的桶,剩下的不用管,不在有效的访问范围之内.

此外还有关于CanShrink,其实现如下:

bool HashTableDirectoryPage::CanShrink() {
  if (global_depth_ == 0) {
    return false;
  }
  auto curr_size = Size();
  for (size_t i = 0; i < curr_size; i++) {
    if (local_depths_[i] == global_depth_) {
      return false;
    }
  }
  return true;
}

简而言之就是检查,当前有效范围内的桶有没有等于global_depth_的,如果没有就是可以缩容的.

此外我还追加了一些有用的函数,尤其是一些涉及到位运算技巧的函数我认为很有帮助,在此记录一下.

static uint32_t GetUpbitmask(uint32_t n) {
  if (n == 0) {
    return 0;
  }
  uint32_t mask = 0x1;
  return mask << (n - 1);
}
static uint32_t Upbit(uint32_t str, uint32_t n) { return str ^ GetUpbitmask(n); }

这一套函数用于将一个给用的str,将其第n位比特串翻转.这个Upbit用于函数GetSplitImageIndex.(其实这个函数我最初没有看到,因此也就没有实现,我又实现了一个GetBrother,与之功能一样,也就是将一个给定的Bucket Index求出其所需要Split时对应的兄弟Bucket的Index,只要将其最高位比特翻转即可).

此外还有获取指定长度的掩码:

uint32_t HashTableDirectoryPage::GetMaskByLen(uint32_t len) const { return ((1 << len) - 1); }
uint32_t HashTableDirectoryPage::GetGlobalDepth() { return global_depth_; }
uint32_t HashTableDirectoryPage::GetGlobalDepthMask() { return GetMaskByLen(global_depth_); }

下面关于src/storage/page/hash_table_bucket_page.cpp,首先说明一下基本的数据结构:

//  For more on BUCKET_ARRAY_SIZE see storage/page/hash_table_page_defs.h
 char occupied_[(BUCKET_ARRAY_SIZE - 1) / 8 + 1];
 // 0 if tombstone/brand new (never occupied), 1 otherwise.
 char readable_[(BUCKET_ARRAY_SIZE - 1) / 8 + 1];
 MappingType array_[0];

首先是两个类似于位图的数据结构,关于occupied_有一说一我感觉在project 2中没怎么用到.其中readable_的作用是很显然的,表示当前array_对应的数据有没有被占用到.其中也有很多API是类似于Get,Set这种的,这里只简单地举几个例子的实现:

template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::IsEmpty() {
  uint8_t mask = 255;
  for (size_t i = 0; i < sizeof(readable_) / sizeof(readable_[0]); i++) {
    if ((readable_[i] & mask) > 0) {
      return false;
    }
  }
  return true;
}

遍历每一个readable,对此进行检查,如果检查到有mask与之与起来之后大于0,那么就意味着不是空的.

其中一些与SetReadable有关的操作如下实现:

template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::IsReadable(uint32_t bucket_idx) const {
  uint32_t bitidx;
  auto idx = BucketIdx2BitIdx(bucket_idx, &bitidx);
  return (readable_[idx] & bitidx) != 0;
}
template <typename KeyType, typename ValueType, typename KeyComparator>
void HASH_TABLE_BUCKET_TYPE::SetReadable(uint32_t bucket_idx) {
  uint32_t bitidx;
  auto idx = BucketIdx2BitIdx(bucket_idx, &bitidx);
  readable_[idx] |= bitidx;
}

2) Task 2,3

这里主要说明一下,我关于Insert,Remove,GetValue这几个函数的实现吧,由于官方建议不得张贴代码,所以我就少贴代码了.

首先是Insert,其算法过程如下:

  • 首先获取目录所在页,然后获取对应的Bucket Page.(这时有关于Page的获取都是以Buffer Pool为接口的,如果前面的实现不正确,那这里就会出问题).
  • 获取Bucket当前的Size,判断是否已经满了.
    • 如果未满,直接调用Bucket Page的Insert即可.
    • 进行SplitInsert,分裂完之后,再调用HASH_TABLE_TYPE::Insert.

其中在SplitInsert中有更多的细节,其算法过程如下:

  • 仍然是先获取目录Page和Bucket Page.
  • 检查Bucket Page是否还是满的,如果不是就调用HASH_TABLE_TYPE::Insert.
  • 检查Local Depth,不可以大于等于9,否则就超出了一个Page的范围.
  • 根据Local Depth和Global Depth的关系判断是否需要增加Global Depth并进行扩容.
  • 调用Buffer Pool中的New Page,New出一个新的Page作为Split Page.
  • 下面关于设置高度和Bucket Page的地方是比较容易出错的:
    • 获取兄弟Bucket,也就是Local Depth +1之后的最高位(第Local Depth对应的位)与之相反的Bucket.将其高度和指向的Page进行重置.与此同时还要注意对于同样指向Bucket Page的,也要增加对应的Local Depth,因此维护高度的统一.

这些所谓的兄弟节点需要注意的是,不一定只有一个,如果是对于Local Depth等于Global Depth的情况,确实可以肯定只有一个,否则就不一定只有一个,所以我写了一个for循环进行处理,其逻辑如下:

for (size_t i = 0; i < dir_page->Size(); i++) {
  if (i == page_idx || page_id != dir_page->GetBucketPageId(i)) {
    continue; //page_idx也就是之前获取的Bucket Page,在前面已经做出了Local Depth的增长.
  }
  dir_page->IncrLocalDepth(i); //与之Page Id相同的
  auto old_page_idx = old_mask & page_idx;
  auto new_page_idx = new_mask & page_idx;
  if ((old_mask & i) == old_page_idx && (new_mask & i) != new_page_idx) {
    dir_page->SetBucketPageId(i, new_page_id); //设置成新的Bucket Page,也就是前文对应的Local Depth所对应的最高位与之相反.比如说Bucket的Index是11111,Local Depth是4,那么10111,00111都是可以的,因此不止一个.
  }
}

其中old_mask和new_mask分别是将Local Depth增加前后的mask.比如说当Local Depth从2增长到3,那么old_mask就是11,后者是111.

  • 进行Rehash,也就是Bucket Page和New Bucket Page之间的重新分配.
  • 释放锁和Unpin相关的Page,然后再调用`HASH_TABLE_TYPE::Insert.`进行插入.

关于Remove的实现如下:

  • 获取目录页,然后获取对应的Bucket Page.
  • 对该Bucket Page调用其Remove方法.
  • 调用后检查是否位Empty,如果不是,就什么也不做,否则将相关的Page和锁释放后调用Merge.

关于Merge的细节如下:

  • 获取目录Page之后再获取Brother Page以及Bucket Page.
  • 做一些条件检查:
    • Depth为0的情况
    • 兄弟Page与Bucket Page Local Depth是否相等.
    • Bucket Page是否为empty.
  • 将Bucket Page 和Brother Page的高度-1,并且将Bucket Page设置成Brother Page所指向的Page,并将旧的Bucket Page所指向的Page进行delete.
  • 采用for循环对相关的Bucket进行调整,代码如下:
for (size_t i = 0; i < dir_page->Size(); i++) {
  page_id_t tmp_page_id = dir_page->GetBucketPageId(i);
  if (tmp_page_id != page_id && tmp_page_id != bro_page_id) {
    continue; 
  }// 要么等于旧的Page,要么和brother Page指向的一样.
  dir_page->SetBucketPageId(i, bro_page_id);
  dir_page->SetLocalDepth(i, local_depth);
}
  • 最后判断是否需要Shrink,注意要用while,而不是If.

至于GetValue有关的判断,比较简单,如下所示:

template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_TYPE::GetValue(Transaction *transaction, const KeyType &key, std::vector<ValueType> *result) {
  table_latch_.RLock();
  HashTableDirectoryPage *dir_page = FetchDirectoryPage();

  auto page_idx = KeyToDirectoryIndex(key, dir_page);
  auto page_id = dir_page->GetBucketPageId(page_idx);

  HASH_TABLE_BUCKET_TYPE *bucket_page = FetchBucketPage(page_id);
  Page *page = reinterpret_cast<Page *>(bucket_page);
  page->RLatch();
  bool res = bucket_page->GetValue(key, comparator_, result);
  page->RUnlatch();
  buffer_pool_manager_->UnpinPage(page_id, false);
  buffer_pool_manager_->UnpinPage(directory_page_id_, false);
  table_latch_.RUnlock();
  return res;
}

并发控制中锁的使用

这其中涉及到的锁的使用,主要有两类,一类是表级锁,可以理解为对于整个Hash Table的目录的锁,如果涉及到读目录Page的操作,因此也就需要对其上读锁,如果涉及到对于目录Page的修改,就需要上写锁.另一类是页级锁,一般用于对于Bucket Page以及Brother Page等的使用.

此外除了上锁以外,还有Fetch(或者New)时候一定要有Unpin与之对应,否则replacer就会卡住,最终导致死锁.Unpin中的脏位也要额外注意.

可以看GetValue的实现.上锁与解锁,Fetch与Unpin都是严格对应的.

一些由于并发需要额外考虑的条件判断

这里的问题主要与Insert中的SplitInsert和Remove中的Merge有关,由于我在这里的实现采用了一种类似于两段锁的方式,简而言之,比如说我在Insert函数调用并且需要在SplitInsert时,再调用SplitInsert调用时,我将原本在Insert相关的锁进行了释放,然后在SplitInsert中还会有锁的重新获取,就想这样:

// SpilitInsert调用之前
page->WUnlatch();
buffer_pool_manager_->UnpinPage(directory_page_id_, false);
buffer_pool_manager_->UnpinPage(page_id, false);
table_latch_.RUnlock();
result = SplitInsert(transaction, key, value);
// Merge调用之前
page_bucket->WUnlatch();
buffer_pool_manager_->UnpinPage(directory_page_id_, false);
buffer_pool_manager_->UnpinPage(page_id, true);
table_latch_.RUnlock();
Merge(transaction, key, value);

所以在这种实现方式中可以说,涉及到对于一些Bucket Page,Directory Page的操作并不是原子性,也就是说在进行入Merge或者SplitInsert之前由于线程的切换,极有可能中间穿插了来自其他线程的操作.这些操作有可能导致原本满足的条件不再满足.因此对于这些条件还要在MergeSpiltInsert做额外的判断.

比如说SplitInsert中关于“Bucket是否仍未满的”的判断,这主要针对在SpiltInsert调用前,锁释放后的短暂瞬间可能会穿插其他线程的Remove进而导致该Bucket不再是满的,因此也就不需要在做Split了.如下:

uint32_t bucket_size = 1 << dir_page->GetLocalDepth(page_idx);
if (bucket_page->NumReadable() < bucket_size) {
  page_bucket->WUnlatch();
  buffer_pool_manager_->UnpinPage(directory_page_id_, false);
  buffer_pool_manager_->UnpinPage(page_id, false);
  table_latch_.WUnlock();
  return Insert(transaction, key, value);
}

同理在Merge中,也有Bucket Page是否仍未empty的判断,如下:

if (!bucket_page->IsEmpty()) {
  page_bucket->RUnlatch();
  buffer_pool_manager_->UnpinPage(page_id, false);
  buffer_pool_manager_->UnpinPage(directory_page_id_, false);
  table_latch_.WUnlock();
  return;
}

除了这几个API之外,还有构造函数也是需要注意的,构造函数需要为目录New出一个Page,也需要为第一个Bucket进行NewPage.后面也别忘了Unpin就可以了.

3.小结

最后,满分通过,但是时间效率比较感人,不好意思说了.

image-20221020235246143

接着往下冲,争取11月中旬之前做完所有关于CMU 15445的工作!