P4实验笔记

Overview

P4需要完成以下任务:

  • Lock Manager:锁管理器,利用 2PL 实现并发控制。支持 REPEATABLE_READREAD_COMMITTEDREAD_UNCOMMITTED 三种隔离级别,支持 SHAREDEXCLUSIVEINTENTION_SHAREDINTENTION_EXCLUSIVESHARED_INTENTION_EXCLUSIVE 五种锁,支持 table 和 row 两种锁粒度,支持锁升级。Project 4 重点部分。
  • Deadlock Detection:死锁检测,运行在一个 background 线程,每间隔一定时间检测当前是否出现死锁,并挑选合适的事务将其 abort 以解开死锁。
  • Concurrent Query Execution:修改之前实现的 SeqScanInsertDelete 算子,加上适当的锁以实现并发的查询。

建议在阅读此文前,先看一下我的上篇文章[Bustub中的并发控制]。

Task-1 Lock Manager

大部分需要注意的内容都在 lock_manager.h 的 LOCK NOTE 和 UNLOCK NOTE 里了,并且我在[Bustub中的并发控制]中也简单介绍了这样做实现不同隔离机制的原理。

首先理一理 Lock Manager 的结构:

  • table_lock_map_:记录 table 和与其相关锁请求的映射关系。
  • row_lock_map_:记录 row 和与其相关锁请求的映射关系。

这两个 map 的值均为锁请求队列 LockRequestQueue

  • request_queue_:实际存放锁请求的队列。
  • cv_ & latch_:条件变量和锁,配合使用可以实现经典的等待资源的模型。
  • upgrading_:正在此资源上尝试锁升级的事务 id。

锁请求以 LockRequest 类表示:

  • txn_id_:发起此请求的事务 id。
  • lock_mode_:请求锁的类型。
  • oid_:在 table 粒度锁请求中,代表 table id。在 row 粒度锁请求中,表示 row 属于的 table 的 id。
  • rid_:仅在 row 粒度锁请求中有效。指 row 对应的 rid。
  • granted_:是否已经对此请求授予锁?

Lock Manager处理事务发送的锁请求,并且记录锁请求的状态。授予的锁和未授予的锁请求都存在锁请求队列LockRequestQueue中。

锁升级

Bustub中的锁升级是什么,为什么需要锁升级?

应用场景:假设事务T1读取 table_1 的所有 Tuple,事务T2先读取 table_1 的所有 Tuple,然后执行一些别的操作(因为二阶段锁的限制这里可能并不能立马释放读锁),再写 table_1 的一些Tuple。

如果事务T2直接获取到了table_1的IX锁,那么事务T1获取table_1的S锁就会发生阻塞。我们可以有更好的解决方案,比如事务T2先给table_1上S类型锁,要写的时候再上X类型锁。这里就需要锁升级了。T2会尝试把原来的S类型锁升级为X类型的锁,它对某个资源始终最多只持有一把锁。判断是否可以进行升级需要已授予的锁是否和锁升级的类型兼容,如果不兼容就得等待。

Lock

接下来,我们以LockTable(Transaction *txn, LockMode lock_mode, const table_oid_t &oid)为例,介绍一下Lock上锁的过程。

第一步:逻辑状态的检查

若 txn 处于 Abort/Commit 状态,抛逻辑异常,不应该有这种情况出现。

若 txn 处于 Shrinking 状态,则需要检查 txn 的隔离级别和当前锁请求类型,不符合要求的锁请求需要把事务状态设置成ABORT,并抛出TransactionAbortException的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
REPEATABLE_READ:
The transaction is required to take all locks.
All locks are allowed in the GROWING state
No locks are allowed in the SHRINKING state

READ_COMMITTED:
The transaction is required to take all locks.
All locks are allowed in the GROWING state
Only IS, S locks are allowed in the SHRINKING state

READ_UNCOMMITTED:
The transaction is required to take only IX, X locks.
X, IX locks are allowed in the GROWING state.
S, IS, SIX locks are never allowed

第二步:获取相应资源的锁请求队列

如果是相应资源的第一个锁请求还需要创建新的锁请求队列。这里要注意加锁保护数据结构。

第三步:检查此锁请求是否为一次锁升级。

首先,记得对 queue 加锁。

granted 和 waiting 的锁请求均放在同一个队列里,我们需要遍历队列查看有没有与当前事务 id(我习惯叫做 tid)相同的请求。如果存在这样的请求,则代表当前事务在此前已经得到了在此资源上的一把锁,接下来可能需要锁升级。

