最近需要对Rocksdb的内部工作方式进行一些了解,于是看了看Rocksdb的代码,写一写文档。
写入
Rocksdb的写入全都要通过WriteBatch来进行。key/value需要先写入到一个WriteBatch然后再进行写入流程。 WriteBatch内部包含了一个std::string,其中保存了需要写入的内容。
// WriteBatch格式
WriteBatch::rep_ :=
sequence: fixed64
count: fixed32
data: record[count]
record :=
kTypeValue varstring varstring
kTypeDeletion varstring
kTypeSingleDeletion varstring
kTypeRangeDeletion varstring varstring
kTypeMerge varstring varstring
kTypeColumnFamilyValue varint32 varstring varstring
kTypeColumnFamilyDeletion varint32 varstring
kTypeColumnFamilySingleDeletion varint32 varstring
kTypeColumnFamilyRangeDeletion varint32 varstring varstring
kTypeColumnFamilyMerge varint32 varstring varstring
kTypeBeginPrepareXID varstring
kTypeEndPrepareXID
kTypeCommitXID varstring
kTypeRollbackXID varstring
kTypeBeginPersistedPrepareXID varstring
kTypeNoop
varstring :=
len: varint32
data: uint8[len]
在某个WriteBatch进入写入流程后,会创建一个Writer。 首先进入写WAL流程,首先Writer会加入一个BatchGroup。BatchGroup中维护着一个用原子变量维护的无锁队列,Writer加入BatchGroup时会将自己尝试加入到这个无锁队列中,队列的第一个Writer取得leader资格,其他Writer取得follower资格。leader所在的线程会继续进行写流程,而follower所在线程则会进入等待,直到writer的状态变为完成。Follower进入等待后有三个阶段,第一个阶段先进行200次的循环,第二阶段进行持续100us的循环等待(调用yield), 第三阶段进入阻塞状态等待条件变量唤醒。
// 第一阶段调用的AsmVolatilePause
static inline void AsmVolatilePause() {
#if defined(__i386__) || defined(__x86_64__)
asm volatile("pause");
......
} // Xeon上循环200次大约7us
/*
Improves the performance of spin-wait loops. When executing a “spin-wait loop,” a Pentium 4 or Intel Xeon processor suffers a severe performance penalty when exiting the loop because it detects a possible memory order violation. The PAUSE instruction provides a hint to the processor that the code sequence is a spin-wait loop. The processor uses this hint to avoid the memory order violation in most situations, which greatly improves processor performance. For this reason, it is recommended that a PAUSE instruction be placed in all spin-wait loops.
An additional function of the PAUSE instruction is to reduce the power consumed by a Pentium 4 processor while executing a spin loop. The Pentium 4 processor can execute a spin-wait loop extremely quickly, causing the processor to consume a lot of power while it waits for the resource it is spinning on to become available. Inserting a pause instruction in a spin-wait loop greatly reduces the processor’s power consumption.
This instruction was introduced in the Pentium 4 processors, but is backward compatible with all IA-32 processors. In earlier IA-32 processors, the PAUSE instruction operates like a NOP instruction. The Pentium 4 and Intel Xeon processors implement the PAUSE instruction as a pre-defined delay. The delay is finite and can be zero for some processors. This instruction does not change the architectural state of the processor (that is, it performs essentially a delaying no-op operation).
*/
Leader继续写入流程,按顺序给每个WriteBatch分发一个SequenceNumber。然后将这个BatchGroup中的所有WriteBatch合并为一个WriteBatch。最后将合并生成的WriteBatch的rep_写入到WAL文件中。
// WAL文件的格式
+-----+-------------+--+----+----------+------+-- ... ----+
File | r0 | r1 |P | r2 | r3 | r4 | |
+-----+-------------+--+----+----------+------+-- ... ----+
<--- kBlockSize ------>|<-- kBlockSize ------>|
rn = WAL records
P = Padding
WAL record :=
CRC 4B
Size 2B
Type 1B
Log Number 4B
Payload <kBlockSize
Type :=
kRecyclableFullType
kRecyclableFirstType
kRecyclableMiddleType
kRecyclableLastType
WAL文件每次落盘的能携带的rep_长度最大是kBlockSize = 32768,超过这个长度的话WAL就要多次写入。全部WAL内容写入完毕后调用一次sync确保落盘。 WAL写入完成之后Leader修改BatchGroup中所有Writer的状态,将所有follower的状态设置为完成并唤醒。
唤醒之后原先被阻塞的各个线程重新活跃,进入写memtable的流程。 写memtable的流程和写WAL流程相似,同样也是要进入一个BatchGroup,并分出leader和follower。Leader会负责所有writer的写入工作。 Memtable的默认实现是跳跃表。Leader会遍历Batch中的所有记录,然后按照类型依次写入到memtable。 写入memtable会首先申请一块内存空间,然后将memtable record写入其中,再将这块空间加入到跳跃表。最后修改memtable的Bloom Filter。
// memtable record格式
memtable record:=
key_size
key(fixed32)
value_size
value(fixed32)
最后leader唤醒各个follower,写入流程完成。
读取
读取操作是基于版本的。 Rocksdb中有SuperVersion和Version两个概念。SuperVersion是由某一时刻的memtable,immutable和所有文件组成的。Version仅仅包括某一时刻包含的文件。 Rocksdb的sst文件归Version管理。Version本身有引用计数,当某一个Version仍在使用时,该Version会一直存在。sst文件同样也有引用计数,当某个Version析构时,其包含的所有sst文件的引用计数都会减1。
struct SuperVersion{
MemTable* mem; // 当前正在使用的memtable
MemTableListVersion* imm; // 写满后待刷盘的memtable
Version* current; // 文件列表
...
}
class Version{
VersionStorageInfo storage_info_; // 其中保存了该版本包含的文件列表
int refs_; // 引用计数
...
}
读取的时候,rocksdb会先获取一个当前的SuperVersion和SequenceNumber。在SuperVersion中通过SequenceNumber查找记录。先查找mem,再查找imm,最后查找current。 对于mem和imm,它们都有一个bloom filter,可以快速做一个粗略的判断。 对于current,查找开始时会建立一个FilePicker的类来负责对于文件的挑选。
class FilePicker{
unsigned int curr_level_; // 当前查找到的层级
LevelFilesBrief* curr_file_level_; // 当前层级的文件列表
unsigned int curr_index_in_curr_level_; // 当前查找的文件在当前层级的文件序号
Slice user_key_; // 需要查找的key
...
}
对于current的查找会不停的调用FilePicker::GetNextFile()来获取下一个可能含有key的文件。 对于文件数量大于3个的level,FilePicker做检索时会先检查各sst文件的Key range。
// sst file
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1: filter block]
[meta block 2: stats block]
[meta block 3: compression dictionary block]
[meta block 4: range deletion block]
...
[meta block K: future extended block]
[metaindex block]
[index block]
[Footer] (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>
// filter block
[filter 0]
[filter 1]
[filter 2]
...
[filter N-1]
[offset of filter 0] : 4 bytes
[offset of filter 1] : 4 bytes
[offset of filter 2] : 4 bytes
...
[offset of filter N-1] : 4 bytes
[offset of beginning of offset array] : 4 bytes
lg(base) : 1 byte
然后加载被选出的sst文件。sst的文件中先保存了各个data_block,默认大小是4K,其中保存了key/value;接下来则是各个meta_data,其中需要关注的是filter_block。filter_block中依次保存了各个data_block的的bloom filter。 加载sst文件之后,在被选出的文件中遍历Block对应的BloomFilter,以确定data_block中是否包含Key。 找到Key后即可返回。
Compaction
Compaction都由MaybeScheduleFlushOrCompaction()来启动. 这里主要介绍Compaction。 对于L0->L1之外的Compaction,由于同一层文件不包含重复的Key range,所以只要稍作分隔,即可实现多线程的Compaction。而对于L0->L1的Compaction,由于L0文件有可能包含重复的Key range,所以多线程Compaction存在困难。
L0->L1的Compaction
Rocksdb中使用SubCompaction来实现了L0->L1的Compaction。
class CompactionJob{
std::vector<Slice> boundaries_;
...
}
CompactionJob类中有一个boundaries_的字段,借助这个字段实现了Compaction任务的分割。 首先,在boundaries_中记录所有参与文件的smallest key和largest key,然后对这个boundaries_进行排序,并去除重复值。
L0:
+-----+ +-----+ +-----+ +-----+
| f00 | | f01 | | f02 | | f03 |
+-----+ +-----+ +-----+ +-----+
a1 a2 a3 a4 a5 a6 a7 a8
L1:
b1 b2 b3 b4 b5 b6 b7
+-----+ +-----+ +-----+ +-----+ +-----+ +-----+
| f10 | | f11 | | f12 | | f13 | | f14 | | f15 |
+-----+ +-----+ +-----+ +-----+ +-----+ +-----+
boundaries_:
b1 a1 a2 b2 a3 b3 b4 a4 a5 b5 a6 b6 a7 a8 b7
然后按照boundaries_中的结果划分SubCompaction区间。
class RangeWithSize
(b1, a1) (a1, a2) (a2, b2) ....
最终按照配置中max_subcompaction的数量将诸多RangeWithSize进行合并。再交给不同的线程启动各自的SubCompation。每个线程遍历所有的参与文件从中获取Key/Value生成memtable,最后每个SubCompaction各自根据自己的memtable生成一个的sst文件。等到所有线程的SubCompaction完成,生成一个新的Version,然后在VersionSet记录下此时的最新版本。同时修改manifest文件记录数据库最新版本包含的文件。
普通的Compaction
对于普通的Compaction而言,实现方式相当于上面那种情况的特例。L0->L1可能会有多个L0文件一起合入L1,但是对于其他层而言一次Compaction只有一个上层文件合入下层。
Snapshot和Checkpoint
Snapshot和Checkpoint都是类似于快照的概念。 通过Snapshot可以记录下某一时刻rocksdb的状态。
class SnapshotImpl{
SequenceNumber number_; // 获取snapshot时的SequenceNumber
SnapshotImpl* prev_; // 所有的snapshot组成了一个双向的循环链表
SnapshotImpl* next_;
...
}
创建快照的过程其实就是记录下了当时的SequenceNumber,然后把新的SnapshotImpl加入到链表中。
SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time,
bool is_write_conflict_boundary) {
s->number_ = seq;
s->next_ = &list_;
s->prev_ = list_.prev_;
s->prev_->next_ = s;
s->next_->prev_ = s;
...
return s;
}
Version中会保存当前Version产生的所有snapshot中SequenceNumber最老的值oldest_snapshot_seqnum_。 这个值在Compaction的时候会使用。Compaction时两个Key相同的记录会进行合并,如果当前存在Snapshot的话,那么只有SequenceNumber小于oldest_snapshot_seqnum_的记录才会参与合并。SequenceNumber大于oldest_snapshot_seqnum_的记录都不会参与合并,均会被直接记录到新的sst文件中,以防止在Snapshot查询记录记录时,记录已经被Compaction删除了。
// 对于一个相同的key而言,存在k1, k2, k3, k4四个版本。
// 同时也存在两个快照s1和s2,分别是在k2和k3之后生成的。
s1 s2
k1 k2 | k3 | k4
// 于是Version中会保存s1的SequenceNumber到oldest_snapshot_seqnum_中。
// 当Compaction时,如果发现文件中有k1和k2两种记录,那么就会按照正常情况,在新的sst中只保存k2
// 如果发现文件中有k2和k3两种记录,那么由于k3的SequenceNumber大于oldest_snapshot_seqnum_,故在新的sst文件中k2和k3均会被保存。
在释放Snapshot时,除了从链表中清除SnapshotImpl,同时还会进行检查并标记,然后主动发起Compaction。中间层的多余记录可以在后续Compaction中逐渐清除,所以不做特殊处理。
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
{
...
snapshots_.Delete(casted_s);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
} else {
oldest_snapshot = snapshots_.oldest()->number_;
}
for (auto* cfd : *versions_->GetColumnFamilySet()) {
cfd->current()->storage_info()
->UpdateOldestSnapshot(oldest_snapshot);
if (!cfd->current()->storage_info()
->BottommostFilesMarkedForCompaction().empty()) {
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
}
}
delete casted_s;
}
可以看出Snapshot是一个保存在内存中的状态。
CheckPoint的实现方式比较像传统意义上的快照,通常是用来备份。
调用CheckpointImpl::CreateCheckpoint()会在同一个盘上建立一个rocksdb所有文件的副本。sst文件通过硬链接创建,manifest等其他文件则直接复制。
// CreateCheckpoint的主要部分
...
db_->DisableFileDeletions(); // 停止删除sst文件
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
return db_->GetEnv()->LinkFile(src_dirname + fname,
full_private_path + fname);
} /* 创建硬链接的Callback */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
return CopyFile(db_->GetEnv(), src_dirname + fname,
full_private_path + fname, size_limit_bytes,
db_options.use_fsync);
} /* 拷贝文件的Callback */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
return CreateFile(db_->GetEnv(), full_private_path + fname, contents);
} /* 创建文件的Callback */,
&sequence_number, log_size_for_flush);
db_->EnableFileDeletions(false); // 回复删除sst文件
...
生成的CheckPoint不能被当前的Rocksdb进程访问。相当于另外产生了一组Rocksdb的数据文件,和当前毫无关联。