0%

下载官网地址:

https://github.com/jingtao8a/GRE

软件环境

Ubuntu 20.04.6 LTS

gcc 9.4.0

cmake 3.16.3

Dependencies

intel-mkl 2018.4.274

这里安装2024的版本也可以

进入官网 https://www.intel.com/content/www/us/en/developer/tools/oneapi/onemkl-download.html

选择Linux 和 APT Package Manager
img
依次输入命令
img
img

默认安装路径如下:
img

使用mkl库需要配置环境变量:

1
2
对所有用户生效,需要重启系统
vim /etc/profile

img

intel-tbb 2020.3

tbb库的版本必须使用2020.3

下载地址https://github.com/oneapi-src/oneTBB/releases/tag/2020_U3

编译

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 解压
tar -zxvf oneTBB-2020_U3.tar.gz

# 因为要使用 gcc-9 进行编译,所以需要编辑成 gcc-9 形式
cp build/linux.gcc.inc build/linux.gcc-9.inc

# 编辑 linux.gcc-9.inc 文件:
# 第15、16行原来是
CPLUS ?= g++
CONLY ?= gcc

# 修改为
CPLUS ?= g++-9
CONLY ?= gcc-9

# 然后在文件夹 oneTBB-2020_U3/ 中编译
cd oneTBB-2020_U3
make compiler=gcc-9 stdver=c++17 tbb_build_prefix=my_tbb_build
# 编译完成后,在 builld/ 文件夹下会看到编译生成的文件夹 my_tbb_build_release/

安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sudo mkdir /usr/local/tbb-2020_U3

sudo cp -r oneTBB-2020_U3/include /usr/local/tbb-2020_U3/include
# 建立新安装tbb版本的符号链接
sudo ln -s /usr/local/tbb-2020_U3/include/tbb /usr/local/include/tbb

sudo cp -r oneTBB-2020_U3/build/my_tbb_build_release /usr/local/tbb-2020_U3/lib
# 建立新安装tbb版本的符号链接
sudo ln -s /usr/local/tbb-2020_U3/lib/libtbb.so.2 /usr/local/lib/libtbb.so
sudo ln -s /usr/local/tbb-2020_U3/lib/libtbb.so.2 /usr/local/lib/libtbb.so2
sudo ln -s /usr/local/tbb-2020_U3/lib/libtbbmalloc.so.2 /usr/local/lib/libtbbmalloc.so
sudo ln -s /usr/local/tbb-2020_U3/lib/libtbbmalloc.so.2 /usr/local/lib/libtbbmalloc.so2
sudo ln -s /usr/local/tbb-2020_U3/lib/libtbbmalloc_proxy.so.2 /usr/local/lib/libtbbmalloc_proxy.so
sudo ln -s /usr/local/tbb-2020_U3/lib/libtbbmalloc_proxy.so.2 /usr/local/lib/libtbbmalloc_proxy.so2

然后把 库文件的路径写入到 ~/.bashrc:

1
2
echo 'export LD_LIBRARY_PATH=/usr/local/tbb-2020_U3/lib:$LD_LIBRARY_PATH' >> ~/.bashrc
source ~/.bashrc

参考链接: https://blog.csdn.net/qq_39779233/article/details/126284595

jemalloc

下载地址: https://github.com/jemalloc/jemalloc/releases

编译与安装

1
2
3
4
5
6
7
8
# step 1
./autogen.sh

# step 2
make

# step 3
sudo make install

参考链接: https://blog.csdn.net/SweeNeil/article/details/94648313

USAGE

Build

1
2
3
4
git submodule update --init # only for the first time
mkdir -p build
cd build
cmake -DCMAKE_BUILD_TYPE=Release .. && make

RUN

1
2
3
4
5
6
7
8
9
./build/microbench \
--keys_file=./data/dataset \ // 数据地址
--keys_file_type={binary,text} \ // 二进制 或者 文本
--read=0.5 --insert=0.5 \
--operations_num=800000000 \
--table_size=-1 \ // 所使用的数据的size
--init_table_ratio=0.5 \ //bulk_load所使用的数据的百分比
--thread_num=24 \
--index=index_name \ //测试的索引名称

其它参数见官网:
https://github.com/jingtao8a/GRE

如果要测试新的索引结构

step1

src/competitor/competitor.h
get_index函数中添加对应的index_type
img

step2

在 src/competitor 下放置源代码 new_index_name/src
编写接口文件 src/competitor/new_index_name.h

以lipp的接口文件为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include"./src/src/core/lipp.h"
#include"../indexInterface.h"

template<class KEY_TYPE, class PAYLOAD_TYPE>
class LIPPInterface : public indexInterface<KEY_TYPE, PAYLOAD_TYPE> {
public:
void init(Param *param = nullptr) {}

void bulk_load(std::pair <KEY_TYPE, PAYLOAD_TYPE> *key_value, size_t num, Param *param = nullptr);

bool get(KEY_TYPE key, PAYLOAD_TYPE &val, Param *param = nullptr);

bool put(KEY_TYPE key, PAYLOAD_TYPE value, Param *param = nullptr);

bool update(KEY_TYPE key, PAYLOAD_TYPE value, Param *param = nullptr);

bool remove(KEY_TYPE key, Param *param = nullptr);

size_t scan(KEY_TYPE key_low_bound, size_t key_num, std::pair <KEY_TYPE, PAYLOAD_TYPE> *result,
Param *param = nullptr);

long long memory_consumption() { return lipp.total_size(); }

private:
LIPP <KEY_TYPE, PAYLOAD_TYPE> lipp;
};

template<class KEY_TYPE, class PAYLOAD_TYPE>
void LIPPInterface<KEY_TYPE, PAYLOAD_TYPE>::bulk_load(std::pair <KEY_TYPE, PAYLOAD_TYPE> *key_value, size_t num,
Param *param) {
lipp.bulk_load(key_value, static_cast<int>(num));
}

template<class KEY_TYPE, class PAYLOAD_TYPE>
bool LIPPInterface<KEY_TYPE, PAYLOAD_TYPE>::get(KEY_TYPE key, PAYLOAD_TYPE &val, Param *param) {
bool exist;
val = lipp.at(key, false, exist);
return exist;
}

template<class KEY_TYPE, class PAYLOAD_TYPE>
bool LIPPInterface<KEY_TYPE, PAYLOAD_TYPE>::put(KEY_TYPE key, PAYLOAD_TYPE value, Param *param) {
return lipp.insert(key, value);

}

template<class KEY_TYPE, class PAYLOAD_TYPE>
bool LIPPInterface<KEY_TYPE, PAYLOAD_TYPE>::update(KEY_TYPE key, PAYLOAD_TYPE value, Param *param) {
return lipp.update(key, value);
}

template<class KEY_TYPE, class PAYLOAD_TYPE>
bool LIPPInterface<KEY_TYPE, PAYLOAD_TYPE>::remove(KEY_TYPE key, Param *param) {
return lipp.remove(key);
}

template<class KEY_TYPE, class PAYLOAD_TYPE>
size_t LIPPInterface<KEY_TYPE, PAYLOAD_TYPE>::scan(KEY_TYPE key_low_bound, size_t key_num,
std::pair <KEY_TYPE, PAYLOAD_TYPE> *result,
Param *param) {
if(!result) {
result = new std::pair <KEY_TYPE, PAYLOAD_TYPE>[key_num];
}
return lipp.range_query_len(result, key_low_bound, key_num);
}

Concurrency Control Theory

formal Definitions

Database: A fixed set of named data objects (e.g., A, B, C, …)
Transaction: A sequence of read and write operations (e.g., R(A), W(B), …)

transaction的正确性标准ACID:

  • Atomicity: 原子性 “all or nothing”
  • Consistency: 一致性 “it looks correct to me”
  • Isolation: 隔离性 “as if alone”
  • Durability: 持久性 “survive failures”

Conflicting Operations

多个transaction并发执行时会发生冲突:

  • 读写冲突(R-W)
  • 写读冲突(W-R)
  • 写写冲突(W-W)

读写冲突造成的问题:

  • 不可重复读(在一个事务内多次读取同一个数据,如果出现前后两次读到的数据不一样的情况,就意味着发生了[不可重复读]的现象)
  • 幻读:在一个事务内多次查询某个符合查询条件的[记录数量],出现前后两次查询到的记录数量不一样

写读冲突造成的问题:

  • 脏读(一个事务[读到]了另一个[未提交事务修改过的数据],就意味着发生了[脏读]现象)

写写冲突造成的问题:

  • 丢失修改

img
img
img

MySQL InnoDB引擎的默认隔离级别虽然是[可重复读],但是它很大程度上避免幻读现象,并没有解决幻读 参考文章


解决方案有两种:

  • 普通select语句(快照读),通过MVCC方式解决了幻读问题
  • select … for update语句(当前读),通过next-key lock(记录锁 + 间隙锁)方式解决了幻读

Read View在MVCC里如何工作的?

Read View有四个重要的字段

四个字段只有m_ids是一个集合,creator_trx_id在m_ids集合中,
min_trx_id是m_ids集合的最小值
img

记录的两个隐藏列
img

  • trx_id,当一个事务对某条聚簇索引记录进行改动时,就会把该事务的事务 id 记录在 trx_id 隐藏列里;
  • roll_pointer,每次对某条聚簇索引记录进行改动时,都会把旧版本的记录写入到 undo 日志中,然后这个隐藏列是个指针,指向每一个旧版本记录,于是就可以通过它找到修改前的记录

在创建ReadView后,一个事务去访问记录的时候,除了自己的更新记录总是可见之外,还有几种情况:

  • 如果记录的trx_id小于ReadView的min_trx_id,表示这个版本的记录是在创建 Read View 前已经提交的事务生成的,所以该版本的记录对当前事务可见
  • 如果记录的trx_id大于等于ReadView中的max_trx_id,表示这个版本的记录是在创建 Read View 后才启动的事务生成的,所以该版本的记录对当前事务不可见
  • 如果记录的 trx_id 值在 Read View 的 min_trx_id 和 max_trx_id 之间,需要判断 trx_id 是否在 m_ids 列表中:
    • 如果记录的 trx_id 在 m_ids 列表中,表示生成该版本记录的活跃事务依然活跃着(还没提交事务),所以该版本的记录对当前事务不可见。
    • 如果记录的 trx_id 不在 m_ids列表中,表示生成该版本记录的活跃事务已经被提交,所以该版本的记录对当前事务可见。

Two Phase Locking 两阶段锁

2PL将事务划分为两个阶段:

  • Growing Phase: 只获得锁
  • Shrink Phase: 只释放锁

img
2PL本身已经足够保证schedule是seriable的,但2PL可能导致cascading aborts,举例如下:
img

于是引入2PL的增强版变种,Rigorous 2PL,后者每个事务在结束之前,其写过的数据不能被其它事务读取或者重写

img

Deadlock Detection & Prevention

2PL 无法避免的一个问题就是死锁,解决方案:

Deadlock Detection 事后检测

为了检测死锁,DBMS 会维护一张 waits-for graph,来跟踪每个事务正在等待 (释放锁) 的其它事务,然后系统会定期地检查 waits-for graph,看其中是否有成环,如果成环了就要决定如何打破这个环。

waits-for graph 中的节点是事务,从 Ti 到 Tj 的边就表示 Ti 正在等待 Tj 释放锁,举例如下
img
当 DBMS 检测到死锁时,它会选择一个 “受害者” (事务),将该事务回滚,打破环形依赖,而这个 “受害者” 将依靠配置或者应用层逻辑重试或中止。这里有两个设计决定:

  1. 检测死锁的频率
  2. 如何选择合适的 “受害者”

检测死锁的频率越高,陷入死锁的事务等待的时间越短,但消耗的 cpu 也就越多。所以这是个典型的 trade-off,通常有一个调优的参数供用户配置。

选择 “受害者” 的指标可能有很多:事务持续时间、事务的进度、事务锁住的数据数量、级联事务的数量、事务曾经重启的次数等等。在选择完 “受害者” 后,DBMS 还有一个设计决定需要做:完全回滚还是回滚到足够消除环形依赖即可。

Deadlock Prevention 事前阻止

通常 prevention 会按照事务的年龄来赋予优先级,事务的时间戳越老,优先级越高。有两种 prevention 的策略:

  • Old Waits for Young:如果 requesting txn 优先级比 holding txn 更高则等待后者释放锁;更低则自行中止
  • Young Waits for Old:如果 requesting txn 优先级比 holding txn 更高则后者自行中止释放锁,让前者获取锁,否则 requesting txn 等待 holding txn 释放锁

举例如下:

img

死锁产生的必要条件是

1.互斥 2.请求与保持 3.不可抢占 4.循环等待

Task#1 Lock Manager

五种锁 S X IS IX SIX
img
这里只实现三种隔离级别READ_UNCOMMITTED、READ_COMMITTED、REPEATABLE_READ

LOCK NOTE

  1. GENERAL BEHAVIOUR:

    • Both LockTable() and LockRow() are blocking methods; they should wait till the lock is granted and then return.
    • If the transaction was aborted in the meantime, do not grant the lock and return false.
  2. MULTIPLE TRANSACTIONS:

    • LockManager should maintain a queue for each resource; locks should be granted to transactions in a FIFO manner.
    • If there are multiple compatible lock requests, all should be granted at the same time
    • as long as FIFO is honoured.
  3. SUPPORTED LOCK MODES:

    • Table locking should support all lock modes.
    • Row locking should not support Intention locks. Attempting this should set the TransactionState as
    • ABORTED and throw a TransactionAbortException (ATTEMPTED_INTENTION_LOCK_ON_ROW)
  4. ISOLATION LEVEL:

    • REPEATABLE_READ:

      The transaction is required to take all locks.
      All locks are allowed in the GROWING state
      No locks are allowed in the SHRINKING state

    • READ_COMMITTED:

      The transaction is required to take all locks.
      All locks are allowed in the GROWING state
      Only IS, S locks are allowed in the SHRINKING state

    • READ_UNCOMMITTED:

      The transaction is required to take only IX, X locks.
      X, IX locks are allowed in the GROWING state.
      S, IS, SIX locks are never allowed

  5. MULTILEVEL LOCKING:

    • While locking rows, Lock() should ensure that the transaction has an appropriate lock on the table which the row

    belongs to. For instance, if an exclusive lock is attempted on a row, the transaction must hold either X, IX, or SIX on the table. If such a lock does not exist on the table, Lock() should set the TransactionState as ABORTED and throw a TransactionAbortException (TABLE_LOCK_NOT_PRESENT)

  6. LOCK UPGRADE:

    Calling Lock() on a resource that is already locked should have the following behaviour:

    • If requested lock mode is the same as that of the lock presently held, Lock() should return true since it already has the lock.
    • If requested lock mode is different, Lock() should upgrade the lock held by the transaction.

      While upgrading, only the following transitions should be allowed:
      1. IS -> [S, X, IX, SIX]
      2. S -> [X, SIX]
      3. IX -> [X, SIX]
      4. SIX -> [X]
    • Any other upgrade is considered incompatible, and such an attempt should set the TransactionState as ABORTED and throw a TransactionAbortException (INCOMPATIBLE_UPGRADE)
    • Furthermore, only one transaction should be allowed to upgrade its lock on a given resource. Multiple concurrent lock upgrades on the same resource should set the TransactionState as ABORTED and throw a TransactionAbortException (UPGRADE_CONFLICT).

UNLOCK NOTE

  1. GENERAL BEHAVIOUR:
    • Both UnlockTable() and UnlockRow() should release the lock on the resource and return. Both should ensure that the transaction currently holds a lock on the resource it is attempting to unlock.

    If not, LockManager should set the TransactionState as ABORTED and throw a TransactionAbortException (ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD)

    • Additionally, unlocking a table should only be allowed if the transaction does not hold locks on any row on that table. If the transaction holds locks on rows of the table, Unlock should set the Transaction State as ABORTED and throw a TransactionAbortException (TABLE_UNLOCKED_BEFORE_UNLOCKING_ROWS).
    • Finally, unlocking a resource should also grant any new lock requests for the resource (if possible).
  2. TRANSACTION STATE UPDATE
    • REPEATABLE_READ:

      Unlocking S/X locks should set the transaction state to SHRINKING
    • READ_COMMITTED:

      Unlocking X locks should set the transaction state to SHRINKING.

      Unlocking S locks does not affect transaction state.
    • READ_UNCOMMITTED:

      Unlocking X locks should set the transaction state to SHRINKING.

      S locks are not permitted under READ_UNCOMMITTED.

      The behaviour upon unlocking an S lock under this isolation level is undefined.




  • LockTable(Transaction, LockMode, TableOID)
  • UnlockTable(Transction, TableOID)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
auto LockManager::LockTable(Transaction *txn, LockMode lock_mode, const table_oid_t &oid) -> bool {
return LockTableDirectlyOrNot(txn, lock_mode, oid, true);
}