需要注意的是,这个请求的 granted_ 一定为 true。因为假如事务此前的请求还没有被通过,事务会被阻塞在 LockManager 中,不可能再去尝试获取另一把锁。

现在我们找到了此前已经获取的锁,开始尝试锁升级。首先,判断此前授予锁类型是否与当前请求锁类型相同。若相同,则代表是一次重复的请求,直接返回。否则进行下一步检查。

接下来,判断当前资源上是否有另一个事务正在尝试升级(queue->upgrading_ == INVALID_TXN_ID)。若有,则终止当前事务,抛出 UPGRADE_CONFLICT 异常。因为不允许多个事务在同一资源上同时尝试锁升级。

  • 为什么一个资源不允许多个事务同时进行锁升级呢?

如果不同事务可以同时对一个资源进行锁升级,那么对于某个资源R,假如事务T1和T2都尝试(阻塞)把IS类型的锁升级为S类型(因为假设事务T3获取了SIX类型的锁所以导致阻塞这两个锁升级请求),R的LockQueue中删除了T1和T2原来的锁请求,都插入了新升级的锁请求(等待授予,grant_为false),因为锁升级具有最高的优先级,所以需要把请求插入到第一个grant不是true的请求的前面,但这也却导致了一个问题,如果T1先插入,T2后插入,T2的锁升级请求在T1锁升级请求的前面了,也就是说反而T2比T1具有更大的优先级了,这不符合逻辑,所以这里为了降低实现的复杂度,采用了这种粗暴的方法。但其实可以新建一个专门的锁升级队列就可以解决这个问题从而实现统一资源的不同事务的同步锁升级了。

第四步:将锁请求插入到队列中

不管是锁升级请求还是平凡的锁请求,都需要把请求添加到队列中。这里不同的是锁升级请求和平凡锁请求插入的位置不同。

第五步,尝试获取锁。

这是最后一步,也是最核心的一步,体现了 Lock Manager 的执行模型。首先,需要清楚条件变量的使用场景。

这里需要使用条件变量,先给出条件变量经典的使用形式:

wait() 函数接受一个 std::unique_lock<std::mutex> 对象作为参数,该参数通常是通过调用 std::unique_lockstd::mutex 对象进行上锁而获得的。在调用 wait() 函数之前,必须先获得互斥锁,以确保在等待期间其他线程无法修改条件变量所关联的数据。

1
2
3
4
std::unique_lock<std::mutex> lock(latch);
while (!resource) {
cv.wait(lock);
}

条件变量与互斥锁配合使用。首先需要持有锁,并查看是否能够获取资源。这个锁与资源绑定,是用来保护资源的锁。若暂时无法获取资源,则调用条件变量的 wait 函数。调用 wait 函数后,latch 将自动释放,并且当前线程被挂起,以节省资源。这就是阻塞的过程。此外,允许有多个线程在 wait 同一个 latch。

GrantLock() 中,Lock Manager 会判断是否可以满足当前锁请求。若可以满足,则返回 true,事务成功获取锁,并退出循环。若不能满足,则返回 false,事务暂时无法获取锁,在 wait 处阻塞,等待资源状态变化时被唤醒并再次判断是否能够获取锁。资源状态变化指的是什么?其他事务释放了锁。

1
2
3
4
5
6
7
8
9
10
while (!GrantLock(upgraded_lock_request, lock_request_queue)) {
lock_request_queue->cv_.wait(lock);

if (txn->GetState() == TransactionState::ABORTED) {
lock_request_queue->upgrading_ = INVALID_TXN_ID;
lock_request_queue->request_queue_.remove(upgraded_lock_request);
lock_request_queue->cv_.notify_all();
return false;
}
}

接下来是 GrantLock() 函数。在此函数中,我们需要判断当前锁请求是否能被满足。

  1. 判断兼容性。遍历请求队列,查看当前锁请求是否与所有的已经 granted 的请求兼容。需要注意的是,在我的实现中 granted 请求不一定都在队列头部,因此需要完全遍历整条队列。锁兼容矩阵可以在 Lecture slides 中查看。若全部兼容,则通过检查。否则直接返回 false。当前请求无法被满足。
  2. 判断优先级。锁请求会以严格的 FIFO 顺序依次满足。只有当前请求为请求队列中优先级最高的请求时,才允许授予锁。优先级可以这样判断:
  3. 如果队列中存在锁升级请求,若锁升级请求正为当前请求,则优先级最高。否则代表其他事务正在尝试锁升级,优先级高于当前请求。
  4. 若队列中不存在锁升级请求,则遍历队列。如果,当前请求是第一个 waiting 状态的请求,则代表优先级最高。如果当前请求前面还存在其他 waiting 请求,则要判断当前请求是否前面的 waiting 请求兼容。若兼容,则仍可以视为优先级最高。若存在不兼容的请求,则优先级不为最高。

