CMU 15-445:project #4 - Concurrency Control

1.相关基础知识

相关基础知识见另一篇博客并发控制小叙

2.代码实现

这一部分需要实现的API有这么几个:

bool LockShared(Transaction *txn, const RID &rid);
bool LockExclusive(Transaction *txn, const RID &rid);
bool LockUpgrade(Transaction *txn, const RID &rid);
bool Unlock(Transaction *txn, const RID &rid);

下面简要说明一下其实现的思路:

LockShared:

(1)先判断事务隔离级别和两阶段锁状态是否合法,否则设置为ABORT后返回.

(2)构造读请求,并追加到队列.

(3)如果该事务已经有了读锁或者写锁,那么处理对应的读请求后,返回.如下所示:

if (txn->IsSharedLocked(rid) || txn->IsExclusiveLocked(rid)) {
  lock_table_[rid].upgrading_ = self_txn_id;
  request_it->granted_ = true;
  lock_table_[rid].cv_.notify_all();
  return true;
}
  1. 之后结合条件变量进行等待,当符合授予条件时,退出等待.
bool can_grant = CanGrant(txn, rid, LockOpType::SHARED_OP, request_it);
while (!can_grant) {
  lock_table_[rid].cv_.wait(guard);
  if (txn->GetState() == TransactionState::ABORTED) {
    lock_table_[rid].cv_.notify_all();
    throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
  }
  can_grant = CanGrant(txn, rid, LockOpType::SHARED_OP, request_it);
}

在这里使用了条件变量进行等待,当检测到该事务被ABORTED时,对应的是后面死锁预防所做的处理,也就是被终止和抢占年轻事务.

  1. 最后授予成功,进行如下处理:
txn->GetSharedLockSet()->emplace(rid);
lock_table_[rid].upgrading_ = self_txn_id;
request_it->granted_ = true;
lock_table_[rid].cv_.notify_all();

return true;

LockExclusive

这一部分的实现,基本上和LockShared的实现是比着葫芦画瓢的.最主要的区别在于事务隔离级别上的处理.其中LockShared增加了IsolationLevel::READ_UNCOMMITTED的判断,如果出现此事务隔离状态,LockShared就会将其视为错误,不去处理它,该事务被设置为ABORT,并且返回一个异常.然而对于LockExclusive并不涉及类似的处理.

LockUpgrade

  1. 仍然是对于两阶段锁状态的判断.

  2. 如果先前该事务已经有排他锁了,那么就可以直接返回了.

  3. 遍历锁请求表队列,找出已经授予的共享锁,如果一个都找不到,就返回错误.

  4. 将找到的所有该事务已经授予的共享锁从队列中移除.

  5. 构造一个排他锁请求,加入队列,然后陷入等待,知道符合授予条件.

  6. 最后的处理如下:

txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->emplace(rid);
lock_table_[rid].upgrading_ = txn->GetTransactionId();
find_it->lock_mode_ = LockMode::EXCLUSIVE;
lock_table_[rid].cv_.notify_all();

UnLock

  1. 移除所有该事务在该RID中的请求.

  2. 根据锁隔离级别设置Shrinking状态.

  3. 更新LockSet

txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->erase(rid);
  1. 通过条件变量唤醒正在等待的事务.

其中我踩过一个坑,我最初以为对于ABROT或者COMMIT状态的事务调用UnLock,应该将返回异常.其实不然,通过阅读测试代码,可以发现对于一个被ABORT的事务,除了将其修改的数据及其索引回滚之外,还需要将其所有锁释放,因此Abort函数还是会调用UnLock的,如果不允许ABORT状态成功调用UnLock就会造成事务无法正常回滚(还有遗留的锁).

下面几个问题打算单独列出来说一下.

判断锁授予的条件

这一部分主要体现在我对于CanGrant这一个函数的实现,首先需要遵循的一点是,锁管理器不会优先将锁授予给晚到达的请求.因此在一个锁被授予之前应该保证之前的锁请求都已经被授予,这通过遍历锁请求队列可以实现,在我的实现中,每当有一个新的锁请求到来时,我都会将该请求追加到队列的front中,因此通过迭代器++,可以访问到更早的请求.

for (auto reit = reqit; reit != lock_table_[rid].request_queue_.end(); ++reit) {
  if (!reit->granted_) { // 如果其中有更早的请求就判读为不可授予.
    can_grant = false;
    break;
  }
  bool is_conflict = IsConflictLock(txn, *reit, mode); // 锁冲突
  can_grant &= is_conflict;
  if (!can_grant) {
    break;
  }
}

关于IsConfilictLock,就是简单地根据Shared还是Exclusive,如下所示:

txn_id_t self_txn_id = txn->GetTransactionId();
if (locktype == LockOpType::SHARED_OP) {
  return !(request.txn_id_ != self_txn_id && request.lock_mode_ == LockMode::EXCLUSIVE &&
           request.txn_id_ != INVALID_TXN_ID);
}
if (locktype == LockOpType::EXCLUSIVE_OP) {
  return (request.txn_id_ == self_txn_id || request.txn_id_ == INVALID_TXN_ID);
}
return false;

此外,关于这部分还会涉及到死锁的判断与处理.在后面的部分会说.

死锁的判断与处理

这一部分的实现体现在CanGrant这一个函数中:

bool have_abort = false;
for (auto reit = start_it; reit != lock_table_[rid].request_queue_.end();) {
  txn_id_t txn_id = reit->txn_id_;
  if (txn_id > self_txn_id && !IsConflictLock(txn, *reit, mode)) {
    Transaction *tmp_txn = TransactionManager::GetTransaction(txn_id);
    tmp_txn->SetState(TransactionState::ABORTED);
    have_abort = true;
    reit = lock_table_[rid].request_queue_.erase(reit);
  } else {
    ++reit;
  }
}
if (have_abort) {
  lock_table_[rid].cv_.notify_all();
  return true;
}

根据比较txn_id_t来判断事务的新老,并且检查是否冲突.如果冲突,无论年轻事务有没有被授予锁,都对其进行ABORT处理,除此之外还需要将其对应的request从请求队列中移除.如果这个过程中有事务因为死锁预防被终止,还会调用条件变量的Notify唤醒正在等待的事务(线程).

事务隔离级别与两阶段锁的实现

这一部分需要结合LockManager中的API以及执行器中对于这些API的调用方式来进行说明.总的来说在需要我们实现的三种隔离级别中,从“读未提交”,“读已提交”,“可重复读”事务隔离级别是依次增强的.

首先看IsolationLevel::READ_UNCOMMITTED相关的实现.这种隔离级别给人的感觉是,其关于读数据的操作是非常自由的,某个事物如果想要对某个数据进行读,不需要考虑是否有其他事务正在写.因此,具体在实现上就是:当需要读取数据时,不需要LockShared.因此在LockShared的实现中,如果检测到来自一个“读未提交”隔离级别的事务,那么将其设置为ABORT,抛出异常即可.如下所示:

if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
  txn->SetState(TransactionState::ABORTED);
  throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
}

与此相呼应的是有关于ScanExecutor的实现,也就是说在该执行器中,不会对此隔离级别的事务LockShared.如下:

if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED) {
   exec_ctx_->GetLockManager()->LockShared(txn, tup.GetRid());
}

在“读未提交中”,关于有关于写锁在执行器中的实现,是正常地上锁和解锁的.

然后是关于"读已提交"的实现.在执行器中,会执行正常的上锁和解锁方式.相比“读未提交”最大的区别就在于关于读取数据的操作需要尽心"LockShared",这使得读取数据不再自由,而是需要和其他事务的“Exclusive”进行竞争,要么先于其他事务的“LockExclusive”,要么等到其他事务将锁释放之后(这种情况下,就意味着脏读问题可以避免了).

 // scan_execuotr
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED) {
    exec_ctx_->GetLockManager()->LockShared(txn, tup.GetRid());
 }
......
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
    exec_ctx_->GetLockManager()->Unlock(txn, tup.GetRid());
 }
// update_executor
if (transaction->IsSharedLocked(tmp_rid)) {
	 exec_ctx_->GetLockManager()->LockUpgrade(transaction, tmp_rid);
 } else {
    exec_ctx_->GetLockManager()->LockExclusive(transaction, tmp_rid);
 }
//......
if (transaction->GetIsolationLevel() != IsolationLevel::REPEATABLE_READ) {
    exec_ctx_->GetLockManager()->Unlock(transaction, tmp_tup.GetRid());
 }

至此,我们结合可以发现,“读已提交”没有遵循所谓的两段锁协议,因此一个事务中上锁和解锁的过程是交织在一起的,而非集中与两个阶段.这因此也就容易导致不可重复读问题.因此“可重复读”为了将两个行为分为两个阶段,其实现做法是,不再执行器中释放锁,而是将锁的释放动作集中到Abort或者Commit进行处理.

因此在执行器中相关的代码,对于“可重复读是不释放锁的”.比如说:

// update_executor	
if (transaction->GetIsolationLevel() != IsolationLevel::REPEATABLE_READ) {
   exec_ctx_->GetLockManager()->Unlock(transaction, tmp_tup.GetRid());
}

在其Unlock中也体现了两段锁协议:

if (txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
  txn->SetState(TransactionState::SHRINKING);
}

因此做个总结:

隔离级别 读锁 写锁 两段锁协议 问题
读未提交 不上读锁 执行器中正常上锁和解锁 不遵循 脏读,不可重复读,幻读
读已提交 执行器中正常上锁和解锁 执行器中正常上锁和解锁 不遵循 不可重复读,幻读
可重复读 执行器中正常上锁,不解锁;在Abort或者Commit集中解锁 执行器中正常上锁,不解锁;在Abort或者Commit集中解锁 遵循 幻读

执行器的其他细节

执行器关于Update,Delete等操作是需要对索引结构进行更新的,这里添加的代码用于生成一个写索引记录,为后续的回滚服务.在我的实现中,遵循先写记录,在写索引的方式.其实现如下:

for (auto &index : indexes) {
  IndexWriteRecord new_record(tmp_rid, info_->oid_, WType::INSERT, tmp_next, info_->oid_,
                              exec_ctx_->GetCatalog());
  txn->AppendTableWriteRecord(new_record);
  index->index_->InsertEntry(
      tmp_next.KeyFromTuple(info_->schema_, index->key_schema_, index->index_->GetKeyAttrs()), tmp_rid,
      AbstractExecutor::exec_ctx_->GetTransaction());
}

在原本更新索引的地方插入写记录的代码即可.

3. 小结

终于完工了CMU 15-445打算抽空再把源代码和测试代码啃一遍,距离寒假还有接近两个月,剩余的各种课程任务重了起来,打算用剩余的时间入门编译原理吧,或者在补一补6.824中分布式的那些论文,还有以前博客中没有填完的坑.

image-20221103114401235

感觉还是学到了许多,深化了数据库系统中不少概念的理解,感谢CMU!