auto LockManager::LockTableDirectlyOrNot(Transaction *txn, LockMode lock_mode, const table_oid_t &oid, bool directly)
-> bool {
auto txn_state = txn->GetState();
auto iso_level = txn->GetIsolationLevel();
if (txn_state == TransactionState::COMMITTED || txn_state == TransactionState::ABORTED) {
return false;//事务已经提交或终止,返回false
}
switch (iso_level) {
case IsolationLevel::REPEATABLE_READ:
if (txn_state == TransactionState::SHRINKING) {//REPEATABLE_READ级别下,SHRINKING阶段不可加锁
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_ON_SHRINKING, directly);//抛出异常
}
break;
case IsolationLevel::READ_COMMITTED:
if (txn_state == TransactionState::SHRINKING) {
//READ_COMMITTED级别下,SHRINGKING阶段只可加IS、S锁
if (lock_mode != LockMode::INTENTION_SHARED && lock_mode != LockMode::SHARED) {
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_ON_SHRINKING, directly);
}
}
break;
case IsolationLevel::READ_UNCOMMITTED:
//READ_UNCOMMITTED级别下,SHRINKING阶段不可加锁
if (txn_state == TransactionState::SHRINKING) {
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_ON_SHRINKING, directly);//抛出异常
}
//READ_UNCOMMITTED级别下,只可以加IX和X锁
if (lock_mode != LockMode::INTENTION_EXCLUSIVE && lock_mode != LockMode::EXCLUSIVE) {
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_SHARED_ON_READ_UNCOMMITTED, directly);//抛出异常
}
break;
default:
UNREACHABLE("wrong IsolationLevel");
}

table_lock_map_latch_.lock();
if (table_lock_map_.count(oid) == 0) {
table_lock_map_[oid] = std::make_shared<LockRequestQueue>();
}
auto lrq = table_lock_map_[oid];//拿到该table的LockRequestQueue
std::unique_lock<std::mutex> lock(lrq->latch_);
table_lock_map_latch_.unlock();

// 检查此锁的请求是否为一次锁升级
bool upgrade = false;
for (auto iter = lrq->request_queue_.begin(); iter != lrq->request_queue_.end(); iter++) {//遍历LockRequestQueue
auto lr = *iter;
if (lr->txn_id_ == txn->GetTransactionId()) { // 同一个事务对相同table请求加锁
if (lr->lock_mode_ == lock_mode) { // 加锁的类型相同,直接返回
return true;
}
if (lrq->upgrading_ != INVALID_TXN_ID) { // 有事务正在对该resource进行锁升级
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::UPGRADE_CONFLICT, directly);//抛出异常
}
if (!CanLockUpgrade(lr->lock_mode_, lock_mode)) { // 不能够进行锁升级
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::INCOMPATIBLE_UPGRADE, directly);//抛出异常
}
//锁升级
lrq->upgrading_ = txn->GetTransactionId();
lrq->request_queue_.erase(iter);//将lr从LockRequestQueue中删除
RemoveFromTxnTableLockSet(txn, lr->lock_mode_, oid);//将txn事务持有该table的锁删除
delete lr; // 防止内存泄露
lrq->request_queue_.push_back(new LockRequest(txn->GetTransactionId(), lock_mode, oid));//加入LockRequest
upgrade = true;
break;
}
}
// 不是锁升级
if (!upgrade) {
lrq->request_queue_.push_back(new LockRequest(txn->GetTransactionId(), lock_mode, oid));//加入LockRequest
}

while (!CanTxnTakeLock(txn, lock_mode, lrq)) {//判断是否可以给改Table加锁
lrq->cv_.wait(lock);//互斥阻塞
// 可能死锁检测将该事务ABORTED 或者 手动ABORT该事务
if (txn->GetState() == TransactionState::ABORTED) {
// 删除该事务对该资源的request
for (auto iter = lrq->request_queue_.begin(); iter != lrq->request_queue_.end(); iter++) {
auto lr = *iter;
if (lr->txn_id_ == txn->GetTransactionId()) {
lrq->request_queue_.erase(iter);
delete lr;
break;
}
}
lrq->cv_.notify_all();
return false;
}
}

AddIntoTxnTableLockSet(txn, lock_mode, oid);//该Table获得锁
// LOG_DEBUG("txn:%d LockTable %d lock_mode:%d", txn->GetTransactionId(), oid, lock_mode);
return true;
}

auto LockManager::CanTxnTakeLock(Transaction *txn, LockMode lock_mode,
std::shared_ptr<LockRequestQueue> &lock_request_queue) -> bool {
for (auto lr : lock_request_queue->request_queue_) {
if (lr->granted_ && !AreLocksCompatible(lock_mode, lr->lock_mode_)) { // 存在锁冲突
return false;
}
}
// 锁升级优先级最高
if (lock_request_queue->upgrading_ != INVALID_TXN_ID) {
if (lock_request_queue->upgrading_ == txn->GetTransactionId()) { // 事务进行锁升级
lock_request_queue->upgrading_ = INVALID_TXN_ID;
for (auto lr : lock_request_queue->request_queue_) {
if (!lr->granted_ && lr->txn_id_ == txn->GetTransactionId()) {
lr->granted_ = true;
break;
}
}
return true;
}
return false; // 进行锁升级的是其它事务,那么该事务需要等待
}
// 遵循FIFO规则
for (auto lr : lock_request_queue->request_queue_) {
if (lr->txn_id_ == txn->GetTransactionId()) {
lr->granted_ = true;
break;
}
if (!lr->granted_ && !AreLocksCompatible(lock_mode, lr->lock_mode_)) { // 锁冲突
return false;
}
}
return true;
}

auto LockManager::UnlockTable(Transaction *txn, const table_oid_t &oid) -> bool {
if (!CheckAllRowsUnLock(txn, oid)) {//检查该Table下的所有row的锁是否释放
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::TABLE_UNLOCKED_BEFORE_UNLOCKING_ROWS, true);//抛出异常
}

table_lock_map_latch_.lock();
if (table_lock_map_.count(oid) == 0) {
table_lock_map_latch_.unlock();
ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD, true);//该table未加锁
}
auto lrq = table_lock_map_[oid];
std::unique_lock<std::mutex> lock(lrq->latch_);
table_lock_map_latch_.unlock();

for (auto iter = lrq->request_queue_.begin(); iter != lrq->request_queue_.end(); ++iter) {
auto lr = *iter;
if (lr->granted_ && lr->txn_id_ == txn->GetTransactionId()) {//找到该事务对该table的LockRequest
auto iso_level = txn->GetIsolationLevel();
switch (iso_level) {
case IsolationLevel::REPEATABLE_READ:
if (lr->lock_mode_ == LockMode::SHARED || lr->lock_mode_ == LockMode::EXCLUSIVE) {
txn->SetState(TransactionState::SHRINKING);
// LOG_DEBUG("txn:%d be set SHRINGKING", txn->GetTransactionId());
}
break;
case IsolationLevel::READ_COMMITTED:
case IsolationLevel::READ_UNCOMMITTED:
if (lr->lock_mode_ == LockMode::EXCLUSIVE) {
txn->SetState(TransactionState::SHRINKING);
}
break;
default:
UNREACHABLE("wrong IsolationLevel");
}

RemoveFromTxnTableLockSet(txn, lr->lock_mode_, oid);
// LOG_DEBUG("txn:%d UnlockTable %d", txn->GetTransactionId(), oid);
lrq->request_queue_.erase(iter);
delete lr;
lrq->cv_.notify_all();//资源释放,cv_进行notify
return true;
}
}

ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD, true);
}
  • LockRow(Transaction, LockMode, TableOID, RID)
  • UnlockRow(Transaction, TableOID, RID, force)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    auto LockManager::LockRow(Transaction *txn, LockMode lock_mode, const table_oid_t &oid, const RID &rid) -> bool {
    //row只能加S和E锁
    if (lock_mode != LockMode::SHARED && lock_mode != LockMode::EXCLUSIVE) {
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::ATTEMPTED_INTENTION_LOCK_ON_ROW, true);
    }

    auto txn_state = txn->GetState();
    auto iso_level = txn->GetIsolationLevel();
    if (txn_state == TransactionState::COMMITTED || txn_state == TransactionState::ABORTED) {
    return false;
    }
    switch (iso_level) {
    case IsolationLevel::REPEATABLE_READ:
    //REPEATABLE_READ级别下SHRINKING阶段不可以加锁
    if (txn_state == TransactionState::SHRINKING) {
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_ON_SHRINKING, true);//抛出异常
    }
    break;
    case IsolationLevel::READ_COMMITTED:
    //READ_COMMITTED级别下SHRINKING阶段只能给row加SHARED锁
    if (txn_state == TransactionState::SHRINKING) {
    if (lock_mode != LockMode::SHARED) {
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_ON_SHRINKING, true);//抛出异常
    }
    }
    break;
    case IsolationLevel::READ_UNCOMMITTED:
    //READ_UNCOMMITTED级别下SHRINKING阶段不能加锁
    if (txn_state == TransactionState::SHRINKING) {
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_ON_SHRINKING, true);//抛出异常
    }
    //READ_UNCOMMITTED级别下,只能给row加X锁
    if (lock_mode != LockMode::EXCLUSIVE) {
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::LOCK_SHARED_ON_READ_UNCOMMITTED, true);//抛出异常
    }
    break;
    default:
    UNREACHABLE("wrong IsolationLevel");
    }

    if (!CheckAppropriateLockOnTable(txn, oid, lock_mode)) {//查看该row的Table是否加锁
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::TABLE_LOCK_NOT_PRESENT, true);
    }

    row_lock_map_latch_.lock();
    if (row_lock_map_.count(rid) == 0) {
    row_lock_map_[rid] = std::make_shared<LockRequestQueue>();
    }
    auto lrq = row_lock_map_[rid];
    std::unique_lock<std::mutex> lock(lrq->latch_);
    row_lock_map_latch_.unlock();

    // 检查是否是一次锁升级(S->X)
    bool upgrade = false;
    for (auto iter = lrq->request_queue_.begin(); iter != lrq->request_queue_.end(); iter++) {
    auto lr = *iter;
    if (lr->txn_id_ == txn->GetTransactionId()) {
    if (lr->lock_mode_ == lock_mode) { // 重复的锁
    return true;
    }
    if (lrq->upgrading_ != INVALID_TXN_ID) { // 抛出 UPGRADE_CONFLICT 异常
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::UPGRADE_CONFLICT, true);
    }
    if (!CanLockUpgrade(lr->lock_mode_, lock_mode)) { // 抛 INCOMPATIBLE_UPGRADE 异常
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::INCOMPATIBLE_UPGRADE, true);
    }

    lrq->upgrading_ = txn->GetTransactionId();
    lrq->request_queue_.erase(iter);
    RemoveFromTxnRowLockSet(txn, lr->lock_mode_, oid, rid);
    delete lr;
    lrq->request_queue_.push_back(new LockRequest(txn->GetTransactionId(), lock_mode, oid, rid));
    upgrade = true;
    break;
    }
    }

    if (!upgrade) {
    lrq->request_queue_.push_back(new LockRequest(txn->GetTransactionId(), lock_mode, oid, rid));
    }

    while (!CanTxnTakeLock(txn, lock_mode, lrq)) {
    lrq->cv_.wait(lock);
    // LOG_DEBUG("txn:%d wake", txn->GetTransactionId());
    // 死锁检测ABORT该事务 或者 手动ABORT该事务
    if (txn->GetState() == TransactionState::ABORTED) {
    // LOG_DEBUG("txn:%d ABORT", txn->GetTransactionId());
    // 移除该事务对该资源的request
    for (auto iter = lrq->request_queue_.begin(); iter != lrq->request_queue_.end(); iter++) {
    auto lr = *iter;
    if (lr->txn_id_ == txn->GetTransactionId()) {
    lrq->request_queue_.erase(iter);
    delete lr;
    break;
    }
    }
    lrq->cv_.notify_all();
    return false;
    }
    }
    AddIntoTxnRowLockSet(txn, lock_mode, oid, rid);
    // LOG_DEBUG("txn:%d LockRow oid:%d rid:%d:%d lock_mode:%d", txn->GetTransactionId(), oid, rid.GetPageId(),
    // rid.GetSlotNum(), lock_mode);
    return true;
    }

    auto LockManager::UnlockRow(Transaction *txn, const table_oid_t &oid, const RID &rid, bool force) -> bool {
    row_lock_map_latch_.lock();
    if (row_lock_map_.count(rid) == 0) {
    row_lock_map_latch_.unlock();
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD, true);//该row未加锁抛出异常
    }
    auto lrq = row_lock_map_[rid];
    std::unique_lock<std::mutex> lock(lrq->latch_);
    row_lock_map_latch_.unlock();

    for (auto iter = lrq->request_queue_.begin(); iter != lrq->request_queue_.end(); ++iter) {
    auto lr = *iter;
    if (lr->granted_ && lr->txn_id_ == txn->GetTransactionId()) {
    if (!force) {
    auto iso_level = txn->GetIsolationLevel();
    switch (iso_level) {
    case IsolationLevel::REPEATABLE_READ:
    if (lr->lock_mode_ == LockMode::SHARED || lr->lock_mode_ == LockMode::EXCLUSIVE) {
    txn->SetState(TransactionState::SHRINKING);
    // LOG_DEBUG("txn:%d be set SHRINGKING", txn->GetTransactionId());
    }
    break;
    case IsolationLevel::READ_COMMITTED:
    case IsolationLevel::READ_UNCOMMITTED:
    if (lr->lock_mode_ == LockMode::EXCLUSIVE) {
    txn->SetState(TransactionState::SHRINKING);
    }
    break;
    default:
    UNREACHABLE("wrong IsolationLevel");
    }
    }
    RemoveFromTxnRowLockSet(txn, lr->lock_mode_, oid, rid);
    // LOG_DEBUG("txn:%d UnlockRow oid:%d rid:%d:%d", txn->GetTransactionId(), oid, rid.GetPageId(),
    // rid.GetSlotNum());
    lrq->request_queue_.erase(iter);
    delete lr;
    lrq->cv_.notify_all();
    return true;
    }
    }
    ABORT_FOR_REASON_DIRECTLY_OR_NOT(AbortReason::ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD, true);
    }

Task #2 - Deadlock Detection

锁管理器应该在后台线程中运行死锁检测,定期构建等待图并根据需要中止事务以消除死锁。

1
2
3
4
5
6
7
8
9
10
11
12
//给LockManger添加死锁检测所需的数据成员
class LockManager {
std::atomic<bool> enable_cycle_detection_;
std::thread *cycle_detection_thread_;
/** Waits-for graph representation. */
std::map<txn_id_t, std::set<txn_id_t>> waits_for_;
// std::unordered_map<txn_id_t, int> node_value_;
// std::vector<txn_id_t> route_;
std::vector<txn_id_t> stk_;
std::unordered_map<txn_id_t, bool> in_stk_;
std::unordered_map<txn_id_t, bool> has_search_;
};

开启死锁检测的后台线程
img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
void LockManager::RunCycleDetection() {
while (enable_cycle_detection_) {
std::this_thread::sleep_for(cycle_detection_interval);
{
waits_for_.clear();
BuildGraph();//构建等待图
// PrintGraph();
while (true) {
stk_.clear();
in_stk_.clear();
has_search_.clear();
txn_id_t abort_tid;
if (HasCycle(&abort_tid)) {//判断是否有环,如果有获取abort_id
// LOG_DEBUG("abort_tid:%d", abort_tid);
auto txn = txn_manager_->GetTransaction(abort_tid);
txn->SetState(TransactionState::ABORTED);
RemoveAllAboutAbortTxn(abort_tid);//删除有向图中该abort_tid的所有入边和出边
WakeAbortedTxn(abort_tid);//该abort_tid的事务所占有的资源的cv_进行notify_all
// LOG_DEBUG("dead_detect notify all");
} else {
break;
}
}
}
}
}

//HasCycle函数判断是否有环,并且返回环中最大的txn_id
auto LockManager::HasCycle(txn_id_t *txn_id) -> bool {
return std::any_of(waits_for_.begin(), waits_for_.end(),
[this, txn_id](const std::pair<txn_id_t, std::set<txn_id_t>> &p) {
auto k = p.first;
if (!this->has_search_[k] && DFS(k)) {
auto iter = std::find(this->stk_.begin(), this->stk_.end() - 1, this->stk_.back());
*txn_id = -1;
while (iter != this->stk_.end()) {
if (*iter > *txn_id) {
*txn_id = *iter;
}
++iter;
}
this->stk_.clear();
this->in_stk_.clear();
this->has_search_.clear();
return true;
}
this->stk_.clear();
this->in_stk_.clear();
return false;
});
}

//DFS函数判断是否有环,txn_id为遍历的起始点
//有环返回true 无环返回false
auto LockManager::DFS(txn_id_t txn_id) -> bool {
has_search_[txn_id] = true;
stk_.push_back(txn_id);
in_stk_[txn_id] = true;
for (auto id : waits_for_[txn_id]) {
if (!has_search_[id]) {
return DFS(id);
}
if (in_stk_[id]) {
stk_.push_back(id);
return true;
}
}
stk_.pop_back();
in_stk_[txn_id] = false;
return false;
}

Task #3 - Concurrent Query Execution

TransactionManager的Abort函数需要恢复Table和对应Index的原始状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void TransactionManager::Commit(Transaction *txn) {
// Release all the locks.
ReleaseLocks(txn);

txn->SetState(TransactionState::COMMITTED);
std::unique_lock<std::shared_mutex> guard(txn_map_mutex_);
txn_map_.erase(txn->GetTransactionId());
}