GrantLock函数首先遍历队列中所有的lock_request(进入的时候已经获取了锁,wait被唤醒需要重新获取锁才可以继续执行),如果这个锁请求已被授予,则需要判断当前的锁请求与这个已授予的锁请求是否是兼容的,如果不兼容直接返回false。如果遍历到第一个没有被授予的锁请求,跟进先进先出的原则,如果这个锁请求和参数lock_request是同一个的话,那么这个锁请求的优先级就是最高的,直接返回true,如果第一个没有被授予的锁请求不是lock_request的话,那么证明当前的锁请求不是优先级最高的,那么需要返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
auto LockManager::GrantLock(std::shared_ptr<LockRequest> &lock_request,
std::shared_ptr<LockRequestQueue> &lock_request_queue) -> bool {
for (auto &lrq : lock_request_queue->request_queue_) {
// 首先判断兼容性
if (lrq->granted_ && !IsLockModeCompatible(lrq->lock_mode_, lock_request->lock_mode_)) {
return false;
}
// 先进先出的原则,前面存在没有授予锁的请求,直接return false
if (!lrq->granted_) {
if (lock_request.get() != lrq.get()) {
return false;
}
}
return true;
}
return false;
}

UnLock

仍以 table lock 为例。Unlock 的流程比 Lock 要简单不少。

首先,由于是 table lock,在释放时需要先检查其下的所有 row lock 是否已经释放。

接下来是 table lock 和 row lock 的公共步骤:

第一步,获取对应的 lock request queue。

第二步,遍历请求队列,找到 unlock 对应的 granted 请求。

若不存在对应的请求,抛 ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD 异常。

找到对应的请求后,根据事务的隔离级别和锁类型修改其状态。

当隔离级别为 REPEATABLE_READ 时,S/X 锁释放会使事务进入 Shrinking 状态。当为 READ_COMMITTED 时,只有 X 锁释放使事务进入 Shrinking 状态。当为 READ_UNCOMMITTED 时,X 锁释放使事务 Shrinking,S 锁不会出现。

之后,在请求队列中 remove unlock 对应的请求,并将请求 delete。

在锁成功释放后,调用 cv_.notify_all() 唤醒所有阻塞在此 table 上的事务,检查能够获取锁。

Task-2 Deadlock Detection

在前面讲过,二阶段锁协议并不保证不会出现死锁,那么我们需要进行死锁检测,如果发生了死锁,则需要终止某一个事务。

在阻塞过程中有可能会出现多个事务的循环等待,而循环等待会造成死锁。在 Bustub 中我们采用一个 Background Deadlock Detection 线程来定时检查当前是否出现死锁。

我们用 wait for 图来表示事务之间的等待关系。wait for 是一个有向图,t1->t2 即代表 t1 事务正在等待 t2 事务释放资源。当 wait for 图中存在环时,即代表出现死锁,需要挑选事务终止以打破死锁。

我们并不需要时刻维护 wait for 图,而是在死锁检测线程被唤醒时,根据当前请求队列构建 wait for 图,再通过 wait for 图判断是否存在死锁。当判断完成后,将丢弃当前 wait for 图。下次线程被唤醒时再重新构建。

最常见的有向图环检测算法包括 DFS 和拓扑排序。在这里我们选用 DFS 来进行环检测。构建 wait for 图时要保证搜索的确定性。始终从 tid 较小的节点开始搜索,在选择邻居时,也要优先搜索 tid 较小的邻居。

构建 wait for 图的过程是,遍历 table_lock_maprow_lock_map 中所有的请求队列,对于每一个请求队列,用一个二重循环将所有满足等待关系的一对 tid 加入 wait for 图的边集。满足等待关系是指,对于两个事务 a 和 b,a 是 waiting 请求,b 是 granted 请求,则生成 a->b 一条边。

构建好 wait for 图后,就可以判断有向图中是否出现了环,如果出现了环,就需要把其中最年轻的事务(txn_id最小)状态设置成ABORT。

Task-3 Concurrent Query Execution

最后就是为Seqscan、Insert和Delete加锁实现不同的隔离级别了。

Seqscan

如果隔离级别是 READ_UNCOMMITTED 则无需加锁。加锁失败则抛出 ExecutionException 异常。

READ_COMMITTED 下,在 Next() 函数中,若表中已经没有数据,则提前释放之前持有的锁。在 REPEATABLE_READ 下,在 Commit/Abort 时统一释放,无需手动释放。

