CMU 15-445:project #4 - Concurrency Control
1.相关基础知识
相关基础知识见另一篇博客并发控制小叙
2.代码实现
这一部分需要实现的API有这么几个:
bool LockShared(Transaction *txn, const RID &rid);
bool LockExclusive(Transaction *txn, const RID &rid);
bool LockUpgrade(Transaction *txn, const RID &rid);
bool Unlock(Transaction *txn, const RID &rid);
下面简要说明一下其实现的思路:
LockShared:
(1)先判断事务隔离级别和两阶段锁状态是否合法,否则设置为ABORT后返回.
(2)构造读请求,并追加到队列.
(3)如果该事务已经有了读锁或者写锁,那么处理对应的读请求后,返回.如下所示:
if (txn->IsSharedLocked(rid) || txn->IsExclusiveLocked(rid)) {
lock_table_[rid].upgrading_ = self_txn_id;
request_it->granted_ = true;
lock_table_[rid].cv_.notify_all();
return true;
}
- 之后结合条件变量进行等待,当符合授予条件时,退出等待.
bool can_grant = CanGrant(txn, rid, LockOpType::SHARED_OP, request_it);
while (!can_grant) {
lock_table_[rid].cv_.wait(guard);
if (txn->GetState() == TransactionState::ABORTED) {
lock_table_[rid].cv_.notify_all();
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
}
can_grant = CanGrant(txn, rid, LockOpType::SHARED_OP, request_it);
}
在这里使用了条件变量进行等待,当检测到该事务被ABORTED时,对应的是后面死锁预防所做的处理,也就是被终止和抢占年轻事务.
- 最后授予成功,进行如下处理:
txn->GetSharedLockSet()->emplace(rid);
lock_table_[rid].upgrading_ = self_txn_id;
request_it->granted_ = true;
lock_table_[rid].cv_.notify_all();
return true;
LockExclusive
这一部分的实现,基本上和LockShared的实现是比着葫芦画瓢的.最主要的区别在于事务隔离级别上的处理.其中LockShared增加了IsolationLevel::READ_UNCOMMITTED
的判断,如果出现此事务隔离状态,LockShared就会将其视为错误,不去处理它,该事务被设置为ABORT,并且返回一个异常.然而对于LockExclusive并不涉及类似的处理.
LockUpgrade
仍然是对于两阶段锁状态的判断.
如果先前该事务已经有排他锁了,那么就可以直接返回了.
遍历锁请求表队列,找出已经授予的共享锁,如果一个都找不到,就返回错误.
将找到的所有该事务已经授予的共享锁从队列中移除.
构造一个排他锁请求,加入队列,然后陷入等待,知道符合授予条件.
最后的处理如下:
txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->emplace(rid);
lock_table_[rid].upgrading_ = txn->GetTransactionId();
find_it->lock_mode_ = LockMode::EXCLUSIVE;
lock_table_[rid].cv_.notify_all();
UnLock
移除所有该事务在该RID中的请求.
根据锁隔离级别设置Shrinking状态.
更新LockSet
txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->erase(rid);
- 通过条件变量唤醒正在等待的事务.
其中我踩过一个坑,我最初以为对于ABROT或者COMMIT状态的事务调用UnLock,应该将返回异常.其实不然,通过阅读测试代码,可以发现对于一个被ABORT的事务,除了将其修改的数据及其索引回滚之外,还需要将其所有锁释放,因此Abort函数还是会调用UnLock的,如果不允许ABORT状态成功调用UnLock就会造成事务无法正常回滚(还有遗留的锁).
下面几个问题打算单独列出来说一下.
判断锁授予的条件
这一部分主要体现在我对于CanGrant
这一个函数的实现,首先需要遵循的一点是,锁管理器不会优先将锁授予给晚到达的请求.因此在一个锁被授予之前应该保证之前的锁请求都已经被授予,这通过遍历锁请求队列可以实现,在我的实现中,每当有一个新的锁请求到来时,我都会将该请求追加到队列的front中,因此通过迭代器++,可以访问到更早的请求.
for (auto reit = reqit; reit != lock_table_[rid].request_queue_.end(); ++reit) {
if (!reit->granted_) { // 如果其中有更早的请求就判读为不可授予.
can_grant = false;
break;
}
bool is_conflict = IsConflictLock(txn, *reit, mode); // 锁冲突
can_grant &= is_conflict;
if (!can_grant) {
break;
}
}
关于IsConfilictLock
,就是简单地根据Shared还是Exclusive,如下所示:
txn_id_t self_txn_id = txn->GetTransactionId();
if (locktype == LockOpType::SHARED_OP) {
return !(request.txn_id_ != self_txn_id && request.lock_mode_ == LockMode::EXCLUSIVE &&
request.txn_id_ != INVALID_TXN_ID);
}
if (locktype == LockOpType::EXCLUSIVE_OP) {
return (request.txn_id_ == self_txn_id || request.txn_id_ == INVALID_TXN_ID);
}
return false;
此外,关于这部分还会涉及到死锁的判断与处理.在后面的部分会说.
死锁的判断与处理
这一部分的实现体现在CanGrant这一个函数中:
bool have_abort = false;
for (auto reit = start_it; reit != lock_table_[rid].request_queue_.end();) {
txn_id_t txn_id = reit->txn_id_;
if (txn_id > self_txn_id && !IsConflictLock(txn, *reit, mode)) {
Transaction *tmp_txn = TransactionManager::GetTransaction(txn_id);
tmp_txn->SetState(TransactionState::ABORTED);
have_abort = true;
reit = lock_table_[rid].request_queue_.erase(reit);
} else {
++reit;
}
}
if (have_abort) {
lock_table_[rid].cv_.notify_all();
return true;
}
根据比较txn_id_t
来判断事务的新老,并且检查是否冲突.如果冲突,无论年轻事务有没有被授予锁,都对其进行ABORT处理,除此之外还需要将其对应的request从请求队列中移除.如果这个过程中有事务因为死锁预防被终止,还会调用条件变量的Notify唤醒正在等待的事务(线程).
事务隔离级别与两阶段锁的实现
这一部分需要结合LockManager中的API以及执行器中对于这些API的调用方式来进行说明.总的来说在需要我们实现的三种隔离级别中,从“读未提交”,“读已提交”,“可重复读”事务隔离级别是依次增强的.
首先看IsolationLevel::READ_UNCOMMITTED
相关的实现.这种隔离级别给人的感觉是,其关于读数据的操作是非常自由的,某个事物如果想要对某个数据进行读,不需要考虑是否有其他事务正在写.因此,具体在实现上就是:当需要读取数据时,不需要LockShared.因此在LockShared的实现中,如果检测到来自一个“读未提交”隔离级别的事务,那么将其设置为ABORT,抛出异常即可.如下所示:
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
}
与此相呼应的是有关于ScanExecutor的实现,也就是说在该执行器中,不会对此隔离级别的事务LockShared.如下:
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED) {
exec_ctx_->GetLockManager()->LockShared(txn, tup.GetRid());
}
在“读未提交中”,关于有关于写锁在执行器中的实现,是正常地上锁和解锁的.
然后是关于"读已提交"的实现.在执行器中,会执行正常的上锁和解锁方式.相比“读未提交”最大的区别就在于关于读取数据的操作需要尽心"LockShared",这使得读取数据不再自由,而是需要和其他事务的“Exclusive”进行竞争,要么先于其他事务的“LockExclusive”,要么等到其他事务将锁释放之后(这种情况下,就意味着脏读问题可以避免了).
// scan_execuotr
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED) {
exec_ctx_->GetLockManager()->LockShared(txn, tup.GetRid());
}
......
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
exec_ctx_->GetLockManager()->Unlock(txn, tup.GetRid());
}
// update_executor
if (transaction->IsSharedLocked(tmp_rid)) {
exec_ctx_->GetLockManager()->LockUpgrade(transaction, tmp_rid);
} else {
exec_ctx_->GetLockManager()->LockExclusive(transaction, tmp_rid);
}
//......
if (transaction->GetIsolationLevel() != IsolationLevel::REPEATABLE_READ) {
exec_ctx_->GetLockManager()->Unlock(transaction, tmp_tup.GetRid());
}
至此,我们结合可以发现,“读已提交”没有遵循所谓的两段锁协议,因此一个事务中上锁和解锁的过程是交织在一起的,而非集中与两个阶段.这因此也就容易导致不可重复读问题.因此“可重复读”为了将两个行为分为两个阶段,其实现做法是,不再执行器中释放锁,而是将锁的释放动作集中到Abort
或者Commit
进行处理.
因此在执行器中相关的代码,对于“可重复读是不释放锁的”.比如说:
// update_executor
if (transaction->GetIsolationLevel() != IsolationLevel::REPEATABLE_READ) {
exec_ctx_->GetLockManager()->Unlock(transaction, tmp_tup.GetRid());
}
在其Unlock
中也体现了两段锁协议:
if (txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
txn->SetState(TransactionState::SHRINKING);
}
因此做个总结:
隔离级别 | 读锁 | 写锁 | 两段锁协议 | 问题 |
---|---|---|---|---|
读未提交 | 不上读锁 | 执行器中正常上锁和解锁 | 不遵循 | 脏读,不可重复读,幻读 |
读已提交 | 执行器中正常上锁和解锁 | 执行器中正常上锁和解锁 | 不遵循 | 不可重复读,幻读 |
可重复读 | 执行器中正常上锁,不解锁;在Abort或者Commit集中解锁 | 执行器中正常上锁,不解锁;在Abort或者Commit集中解锁 | 遵循 | 幻读 |
执行器的其他细节
执行器关于Update,Delete等操作是需要对索引结构进行更新的,这里添加的代码用于生成一个写索引记录,为后续的回滚服务.在我的实现中,遵循先写记录,在写索引的方式.其实现如下:
for (auto &index : indexes) {
IndexWriteRecord new_record(tmp_rid, info_->oid_, WType::INSERT, tmp_next, info_->oid_,
exec_ctx_->GetCatalog());
txn->AppendTableWriteRecord(new_record);
index->index_->InsertEntry(
tmp_next.KeyFromTuple(info_->schema_, index->key_schema_, index->index_->GetKeyAttrs()), tmp_rid,
AbstractExecutor::exec_ctx_->GetTransaction());
}
在原本更新索引的地方插入写记录的代码即可.
3. 小结
终于完工了CMU 15-445打算抽空再把源代码和测试代码啃一遍,距离寒假还有接近两个月,剩余的各种课程任务重了起来,打算用剩余的时间入门编译原理吧,或者在补一补6.824中分布式的那些论文,还有以前博客中没有填完的坑.
感觉还是学到了许多,深化了数据库系统中不少概念的理解,感谢CMU!