void TransactionManager::Abort(Transaction *txn) {
/* TODO: revert all the changes in write set */
while (!txn->GetWriteSet()->empty()) {
auto twr = txn->GetWriteSet()->back();
if (twr.wtype_ == WType::INSERT) {
auto tuple_meta = twr.table_heap_->GetTupleMeta(twr.rid_);
tuple_meta.is_deleted_ = true;
twr.table_heap_->UpdateTupleMeta(tuple_meta, twr.rid_);
} else if (twr.wtype_ == WType::DELETE) {
auto tuple_meta = twr.table_heap_->GetTupleMeta(twr.rid_);
tuple_meta.is_deleted_ = false;
twr.table_heap_->UpdateTupleMeta(tuple_meta, twr.rid_);
}
txn->GetWriteSet()->pop_back();
}

while (!txn->GetIndexWriteSet()->empty()) {
auto iwr = txn->GetIndexWriteSet()->back();
if (iwr.wtype_ == WType::INSERT) {
iwr.catalog_->GetIndex(iwr.index_oid_)->index_->DeleteEntry(iwr.tuple_, iwr.rid_, txn);
} else if (iwr.wtype_ == WType::DELETE) {
iwr.catalog_->GetIndex(iwr.index_oid_)->index_->InsertEntry(iwr.tuple_, iwr.rid_, txn);
}
txn->GetIndexWriteSet()->pop_back();
}

ReleaseLocks(txn);

txn->SetState(TransactionState::ABORTED);
std::unique_lock<std::shared_mutex> guard(txn_map_mutex_);
txn_map_.erase(txn->GetTransactionId());
}

SeqScanExecutor修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
void SeqScanExecutor::Init() {
auto catalog = exec_ctx_->GetCatalog();
table_info_ = catalog->GetTable(plan_->table_oid_);
auto txn = exec_ctx_->GetTransaction();
auto iso_level = txn->GetIsolationLevel();
try {
if (exec_ctx_->IsDelete()) {//当前进行的sql操作是DELETE或者UPDATE
auto res =
exec_ctx_->GetLockManager()->LockTable(txn, LockManager::LockMode::INTENTION_EXCLUSIVE, plan_->table_oid_);
if (!res) {
throw ExecutionException("SeqScanExecutor LockTable Failed");
}
} else if (!txn->IsTableIntentionExclusiveLocked(plan_->table_oid_) && // 避免反向升级
(iso_level == IsolationLevel::READ_COMMITTED || iso_level == IsolationLevel::REPEATABLE_READ)) {
auto res =
exec_ctx_->GetLockManager()->LockTable(txn, LockManager::LockMode::INTENTION_SHARED, plan_->table_oid_);
if (!res) {
throw ExecutionException("SeqScanExecutor LockTable Failed");
}
}
} catch (TransactionAbortException &exception) {
throw ExecutionException("SeqScanExecutor LockTable Failed");
}
iterator_ = std::make_unique<TableIterator>(table_info_->table_->MakeEagerIterator());
}

auto SeqScanExecutor::Next(Tuple *tuple, RID *rid) -> bool {
std::pair<TupleMeta, Tuple> pair;
auto txn = exec_ctx_->GetTransaction();
auto iso_level = txn->GetIsolationLevel();
while (!iterator_->IsEnd()) {
pair = iterator_->GetTuple();
try {
if (exec_ctx_->IsDelete()) {
auto res = exec_ctx_->GetLockManager()->LockRow(txn, LockManager::LockMode::EXCLUSIVE, plan_->table_oid_, pair.second.GetRid());
if (!res) {
throw ExecutionException("SeqScanExecutor LockRow Failed");
}
} else if (!txn->IsRowExclusiveLocked(plan_->table_oid_, pair.second.GetRid()) && // 避免反向升级
(iso_level == IsolationLevel::READ_COMMITTED || iso_level == IsolationLevel::REPEATABLE_READ)) {
auto res = exec_ctx_->GetLockManager()->LockRow(txn, LockManager::LockMode::SHARED, plan_->table_oid_, pair.second.GetRid());
if (!res) {
throw ExecutionException("SeqScanExecutor LockRow Failed");
}
}
} catch (TransactionAbortException &exception) {
throw ExecutionException("SeqScanExecutor LockRow Failed");
}

if (iterator_->GetTuple().first.is_deleted_ ||
(plan_->filter_predicate_ &&
plan_->filter_predicate_->Evaluate(&pair.second, table_info_->schema_)
.CompareEquals(ValueFactory::GetBooleanValue(false)) == CmpBool::CmpTrue)) {
if (exec_ctx_->IsDelete() ||
(iso_level == IsolationLevel::READ_COMMITTED || iso_level == IsolationLevel::REPEATABLE_READ)) {
try {
auto res = exec_ctx_->GetLockManager()->UnlockRow(txn, plan_->table_oid_, pair.second.GetRid(), true);
if (!res) {
throw ExecutionException("SeqScanExecutor Force UnLockRow Failed");
}
} catch (TransactionAbortException &exception) {
throw ExecutionException("SeqScanExecutor Force UnLockRow Failed");
}
}
++(*iterator_);
continue;
}

if (!exec_ctx_->IsDelete() && iso_level == IsolationLevel::READ_COMMITTED) {
try {
auto res = exec_ctx_->GetLockManager()->UnlockRow(txn, plan_->table_oid_, pair.second.GetRid());
if (!res) {
throw ExecutionException("SeqScanExecutor UnLockRow Failed");
}
} catch (TransactionAbortException &exception) {
throw ExecutionException("SeqScanExecutor UnLockRow Failed");
}
}
++(*iterator_);
*tuple = std::move(pair.second);
*rid = tuple->GetRid();
return true;
}
if (!exec_ctx_->IsDelete() && iso_level == IsolationLevel::READ_COMMITTED) {
try {
auto res = exec_ctx_->GetLockManager()->UnlockTable(txn, plan_->table_oid_);
if (!res) {
throw ExecutionException("SeqScanExecutor UnLockTable Failed");
}
} catch (TransactionAbortException &exception) {
throw ExecutionException("SeqScanExecutor UnLockTable Failed");
}
}
return false;
}

InsertExecutor修改,给table加IE锁,并且table和index插入记录需要被保存下来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void InsertExecutor::Init() {
child_executor_->Init();
auto cata_log = exec_ctx_->GetCatalog();
table_info_ = cata_log->GetTable(plan_->table_oid_);
index_infos_ = cata_log->GetTableIndexes(table_info_->name_);
successful_ = false;
auto txn = exec_ctx_->GetTransaction();
try {
auto res =
exec_ctx_->GetLockManager()->LockTable(txn, LockManager::LockMode::INTENTION_EXCLUSIVE, plan_->table_oid_);
if (!res) {
throw ExecutionException("InsertExecutor LockTable Failed");
}
} catch (TransactionAbortException &exception) {
throw ExecutionException("InsertExecutor LockTable Failed");
}
}

auto InsertExecutor::Next(Tuple *tuple, RID *rid) -> bool {
TupleMeta meta;
if (successful_) {
return false;
}
meta.insert_txn_id_ = INVALID_TXN_ID;
meta.delete_txn_id_ = INVALID_TXN_ID;
meta.is_deleted_ = false;
auto count = 0;
while (child_executor_->Next(tuple, rid)) {
auto tuple_rid = table_info_->table_->InsertTuple(meta, *tuple, exec_ctx_->GetLockManager(), exec_ctx_->GetTransaction(), table_info_->oid_);
if (tuple_rid == std::nullopt) {
continue;
}
auto twr = TableWriteRecord(table_info_->oid_, tuple_rid.value(), table_info_->table_.get());
twr.wtype_ = WType::INSERT;
exec_ctx_->GetTransaction()->GetWriteSet()->push_back(twr);

for (auto index_info : index_infos_) {
auto key = tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs());
index_info->index_->InsertEntry(key, *tuple_rid, exec_ctx_->GetTransaction());
auto iwr = IndexWriteRecord(tuple_rid.value(), table_info_->oid_, WType::INSERT, key, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->GetIndexWriteSet()->push_back(iwr);
}
++count;
}
std::vector<Value> values;
values.emplace_back(TypeId::INTEGER, count);
*tuple = Tuple(values, &GetOutputSchema());
successful_ = true;
return true;
}

DeleteExecutor修改,并且table和index删除记录需要被保存下来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void DeleteExecutor::Init() {
child_executor_->Init();
successful_ = false;
auto catalog = exec_ctx_->GetCatalog();
table_info_ = catalog->GetTable(plan_->TableOid());
index_infos_ = catalog->GetTableIndexes(table_info_->name_);
}

auto DeleteExecutor::Next(Tuple *tuple, RID *rid) -> bool {
TupleMeta tuple_meta;
if (successful_) {
return false;
}
tuple_meta.delete_txn_id_ = INVALID_TXN_ID;
tuple_meta.insert_txn_id_ = INVALID_TXN_ID;
tuple_meta.is_deleted_ = true;
auto count = 0;
while (child_executor_->Next(tuple, rid)) {
table_info_->table_->UpdateTupleMeta(tuple_meta, *rid);

auto twr = TableWriteRecord(table_info_->oid_, *rid, table_info_->table_.get());
twr.wtype_ = WType::DELETE;
exec_ctx_->GetTransaction()->GetWriteSet()->push_back(twr);

for (auto index_info : index_infos_) {
auto key = tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs());
index_info->index_->DeleteEntry(key, *rid, exec_ctx_->GetTransaction());

auto iwr = IndexWriteRecord(*rid, table_info_->oid_, WType::DELETE, key, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->GetIndexWriteSet()->push_back(iwr);
}
count++;
}
std::vector<Value> values;
values.emplace_back(TypeId::INTEGER, count);
*tuple = Tuple(values, &GetOutputSchema());
successful_ = true;
return true;
}

五种细粒度锁IS IX S SIX X相容矩阵

IS IX S SIX X
IS YES YES YES YES NO
IX YES YES NO NO NO
S YES NO YES NO NO
SIX YES NO NO NO NO
X NO NO NO NO NO

本地测试通过:
img
img
img
img
img

线上测试通过:
img

SSH免密码失败原因定位分析

  1. 服务器B上.ssh目录的权限必须是700
  2. 服务器B上.authorized_keys文件权限必须是600或者644
  3. 服务器B上用户家目录文件权限必须是700,比如用户名是aischang,则/home/aischang这个目录权限必须是700

如果不是700,在服务器A上查看/var/log/secure文件会报错

原因:

sshd为了安全,对属主的目录和文件权限有所要求。如果权限不对,则ssh的免密码登陆不生效。

  1. 服务器B上SELinux关闭为disabled,可以使用命令修改setenforce 0 ,查看状态的命令为getenforce或者 查看/etc/selinux/config 文件中是否是disabled
  1. 有可能是StrictModes问题

    编辑 vi /etc/ssh/sshd_config

    找到#StrictModes yes改成StrictModes no
  1. 有可能是PubkeyAuthentication问题

    编辑 vi /etc/ssh/sshd_config

    找到PubkeyAuthentication改成yes

如果还不行,可以在服务器A上用ssh -vvv 机器B的ip 查看详情,根据输出内容具体问题具体分析了

参考链接: https://juejin.cn/s/ssh%E5%85%8D%E5%AF%86%E7%99%BB%E5%BD%95%E5%A4%B1%E8%B4%A5%E5%8E%9F%E5%9B%A0

Query Processing Model

OLAP与OLTP数据库

  • OLAP数据库架构将数据读取优先于数据写入操作。可以快速地对大量数据执行复杂的查询
  • OLTP数据库架构将数据写入优先于数据读取操作。它针对写入密集型工作负载进行了优化

example:

以一家大型零售公司为例。该公司有一个庞大的数据库,用于跟踪销售、库存、客户数据等

  • 使用OLTP数据库实时处理交易、更新库存水平和管理客户账户
  • 使用OLAP数据库来分析有关销售趋势、库存水平、客户人口统计等

DBMS的Processing Model定义了系统如何执行一个query plan,目前主要有三种模型

  • Iterator Model
  • Materialization Model
  • Vectorized/Batch Model

Iterator Model

query plan 中的每步 operator 都实现一个 next 函数,每次调用时,operator 返回一个 tuple 或者 null,后者表示数据已经遍历完毕。operator 本身实现一个循环,每次调用其 child operators 的 next 函数,从它们那边获取下一条数据供自己操作,这样整个 query plan 就被从上至下地串联起来,它也称为 Volcano/Pipeline Model:
img
Iterator 几乎被用在每个 DBMS 中,包括 sqlite、MySQL、PostgreSQL 等等,其它需要注意的是:

  • 有些 operators 会等待 children 返回所有 tuples 后才执行,如 Joins, Subqueries 和 Order By
  • Output Control 在 Iterator Model 中比较容易,如 Limit,只按需调用 next 即可

Materialization Model

每个 operator 处理完所有输入后,将所有结果一次性输出,DBMS 会将一些参数传递到 operator 中防止处理过多的数据,这是一种从下至上的思路,示意如下:
img
materialization model:

  • 更适合 OLTP 场景,因为后者通常指需要处理少量的 tuples,这样能减少不必要的执行、调度成本
  • 不太适合会产生大量中间结果的 OLAP 查询

Vectorization Model

Vectorization Model 是 Iterator 与 Materialization Model 折衷的一种模型:

  • 每个 operator 实现一个 next 函数,但每次 next 调用返回一批 tuples,而不是单个 tuple
  • operator 内部的循环每次也是一批一批 tuples 地处理
  • batch 的大小可以根据需要改变(hardware、query properties)

    img

vectorization model 是 OLAP 查询的理想模型:

  • 极大地减少每个 operator 的调用次数
  • 允许 operators 使用 vectorized instructions (SIMD) 来批量处理 tuples

BACKGROUND:QUERY PROCESSING

BusTub架构如下
img

note:

  • BusTub只支持SQL的一个小子集,可以通过tests/sql中的SQLLogicTest文件来查看它所支持的SQL语句
  • 如果你使用Clion来运行Bustub shell,添加–disable-tty参数
  • SQL语句使用;结尾
  • BusTub只支持INT和VARCHAR(n)类型,字符串使用单引号
  • Bustub使用Iterator Porcessing Model

Inspecting SQL query plans

BusTub支持EXPLAIN来打印SQL查询计划
img
EXPLAIN会展示query processing这一层的转换过程 Parser -> Binder -> Planner -> Optimizer

Parser解析SQL语句生成Binder AST语法树,接着生成query plan,然后由Optimizer优化query plan生成executor树

Task#1 Access Method Executors

我们并不需要关心query plan是如何创建的;但有必要理解query plan的组成结构:这是棵树,每个plan节点都对应具体的算子,Bustub采用iterator procesing model,也就是Top-to-Bottom的火山模型,因此query plan的执行就是从根节点开始,将plan节点转换为对应的算子

Plan节点的类型如下
img
还是有必要阅读一些相关代码

表Table的元信息

其中TableHeap代表磁盘上的一张表,是一个doubly-linked of pages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct TableInfo {
/** The table schema */
Schema schema_;
/** The table name */
const std::string name_;
/** An owning pointer to the table heap */
std::unique_ptr<TableHeap> table_;
/** The table OID */
const table_oid_t oid_;
};

class TableHeap {
page_id_t first_page_id_{INVALID_PAGE_ID};
page_id_t last_page_id_{INVALID_PAGE_ID};
};

索引Index的元信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct IndexInfo {
/** The schema for the index key */
Schema key_schema_;
/** The name of the index */
std::string name_;
/** An owning pointer to the index */
std::unique_ptr<Index> index_;
/** The unique OID for the index */
index_oid_t index_oid_;
/** The name of the table on which the index is created */
std::string table_name_;
/** The size of the index key, in bytes */
const size_t key_size_;
};

catalog

记录所有TableInfo和IndexInfo

1
2
3
4
5
6
7
8
9
10
class Catalog {
std::unordered_map<table_oid_t, std::unique_ptr<TableInfo>> tables_;
/** Map table name -> table identifiers. */
std::unordered_map<std::string, table_oid_t> table_names_;

std::unordered_map<index_oid_t, std::unique_ptr<IndexInfo>> indexes_;

/** Map table name -> index names -> index identifiers. */
std::unordered_map<std::string, std::unordered_map<std::string, index_oid_t>> index_names_;
};

SeqScanExecutor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//添加数据成员
class SeqScanExecutor {
const SeqScanPlanNode *plan_;//对应的SeqScanPlanNode
TableInfo *table_info_;//扫描的table
std::unique_ptr<TableIterator> iterator_;//TableIterator
};

void SeqScanExecutor::Init() {
auto catalog = exec_ctx_->GetCatalog();
table_info_ = catalog->GetTable(plan_->table_oid_);//获取TableInfo
iterator_ = std::make_unique<TableIterator>(table_info_->table_->MakeIterator());//获取对应Table的TableIterator
}