那应该加什么锁呢,直观来说应该对Table加S类型的锁就行了。但如果深入了解Bustub的代码,发现直接对Table加S类型的锁是错误的,而应该对表先加IS锁,然后再在下面的tuple加S锁。

  • Q1:为什么对于Seqscan,需要对表加IS锁再对tuple加S锁,为什么不能够直接对表加S锁呢?

主要是后续可以进行一个merge filter scan的优化,把filter放进SeqScan里,这种情况下就是表IS+符合条件的行S。

另外,如果直接表S的话,执行一个DELETE … WHERE …,同一个query里先在下层SeqScan加了S锁,又尝试在Delete里加IX锁,但是S不能升级为IX,会导致这条query执行不了,升级不兼容。所以还是SeqScan也用表IS+行S比较容易实现。

  • Q2:在 READ_COMMITTED 下,在 Next() 函数中,若表中已经没有数据,则提前释放之前持有的锁。在 REPEATABLE_READ 下,在 Commit/Abort 时统一释放,无需手动释放,为什么?

在我的上篇文章[Bustub中的并发控制]中讲过,读未提交的Seqscan读取是不需要进行加锁的。

在读已提交中,为什么要在Next函数中如果没有数据,释放所有的锁呢。在前面的加锁规则中我们也说到过,读已提交事务解S类型的锁并不改变二阶段封锁协议中的状态,所以这里提前释放S锁,后面就有可能再次获取这个tuple的S锁(读已提交在SHRINKING阶段仍可以加S锁),这也就是为什么读已提交会出现不可重复读的问题。

而可重复读的事务把所有锁的释放推到了事务提交或终止后,在读取某条tuple时,只能lock-S(tuple-1) ... read(tuple1) ... read(tuple1)...unlock(tuple-1),所以就不会出现前后读取不一致的问题。

Insert & Delete

Init() 函数中,为表加上 IX 锁,再为行加 X 锁。

LeaderBoard Task

这里需要实现几个小优化:

Predicate pushdown to SeqScan

这就是谓词下推到Seqscan的算子中,我们在上面已经提到了,这需要先对整张表上IS锁,再对行上S锁实现读取。

Update算子

在了解完TablePage的结构和MarkDelete以及ApplyDelete函数后,我误认为Bustub貌似不能够实现Update操作,原因是Delete采用了比较取巧的行为将size的第32位置1,但Update后tuple原来的大小可是彻底消失了,事务Abort之后又该怎么恢复呢。可是还是证明我想错了,Bustub能够实现Update操作,主要原因就是Transaction维护了一个写的集合。这样做Update,Insert、Delete操作的时候都需要把这些写过的tuple记录到write set里面,方便事务Abort后进行恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inline auto GetWriteSet() -> std::shared_ptr<std::deque<TableWriteRecord>> { return table_write_set_; }

class TableWriteRecord {
public:
TableWriteRecord(RID rid, WType wtype, const Tuple &tuple, TableHeap *table)
: rid_(rid), wtype_(wtype), tuple_(tuple), table_(table) {}

RID rid_;
WType wtype_;
/** The tuple is only used for the update operation. */
Tuple tuple_;
/** The table heap specifies which table this write record is for. */
TableHeap *table_;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void TransactionManager::Abort(Transaction *txn) {
txn->SetState(TransactionState::ABORTED);
// Rollback before releasing the lock.
auto table_write_set = txn->GetWriteSet();
while (!table_write_set->empty()) {
auto &item = table_write_set->back();
auto *table = item.table_;
if (item.wtype_ == WType::DELETE) {
table->RollbackDelete(item.rid_, txn);
} else if (item.wtype_ == WType::INSERT) {
// Note that this also releases the lock when holding the page latch.
table->ApplyDelete(item.rid_, txn);
} else if (item.wtype_ == WType::UPDATE) {
// 再调用UpdateTuple函数更新回去,妙妙妙!
table->UpdateTuple(item.tuple_, item.rid_, txn);
}
table_write_set->pop_back();
}
table_write_set->clear();

........
// 接下来可能还需要恢复索引呢

Update算子的实现就很清楚了,首先为整张表上一把IX类型的锁,然后再对要更新的tuple加上X锁,但不立马释放,等到事务结束的时候统一释放(所以每一个事务都有表锁集合和行锁集合)。


P4实验笔记
http://example.com/2024/03/05/数据库/CMU_15445/P4实验笔记/
作者
LiuZhaocheng
发布于
2024年3月5日
许可协议