RocksDB Compaction源码分析
RocksDB的Compaction过程整体可分为三个部分,prepare keys、process keys、write keys。
- 入口:
db/db_impl_compaction_flush.cc
中的BackgroundCompaction()
Prepare keys
触发条件
-
RocksDB的compaction都是后台运行,通过线程
BGWorkCompaction
进行compaction的调度。Compaction分为两种:- Manual compaction by
CompactFiles()
- Auto compaction by
BackgroundCompaction()
- Manual compaction by
-
MaybeScheduleFlushOrCompaction
1 | while (bg_compaction_scheduled_ < bg_job_limits.max_compactions && |
可以看到最大线程数量限制是bg_job_limits.max_compactions
。
- 队列
DBImpl::compaction_queue_
1 | std::deque<ColumnFamilyData*> compaction_queue_; |
这个队列的更新是在函数SchedulePendingCompaction
更新的,且unscheduled_compactions_
变量是和该函数一起更新的,也就是只有设置了该变量才能够正常调度compaction后台线程。
1 | void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { |
上面的核心函数是NeedsCompaction
,通过这个函数来判断是否有sst需要被compact。
1 | bool LevelCompactionPicker::NeedsCompaction( |
SST文件的选择
下面这两个变量分别保存了level以及每个level所对应的score,score越高,优先级越高。
1 | std::vector<double> compaction_score_; //当前sst的score |
这两个变量的更新在函数VersionStorageInfo::ComputeCompactionScore
中。
1 | void VersionStorageInfo::ComputeCompactionScore( |
compaction每一层level大小的确定
1 | void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions, |
-
static:每一层的大小都是固定的
-
dynamic:动态根据每一层大小进行计算
- 引入base level的概念,通常使用空间放大来衡量空间效率,忽略数据压缩的影响,空间放大 = size_on_file_system / size_of_user_data。
挑选参与compaction的文件
1 | Compaction* LevelCompactionBuilder::PickCompaction() { |
这里PickCompaction分别调用了三个主要的函数。
SetupInitialFiles
初始化需要compact的文件SetupOtherL0FilesIfNeeded
如果需要的话,setup一些其他的L0文件SetupOtherInputsIfNeeded
如果需要的话,setup一些其他的inputs
下面首先分析SetupInitialFiles
。
1 | void LevelCompactionBuilder::SetupInitialFiles() { |
-
首先遍历所有的level,从之前计算好的的compaction信息中得到每个level对应的score,只有当score>=1才能继续进行compact的处理。
-
通过
PickFileToCompact
来选择input以及output文件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73bool LevelCompactionBuilder::PickFileToCompact() {
// level 0 files are overlapping. So we cannot pick more
// than one concurrent compactions at this level. This
// could be made better by looking at key-ranges that are
// being compacted at level 0.
if (start_level_ == 0 &&
!compaction_picker_->level0_compactions_in_progress()->empty()) {
TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
return false;
}
start_level_inputs_.clear();
assert(start_level_ >= 0);
// Pick the largest file in this level that is not already
// being compacted
const std::vector<int>& file_size =
vstorage_->FilesByCompactionPri(start_level_);
const std::vector<FileMetaData*>& level_files =
vstorage_->LevelFiles(start_level_);
unsigned int cmp_idx;
for (cmp_idx = vstorage_->NextCompactionIndex(start_level_);
cmp_idx < file_size.size(); cmp_idx++) {
int index = file_size[cmp_idx];
auto* f = level_files[index];
// do not pick a file to compact if it is being compacted
// from n-1 level.
if (f->being_compacted) {
continue;
}
start_level_inputs_.files.push_back(f);
start_level_inputs_.level = start_level_;
if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_) ||
compaction_picker_->FilesRangeOverlapWithCompaction(
{start_level_inputs_}, output_level_)) {
// A locked (pending compaction) input-level file was pulled in due to
// user-key overlap.
start_level_inputs_.clear();
continue;
}
// Now that input level is fully expanded, we check whether any output files
// are locked due to pending compaction.
//
// Note we rely on ExpandInputsToCleanCut() to tell us whether any output-
// level files are locked, not just the extra ones pulled in for user-key
// overlap.
InternalKey smallest, largest;
compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
CompactionInputFiles output_level_inputs;
output_level_inputs.level = output_level_;
vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
&output_level_inputs.files);
if (!output_level_inputs.empty() &&
!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&output_level_inputs)) {
start_level_inputs_.clear();
continue;
}
base_index_ = index;
break;
}
// store where to start the iteration in the next call to PickCompaction
vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);
return start_level_inputs_.size() > 0;
}-
首先得到当前level(start_level_)的未compacted的最大大小的文件。
-
通过cmp_idx索引到对应的文件。
-
通过
ExpandInputsToCleanCut
扩展当前文件的key的范围,需要满足"clean cut"。 -
通过
FilesRangeOverlapWithCompaction
判断是否有正在compact的out_level的文件范围和已经选择好的文件的key有overlap,如果有则跳过(clear start_level_inputs然后continue)。 -
最后在output_level中选择和start_level已经选择的文件的key有overlap的文件,通过
ExpandInputsToCleanCut
来判断output level files是否有被lock的,如果有则跳过(clear start_level_inputs然后continue)。
-
继续分析PickCompaction
,在RocksDB中level-0比较特殊,因为只有level-0中的sst文件之间是无序的,因此接下来我们需要特殊处理level-0的情况,这个函数就是SetupOtherL0FilesIfNeeded
。
1 | bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() { |
如果调用start_level_ == 0 且 output_level_ != 0则调用GetOverlappingL0Files
。
1 | bool CompactionPicker::GetOverlappingL0Files( |
- 从level-0中得到所有的重合key的文件,然后加入到start_level_inputs中。
最后调用SetupOtherInputsIfNeeded()
。
1 | bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { |
-
调用
SetupOtherInputs
,扩展start_level_inputs对应的output。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118// Populates the set of inputs of all other levels that overlap with the
// start level.
// Now we assume all levels except start level and output level are empty.
// Will also attempt to expand "start level" if that doesn't expand
// "output level" or cause "level" to include a file for compaction that has an
// overlapping user-key with another file.
// REQUIRES: input_level and output_level are different
// REQUIRES: inputs->empty() == false
// Returns false if files on parent level are currently in compaction, which
// means that we can't compact them
bool CompactionPicker::SetupOtherInputs(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
CompactionInputFiles* output_level_inputs, int* parent_index,
int base_index) {
assert(!inputs->empty());
assert(output_level_inputs->empty());
const int input_level = inputs->level;
const int output_level = output_level_inputs->level;
if (input_level == output_level) {
// no possibility of conflict
return true;
}
// For now, we only support merging two levels, start level and output level.
// We need to assert other levels are empty.
for (int l = input_level + 1; l < output_level; l++) {
assert(vstorage->NumLevelFiles(l) == 0);
}
InternalKey smallest, largest;
// Get the range one last time.
GetRange(*inputs, &smallest, &largest);
// Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
// include in compaction
vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
&output_level_inputs->files, *parent_index,
parent_index);
if (AreFilesInCompaction(output_level_inputs->files)) {
return false;
}
if (!output_level_inputs->empty()) {
if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) {
return false;
}
}
// See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. We also choose NOT
// to expand if this would cause "level" to include some entries for some
// user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files.
if (!output_level_inputs->empty()) {
const uint64_t limit = mutable_cf_options.max_compaction_bytes;
const uint64_t output_level_inputs_size =
TotalCompensatedFileSize(output_level_inputs->files);
const uint64_t inputs_size = TotalCompensatedFileSize(inputs->files);
bool expand_inputs = false;
CompactionInputFiles expanded_inputs;
expanded_inputs.level = input_level;
// Get closed interval of output level
InternalKey all_start, all_limit;
GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
bool try_overlapping_inputs = true;
vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
&expanded_inputs.files, base_index, nullptr);
uint64_t expanded_inputs_size =
TotalCompensatedFileSize(expanded_inputs.files);
if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) {
try_overlapping_inputs = false;
}
if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() &&
output_level_inputs_size + expanded_inputs_size < limit &&
!AreFilesInCompaction(expanded_inputs.files)) {
InternalKey new_start, new_limit;
GetRange(expanded_inputs, &new_start, &new_limit);
CompactionInputFiles expanded_output_level_inputs;
expanded_output_level_inputs.level = output_level;
vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
&expanded_output_level_inputs.files,
*parent_index, parent_index);
assert(!expanded_output_level_inputs.empty());
if (!AreFilesInCompaction(expanded_output_level_inputs.files) &&
ExpandInputsToCleanCut(cf_name, vstorage,
&expanded_output_level_inputs) &&
expanded_output_level_inputs.size() == output_level_inputs->size()) {
expand_inputs = true;
}
}
if (!expand_inputs) {
vstorage->GetCleanInputsWithinInterval(input_level, &all_start,
&all_limit, &expanded_inputs.files,
base_index, nullptr);
expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files);
if (expanded_inputs.size() > inputs->size() &&
output_level_inputs_size + expanded_inputs_size < limit &&
!AreFilesInCompaction(expanded_inputs.files)) {
expand_inputs = true;
}
}
if (expand_inputs) {
ROCKS_LOG_INFO(ioptions_.logger,
"[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
"(%" PRIu64 "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt
"+%" ROCKSDB_PRIszt " (%" PRIu64 "+%" PRIu64 " bytes)\n",
cf_name.c_str(), input_level, inputs->size(),
output_level_inputs->size(), inputs_size,
output_level_inputs_size, expanded_inputs.size(),
output_level_inputs->size(), expanded_inputs_size,
output_level_inputs_size);
inputs->files = expanded_inputs.files;
}
}
return true;
} -
将start_level_inputs和output_level_inputs加入到compaction_inputs中。
-
防止一些可能会出现的conflict情况,进行一些判断。
回到PickCompaction
函数,最后构造一个compaction然后返回。
1 | // Form a compaction object containing the files we picked. |
Compaction job:根据获取到数据分配compaction线程
1 | TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", |
-
Prepare
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38void CompactionJob::Prepare() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PREPARE);
// Generate file_levels_ for compaction before making Iterator
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
write_hint_ =
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) {
{
StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
GenSubcompactionBoundaries();
}
assert(sizes_.size() == boundaries_.size() + 1);
for (size_t i = 0; i <= boundaries_.size(); i++) {
Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i],
static_cast<uint32_t>(i));
}
RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size());
} else {
constexpr Slice* start = nullptr;
constexpr Slice* end = nullptr;
constexpr uint64_t size = 0;
compact_->sub_compact_states.emplace_back(c, start, end, size,
/*sub_job_id*/ 0);
}
}-
调用
GenSubcompactionBoundaries
构造subcompaction。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128void CompactionJob::GenSubcompactionBoundaries() {
auto* c = compact_->compaction;
auto* cfd = c->column_family_data();
const Comparator* cfd_comparator = cfd->user_comparator();
std::vector<Slice> bounds;
int start_lvl = c->start_level();
int out_lvl = c->output_level();
// Add the starting and/or ending key of certain input files as a potential
// boundary
for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
int lvl = c->level(lvl_idx);
if (lvl >= start_lvl && lvl <= out_lvl) {
const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
size_t num_files = flevel->num_files;
if (num_files == 0) {
continue;
}
if (lvl == 0) {
// For level 0 add the starting and ending key of each file since the
// files may have greatly differing key ranges (not range-partitioned)
for (size_t i = 0; i < num_files; i++) {
bounds.emplace_back(flevel->files[i].smallest_key);
bounds.emplace_back(flevel->files[i].largest_key);
}
} else {
// For all other levels add the smallest/largest key in the level to
// encompass the range covered by that level
bounds.emplace_back(flevel->files[0].smallest_key);
bounds.emplace_back(flevel->files[num_files - 1].largest_key);
if (lvl == out_lvl) {
// For the last level include the starting keys of all files since
// the last level is the largest and probably has the widest key
// range. Since it's range partitioned, the ending key of one file
// and the starting key of the next are very close (or identical).
for (size_t i = 1; i < num_files; i++) {
bounds.emplace_back(flevel->files[i].smallest_key);
}
}
}
}
}
std::sort(bounds.begin(), bounds.end(),
[cfd_comparator](const Slice& a, const Slice& b) -> bool {
return cfd_comparator->Compare(ExtractUserKey(a),
ExtractUserKey(b)) < 0;
});
// Remove duplicated entries from bounds
bounds.erase(
std::unique(bounds.begin(), bounds.end(),
[cfd_comparator](const Slice& a, const Slice& b) -> bool {
return cfd_comparator->Compare(ExtractUserKey(a),
ExtractUserKey(b)) == 0;
}),
bounds.end());
// Combine consecutive pairs of boundaries into ranges with an approximate
// size of data covered by keys in that range
uint64_t sum = 0;
std::vector<RangeWithSize> ranges;
// Get input version from CompactionState since it's already referenced
// earlier in SetInputVersioCompaction::SetInputVersion and will not change
// when db_mutex_ is released below
auto* v = compact_->compaction->input_version();
for (auto it = bounds.begin();;) {
const Slice a = *it;
++it;
if (it == bounds.end()) {
break;
}
const Slice b = *it;
// ApproximateSize could potentially create table reader iterator to seek
// to the index block and may incur I/O cost in the process. Unlock db
// mutex to reduce contention
db_mutex_->Unlock();
uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
b, start_lvl, out_lvl + 1,
TableReaderCaller::kCompaction);
db_mutex_->Lock();
ranges.emplace_back(a, b, size);
sum += size;
}
// Group the ranges into subcompactions
const double min_file_fill_percent = 4.0 / 5;
int base_level = v->storage_info()->base_level();
uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
sum / min_file_fill_percent /
MaxFileSizeForLevel(
*(c->mutable_cf_options()), out_lvl,
c->immutable_options()->compaction_style, base_level,
c->immutable_options()->level_compaction_dynamic_level_bytes)));
uint64_t subcompactions =
std::min({static_cast<uint64_t>(ranges.size()),
static_cast<uint64_t>(c->max_subcompactions()),
max_output_files});
if (subcompactions > 1) {
double mean = sum * 1.0 / subcompactions;
// Greedily add ranges to the subcompaction until the sum of the ranges'
// sizes becomes >= the expected mean size of a subcompaction
sum = 0;
for (size_t i = 0; i + 1 < ranges.size(); i++) {
sum += ranges[i].size;
if (subcompactions == 1) {
// If there's only one left to schedule then it goes to the end so no
// need to put an end boundary
continue;
}
if (sum >= mean) {
boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
sizes_.emplace_back(sum);
subcompactions--;
sum = 0;
}
}
sizes_.emplace_back(sum + ranges.back().size);
} else {
// Only one range so its size is the total sum of sizes computed above
sizes_.emplace_back(sum);
}
}- 遍历所有的需要compact的level,然后取得每一个level的边界(最大key和最小key)加入到bounds数组之中。
- 然后对获取到的bounds进行排序去重。
- 计算理想情况下所需要的subcompactions的个数以及输出文件的个数。
- 最后更新
boundaries_
,这里会根据文件的大小,通过平均的size,把所有的range分为几份,最终这些都会保存在boundaries_
中。
-
-
Run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185Status CompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
TEST_SYNC_POINT("CompactionJob::Run():Start");
log_buffer_->FlushBufferToLog();
LogCompaction();
const size_t num_threads = compact_->sub_compact_states.size();
assert(num_threads > 0);
const uint64_t start_micros = db_options_.clock->NowMicros();
// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<port::Thread> thread_pool;
thread_pool.reserve(num_threads - 1);
for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
&compact_->sub_compact_states[i]);
}
// Always schedule the first subcompaction (whether or not there are also
// others) in the current thread to be efficient with resources
ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
// Wait for all other threads (if there are any) to finish execution
for (auto& thread : thread_pool) {
thread.join();
}
compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
compaction_stats_.cpu_micros = 0;
for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
compaction_stats_.cpu_micros +=
compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
}
RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
compaction_stats_.cpu_micros);
TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
// Check if any thread encountered an error during execution
Status status;
IOStatus io_s;
bool wrote_new_blob_files = false;
for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) {
status = state.status;
io_s = state.io_status;
break;
}
if (!state.blob_file_additions.empty()) {
wrote_new_blob_files = true;
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
constexpr IODebugContext* dbg = nullptr;
if (output_directory_) {
io_s = output_directory_->Fsync(IOOptions(), dbg);
}
if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
blob_output_directory_ != output_directory_) {
io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
status = io_s;
}
if (status.ok()) {
thread_pool.clear();
std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
files_output.emplace_back(&output);
}
}
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto prefix_extractor =
compact_->compaction->mutable_cf_options()->prefix_extractor.get();
std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
while (true) {
size_t file_idx = next_file_idx.fetch_add(1);
if (file_idx >= files_output.size()) {
break;
}
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regard this verification as user reads since the goal is
// to cache it here for further user reads
ReadOptions read_options;
InternalIterator* iter = cfd->table_cache()->NewIterator(
read_options, file_options_, cfd->internal_comparator(),
files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
prefix_extractor,
/*table_reader_ptr=*/nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
/*skip_filters=*/false, compact_->compaction->output_level(),
MaxFileSizeForL0MetaPin(
*compact_->compaction->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false);
auto s = iter->status();
if (s.ok() && paranoid_file_checks_) {
OutputValidator validator(cfd->internal_comparator(),
/*_enable_order_check=*/true,
/*_enable_hash=*/true);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
s = validator.Add(iter->key(), iter->value());
if (!s.ok()) {
break;
}
}
if (s.ok()) {
s = iter->status();
}
if (s.ok() &&
!validator.CompareValidator(files_output[file_idx]->validator)) {
s = Status::Corruption("Paranoid checksums do not match");
}
}
delete iter;
if (!s.ok()) {
output_status = s;
break;
}
}
};
for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
thread_pool.emplace_back(verify_table,
std::ref(compact_->sub_compact_states[i].status));
}
verify_table(compact_->sub_compact_states[0].status);
for (auto& thread : thread_pool) {
thread.join();
}
for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) {
status = state.status;
break;
}
}
}
TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
auto fn =
TableFileName(state.compaction->immutable_options()->cf_paths,
output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
}
}
compact_->compaction->SetOutputTableProperties(std::move(tp));
// Finish up all book-keeping to unify the subcompaction results
AggregateStatistics();
UpdateCompactionStats();
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
compact_->status = status;
return status;
}- 遍历所有的sub_compact,然后启动线程来进行对应的compact工作,最后等到所有的线程完成,然后退出。
- 通过
ProcessKeyValueCompaction
拿到的sub_compact_states进行真正的compaction处理实际的key-value数据。
Process keys
构造能够访问所有key的迭代器
首先进入到ProcessKeyValueCompaction
函数中,通过之前步骤中填充的sub_compact数据取出对应的key-value数据,构造一个InternalIterator。这一部分主要做key之间的排序以及inernal key的merge操作。
1 | std::unique_ptr<InternalIterator> raw_input( |
-
构造的过程是通过函数
MakeInputIterator
进行的,我们进入到该函数,这个函数构造迭代器的逻辑同样区分level-0和level-其他。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53InternalIterator* VersionSet::MakeInputIterator(
const ReadOptions& read_options, const Compaction* c,
RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions) {
auto cfd = c->column_family_data();
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
c->num_input_levels() - 1
: c->num_input_levels());
InternalIterator** list = new InternalIterator* [space];
size_t num = 0;
for (size_t which = 0; which < c->num_input_levels(); which++) {
if (c->input_levels(which)->num_files != 0) {
if (c->level(which) == 0) {
const LevelFilesBrief* flevel = c->input_levels(which);
for (size_t i = 0; i < flevel->num_files; i++) {
list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions,
cfd->internal_comparator(), *flevel->files[i].file_metadata,
range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
/*table_reader_ptr=*/nullptr,
/*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
/*arena=*/nullptr,
/*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)),
MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = new LevelIterator(
cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), c->input_levels(which),
c->mutable_cf_options()->prefix_extractor.get(),
/*should_sample=*/false,
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)), range_del_agg,
c->boundaries(which));
}
}
}
assert(num <= space);
InternalIterator* result =
NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
static_cast<int>(num));
delete[] list;
return result;
}-
首先获取当前sub_compact所属的cfd。
-
针对level-0,为其中的每一个sst文件构建一个table_cache迭代器,放入list中。
-
针对其他非level-0的层,每一层直接创建一个级联的迭代器并放入list中。也就是这个迭代器从它的start就能够顺序访问到该层最后一个sst文件的最后一个key。
-
将所有层的迭代器添加到一个迭代器数组list中,通过
NewMergingIterator
迭代器维护一个底层的排序堆结构,完成所有层之间的key-value的排序。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
InternalIterator** list, int n,
Arena* arena, bool prefix_seek_mode) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator<Slice>(arena);
} else if (n == 1) {
return list[0];
} else {
if (arena == nullptr) {
return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
}
}
}-
如果list是空的,则直接返回空。
-
如果只有一个,那么认为这个迭代器本身就是有序的,不需要构建一个堆排序的迭代器(level-0的sst内部是有序的,之前创建的时候是为level-0每一个sst创建一个list元素;非level-0的整层都是有序的)。
-
如果有多个,那么直接通过
MergingIterator
来创建堆排序的迭代器。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19MergingIterator(const InternalKeyComparator* comparator,
InternalIterator** children, int n, bool is_arena_mode,
bool prefix_seek_mode)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
direction_(kForward),
minHeap_(comparator_),
prefix_seek_mode_(prefix_seek_mode),
pinned_iters_mgr_(nullptr) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].Set(children[i]);
}
for (auto& child : children_) {
AddToMinHeapOrCheckStatus(&child);
}
current_ = CurrentForward();
}通过将传入的list也就是函数中的children中的所有元素添加到一个vector中,再遍历其中的每一个key-value,通过函数
AddToMinHeapOrCheckStatus
构造底层结构堆,堆中的元素顺序是由用户参数option.comparator
指定,默认是BytewiseComparator支持的lexicographical order,也就是字典顺序。1
2
3
4
5
6
7
8void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
if (child->Valid()) {
assert(child->status().ok());
minHeap_.push(child);
} else {
considerStatus(child->status());
}
}
-
-
通过SeekToFirst和Next指针处理元素
回到ProcessKeyValueCompaction
函数,使用构造好的internalIterator再构造一个包含所有状态的CompactionIterator,直接初始化就可以,构造完成需要将CompactionIterator的内部指针放在整个迭代器最开始的部位,通过Next指针来获取下一个key-value,同时还需要需要在每次迭代器元素内部移动的时候除了调整底层堆中的字典序结构之外,还兼顾处理各个不同type的key数据,将kValueType,kTypeDeletion,kTypeSingleDeletion,kValueDeleteRange,kTypeMerge 等不同的key type处理完成。
1 | c_iter->SeekToFirst(); |
Write keys
这一步也在ProcessKeyValueCompaction
函数中,将key-value写入SST文件中。
-
确认key 的valueType类型,如果是data_block或者index_block类型,则放入builder状态机中
-
优先创建filter_buiilder和index_builder,index builer创建成 分层格式(两层index leve, 第一层多个restart点,用来索引具体的datablock;第二层索引第一层的index block),方便加载到内存进行二分查找,节约内存消耗,加速查找;其次再写data_block_builder
-
如果key的 valueType类型是 range_deletion,则加入到range_delete_block_builder之中
-
先将data_block builder 利用绑定的输出的文件的writer写入底层文件
-
将filter_block / index_builder / compress_builder/range_del_builder/properties_builder 按照对应的格式加入到 meta_data_builder之中,利用绑定ouput 文件的 writer写入底层存储
-
利用meta_data_handle 和 index_handle 封装footer,写入底层存储
将builder与输出文件的writer绑定
默认的blockbase table SST文件有很多不同的block,除了data block之外,其他的block都是需要先写入到一个临时的数据结构builder,然后由builder通过其绑定的output文件的writer写入到底层磁盘形成磁盘的sst文件结构。
这里的逻辑就是将builder与output文件的writer进行绑定,创建好table builder。
1 | // Open output file if necessary |
通过table_builder的状态机添加block数据
然后调用builder->Add
函数构造对应的builder结构,添加的过程主要是通过拥有三个状态的状态机完成不同block的builder创建,状态机是由构造tablebuilder的时候创建的。
1 | status = sub_compact->AddToBuilder(key, value); |
1 | Status AddToBuilder(const Slice& key, const Slice& value) { |
1 | void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { |
- kBuffered为状态机的初始状态。处于这个状态的时候,内存有较多缓存的未压缩的datablock。在该状态的过程中,通过 EnterUnbuffered 函数构造compression block,依此构建对应的index block和filterblock。最终将状态置为下一个状态的:kUnbuffered。
- kUnbuffered这个状态时,compressing block已经通过之前的buffer中的data初步构造完成,且接下来将在这个状态通过 Finish 完成各个block的写入 或者通过 Abandon 丢弃当前的写入。
- kClosed这个状态之前已经完成了table builder的finish或者abandon,那么接下来将析构当前的table builder。
对于第一个状态,进入下面的逻辑。如果data block能够满足flush的条件,则直接flush datablock的数据到当前bulider对应的datablock存储结构中。
1 | auto should_flush = r->flush_block_policy->Update(key, value); |
EnterUnbuffered
函数主要逻辑是构造compression block,如果我们开启了compression的选项则会构造。
同时依据之前flush添加到datablock中的数据来构造index block和filter block,用来索引datablock的数据。选择在这里构造的话主要还是因为flush的时候表示一个完整的datablock已经写入完成,这里需要通过一个完整的datablock数据才有必要构造一条indexblock的数据。
其中data_block_and_keys_buffers数组存放的是未经过压缩的datablock数据。
1 | for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) { |
在EnterUnbuffered
函数中创建index block。
1 | if (table_options.index_type == |
回到ProcessKeyValueCompaction
中的while循环中,不断遍历迭代器中的key,将其添加到对应的datablock,并完善indeblock和filter block,以及compression block。
通过构建的meta_index_builder和Footer完成数据的固化
接下来将通过FinishCompactionOutputFil
对之前添加的builder数据进行整合,处理一些delete range的block以及更新当前compaction的边界。
这个函数调用是当之前累计的builder中block数据的大小达到可以写入的sst文件本身的大小max_output_file_size,会触发当前函数。
1 | // Close output file if it is big enough. Two possibilities determine it's |
FinishCompactionOutputFile
函数内部最终调用s=sub_compact->builder->Finish()完成所有数据的固化写入。
1 | Status BlockBasedTableBuilder::Finish() { |
Compaction参数设置
参数 | 说明 | 默认值 |
---|---|---|
write_buffer_size |
限定Memtable的大小 | 64MB |
level0_file_num_compaction_trigger |
限定Level 0层的文件数量 | 4 |
target_file_size_base |
每一层单个目标文件的大小 | 64MB |
target_file_size_multiplier |
每一层单个目标文件的乘法因子 | 1 |
max_bytes_for_level_base |
每一层所有文件的大小 | 256MB |
max_bytes_for_level_multiplier |
每一层所有文件的乘法因子 | 10 |
level_compaction_dynamic_level_bytes |
是否将Compact的策略改为层级从下往上应用 | False |
num_levels |
LSM的层级数量 | 7 |
-
参数
target_file_size_base
和target_file_size_multiplier
用来限定Compact之后的每一层的单个文件大小。target_file_size_base
是Level-1中每个文件的大小,Level N层可以用target_file_size_base * target_file_size_multiplier ^ (L -1)
计算。target_file_size_base
默认为64MB,target_file_size_multiplier
默认为1。 -
参数
max_bytes_for_level_base
和max_bytes_for_level_multiplier
用来限定每一层所有文件的限定大小。max_bytes_for_level_base
是Level-1层的所有文件的限定大小。Level N层的所有文件的限定大小可以用(max_bytes_for_level_base) * (max_bytes_for_level_multiplier ^ (L-1))
计算。max_bytes_for_level_base
的默认为256MB,max_bytes_for_level_multiplier
默认为10。 -
参数
level_compaction_dynamic_level_bytes
用来指示Compact的策略改为层级从下往上应用。Target_Size(Ln-1) = Target_Size(Ln) / max_bytes_for_level_multiplier
来限定大小:假如max_bytes_for_level_base
是 1GB,num_levels
设为6。最底层的实际容量是276GB, 所以L1-L6层的大小分别是 0, 0, 0.276GB, 2.76GB, 27.6GB and 276GB。 -
MutableDBOptions
1 | struct MutableDBOptions { |
- mutable_cf_options_
1 | explicit MutableCFOptions(const ColumnFamilyOptions& options) |
Some Concepts
- Slice is a simple structure containing a pointer into some external storage and a size.
- parents && grandparents: parent=level+1 grandparent==level+2
- column family(cfd)
- compaction filter
- compression
- sst file maneger(sfm)
- background(bg)
Reference
RocksDB Compaction源码分析
https://tong1heng.github.io/2021/09/24/Embedded/rocksdb_compaction/