auto SeqScanExecutor::Next(Tuple *tuple, RID *rid) -> bool {
std::pair<TupleMeta, Tuple> pair;
while (!iterator_->IsEnd()) {//如果未遍历完Table
pair = iterator_->GetTuple();//获取一个TupleMeta-Tuple pair
if (pair.first.is_deleted_) {//如果该TupleMeta标记Tuple为已删除
++(*iterator_);//跳过
continue;
}
if (plan_->filter_predicate_) {//如果该算子对应的SeqScanPlanNode含有filter_predicate表达式
auto res = plan_->filter_predicate_->Evaluate(&pair.second, table_info_->schema_);
if (!(!res.IsNull() && res.GetAs<bool>())) {//如果该Tuple不满足该filter_predicate表达式
++(*iterator_);//跳过
continue;
}
}
++(*iterator_);//迭代器前进一步
*tuple = std::move(pair.second);
*rid = tuple->GetRid();
return true;
}
return false;
}

InsertExecutor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//添加数据成员
class InsertExecutor {
const InsertPlanNode *plan_;//对应的InsertPlanNode
bool successful_;//是否插入成功
TableInfo *table_info_;//插入的Table
std::vector<IndexInfo *> index_infos_;//该Table对应的Index,如果Table变更,索引也需更改
std::unique_ptr<AbstractExecutor> child_executor_;//一个孩子executor
};

void InsertExecutor::Init() {
child_executor_->Init();
auto cata_log = exec_ctx_->GetCatalog();
table_info_ = cata_log->GetTable(plan_->table_oid_);
index_infos_ = cata_log->GetTableIndexes(table_info_->name_);
successful_ = false;
}

auto InsertExecutor::Next(Tuple *tuple, RID *rid) -> bool {
TupleMeta meta;
if (successful_) {
return false;
}
meta.insert_txn_id_ = INVALID_TXN_ID;
meta.delete_txn_id_ = INVALID_TXN_ID;
meta.is_deleted_ = false;
auto count = 0;
while (child_executor_->Next(tuple, rid)) {//从孩子executor获取tuple
auto tuple_rid = table_info_->table_->InsertTuple(meta, *tuple, exec_ctx_->GetLockManager(), exec_ctx_->GetTransaction(), table_info_->oid_);//插入Table
if (tuple_rid == std::nullopt) {
continue;
}
//更新该Table的所有索引结构
for (auto index_info : index_infos_) {
auto key = tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs());
index_info->index_->InsertEntry(key, *tuple_rid, exec_ctx_->GetTransaction());
}
++count;//插入记录count++
}
std::vector<Value> values;
values.emplace_back(TypeId::INTEGER, count);
*tuple = Tuple(values, &GetOutputSchema());//入参tuple返回插入的记录的数量
successful_ = true;
return true;
}

UpdateExecutor实现

这里实现的思路就是将旧的Tuple删除,插入新的Tuple

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//添加数据成员
class UpdateExecutor {
const UpdatePlanNode *plan_;//对应的UpdatePlanNode
/** Metadata identifying the table that should be updated */
const TableInfo *table_info_;//要update的table
/** The child executor to obtain value from */
std::unique_ptr<AbstractExecutor> child_executor_;//孩子executor

std::vector<IndexInfo *> index_infos_;//该Table上的所有index

bool successful_;//更新是否成功
};

void UpdateExecutor::Init() {
child_executor_->Init();
successful_ = false;
auto cata_log = exec_ctx_->GetCatalog();
table_info_ = cata_log->GetTable(plan_->TableOid());
index_infos_ = cata_log->GetTableIndexes(table_info_->name_);
}

auto UpdateExecutor::Next(Tuple *tuple, RID *rid) -> bool {
TupleMeta tuple_meta;
if (successful_) {
return false;
}
tuple_meta.delete_txn_id_ = INVALID_TXN_ID;
tuple_meta.insert_txn_id_ = INVALID_TXN_ID;
auto count = 0;
while (child_executor_->Next(tuple, rid)) {//从孩子executor拿到需要更新的tuple
// 删除tuple
tuple_meta.is_deleted_ = true;
table_info_->table_->UpdateTupleMeta(tuple_meta, *rid);//将Table中相同rid的Tuple标记为删除
for (auto index_info : index_infos_) {//将Table对应的Index中与该Tuple对应的key删除
auto key = tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs());
index_info->index_->DeleteEntry(key, *rid, exec_ctx_->GetTransaction());
}
// 计算新的tuple
std::vector<Value> values;
for (auto &expresssion : plan_->target_expressions_) {
values.emplace_back(expresssion->Evaluate(tuple, child_executor_->GetOutputSchema()));
}
auto new_tuple = Tuple(values, &child_executor_->GetOutputSchema());
// 插入新的tuple
tuple_meta.is_deleted_ = false;
auto tuple_rid = table_info_->table_->InsertTuple(tuple_meta, new_tuple, exec_ctx_->GetLockManager(), exec_ctx_->GetTransaction(), table_info_->oid_);
if (tuple_rid == std::nullopt) {
continue;
}
for (auto index_info : index_infos_) {
auto key = new_tuple.KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs());
index_info->index_->InsertEntry(key, *tuple_rid, exec_ctx_->GetTransaction());//Index插入key
}
++count;
}
std::vector<Value> values;
values.emplace_back(TypeId::INTEGER, count);
*tuple = {values, &GetOutputSchema()};//入参tuple返回update的参数个数
successful_ = true;
return true;
}

DeleteExecutor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class DeleteExecutor {
const DeletePlanNode *plan_;//对应的DeletePlanNode
/** The child executor from which RIDs for deleted tuples are pulled */
std::unique_ptr<AbstractExecutor> child_executor_;//孩子DeleteExecutor
bool successful_;//判断删除是否成功
TableInfo *table_info_;//删除的Table
std::vector<IndexInfo *> index_infos_;//Table对应的所有Index
};

void DeleteExecutor::Init() {
child_executor_->Init();
successful_ = false;
auto catalog = exec_ctx_->GetCatalog();
table_info_ = catalog->GetTable(plan_->TableOid());
index_infos_ = catalog->GetTableIndexes(table_info_->name_);
}

auto DeleteExecutor::Next(Tuple *tuple, RID *rid) -> bool {
TupleMeta tuple_meta;
if (successful_) {
return false;
}
tuple_meta.delete_txn_id_ = INVALID_TXN_ID;
tuple_meta.insert_txn_id_ = INVALID_TXN_ID;
tuple_meta.is_deleted_ = true;
auto count = 0;
while (child_executor_->Next(tuple, rid)) {//从孩子节点获取tuple
table_info_->table_->UpdateTupleMeta(tuple_meta, *rid);//删除Table中对应的Tuple,标记为删除
for (auto index_info : index_infos_) {//删除该Table上所有Index中与该Tuple对应的key
auto key = tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs());
index_info->index_->DeleteEntry(key, *rid, exec_ctx_->GetTransaction());
}
count++;//删除计数count++
}
std::vector<Value> values;
values.emplace_back(TypeId::INTEGER, count);
*tuple = Tuple(values, &GetOutputSchema());//入参tuple返回删除的tuple数量
successful_ = true;
return true;
}

IndexScanExecutor实现

SELECT FROM

ORDER BY 中的ORDER BY会被转为IndexScan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//添加数据成员
class IndexScanExecutor {
const IndexScanPlanNode *plan_;//对应的IndexScanPlanNode
IndexInfo *index_info_;//IndexInfo
TableInfo *table_info_;//TableInfo
BPlusTreeIndexForTwoIntegerColumn *index_;//扫描的Index
std::unique_ptr<BPlusTreeIndexIteratorForTwoIntegerColumn> index_iterator_;//扫描的Index的IndexIterator
};

void IndexScanExecutor::Init() {
auto catalog = exec_ctx_->GetCatalog();
index_info_ = catalog->GetIndex(plan_->index_oid_);//获取对应的Index_Info
table_info_ = catalog->GetTable(index_info_->table_name_);//获取对应的Table_Info
index_ = dynamic_cast<BPlusTreeIndexForTwoIntegerColumn *>(index_info_->index_.get());
index_iterator_ = std::make_unique<BPlusTreeIndexIteratorForTwoIntegerColumn>(index_->GetBeginIterator());
}

auto IndexScanExecutor::Next(Tuple *tuple, RID *rid) -> bool {
while (!index_iterator_->IsEnd()) {//遍历Index
auto map = *(*index_iterator_);
*rid = map.second;//拿到rid
if (!table_info_->table_->GetTupleMeta(*rid).is_deleted_) { // 未被删除
index_iterator_->operator++();
*tuple = table_info_->table_->GetTuple(*rid).second;//入参tuple返回rid指向的tuple
return true;
}
index_iterator_->operator++();
}
return false;
}

这里通过SQLLogicTests#1 to #6
img
img
img
img
img
img

Task#2 Aggregation & Join Executors

AggregationExecutor实现

AggregationExecutor用来支持以下的sql查询,第四条sql语句的DISTINCT相当于GROUP BY

AggregationExecutor不需要处理HAVING语句,planner会让AggregationPlanNode跟着一个FilterPlanNode
img

补充完成SimpleAggregationHashTable,其中哈希表的键AggregateKey就是GROUP BY的columns

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class SimpleAggregationHashTable {
private:
/** The hash table is just a map from aggregate keys to aggregate values */
std::unordered_map<AggregateKey, AggregateValue> ht_{};
/** The aggregate expressions that we have */
const std::vector<AbstractExpressionRef> &agg_exprs_;
/** The types of aggregations that we have */
const std::vector<AggregationType> &agg_types_;
public:
auto GenerateInitialAggregateValue() -> AggregateValue {
std::vector<Value> values{};
for (const auto &agg_type : agg_types_) {
switch (agg_type) {
case AggregationType::CountStarAggregate:
// Count start starts at zero.
values.emplace_back(ValueFactory::GetIntegerValue(0));
break;
case AggregationType::CountAggregate:
case AggregationType::SumAggregate:
case AggregationType::MinAggregate:
case AggregationType::MaxAggregate:
// Others starts at null.
values.emplace_back(ValueFactory::GetNullValueByType(TypeId::INTEGER));
break;
}
}
return {values};
}

void CombineAggregateValues(AggregateValue *result, const AggregateValue &input) {
for (uint32_t i = 0; i < agg_exprs_.size(); i++) {
switch (agg_types_[i]) {
case AggregationType::CountStarAggregate://count(*)统计null数量
result->aggregates_[i] = {INTEGER, result->aggregates_[i].GetAs<int32_t>() + 1};
break;
case AggregationType::CountAggregate://count()不统计null数量
if (input.aggregates_[i].IsNull()) {
break;
}
if (result->aggregates_[i].IsNull()) {
result->aggregates_[i] = {INTEGER, 1};
} else {
result->aggregates_[i] = {INTEGER, result->aggregates_[i].GetAs<int32_t>() + 1};
}
break;
case AggregationType::SumAggregate:
if (input.aggregates_[i].IsNull()) {
break;
}
if (result->aggregates_[i].IsNull()) {
result->aggregates_[i] = input.aggregates_[i];
} else {
result->aggregates_[i] = result->aggregates_[i].Add((input.aggregates_[i]));
}
break;
case AggregationType::MinAggregate:
if (input.aggregates_[i].IsNull()) {
break;
}
if (result->aggregates_[i].IsNull()) {
result->aggregates_[i] = input.aggregates_[i];
} else {
result->aggregates_[i] = result->aggregates_[i].Min(input.aggregates_[i]);
}
break;
case AggregationType::MaxAggregate:
if (input.aggregates_[i].IsNull()) {
break;
}
if (result->aggregates_[i].IsNull()) {
result->aggregates_[i] = input.aggregates_[i];
} else {
result->aggregates_[i] = result->aggregates_[i].Max(input.aggregates_[i]);
}
break;
}
}
}

void InsertCombine(const AggregateKey &agg_key, const AggregateValue &agg_val) {
if (ht_.count(agg_key) == 0) {
ht_.insert({agg_key, GenerateInitialAggregateValue()});
}
CombineAggregateValues(&ht_[agg_key], agg_val);
}

void Insert(const AggregateKey &agg_key, const AggregateValue &agg_val) { ht_.insert({agg_key, agg_val}); }

};


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class AggregationExecutor {
// 添加数据成员
const AggregationPlanNode *plan_;
/** The child executor that produces tuples over which the aggregation is computed */
std::unique_ptr<AbstractExecutor> child_;
/** Simple aggregation hash table */
SimpleAggregationHashTable aht_;
/** Simple aggregation hash table iterator */
std::unique_ptr<SimpleAggregationHashTable::Iterator> aht_iterator_;
};

void AggregationExecutor::Init() {
child_->Init();
Tuple tuple;
RID rid;
while (child_->Next(&tuple, &rid)) {//遍历孩子executor中所有的tuple
//构建AggregateKey 和 AggregateValue 插入哈希表
AggregateKey key = MakeAggregateKey(&tuple);
AggregateValue value = MakeAggregateValue(&tuple);
aht_.InsertCombine(key, value);
}
if (aht_.Begin() == aht_.End() && plan_->GetGroupBys().empty()) { // hash表为空,
AggregateKey key;
aht_.Insert(key, aht_.GenerateInitialAggregateValue());
}
aht_iterator_ = std::make_unique<SimpleAggregationHashTable::Iterator>(aht_.Begin());
}

auto AggregationExecutor::Next(Tuple *tuple, RID *rid) -> bool {
if ((*aht_iterator_) == aht_.End()) {
return false;
}
auto key = aht_iterator_->Key();
auto value = aht_iterator_->Val();
++(*aht_iterator_);//迭代器++
key.group_bys_.insert(key.group_bys_.end(), value.aggregates_.begin(), value.aggregates_.end());
*tuple = {key.group_bys_, &GetOutputSchema()};//key和value合并后由入参tuple返回
return true;
}

NestedLoopJoinExecutor实现

NestedLoopJoinExecutor将支持inner join和left join,使用simple nested loop join算法
img

NestedLoopJoin是流水线破坏者吗?

BusTub采用火山模型(iterator processing
model)执行算子。但是某些算子直到子算子提交所有元组的计算结果,才会解除阻塞。如Join、SubQueries、Ordering等,此类操作就被称为Pipeline Breaker。在Task2中,Aggregation、HashJoin都备注了是Pipeline Breaker,但NestedLoopJoin并没有这么说明,如果把它当做Pipeline Breaker,则无法通过测试,Spring2023要求NestedLoopJoin左子节点每次调用一次Next()方法,右子节点都需要Init()一次,因此并非Pipeline Breaker。这也意味着NestedLoopJoin的性能非常糟糕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class NestedLoopJoinExecutor { 
const NestedLoopJoinPlanNode *plan_;
std::unique_ptr<AbstractExecutor> left_executor_;//左孩子
std::unique_ptr<AbstractExecutor> right_executor_;//右孩子
std::vector<Tuple> right_tuples_;
int index_;
Tuple left_tuple_;
bool is_match_;//表示当前left_tuple_是否有匹配项
};

void NestedLoopJoinExecutor::Init() {
left_executor_->Init();
right_executor_->Init();
Tuple right_tuple;
RID right_rid;
//将右孩子的所有Tuple遍历出来放在right_tuples中
while (right_executor_->Next(&right_tuple, &right_rid)) {
right_tuples_.emplace_back(right_tuple);
}
index_ = 0;
is_match_ = false;
}

auto NestedLoopJoinExecutor::Next(Tuple *tuple, RID *rid) -> bool {
RID left_rid;
if (index_ != 0) {
// 上次右侧循环还未结束
if (NestedLoop(tuple, rid)) {
return true;
}
}
//获取左孩子的一个tuple存储在left_tuple_中
while (left_executor_->Next(&left_tuple_, &left_rid)) {
right_executor_->Init(); // no use 单纯为了通过测试
if (NestedLoop(tuple, rid)) {
return true;
}
}
return false;
}

auto NestedLoopJoinExecutor::NestedLoop(Tuple *tuple, RID *rid) -> bool {
while (index_ < static_cast<int>(right_tuples_.size())) {
if (plan_->predicate_) {
auto res = plan_->predicate_->EvaluateJoin(&left_tuple_, left_executor_->GetOutputSchema(), &right_tuples_[index_], right_executor_->GetOutputSchema());
if (!(!res.IsNull() && res.GetAs<bool>())) { // 不符合条件
index_++;
continue; // 过滤
}
}
// 符合条件
MergeTuple(tuple);
index_ = (index_ + 1) % right_tuples_.size();
is_match_ = (index_ != 0);
return true;
}

index_ = 0;
if (!is_match_ && plan_->GetJoinType() == JoinType::LEFT) {
// left join
std::vector<Value> values;
values.reserve(GetOutputSchema().GetColumnCount());
for (auto i = 0; i < static_cast<int>(left_executor_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(left_tuple_.GetValue(&left_executor_->GetOutputSchema(), i));
}
for (auto i = 0; i < static_cast<int>(right_executor_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(ValueFactory::GetNullValueByType(right_executor_->GetOutputSchema().GetColumn(i).GetType()));
}
*tuple = {values, &GetOutputSchema()};
return true;
}

is_match_ = false;
return false;
}

void NestedLoopJoinExecutor::MergeTuple(Tuple *tuple) {
// inner join
std::vector<Value> values;
values.reserve(GetOutputSchema().GetColumnCount());
for (auto i = 0; i < static_cast<int>(left_executor_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(left_tuple_.GetValue(&left_executor_->GetOutputSchema(), i));
}
for (auto i = 0; i < static_cast<int>(right_executor_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(right_tuples_[index_].GetValue(&right_executor_->GetOutputSchema(), i));
}
*tuple = {values, &GetOutputSchema()};
}

到这里为止,通过SQLLogicTests #7 to #12
img
img
img
img
img
img

HashJoinExecutor实现

你将要为HashJoinExecutor实现inner join和left join,使用hash join算法
img
和NestedLoopJoin相同,HashJoin要处理inner join和left join两种情况,而这就会影响HashJoin建表的选择———对于Left join,需要在右表不存在对应匹配时,返回将右表字段用NULL填充的记录。因此对Left Join,在创建哈希表时应该选择右表。

实现思路:

先遍历右表的所有tuple,收集右表的HashJoinKey,加入哈希表(键值对为HashJoinKey-tuple)

然后遍历左表的所有tuple,收集左表的HashJoinKey,在哈希表中查找是否有匹配的HashJoinKey,如果匹配成功,拼接

如果匹配失败,并且是left join,左表的tuple拼接null

拼接好的结果保存在result_中,由index_下标遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
 class HashJoinExecutor {
const HashJoinPlanNode *plan_;
std::unique_ptr<AbstractExecutor> left_child_;
std::unique_ptr<AbstractExecutor> right_child_;
std::unordered_map<HashJoinKey, std::vector<Tuple>> map_;
std::vector<Tuple> result_;
int index_;
};

void HashJoinExecutor::Init() {
left_child_->Init();
right_child_->Init();
Tuple right_tuple;
RID right_rid;
while (right_child_->Next(&right_tuple, &right_rid)) {
HashJoinKey key;
for (auto &expression : plan_->RightJoinKeyExpressions()) {
key.column_values_.emplace_back(expression->Evaluate(&right_tuple, right_child_->GetOutputSchema()));
}
// 加入哈希表
if (map_.count(key) != 0) {
map_[key].emplace_back(right_tuple);
} else {
map_[key] = {right_tuple};
}
}
// 遍历左侧查询,得到查询结果
Tuple left_tuple;
RID left_rid;
while (left_child_->Next(&left_tuple, &left_rid)) {
HashJoinKey key;
for (auto &expression : plan_->LeftJoinKeyExpressions()) {
key.column_values_.emplace_back(expression->Evaluate(&left_tuple, left_child_->GetOutputSchema()));
}
if (map_.count(key) != 0) { // 匹配成功
for (auto &t : map_[key]) {
std::vector<Value> values;
values.reserve(GetOutputSchema().GetColumnCount());
for (auto i = 0; i < static_cast<int>(left_child_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(left_tuple.GetValue(&left_child_->GetOutputSchema(), i));
}
for (auto i = 0; i < static_cast<int>(right_child_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(t.GetValue(&right_child_->GetOutputSchema(), i));
}
result_.emplace_back(values, &GetOutputSchema());
}
} else if (plan_->GetJoinType() == JoinType::LEFT) { // 匹配失败,但是为LEFT JOIN
std::vector<Value> values;
values.reserve(GetOutputSchema().GetColumnCount());
for (auto i = 0; i < static_cast<int>(left_child_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(left_tuple.GetValue(&left_child_->GetOutputSchema(), i));
}
for (auto i = 0; i < static_cast<int>(right_child_->GetOutputSchema().GetColumnCount()); ++i) {
values.emplace_back(ValueFactory::GetNullValueByType(right_child_->GetOutputSchema().GetColumn(i).GetType()));
}
result_.emplace_back(values, &GetOutputSchema());
}
}

index_ = 0;
}

auto HashJoinExecutor::Next(Tuple *tuple, RID *rid) -> bool {
if (index_ >= static_cast<int>(result_.size())) {
return false;
}
*tuple = result_[index_];
index_++;
return true;
}

Optimizing NestedLoopJoin to HashJoin

具体来说,当连接谓词是两列之间等条件的合取时,可以使用散列连接算法。就本项目而言,处理单个等值条件以及通过 AND 连接的两个等值条件将获得满分

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
auto Optimizer::OptimizeNLJAsHashJoin(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef {
// TODO(student): implement NestedLoopJoin -> HashJoin optimizer rule
// Note for 2023 Spring: You should at least support join keys of the form:
// 1. <column expr> = <column expr>
// 2. <column expr> = <column expr> AND <column expr> = <column expr>
std::vector<AbstractPlanNodeRef> children;
for (auto &child : plan->GetChildren()) {//递归优化
children.emplace_back(OptimizeNLJAsHashJoin(child));
}
auto optimized_plan = plan->CloneWithChildren(std::move(children));

if (optimized_plan->GetType() == PlanType::NestedLoopJoin) {
const auto nlj_plan = dynamic_cast<const NestedLoopJoinPlanNode &>(*optimized_plan);
BUSTUB_ENSURE(nlj_plan.children_.size() == 2, "NLJ should have exactly 2 children.");
if (auto *expr = dynamic_cast<ComparisonExpression *>(nlj_plan.Predicate().get()); expr != nullptr) {
if (expr->comp_type_ == ComparisonType::Equal) { // 如果表达式的Type为 ComparisonType::Equal
if (auto *left_expr = dynamic_cast<ColumnValueExpression *>(expr->children_[0].get()); left_expr != nullptr) {
if (auto *right_expr = dynamic_cast<ColumnValueExpression *>(expr->children_[1].get());
right_expr != nullptr) {
std::vector<AbstractExpressionRef> left_key_expressions;
std::vector<AbstractExpressionRef> right_key_expressions;
if (left_expr->GetTupleIdx() == 0 && right_expr->GetTupleIdx() == 1) {
left_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(0, left_expr->GetColIdx(), left_expr->GetReturnType()));
right_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(1, right_expr->GetColIdx(), right_expr->GetReturnType()));
} else if (left_expr->GetTupleIdx() == 1 && right_expr->GetTupleIdx() == 0) {
left_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(0, right_expr->GetColIdx(), right_expr->GetReturnType()));
right_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(1, left_expr->GetColIdx(), left_expr->GetReturnType()));
}
return std::make_shared<HashJoinPlanNode>(nlj_plan.output_schema_, nlj_plan.GetLeftPlan(),
nlj_plan.GetRightPlan(), std::move(left_key_expressions),
std::move(right_key_expressions), nlj_plan.GetJoinType());
}
}
}
}
if (auto *expr = dynamic_cast<LogicExpression *>(nlj_plan.Predicate().get()); expr != nullptr) {
if (expr->logic_type_ == LogicType::And) {//如果表达式的 Type为 LogicType::And
BUSTUB_ASSERT(expr->GetChildren().size() == 2, "LogicExpression has two children");
if (auto *expr1 = dynamic_cast<ComparisonExpression *>(expr->children_[0].get()); expr1 != nullptr) {
if (auto *expr2 = dynamic_cast<ComparisonExpression *>(expr->children_[1].get()); expr2 != nullptr) {
if (expr1->comp_type_ == ComparisonType::Equal && expr2->comp_type_ == ComparisonType::Equal) { // 两个子表达式的Type均为 ComparisonType::Equal
std::vector<AbstractExpressionRef> left_key_expressions;
std::vector<AbstractExpressionRef> right_key_expressions;
if (auto *left_expr = dynamic_cast<ColumnValueExpression *>(expr1->children_[0].get());
left_expr != nullptr) {
if (auto *right_expr = dynamic_cast<ColumnValueExpression *>(expr1->children_[1].get());
right_expr != nullptr) {
if (left_expr->GetTupleIdx() == 0 && right_expr->GetTupleIdx() == 1) {
left_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(0, left_expr->GetColIdx(), left_expr->GetReturnType()));
right_key_expressions.emplace_back(std::make_shared<ColumnValueExpression>(
1, right_expr->GetColIdx(), right_expr->GetReturnType()));
} else if (left_expr->GetTupleIdx() == 1 && right_expr->GetTupleIdx() == 0) {
left_key_expressions.emplace_back(std::make_shared<ColumnValueExpression>(
0, right_expr->GetColIdx(), right_expr->GetReturnType()));
right_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(1, left_expr->GetColIdx(), left_expr->GetReturnType()));
}
}
}
if (auto *left_expr = dynamic_cast<ColumnValueExpression *>(expr2->children_[0].get());
left_expr != nullptr) {
if (auto *right_expr = dynamic_cast<ColumnValueExpression *>(expr2->children_[1].get());
right_expr != nullptr) {
if (left_expr->GetTupleIdx() == 0 && right_expr->GetTupleIdx() == 1) {
left_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(0, left_expr->GetColIdx(), left_expr->GetReturnType()));
right_key_expressions.emplace_back(std::make_shared<ColumnValueExpression>(
1, right_expr->GetColIdx(), right_expr->GetReturnType()));
} else if (left_expr->GetTupleIdx() == 1 && right_expr->GetTupleIdx() == 0) {
left_key_expressions.emplace_back(std::make_shared<ColumnValueExpression>(
0, right_expr->GetColIdx(), right_expr->GetReturnType()));
right_key_expressions.emplace_back(
std::make_shared<ColumnValueExpression>(1, left_expr->GetColIdx(), left_expr->GetReturnType()));
}
}
}
return std::make_shared<HashJoinPlanNode>(nlj_plan.output_schema_, nlj_plan.GetLeftPlan(),
nlj_plan.GetRightPlan(), std::move(left_key_expressions),
std::move(right_key_expressions), nlj_plan.GetJoinType());
}
}
}
}
}
}

return optimized_plan;
}

到这里为止,通过SQLLogicTests #14 to #15
img
img
img

Task#3 Sort + Limit Executors and Top-N Optimization

SortExecutor实现:

如果查询的ORDER BY属性与索引的key不匹配,BusTub将为查询生成一个SortPlanNode

如果查询不包含排序方向(即ASC、DESC),则排序模式将为默认(即ASC)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class SortExecutor {
//添加数据成员
const SortPlanNode *plan_;
std::unique_ptr<AbstractExecutor> child_executor_;
std::vector<Tuple> result_;
int index_;
};

void SortExecutor::Init() {
Tuple tuple;
RID rid;
child_executor_->Init();
while (child_executor_->Next(&tuple, &rid)) {//遍历孩子executor的所有tuple放入result_
result_.emplace_back(tuple);
}
std::sort(result_.begin(), result_.end(),
[this](const Tuple &left, const Tuple &right) -> bool { return this->TupleComparator(left, right); });//对result_进行排序
index_ = 0;
}

auto SortExecutor::Next(Tuple *tuple, RID *rid) -> bool {
if (index_ >= static_cast<int>(result_.size())) {
return false;
}
*tuple = result_[index_];
index_++;
return true;
}

//自定义排序函数
auto SortExecutor::TupleComparator(const Tuple &left, const Tuple &right) -> bool {
auto &order_by = plan_->GetOrderBy();
for (auto &p : order_by) {
auto order = p.first;
auto &exp = p.second;
auto lvalue = exp->Evaluate(&left, child_executor_->GetOutputSchema());
auto rvalue = exp->Evaluate(&right, child_executor_->GetOutputSchema());
if (order == OrderByType::DESC) {
if (lvalue.CompareGreaterThan(rvalue) == CmpBool::CmpTrue) {
return true;
}
if (lvalue.CompareLessThan(rvalue) == CmpBool::CmpTrue) {
return false;
}
} else {
if (lvalue.CompareLessThan(rvalue) == CmpBool::CmpTrue) {
return true;
}
if (lvalue.CompareGreaterThan(rvalue) == CmpBool::CmpTrue) {
return false;
}
}
}
UNREACHABLE("duplicate key is not allowed");
}

LimitExecutor实现:

LimitExectutor限制其子executor的输出tuple数量。如果其子executor生成的元组数量小于LimitExecutor中指定的限制,则该executor无效并生成它接受到的所有tuple

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class LimitExecutor {
//添加数据成员
const LimitPlanNode *plan_;
/** The child executor from which tuples are obtained */
std::unique_ptr<AbstractExecutor> child_executor_;
size_t num_;
};

void LimitExecutor::Init() {
child_executor_->Init();
num_ = 0;
}

auto LimitExecutor::Next(Tuple *tuple, RID *rid) -> bool {
if (num_ >= plan_->GetLimit()) {
return false;
}
if (child_executor_->Next(tuple, rid)) {
num_++;
return true;
}
return false;
}

Top-N Optimization Rule

用一个优先队列维护top n 条tuple

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class TopNExecutor {
//数据成员
const TopNPlanNode *plan_;
/** The child executor from which tuples are obtained */
std::unique_ptr<AbstractExecutor> child_executor_;
std::priority_queue<HeapKeyType> heap_;//一个优先队列
std::deque<Tuple> result_;
};

void TopNExecutor::Init() {
child_executor_->Init();
Tuple tuple;
RID rid;
while (child_executor_->Next(&tuple, &rid)) {//child_executor_遍历所有的tuple
HeapKeyType key(tuple, plan_->GetOrderBy(), child_executor_.get());
heap_.emplace(tuple, plan_->GetOrderBy(), child_executor_.get());//加入优先队列中
if (heap_.size() > plan_->GetN()) {//保证优先队列中的size不超过N
heap_.pop();
}
}

while (!heap_.empty()) {//将heap中的tuple_都放入result_
result_.emplace_front(heap_.top().tuple_);
heap_.pop();
}
}

auto TopNExecutor::Next(Tuple *tuple, RID *rid) -> bool {
if (GetNumInHeap() != 0) {
*tuple = result_.front();
result_.pop_front();
return true;
}
return false;
}

auto TopNExecutor::GetNumInHeap() -> size_t { return result_.size(); }

到这里为止,通过SQLLogicTests #16 to #19
img
img
img
img

通过线上测试:
img

B+ Tree

B+ Tree是一种自平衡树,它将数据有序地存储,并且在search、sequential access、insertions以及deletions操作的复杂度上都满足O(logn),其中sequential access的最终复杂度还与所需数据总量有关
img
以M—way B+tree为例,它的特点总结如下:

  • 每个节点最多存储M个key,有M+1个children
  • B+ Tree是perfectly balanced,即每个leaf node的深度都一样
  • 除了root节点,所有节点必须至少处于半满状态,即 M/2 - 1 <= #keys <= M - 1
  • 假设每个inner node中包含k个keys,那么它必然有k + 1个children

B+ Tree Operations

Insert

  1. 找到对应的leafNode L
  2. 将key/value pair按顺序插入到 L 中
  3. 如果L 还有足够的空间,操作结束;如果空间不足,则需要将L分裂成两个节点,同时在parent node上新增entry,若parent node也空间不足,则递归地分裂,直到root node为止

Max.degree = 5时

从1插入到13的情况 BPlusTree可视化网站
img

Delete

  1. 从root开始,找到目标entry所在的leaf node L
  2. 删除该entry
  3. 如果L仍然处在半满状态,操作结束;否则先尝试从siblings那里借entries,如果失败,则将L 与相应的sibling合并
  4. 如果合并发生了,则可能需要递归地删除parent node中的entry

CheckPoint#1

Task #1 B+ Tree Pages

class BPlusTreePage的3个类成员

img
GetMinSize函数实现:

1
2
3
4
5
6
auto BPlusTreePage::GetMinSize() const -> int {
if (IsLeafPage()) {// 叶子节点
return max_size_ / 2;
}
return (max_size_ + 1) / 2; //内部节点
}

class BPlusTreeInternalPage : public BPlusTreePage

一个Internal Page存储 m 个顺序 key 和 m + 1 个child pointers(其它BPlusTreePage的page_ids)

使用一个数组存储key/page_id pairs,并且第一个key被设置为invalid,并且查找要从第二个key开始查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_INTERNAL_PAGE_TYPE::Init(int max_size, int size) {
SetPageType(IndexPageType::INTERNAL_PAGE);
SetMaxSize(max_size);
SetSize(size);//size 默认为1
}

INDEX_TEMPLATE_ARGUMENTS
auto B_PLUS_TREE_INTERNAL_PAGE_TYPE::ValueIndex(const ValueType &value) const -> int {
for (int i = 0; i <= GetSize(); ++i) {//顺序查找
if (array_[i].second == value) {
return i;
}
}
return -1;
}

INDEX_TEMPLATE_ARGUMENTS
auto B_PLUS_TREE_INTERNAL_PAGE_TYPE::LookUp(const KeyType &key, const KeyComparator &comparator) const -> ValueType {
for (int i = 1; i < GetSize(); ++i) { // 顺序查找
if (comparator(key, array_[i].first) < 0) {//找到第一个大于key的array_[i].first
return array_[i - 1].second;
}
}
return array_[GetSize() - 1].second;
}

INDEX_TEMPLATE_ARGUMENTS
auto B_PLUS_TREE_INTERNAL_PAGE_TYPE::Insert(const KeyType &key, const ValueType &value, const KeyComparator &comparator)
-> bool {
if (GetSize() == GetMaxSize()) {//已经满了返回false
return false;
}
// upper_bound
int l = 1;
int r = GetSize();
while (l < r) {
int mid = (l + r) / 2;
if (comparator(array_[mid].first, key) > 0) {
r = mid;
} else {
l = mid + 1;
}
}
for (int i = GetSize() - 1; i >= l; --i) {//元素移位
array_[i + 1] = array_[i];
}
array_[l] = {key, value};//存储插入的key-value
IncreaseSize(1);//size ++
return true;
}

class BPlusTreeLeafPage : public BPlusTreePage

一个Leaf Page存储 m 个顺序 key 和 m 个对应的value.value应该为 64-bit record_id 用于表示实际的tuple存储的地方(src/include/common/rid.h)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
INDEX_TEMPLATE_ARGUMENTS
void B_PLUS_TREE_LEAF_PAGE_TYPE::Init(int max_size, int size, page_id_t next_page_id) {
SetPageType(IndexPageType::LEAF_PAGE);
SetMaxSize(max_size);
SetSize(size);//size 默认为0
SetNextPageId(next_page_id);//next_page_id默认为INVALID_PAGE_ID
}

INDEX_TEMPLATE_ARGUMENTS
auto B_PLUS_TREE_LEAF_PAGE_TYPE::KeyIndex(const KeyType &key, const KeyComparator &comparator, int &index) const
-> bool {
// lower_bound
int l = 0;
int r = GetSize();
if (l >= r) {
return false;
}
while (l < r) {
int mid = (l + r) / 2;
if (comparator(array_[mid].first, key) < 0) {
l = mid + 1;
} else {
r = mid;
}
}
index = l;
return static_cast<bool>(l != GetSize() && comparator(KeyAt(l), key) == 0);
}

INDEX_TEMPLATE_ARGUMENTS
auto B_PLUS_TREE_LEAF_PAGE_TYPE::Insert(const KeyType &key, const ValueType &value, const KeyComparator &comparator)
-> bool {
int pos = -1;
if (KeyIndex(key, comparator, pos)) { // duplicate key
return false;//重复的key,直接返回false
}

// move
for (int i = GetSize() - 1; i >= pos; --i) {//移动array_元素
array_[i + 1] = array_[i];
}
// insert
array_[pos] = {key, value};//插入key-value
IncreaseSize(1);//size ++
return true;
}

Class BplusTreeHeaderPage

头节点,存储了root page id,使得根节点和非根节点一样拥有父节点

GetRootPageId函数实现:

1
2
3
4
5
INDEX_TEMPLATE_ARGUMENTS auto BPLUSTREE_TYPE::GetRootPageId() const -> page_id_t {
ReadPageGuard guard = bpm_->FetchPageRead(header_page_id_);
auto page = guard.As<BPlusTreeHeaderPage>();
return page->root_page_id_;
}

Search操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::GetValue(const KeyType &key, std::vector<ValueType> *result, Transaction *txn) -> bool {
// Declaration of context instance.
Context ctx;
{
//先给header_page加读锁判断root_page是否存在,如果存在,给root_page加读锁,放入Context中
auto header_page_guard = bpm_->FetchPageRead(header_page_id_);
auto header_page = header_page_guard.As<BPlusTreeHeaderPage>();
if (header_page->root_page_id_ == INVALID_PAGE_ID) {
return false;
}
ctx.root_page_id_ = header_page->root_page_id_;
ctx.read_set_.push_back(bpm_->FetchPageRead(ctx.root_page_id_));
}
//查找到对应的leafPage
FindLeafPage(key, Operation::Search, ctx);
auto leaf_page = ctx.read_set_.back().As<LeafPage>();
int index = -1;
if (leaf_page->KeyIndex(key, comparator_, index)) {
result->push_back(leaf_page->ValueAt(index));//查找成功
return true;
}
return false;//查找失败
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::FindLeafPage(const KeyType &key, Operation op, Context &ctx) {
if (op == Operation::Search) {
//Search的加锁策略
//从root往下,不断地
// - 获取child的read latch
// - 释放parent的read latch
auto page = ctx.read_set_.back().As<BPlusTreePage>();
while (!page->IsLeafPage()) {
auto internal = ctx.read_set_.back().As<InternalPage>();
auto next_page_id = internal->LookUp(key, comparator_);
ctx.read_set_.push_back(bpm_->FetchPageRead(next_page_id));
ctx.read_set_.pop_front();
page = ctx.read_set_.back().As<BPlusTreePage>();
}
return;
}
if (op == Operation::Insert || op == Operation::Remove) {
//Insert和Remove的加锁策略
//从root往下,按照需要获取write latch,一旦获取到了child的write latch,检查
//它是否安全,如果安全,则释放之前获取的所有write latch
auto page = ctx.write_set_.back().As<BPlusTreePage>();
while (!page->IsLeafPage()) {
auto internal = ctx.write_set_.back().As<InternalPage>();
auto next_page_id = internal->LookUp(key, comparator_);
ctx.write_set_.push_back(bpm_->FetchPageWrite(next_page_id));
if (IsSafePage(ctx.write_set_.back().As<BPlusTreePage>(), op, false)) {
while (ctx.write_set_.size() > 1) {
ctx.write_set_.pop_front();
}
}
page = ctx.write_set_.back().As<BPlusTreePage>();
}
return;
}
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::IsSafePage(const BPlusTreePage *tree_page, Operation op, bool isRootPage) -> bool {
if (op == Operation::Search) {//no use
return true;
}
if (op == Operation::Insert) {//插入操作
//若会发生上溢,表示不安全
if (tree_page->IsLeafPage()) {
//叶子节点中,size最大为tree_page->GetMaxSize() - 1;
return tree_page->GetSize() + 1 < tree_page->GetMaxSize();
}
//内部节点中,size最大为tree_page->GetMaxSize()
return tree_page->GetSize() < tree_page->GetMaxSize();
}
if (op == Operation::Remove) {//删除操作
//若会发生下溢,表示不安全
if (isRootPage) {//对RootPage进行Remove操作
if (tree_page->IsLeafPage()) {
//如果为叶子节点,size至少为2
return tree_page->GetSize() > 1;
}
//如果为内部节点,size至少为3
return tree_page->GetSize() > 2;
}
return tree_page->GetSize() > tree_page->GetMinSize();
}
return false;
}

Insert操作

插入到leaf节点中,插入前,如果size == max_size表示溢出,需要进行分裂
插入到internal节点中,插入前,如果size == max_size表示溢出,需要进行分裂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
INDEX_TEMPLATE_ARGUMENTS auto BPLUSTREE_TYPE::Insert(const KeyType &key, const ValueType &value, Transaction *txn)
-> bool {
// Declaration of context instance.
Context ctx;
ctx.header_page_ = bpm_->FetchPageWrite(header_page_id_);//先给header_page_id写锁
auto header_page = ctx.header_page_->AsMut<BPlusTreeHeaderPage>();
if (header_page->root_page_id_ == INVALID_PAGE_ID) { // root not exist,start a new tree
auto root_guard = bpm_->NewPageGuarded(&ctx.root_page_id_);//申请root_page
header_page->root_page_id_ = ctx.root_page_id_;
auto leaf_page = root_guard.AsMut<LeafPage>();
leaf_page->Init(leaf_max_size_, 1);
leaf_page->GetArray()[0] = {key, value};//插入key-value
ctx.Drop();
return true;
}
ctx.root_page_id_ = header_page->root_page_id_;
ctx.write_set_.push_back(bpm_->FetchPageWrite(ctx.root_page_id_));
if (IsSafePage(ctx.write_set_.back().As<BPlusTreePage>(), Operation::Insert, true)) {//如果root_page安全,释放header_page的写锁
ctx.header_page_ = std::nullopt; // unlock header_page
}
FindLeafPage(key, Operation::Insert, ctx);
auto &leaf_page_guard = ctx.write_set_.back();
auto leaf_page = leaf_page_guard.AsMut<LeafPage>();
if (!leaf_page->Insert(key, value, comparator_)) { // duplicate key, 插入失败
ctx.Drop();
return false;
}
if (leaf_page->GetSize() < leaf_page->GetMaxSize()) { // 叶子节点未溢出,不需要分裂
ctx.Drop();
return true;
}
// 发生溢出,叶子节点分裂
auto new_page_id = 0;
auto new_leaf_page_guard = bpm_->NewPageGuarded(&new_page_id);
auto new_leaf_page = new_leaf_page_guard.AsMut<LeafPage>();
std::copy(leaf_page->GetArray() + leaf_page->GetMinSize(), leaf_page->GetArray() + leaf_page->GetSize(),
new_leaf_page->GetArray());
new_leaf_page->Init(leaf_max_size_, leaf_page->GetSize() - leaf_page->GetMinSize(), leaf_page->GetNextPageId());
leaf_page->SetNextPageId(new_leaf_page_guard.PageId());
leaf_page->SetSize(leaf_page->GetMinSize());
KeyType split_key = new_leaf_page->KeyAt(0);
// 将split_key插入父节点
InsertIntoParent(split_key, new_leaf_page_guard.PageId(), ctx, ctx.write_set_.size() - 2);
ctx.Drop();
return true;
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::InsertIntoParent(const KeyType &key, page_id_t right_child_id, Context &ctx, int index) {
if (index < 0) { // parent为header_page
//创建新的root_page,并更新header_page中的root_page_id_
auto new_root_page_id = 0;
auto new_root_page_guard = bpm_->NewPageGuarded(&new_root_page_id);
auto new_root_page = new_root_page_guard.AsMut<InternalPage>();
new_root_page->Init(internal_max_size_, 2);
new_root_page->GetArray()[0].second = ctx.write_set_[index + 1].PageId();
new_root_page->GetArray()[1] = {key, right_child_id};
auto header_page = ctx.header_page_->AsMut<BPlusTreeHeaderPage>();
header_page->root_page_id_ = new_root_page_id;
return;
}
auto parent_page = ctx.write_set_[index].AsMut<InternalPage>();
if (parent_page->Insert(key, right_child_id, comparator_)) { // 父节点不需要分裂
return;
}
// 父节点需要分裂
auto new_parent_page_id = 0;
auto new_parent_page_guard = bpm_->NewPageGuarded(&new_parent_page_id);
auto new_parent_page = new_parent_page_guard.AsMut<InternalPage>();
auto array = new std::pair<KeyType, page_id_t>[parent_page->GetMaxSize() + 1];
std::copy(parent_page->GetArray(), parent_page->GetArray() + parent_page->GetMaxSize(), array);
// upper_bound
int l = 1;
int r = parent_page->GetMaxSize();
while (l < r) {
int mid = (l + r) / 2;
if (comparator_(array[mid].first, key) > 0) {
r = mid;
} else {
l = mid + 1;
}
}
// 右移一位,腾出空间
for (int i = parent_page->GetMaxSize() - 1; i >= l; --i) {
array[i + 1] = array[i];
}
array[l] = {key, right_child_id};
std::copy(array, array + parent_page->GetMinSize(), parent_page->GetArray());
std::copy(array + parent_page->GetMinSize(), array + parent_page->GetMaxSize() + 1, new_parent_page->GetArray());
new_parent_page->Init(internal_max_size_, parent_page->GetMaxSize() + 1 - parent_page->GetMinSize());
parent_page->SetSize(parent_page->GetMinSize());
delete[] array;
InsertIntoParent(new_parent_page->KeyAt(0), new_parent_page_id, ctx, index - 1);
}

CheckPoint#1本地测试

img
img

线上测试

img

CheckPoint#2

Task #2b B+ Tree Data Structure(Deletion)

Deletion操作:

如果删除的leaf节点是root节点,那么删除后的size == 0,表示下溢,需要将header_page中的root_page设置为INVALID_PAGE_ID

如果删除的leaf节点不是root节点,那么删除后的size < min_size表示下溢

1.如果有右孩子

2.判断是否能merge(merge_size < max_size),能则merge,否则转3

3.向右孩子进行borrow

4.如果有左孩子

5.判断是否能merge(merge_size < min_size),能则merge,否则转6

6.向左孩子进行borrow

merge操作后需要删除internal节点中的entry, 与删除leaf节点中的entry十分相似

如果删除的internal节点是root节点,那么删除后的size==1表示下溢,需要将header_page_中的root_page_id_设置为page->GetArray()[0].second

如果删除的internal节点不是root节点,那么删除后的size < min_size表示下溢

1.如果有右孩子

2.判断是否能merge(merge_size <= max_size),能则merge,否则转3

3.向右孩子进行borrow

4.如果有左孩子

5.判断是否能merge(merge_size <= max_size),能则merge,否则转6

6.向左孩子进行borrow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::Remove(const KeyType &key, Transaction *txn) {
// Declaration of context instance.
Context ctx;
ctx.header_page_ = bpm_->FetchPageWrite(header_page_id_);
auto header_page = ctx.header_page_->AsMut<BPlusTreeHeaderPage>();
if (header_page->root_page_id_ == INVALID_PAGE_ID) { // root not exist
return;
}
ctx.root_page_id_ = header_page->root_page_id_;
ctx.write_set_.push_back(bpm_->FetchPageWrite(ctx.root_page_id_));
if (IsSafePage(ctx.write_set_.back().As<BPlusTreePage>(), Operation::Remove, true)) {
ctx.header_page_ = std::nullopt; // unlock header_page
}
FindLeafPage(key, Operation::Remove, ctx);
auto &leaf_page_guard = ctx.write_set_.back();
auto leaf_page = leaf_page_guard.AsMut<LeafPage>();
int pos = -1;
// key不存在
if (!leaf_page->KeyIndex(key, comparator_, pos)) {
ctx.Drop();
return;
}
// key存在,将其从leaf中删除
for (int i = pos + 1; i < leaf_page->GetSize(); ++i) {
leaf_page->GetArray()[i - 1] = leaf_page->GetArray()[i];
}
leaf_page->SetSize(leaf_page->GetSize() - 1); // 更新leaf_page的size

if (leaf_page->GetSize() >= leaf_page->GetMinSize()) { // 无underflow 直接返回
ctx.Drop();
return;
}
// underflow
if (ctx.IsRootPage(leaf_page_guard.PageId())) { // 该叶子节点是根节点
if (leaf_page->GetSize() == 0) { // size为0
header_page->root_page_id_ = INVALID_PAGE_ID;
}
ctx.Drop();
return;
}

auto &parent_page_guard = ctx.write_set_[ctx.write_set_.size() - 2];
auto parent_page = parent_page_guard.AsMut<InternalPage>();
auto index = parent_page->ValueIndex(leaf_page_guard.PageId());
BUSTUB_ASSERT(index != -1, "index must not be -1");
// 如果有右brother
if (index < parent_page->GetSize() - 1) {
page_id_t right_brother_page_id = parent_page->GetArray()[index + 1].second;
auto right_brother_page_guard = bpm_->FetchPageWrite(right_brother_page_id);
auto right_brother_page = right_brother_page_guard.AsMut<LeafPage>();

auto merge_size = right_brother_page->GetSize() + leaf_page->GetSize();
if (merge_size < leaf_page->GetMaxSize()) { // 可以合并
// merge
std::copy(right_brother_page->GetArray(), right_brother_page->GetArray() + right_brother_page->GetSize(),
leaf_page->GetArray() + leaf_page->GetSize());
leaf_page->SetSize(merge_size);
leaf_page->SetNextPageId(right_brother_page->GetNextPageId());
RemoveFromParent(index + 1, ctx, ctx.write_set_.size() - 2);
} else {
// borrow
leaf_page->GetArray()[leaf_page->GetSize()] = right_brother_page->GetArray()[0];
std::copy(right_brother_page->GetArray() + 1, right_brother_page->GetArray() + right_brother_page->GetSize(),
right_brother_page->GetArray());
leaf_page->IncreaseSize(1);
right_brother_page->SetSize(right_brother_page->GetSize() - 1);
parent_page->SetKeyAt(index + 1, right_brother_page->GetArray()[0].first);
}
} else {
// 左brother
BUSTUB_ASSERT(index - 1 >= 0, "left brother must exist");
page_id_t left_brother_page_id = parent_page->GetArray()[index - 1].second;
auto left_brother_page_guard = bpm_->FetchPageWrite(left_brother_page_id);
auto left_brother_page = left_brother_page_guard.AsMut<LeafPage>();

auto merge_size = left_brother_page->GetSize() + leaf_page->GetSize();
if (merge_size < left_brother_page->GetMaxSize()) { // 可以合并
// merge
std::copy(leaf_page->GetArray(), leaf_page->GetArray() + leaf_page->GetSize(),
left_brother_page->GetArray() + left_brother_page->GetSize());
left_brother_page->SetSize(merge_size);
left_brother_page->SetNextPageId(leaf_page->GetNextPageId());
RemoveFromParent(index, ctx, ctx.write_set_.size() - 2);
} else {
// borrow
for (int i = leaf_page->GetSize(); i >= 1; --i) {
leaf_page->GetArray()[i] = leaf_page->GetArray()[i - 1];
}
leaf_page->GetArray()[0] = left_brother_page->GetArray()[left_brother_page->GetSize() - 1];
leaf_page->IncreaseSize(1);
left_brother_page->SetSize(left_brother_page->GetSize() - 1);
parent_page->SetKeyAt(index, leaf_page->GetArray()[0].first);
}
}
ctx.Drop();
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::RemoveFromParent(int valueIndex, Context &ctx, int index) {
auto &page_guard = ctx.write_set_[index];
auto page = page_guard.AsMut<InternalPage>();
for (int i = valueIndex + 1; i < page->GetSize(); ++i) { // 删除key value
page->GetArray()[i - 1] = page->GetArray()[i];
}
page->SetSize(page->GetSize() - 1); // 更新page的size

if (page->GetSize() >= page->GetMinSize()) { // 无underflow
return;
}
// underflow
if (ctx.IsRootPage(page_guard.PageId())) { // 该page是根节点
if (page->GetSize() == 1) { // 根节点需要更换了
BUSTUB_ASSERT(ctx.header_page_ != std::nullopt, "ctx.header_page must exist");
auto header_page = ctx.header_page_->AsMut<BPlusTreeHeaderPage>();
header_page->root_page_id_ = page->GetArray()[0].second;
}
return;
}
BUSTUB_ASSERT(index - 1 >= 0, "parent_page_guard must exist");
auto &parent_page_guard = ctx.write_set_[index - 1];
auto parent_page = parent_page_guard.AsMut<InternalPage>();
auto pos = parent_page->ValueIndex(page_guard.PageId());
BUSTUB_ASSERT(pos != -1, "pos must not be -1");
// 如果有右brother
if (pos < parent_page->GetSize() - 1) {
page_id_t right_brother_page_id = parent_page->GetArray()[pos + 1].second;
auto right_brother_page_guard = bpm_->FetchPageWrite(right_brother_page_id);
auto right_brother_page = right_brother_page_guard.AsMut<InternalPage>();

auto merge_size = right_brother_page->GetSize() + page->GetSize();
if (merge_size <= page->GetMaxSize()) { // 可以合并
// merge
std::copy(right_brother_page->GetArray(), right_brother_page->GetArray() + right_brother_page->GetSize(),
page->GetArray() + page->GetSize());
page->SetSize(merge_size);
RemoveFromParent(pos + 1, ctx, index - 1);
} else {
// borrow
page->GetArray()[page->GetSize()] = right_brother_page->GetArray()[0];
std::copy(right_brother_page->GetArray() + 1, right_brother_page->GetArray() + right_brother_page->GetSize(),
right_brother_page->GetArray());
page->IncreaseSize(1);
right_brother_page->SetSize(right_brother_page->GetSize() - 1);
parent_page->SetKeyAt(pos + 1, right_brother_page->GetArray()[0].first);
}
} else {
// 左brother
BUSTUB_ASSERT(pos - 1 >= 0, "left brother must exist");
page_id_t left_brother_page_id = parent_page->GetArray()[pos - 1].second;
auto left_brother_page_guard = bpm_->FetchPageWrite(left_brother_page_id);
auto left_brother_page = left_brother_page_guard.AsMut<InternalPage>();

auto merge_size = left_brother_page->GetSize() + page->GetSize();
if (merge_size <= left_brother_page->GetMaxSize()) { // 可以合并
// merge
std::copy(page->GetArray(), page->GetArray() + page->GetSize(),
left_brother_page->GetArray() + left_brother_page->GetSize());
left_brother_page->SetSize(merge_size);
RemoveFromParent(pos, ctx, index - 1);
} else {
// borrow
for (int i = page->GetSize(); i >= 1; --i) {
page->GetArray()[i] = page->GetArray()[i - 1];
}
page->GetArray()[0] = left_brother_page->GetArray()[left_brother_page->GetSize() - 1];
page->IncreaseSize(1);
left_brother_page->SetSize(left_brother_page->GetSize() - 1);
parent_page->SetKeyAt(pos, page->GetArray()[0].first);
}
}
}

Task #3 Index Iterator

Index Iterator 代码实现:

Index Iterator的实现只需要支持单线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
INDEX_TEMPLATE_ARGUMENTS
auto INDEXITERATOR_TYPE::IsEnd() const -> bool { return read_guard_ == std::nullopt; }

INDEX_TEMPLATE_ARGUMENTS
auto INDEXITERATOR_TYPE::operator*() -> const MappingType & {
auto page = read_guard_->As<B_PLUS_TREE_LEAF_PAGE_TYPE>();
BUSTUB_ASSERT(page->GetSize() > index_, "index_ must be valid");
return page->GetArray()[index_];
}

INDEX_TEMPLATE_ARGUMENTS
auto INDEXITERATOR_TYPE::operator++() -> INDEXITERATOR_TYPE & {
if (IsEnd()) {
return *this;
}
auto leaf_page = read_guard_->As<B_PLUS_TREE_LEAF_PAGE_TYPE>();
if (index_ + 1 < leaf_page->GetSize()) {
index_++;
return *this;
}
if (leaf_page->GetNextPageId() != INVALID_PAGE_ID) {
read_guard_ = bpm_->FetchPageRead(leaf_page->GetNextPageId());
index_ = 0;
return *this;
}
read_guard_ = std::nullopt;
index_ = INVALID_PAGE_ID;
return *this;
}

auto operator==(const IndexIterator &itr) const -> bool {
if (IsEnd() && itr.IsEnd()) {
return true;
}
if (IsEnd() || itr.IsEnd()) {
return false;
}
return read_guard_->PageId() == itr.read_guard_->PageId() && index_ == itr.index_;
}

auto operator!=(const IndexIterator &itr) const -> bool { return !(*this == itr); }

BplusTree实现Begin和End函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::Begin() -> INDEXITERATOR_TYPE {
Context ctx;
auto header_page_guard = bpm_->FetchPageRead(header_page_id_);
auto header_page = header_page_guard.As<BPlusTreeHeaderPage>();
if (header_page->root_page_id_ == INVALID_PAGE_ID) {
return INDEXITERATOR_TYPE();
}
ctx.root_page_id_ = header_page->root_page_id_;
ctx.read_set_.push_back(bpm_->FetchPageRead(ctx.root_page_id_));
header_page_guard.Drop();

auto page = ctx.read_set_.back().As<BPlusTreePage>();
while (!page->IsLeafPage()) {
auto internal = ctx.read_set_.back().As<InternalPage>();
page_id_t id = internal->ValueAt(0);
ctx.read_set_.push_back(bpm_->FetchPageRead(id));
ctx.read_set_.pop_front();
page = ctx.read_set_.back().As<BPlusTreePage>();
}
return INDEXITERATOR_TYPE(bpm_, std::move(ctx.read_set_.back()));
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::Begin(const KeyType &key) -> INDEXITERATOR_TYPE {
Context ctx;
auto header_page_guard = bpm_->FetchPageRead(header_page_id_);
auto header_page = header_page_guard.As<BPlusTreeHeaderPage>();
if (header_page->root_page_id_ == INVALID_PAGE_ID) {
return INDEXITERATOR_TYPE();
}
ctx.root_page_id_ = header_page->root_page_id_;
ctx.read_set_.push_back(bpm_->FetchPageRead(ctx.root_page_id_));
header_page_guard.Drop();
FindLeafPage(key, Operation::Search, ctx);
int pos = -1;
if (!ctx.read_set_.back().As<LeafPage>()->KeyIndex(key, comparator_, pos)) {
throw Exception("key not exist");
}
return INDEXITERATOR_TYPE(bpm_, std::move(ctx.read_set_.back()), pos);
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::End() -> INDEXITERATOR_TYPE { return INDEXITERATOR_TYPE(); }

Task #4 Concurrency Control

Latch Crabbing/Coupling

Latch Crabbing 的基本思想如下:

  1. 获取 parent 的 latch
  2. 获取 child 的 latch
  3. 如果安全,则可以释放 parent 的 latch

这里的“安全”指的是,当发生更新操作时,该节点不会发生 split 或 merge 的操作,即:

  • 在插入元素时,节点未满
  • 在删除元素时,节点超过半满

Search
从 root 往下,不断地:

  • 获取 child 的 read latch
  • 释放 parent 的 read latch

img
img
img
img
img

Insert/Delete

从 root 往下,按照需要获取 write latch,一旦获取了 child 的 write latch,检查它是否安全,如果安全,则释放之前获取的所有 write latch。
安全判断函数逻辑见函数IsSafePage

Better Latching Algorithm

  • Search:与 Latch Crabbing 相同
  • Insert/Delete:
    • 使用与 Search 相同的方式在查询路径上获取、释放 latch,在 leaf node 上获取 write latch
    • 如果 leaf node 不安全,可能会引起其它节点的变动,则使用 Latch Crabbing 的策略再执行一遍

该方法乐观地假设整个操作只会引起 leaf node 的变化,若假设错误,则使用 Latch Crabbing 的原始方案。

CheckPoint#2本地测试

img
img
img
img

Bustub Tree printer

CheckPoint#2线上测试

img

project1的任务就是实现一个Buffer Pool Manager

DBMS启动时会从OS申请一片内存区域,即Buffer Pool,并将这块区域划分成大小相同的pages,为了与disk pages区别,通常称为frames,当DBMS请求一个disk page时,它首先需要被复制到Buffer Pool的一个frame中。当Buffer Pool空间不足时,需要采取某种replacement policy,淘汰已有的page。
img

question 1:
为什么不使用OS自带的磁盘管理模块,OS为开发者提供了mmap这样的调用,使开发者能够依赖OS自动管理数据在内外存之间的移动?

DBMS比OS拥有更多、更充分的知识来决定数据移动的移动和数量,具体包括

  1. 将dirty pages按正确的顺序写到磁盘
  2. 根据具体情况预获取数据
  3. 定制化缓存置换策略

同时DBMS会维护一个page table,负责记录每个page在内存中的位置,以及是否被写过(Dirty Flag),是否被引用或引用计数(Pin/Reference Counter)等元信息,如下图所示:

img

当page table中的某page被引用时,会记录引用数(pin/reference),表示该page正在被使用,空间不够时不应该被移除;当被请求的page不再page table中时,DBMS会申请一个latch(lock的别名),表示该entry被占用,然后从disk中读取相关page到buffer pool,释放latch

img

Buffer Replacement Policies

LRU

维护每个page上一次被访问的时间戳,每次移除时间戳最早的page

Clock

Clock是LRU的近似策略,它不需要每个page上次被访问的时间戳,而是为每个page保存一个reference

  • 每当page被访问时,reference bit设置为1
  • 每当需要移动page时,从上次访问的位置开始,按顺序轮询,每个page的reference bit,若该bit为1,则重置为0;若该bit为0,则移除该page

LRU-K

保存每个page的最后K次访问时间戳,利用这些时间戳来估计它们下次被访问的时间,通常K取1就能获得很好的效果。

Task#1 LRU-K Replacement Policy

实现LRUKReplacer
实现策略:

LRU-K算法驱逐replacer的所有frame中backward k-distance最大的frame

backward k-distance计算方式:当前时间戳与之前k次访问的时间戳之间的时间差。

历史访问次数少于k的帧被赋予+inf作为其backward k-distance,当多个frame具有+inf backward k-distance时,replacer将驱逐具有最早总体时间戳的frame

代码实现:

一个LRUKNode对应一个frame

1
2
3
4
5
6
7
class LRUKNode {
public:
/** History of last seen K timestamps of this page. Least recent timestamp stored in front. */
std::list<size_t> history_;//记录一批时间戳
frame_id_t fid_;//
bool is_evictable_{false};
};
1
2
3
4
5
6
7
8
9
class LRUKReplacer {
std::unordered_map<frame_id_t, LRUKNode> node_store_;//frame LRUKNode couple
size_t current_timestamp_{0};//当前时间戳
//replacer_size_ >= curr_size
size_t curr_size_{0};//curr_size为当前is_evictable被标记为true的frame数量
size_t replacer_size_;//replacer_size == num_frames
size_t k_;
std::mutex latch_;
};

Evict函数

驱逐一个frame,驱逐成功返回true,否则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
auto LRUKReplacer::Evict(frame_id_t *frame_id) -> bool {
std::lock_guard<std::mutex> guard(latch_);
*frame_id = -1;
for (auto &p : node_store_) {
if (p.second.is_evictable_) {//通过Judge函数选择backward k-distance中最大的frame
if (*frame_id == -1 || Judge(p.second, node_store_[*frame_id])) {
*frame_id = p.second.fid_;
}
}
}
if (*frame_id != -1) {
node_store_.erase(*frame_id);
--curr_size_;
return true;
}
return false;
}

Judge函数实现如下

1
2
3
4
5
6
7
8
9
10
11
//lhs的backward k-distance大于rhs的backward k-distance 返回true 否则返回false
auto Judge(const LRUKNode &lhs, const LRUKNode &rhs) const -> bool {
if (rhs.history_.size() == k_ && lhs.history_.size() < k_) {
return true;
}
if (rhs.history_.size() < k_ && lhs.history_.size() == k_) {
return false;
}
//比较最早的时间戳,若lhs的时间戳更小,则返回true 否则返回false
return lhs.history_.back() < rhs.history_.back();
}

RecordAccess函数

  1. 如果访问的frame_id大于等于replacer_size抛出异常
  2. 否则,对该frame对应的LRUKNode添加时间戳,并且保证history_列表长度不超过k_
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void LRUKReplacer::RecordAccess(frame_id_t frame_id, [[maybe_unused]] AccessType access_type) {
std::lock_guard<std::mutex> lock_guard(latch_);
if (frame_id >= static_cast<int>(replacer_size_)) {
throw Exception("frame_id is larger than or equal to replacer_size_");
}
if (node_store_.count(frame_id) == 0) {
node_store_[frame_id] = LRUKNode();
node_store_[frame_id].fid_ = frame_id;
}
auto &node = node_store_[frame_id];
node.history_.push_front(current_timestamp_++);
while (node.history_.size() > k_) {
node.history_.pop_back();
}
}

SetEvictable函数

将某个frame的is_evictable标记为set_evictable,如果该frame未被占用,抛出异常
false->true curr_size_++
true->false curr_size_–

1
2
3
4
5
6
7
8
9
10
11
12
void LRUKReplacer::SetEvictable(frame_id_t frame_id, bool set_evictable) {
std::lock_guard<std::mutex> lock_guard(latch_);
if (node_store_.count(frame_id) <= 0) {
throw Exception("frame_id should be used");
}
if (!node_store_[frame_id].is_evictable_ && set_evictable) { // false -> true
curr_size_++;
} else if (node_store_[frame_id].is_evictable_ && !set_evictable) { // true -> false
curr_size_--;
}
node_store_[frame_id].is_evictable_ = set_evictable;
}

Remove函数

如果删除的frame不存在直接返回
如果删除的frame的is_evictable_未被设置为true,抛出异常
删除frame,–curr_size_

1
2
3
4
5
6
7
8
9
10
11
void LRUKReplacer::Remove(frame_id_t frame_id) {
std::lock_guard<std::mutex> lock_guard(latch_);
if (node_store_.count(frame_id) <= 0) {
return;
}
if (!node_store_[frame_id].is_evictable_) {
throw Exception("Remove a non-evictable frame");
}
node_store_.erase(frame_id);
--curr_size_;
}

task1本地测试:
img

Task#2 -Buffer Pool Manager

完成LRU-K替换策略之后,接下来需要实现Buffer Pool的基本功能。对于DBMS来说,Buffer Pool可以隐藏内存和磁盘交互的细节,包括脏页面写回磁盘的过程。

Page

1
2
3
4
5
6
7
class Page {
char *data_;//4096字节
page_id_t page_id;//physical page id
int pin_count_;//该Page对象的引用计数
bool is_dirty_;//脏位
ReaderWriterLatch rwlatch_;//读写锁
};

BufferPoolManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class BufferPoolManager {
/** Number of pages in the buffer pool. */
const size_t pool_size_;
/** The next page id to be allocated */
std::atomic<page_id_t> next_page_id_ = 0;

/** Array of buffer pool pages. */
Page *pages_;
/** Pointer to the disk manager. */
DiskManager *disk_manager_ __attribute__((__unused__));
/** Pointer to the log manager. Please ignore this for P1. */
LogManager *log_manager_ __attribute__((__unused__));
/** Page table for keeping track of buffer pool pages. */
std::unordered_map<page_id_t, frame_id_t> page_table_;
/** Replacer to find unpinned pages for replacement. */
std::unique_ptr<LRUKReplacer> replacer_;
/** List of free frames that don't have any pages on them. */
std::list<frame_id_t> free_list_;
/** This latch protects shared data structures. We recommend updating this comment to describe what it protects. */
std::mutex latch_;
};

BufferPoolManager初始化时,分配pool_size_个Page对象,LRUKReplacer的num_frame也设置为pool_size_

代码实现:

NewPage函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
auto BufferPoolManager::NewPage(page_id_t *page_id) -> Page * {
frame_id_t free_frame_id = -1;
std::lock_guard<std::mutex> guard(latch_);
//获取一个空闲的frame
if (!free_list_.empty()) {//存在空的frame
free_frame_id = free_list_.front();
free_list_.pop_front();
} else {//不存在空的frame
if (!replacer_->Evict(&free_frame_id)) {//通过LRUKReplacer得到一个空闲的frame
return nullptr;
}
if (pages_[free_frame_id].IsDirty()) {//如果被驱逐的frame对应的page为脏页,需要进行写回操作
disk_manager_->WritePage(pages_[free_frame_id].page_id_, pages_[free_frame_id].data_);
}
page_table_.erase(pages_[free_frame_id].page_id_);//将page_table_中该frame对应的page_id_删除
pages_[free_frame_id].ResetMemory();//重置该改frame对应的内存
}
*page_id = AllocatePage();
pages_[free_frame_id].page_id_ = *page_id;
pages_[free_frame_id].pin_count_ = 1;
pages_[free_frame_id].is_dirty_ = false;
page_table_[*page_id] = free_frame_id;

replacer_->RecordAccess(free_frame_id);
replacer_->SetEvictable(free_frame_id, false); // no use
return pages_ + free_frame_id;
}

FetchPage函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
auto BufferPoolManager::FetchPage(page_id_t page_id, [[maybe_unused]] AccessType access_type) -> Page * {
BUSTUB_ASSERT(page_id != INVALID_PAGE_ID, "page_id is equal to INVALID_PAGE_ID");
std::lock_guard<std::mutex> guard(latch_);
if (page_table_.count(page_id) != 0) {//如果page_table_中存在该page_id
pages_[page_table_[page_id]].pin_count_++;//该page的引用计数增加
replacer_->RecordAccess(page_table_[page_id]);//增加该page对应的frame的访问时间戳
replacer_->SetEvictable(page_table_[page_id], false);
return pages_ + page_table_[page_id];
}

frame_id_t free_frame_id = -1;
//获取一个空闲的frame
if (!free_list_.empty()) {
free_frame_id = free_list_.front();
free_list_.pop_front();
} else {
if (!replacer_->Evict(&free_frame_id)) {//通过LRUKReplacer得到一个空闲的frame
return nullptr;
}
if (pages_[free_frame_id].IsDirty()) {//如果被驱逐的frame对应的page为脏页,需要进行写回操作
disk_manager_->WritePage(pages_[free_frame_id].page_id_, pages_[free_frame_id].data_);
}
page_table_.erase(pages_[free_frame_id].page_id_);//将page_table_中该frame对应的page_id_删除
pages_[free_frame_id].ResetMemory();//重置该改frame对应的内存
}

pages_[free_frame_id].page_id_ = page_id;
pages_[free_frame_id].pin_count_ = 1;
pages_[free_frame_id].is_dirty_ = false;
page_table_[page_id] = free_frame_id;
disk_manager_->ReadPage(page_id, pages_[free_frame_id].data_);//读取该page_id对应的物理页

replacer_->RecordAccess(free_frame_id);//增加该frame的访问时间戳
replacer_->SetEvictable(free_frame_id, false); // no use
return pages_ + free_frame_id;
}

UnpinPage函数实现:

需要注意的是入参is_dirty不能破坏已经置为脏的状态,这里用 | 运算符来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
auto BufferPoolManager::UnpinPage(page_id_t page_id, bool is_dirty, [[maybe_unused]] AccessType access_type) -> bool {
std::lock_guard<std::mutex> guard(latch_);
if (page_table_.count(page_id) <= 0) {
return false;
}
frame_id_t frame_id = page_table_[page_id];
if (pages_[frame_id].pin_count_ == 0) {
return false;
}

if (--pages_[frame_id].pin_count_ == 0) {//引用计数减为0时,将该frame设置为evictable
replacer_->SetEvictable(frame_id, true);
}
pages_[frame_id].is_dirty_ |= is_dirty;
return true;
}

FlushPage函数实现:

强制将page_id对应的Page的内容写回磁盘,并将该Page对应脏位置为false

1
2
3
4
5
6
7
8
9
10
auto BufferPoolManager::FlushPage(page_id_t page_id) -> bool {
std::lock_guard<std::mutex> guard(latch_);
if (page_table_.count(page_id) <= 0) {
return false;
}
frame_id_t frame_id = page_table_[page_id];
disk_manager_->WritePage(page_id, pages_[frame_id].data_);
pages_[frame_id].is_dirty_ = false;
return true;
}

FlushAllPages函数实现:
写回所有在内存中的Page

DeletePage函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
auto BufferPoolManager::DeletePage(page_id_t page_id) -> bool {
std::lock_guard<std::mutex> guard(latch_);
if (page_table_.count(page_id) <= 0) {
return true;
}
frame_id_t frame_id = page_table_[page_id];
if (pages_[frame_id].pin_count_ != 0) {
return false;
}
//只有当该page_id对应的Page的引用计数为0时可以进行删除
page_table_.erase(page_id);//page_table_删除该page_id
replacer_->SetEvictable(frame_id, true);//replacer驱逐该frame
replacer_->Remove(frame_id);
free_list_.push_back(frame_id);//将该frame加入free_list
//该Page初始化
pages_[frame_id].is_dirty_ = false;
pages_[frame_id].page_id_ = INVALID_PAGE_ID;
pages_[frame_id].ResetMemory();
DeallocatePage(page_id);
return true;
}

task2本地测试:
img

Task#3 Read/Write Page Guards

FetchPage和NewPage函数返回指向pages的指针,并且pages已经被pinned,并且当一个page不再需要时,要调用UnpinPage。另一方面,如果忘记调用UnPinPage,该Page将永远不会被evict。于是PageGuard就派上用场了

BasicPageGuard
思路:BasicPageGuard析构时调用Page的UnpinPage函数,并且BasicPageGuard中保存变量is_dirty_,调用AsMut或GetDataMut函数时将is_dirty_设置为true

WritePageGuard和ReadPageGuard
思路:与BasicPageGuard思路相似,析构函数调用UnpinPage多了一步释放Page的写锁和读锁

FetchPageBasic、FetchPageRead、FetchPageWrite和NewPageGuarded的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
auto BufferPoolManager::FetchPageBasic(page_id_t page_id) -> BasicPageGuard { return {this, FetchPage(page_id)}; }

auto BufferPoolManager::FetchPageRead(page_id_t page_id) -> ReadPageGuard {
Page *page = FetchPage(page_id);
if (page != nullptr) {
page->RLatch();
}
return {this, page};
}

auto BufferPoolManager::FetchPageWrite(page_id_t page_id) -> WritePageGuard {
Page *page = FetchPage(page_id);
if (page != nullptr) {
page->WLatch();
}
return {this, page};
}

auto BufferPoolManager::NewPageGuarded(page_id_t *page_id) -> BasicPageGuard { return {this, NewPage(page_id)}; }

task3本地测试:
img

测试通过截图:
img

TASK 1 Copy-On-Write Trie

COW Trie在每次插入和删除时不会改变原有节点,而是对该节点的副本进行修改后,依次为其父节点创建修改后的副本,最后返回一个新的根节点。
此外,删除操作中,如果回溯路径上的某节点无值,且不存在子节点,还需要删除该节点


插入(“ad”, 2),创建了一个新的Node2
img


插入(“b”, 3)
img


插入(“a”, “abc”) 删除(“ab”, 1)

注意删除操作后需要清除所有不需要的节点

img

Get函数实现

从root节点遍历Tire树,
如果key不存在返回nullptr,
如果key存在,但是对应的Node无value或者value的类型不匹配,返回nullptr
其它情况,返回value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Get the value associated with the given key.
// 1. If the key is not in the trie, return nullptr.
// 2. If the key is in the trie but the type is mismatched, return nullptr.
// 3. Otherwise, return the value.
template <class T>
auto Trie::Get(std::string_view key) const -> const T * {
if (!root_) {
return nullptr;
}
std::shared_ptr<const TrieNode> ptr(root_);
for (char ch : key) {
if (ptr->children_.count(ch) == 0) {
return nullptr;
}
ptr = ptr->children_.at(ch);
}
if (!ptr->is_value_node_) {
return nullptr;
}
auto p = std::dynamic_pointer_cast<const TrieNodeWithValue<T>>(ptr);
if (!p) {
return nullptr;
}
return p->value_.get();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
template <class T>
auto Trie::Put(std::string_view key, T value) const -> Trie {
// Note that `T` might be a non-copyable type. Always use `std::move` when creating `shared_ptr` on that value.

// You should walk through the trie and create new nodes if necessary. If the node corresponding to the key already
// exists, you should create a new `TrieNodeWithValue`.
std::shared_ptr<const TrieNode> new_root(nullptr);
std::map<char, std::shared_ptr<const TrieNode>> children;
if (key.length() == 0) {//key长度为0,表示在root节点put value
if (root_) {
children = root_->children_;
}
new_root = std::make_shared<const TrieNodeWithValue<T>>(children, std::make_shared<T>(std::move(value)));//创建一个新的root节点
return Trie(new_root);
}

std::vector<std::unique_ptr<TrieNode>> stack;
if (root_) {
stack.push_back(root_->Clone());
} else {
stack.push_back(std::make_unique<TrieNode>());
}
auto ptr(root_);

for (int64_t i = 0; i < static_cast<int64_t>(key.length() - 1); ++i) {
std::unique_ptr<TrieNode> tmp_ptr(nullptr);
if (ptr && ptr->children_.count(key[i]) == 1) {
ptr = ptr->children_.at(key[i]);
tmp_ptr = ptr->Clone();
} else {
tmp_ptr = std::make_unique<TrieNode>();
ptr = nullptr;
}

stack.push_back(std::move(tmp_ptr));
}
auto value_ptr = std::make_shared<T>(std::move(value));
if (ptr && ptr->children_.count(key.back())) {
ptr = ptr->children_.at(key.back());
children = ptr->children_;
}
auto value_node = std::make_unique<TrieNodeWithValue<T>>(children, std::move(value_ptr));
stack.push_back(std::move(value_node));

for (int64_t i = key.length() - 1; i >= 0; i--) {
auto tmp_ptr = std::move(stack.back());
stack.pop_back();
stack.back()->children_[key[i]] = std::move(tmp_ptr);
}
new_root = std::move(stack.back());
return Trie(new_root);
}

TASK 2 Concurrent Key-Value Store

concurrent Key-Value store需要支持 多个读者和一个写者 工作的情况
也就是当一个写者在创建一个新的root的时候,读者可以在old root进行读操作
Tire_store.cpp文件

读操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <class T>
auto TrieStore::Get(std::string_view key) -> std::optional<ValueGuard<T>> {
// Pseudo-code:
// (1) Take the root lock, get the root, and release the root lock. Don't lookup the value in the
// trie while holding the root lock.
// (2) Lookup the value in the trie.
// (3) If the value is found, return a ValueGuard object that holds a reference to the value and the
// root. Otherwise, return std::nullopt.
Trie root;
{
std::lock_guard<std::mutex> guard(root_lock_);
root = root_;
}
const T *val = root.Get<T>(key);
if (!val) {
return std::nullopt;
}

return ValueGuard<T>(root, *val);
}

写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template <class T>
void TrieStore::Put(std::string_view key, T value) {
// You will need to ensure there is only one writer at a time. Think of how you can achieve this.
// The logic should be somehow similar to `TrieStore::Get`.
std::lock_guard<std::mutex> guard(write_lock_);
Trie root;
{
std::lock_guard<std::mutex> guard1(root_lock_);
root = root_;
}

Trie new_root = root.Put<T>(key, std::move(value));

{
std::lock_guard<std::mutex> guard1(root_lock_);
root_ = new_root;
}
}

void TrieStore::Remove(std::string_view key) {
// You will need to ensure there is only one writer at a time. Think of how you can achieve this.
// The logic should be somehow similar to `TrieStore::Get`.
std::lock_guard<std::mutex> guard(write_lock_);
Trie root;
{
std::lock_guard<std::mutex> guard1(root_lock_);
root = root_;
}

Trie new_root = root.Remove(key);

{
std::lock_guard<std::mutex> guard1(root_lock_);
root_ = new_root;
}
}

TASK 3 Debugging

skip…….

TASK 4 SQL String Functions

实现Upper方法和Lower方法
src/include/execution/string_expression.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
auto Compute(const std::string &val) const -> std::string {
// TODO(student): implement upper / lower.
std::string res;
res.resize(val.length());
switch (expr_type_) {
case StringExpressionType::Lower:
std::transform(val.begin(), val.end(), res.begin(), ::tolower);
break;
case StringExpressionType::Upper:
std::transform(val.begin(), val.end(), res.begin(), ::toupper);
break;
}
return res;
}

运行结果如下:

img

测试通过截图:

img

应用场景

需要有一批数据用于批量加载构建初始数据,不支持从0开始构建(一个key一个key插入),支持读写操作。

问题描述

在一棵树中查找一个key,包含两部分;找到包含key的leaf node,leaf node的local search
查找性能取决于leaf node的深度和线性回归模型的accuracy

方法和结果

提出一种two-phase bulk loading算法,先构建一棵BU(bottom-up)-Tree,它的node布局由greedy merging 算法(考虑了leaf node的深度和线性回归模型的accuracy)决定,接着根据BU-Tree的node布局构建DILI.
BU-Tree中internal node的range并不是被它的child平分
DILI与LIPP相比,每个leaf node的keys分布更接近线性,发生冲突的概率更低
img

Search without Optimization
img

Building BU-Tree
难点:确定nh的大小和nh - 1个breakpoints
img
img
img
img
img
img
img
img
img
img
img

SALI: A Scalable Adaptive Learned Index Framework based on
Probability Models
一个基于概率模型的可进化学习索引框架

研究背景

LI:只读
可写:
1.基于缓冲区的策略(插入时放入缓冲区,到达一个阈值后进行合并操作)
XIndex、FINEdex
2.基于模型的策略(就地插入)
ALEX(在插入冲突中,映射的slot已经被占用,通过移动来尝试重新组织节点)、
LIPP(利用链接方案,为相应的时隙创建一个新节点,将最后一英里问题转化为子树遍历问题)

问题描述

img
上述的索引结构都不能以高并发性进行扩展

并发数较少的时候,与基于模型的策略(ALEX+、LIPP+)相比,基于缓冲区策略(XIndex、FINEdex)的索引表现出较差的基本性能和较差的扩展性;并且随着并发数的提高,“最后一英里问题”搜索会迅速饱和内存带宽,从而成为系统的瓶颈(ALEX+必须为此操作获取粗粒度写锁,线程数量增加时,越来越多的线程被阻塞)

LIPP+没有最后一英里问题,但是它需要在每个节点中维护统计信息,如访问计数和冲突计数(以触发节点再训练,防止性能下降)。这些节点计数器在线程之间造成高争用。

难点与分析过程

我们需要设计一个满足如下要求的可扩展学习索引
1.Efficient Concurrency高效并发:
为了实现高效的插入性能,索引必须跟踪统计信息,这些信息反映了由于新的插入而导致的索引结构随时间的退化(这些信息对于触发节点再训练至关重要),但是节点计数器在线程之间会造成高争用,需要一种轻量级方法
2.Adaptive ability适应能力
与均匀工作负载相比,学习索引在倾斜插入工作负载下表现出次优性能。因此,学习索引需要有自适应能力以保证其在并发场景的鲁棒性。此外,学习的索引缺乏用于查找操作的优化策略。在偏斜的工作负载下,学习索引尚未充分利用显著降低索引空间成本的机会。

3.Low overheads of basic performance基本性能的低开销
(1)Efficient lookup
实现高查找性能,通常取决于最大限度地减少查找的预测错误
(2)Efficient insert
采用基于模型的策略,而不是基于缓冲区的策略,通过在每个节点中保留间隙,可以显著提高学习索引的插入性能

方法

提出SALI
1.利用LIPP+结构(使用细粒度锁)
2.除了模型重训练以外定义一组节点进化策略,以允许学习到的索引自适应于不同的工作负载倾斜(建议对具有不同读写热度的节点应用不同的进化策略)
3.用轻量级概率模型取代了现有学习索引中的每个节点的统计信息,以消除统计信息维护的可扩展性瓶颈

img
img

img
img

进化策略:
img
img
img

img
img
img

概率模型:
为了确保最佳性能,学习索引必须监控退化统计信息,以便在必要时启动调整;另外,进化策略需要额外的统计信息。
基本概念:模拟信息积累时利用概率

例子:
1.当模拟指定时间段内插入key的累积数量时,我们设计一个基于插入率和插入时间的概率模型
2.几何分布可以用来模拟信息的累计(插入冲突等)

触发insert evolution的条件

Condition1:评估一个节点及其子树中新key插入的频率
该节点容纳足够数量的新插入的key
img

n.current_num:是指在当前插入操作结束时节点中包含的key的数量
n.build_num:是指上一次执行完进化策略后节点中的key的数量
img

Condition2:节点内冲突的升级(判断节点是否恶化)
Node必须由足够的新插入的key
img
img

先计算Pconflict是否被触发,如果触发,再判断Pacc是否被触发,如果两个条件都被触发,执行进化策略

触发lookup evolution的条件
Phl
还需要考虑以下两个条件
Condition1:
再很长的一段时间内,节点上的查找操作没有触发进化策略

Condition2:
节点累计数据的速率并不慢

For condition1:如果一个节点的最后一次进化操作是由hot lookup触发的,这意味着自那以后没有插入操作触发该节点进化,即该节点没有严重恶化,并且新插入key的数量可能很少,可以将Phl调整到一个更小的值

img

For condition2:引入Pacc,如果自上次进化操作以来插入了大量新的key,则表明可能需要进行新一轮的进化操作

每个线程维护一个skip_counter,每次查找操作,skip_counter加1,10次查找操作后,执行一次伯努利实验来判断Phl是否被触发。如果Phl被触发,判断Pacc是否也被触发,如果触发,执行进化策略。