RocksDB Compaction源码分析

RocksDB Compaction源码分析

  RocksDB的Compaction过程整体可分为三个部分,prepare keys、process keys、write keys。

  • 入口:db/db_impl_compaction_flush.cc中的BackgroundCompaction()

Prepare keys

触发条件

  • RocksDB的compaction都是后台运行,通过线程BGWorkCompaction进行compaction的调度。Compaction分为两种:

    • Manual compaction by CompactFiles()
    • Auto compaction by BackgroundCompaction()
  • MaybeScheduleFlushOrCompaction

1
2
3
4
5
6
7
8
9
10
11
while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->prepicked_compaction = nullptr;
bg_compaction_scheduled_++; //正在被调度的compaction线程数目
unscheduled_compactions_--; //待调度的线程个数,及待调度的cfd的长度
//调度BGWorkCompaction线程
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCompactionCallback);
}

​ 可以看到最大线程数量限制是bg_job_limits.max_compactions

  • 队列DBImpl::compaction_queue_
1
std::deque<ColumnFamilyData*> compaction_queue_;

​ 这个队列的更新是在函数SchedulePendingCompaction更新的,且unscheduled_compactions_变量是和该函数一起更新的,也就是只有设置了该变量才能够正常调度compaction后台线程。

1
2
3
4
5
6
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}

​ 上面的核心函数是NeedsCompaction,通过这个函数来判断是否有sst需要被compact。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bool LevelCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
if (!vstorage->ExpiredTtlFiles().empty()) { //有超时的sst(ExpiredTtlFiles)
return true;
}
if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
return true;
}
if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
return true;
}
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
if (vstorage->CompactionScore(i) >= 1) { //遍历所有的level的sst,根据score判断是否需要compact
return true;
}
}
return false;
}

SST文件的选择

下面这两个变量分别保存了level以及每个level所对应的score,score越高,优先级越高。

1
2
std::vector<double> compaction_score_; 	//当前sst的score
std::vector<int> compaction_level_; //当前sst需要被compact到的层level

这两个变量的更新在函数VersionStorageInfo::ComputeCompactionScore中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
void VersionStorageInfo::ComputeCompactionScore(
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options) {
for (int level = 0; level <= MaxInputLevel(); level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
int num_sorted_runs = 0;
uint64_t total_size = 0;
for (auto* f : files_[level]) {
if (!f->being_compacted) {
total_size += f->compensated_file_size;
num_sorted_runs++;
}
}
if (compaction_style_ == kCompactionStyleUniversal) {
// For universal compaction, we use level0 score to indicate
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
// Its possible that a subset of the files in a level may be in a
// compaction, due to delete triggered compaction or trivial move.
// In that case, the below check may not catch a level being
// compacted as it only checks the first file. The worst that can
// happen is a scheduled compaction thread will find nothing to do.
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
num_sorted_runs++;
}
}
}

if (compaction_style_ == kCompactionStyleFIFO) {
score = static_cast<double>(total_size) /
mutable_cf_options.compaction_options_fifo.max_table_files_size;
if (mutable_cf_options.compaction_options_fifo.allow_compaction ||
mutable_cf_options.compaction_options_fifo.age_for_warm > 0) {
// Warm tier move can happen at any time. It's too expensive to
// check very file's timestamp now. For now, just trigger it
// slightly more frequently than FIFO compaction so that this
// happens first.
score = std::max(
static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger,
score);
}
if (mutable_cf_options.ttl > 0) {
score = std::max(
static_cast<double>(GetExpiredTtlFilesCount(
immutable_options, mutable_cf_options, files_[level])),
score);
}
} else {
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;
if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
// Level-based involves L0->L0 compactions that can lead to oversized
// L0 files. Take into account size as well to avoid later giant
// compactions to the base level.
uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base;
if (immutable_options.level_compaction_dynamic_level_bytes &&
level_multiplier_ != 0.0) {
// Prevent L0 to Lbase fanout from growing larger than
// `level_multiplier_`. This prevents us from getting stuck picking
// L0 forever even when it is hurting write-amp. That could happen
// in dynamic level compaction's write-burst mode where the base
// level's target size can grow to be enormous.
l0_target_size =
std::max(l0_target_size,
static_cast<uint64_t>(level_max_bytes_[base_level_] /
level_multiplier_));
}
score =
std::max(score, static_cast<double>(total_size) / l0_target_size);
}
}
} else {
// Compute the ratio of current size to size limit.
uint64_t level_bytes_no_compacting = 0;
for (auto f : files_[level]) {
if (!f->being_compacted) {
level_bytes_no_compacting += f->compensated_file_size;
}
}
score = static_cast<double>(level_bytes_no_compacting) /
MaxBytesForLevel(level);
}
compaction_level_[level] = level;
compaction_score_[level] = score;
}

// sort all the levels based on their score. Higher scores get listed
// first. Use bubble sort because the number of entries are small.
for (int i = 0; i < num_levels() - 2; i++) {
for (int j = i + 1; j < num_levels() - 1; j++) {
if (compaction_score_[i] < compaction_score_[j]) {
double score = compaction_score_[i];
int level = compaction_level_[i];
compaction_score_[i] = compaction_score_[j];
compaction_level_[i] = compaction_level_[j];
compaction_score_[j] = score;
compaction_level_[j] = level;
}
}
}
ComputeFilesMarkedForCompaction();
ComputeBottommostFilesMarkedForCompaction();
if (mutable_cf_options.ttl > 0) {
ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl);
}
if (mutable_cf_options.periodic_compaction_seconds > 0) {
ComputeFilesMarkedForPeriodicCompaction(
immutable_options, mutable_cf_options.periodic_compaction_seconds);
}
EstimateCompactionBytesNeeded(mutable_cf_options);
}

compaction每一层level大小的确定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
const MutableCFOptions& options) {
// Special logic to set number of sorted runs.
// It is to match the previous behavior when all files are in L0.
int num_l0_count = static_cast<int>(files_[0].size());
if (compaction_style_ == kCompactionStyleUniversal) {
// For universal compaction, we use level0 score to indicate
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
if (!files_[i].empty()) {
num_l0_count++;
}
}
}
set_l0_delay_trigger_count(num_l0_count);

level_max_bytes_.resize(ioptions.num_levels);
if (!ioptions.level_compaction_dynamic_level_bytes) {
base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;

// Calculate for static bytes base case
for (int i = 0; i < ioptions.num_levels; ++i) {
if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
level_max_bytes_[i] = options.max_bytes_for_level_base;
} else if (i > 1) {
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1],
options.max_bytes_for_level_multiplier),
options.MaxBytesMultiplerAdditional(i - 1));
} else {
level_max_bytes_[i] = options.max_bytes_for_level_base;
}
}
} else {
uint64_t max_level_size = 0;

int first_non_empty_level = -1;
// Find size of non-L0 level of most data.
// Cannot use the size of the last level because it can be empty or less
// than previous levels after compaction.
for (int i = 1; i < num_levels_; i++) {
uint64_t total_size = 0;
for (const auto& f : files_[i]) {
total_size += f->fd.GetFileSize();
}
if (total_size > 0 && first_non_empty_level == -1) {
first_non_empty_level = i;
}
if (total_size > max_level_size) {
max_level_size = total_size;
}
}

// Prefill every level's max bytes to disallow compaction from there.
for (int i = 0; i < num_levels_; i++) {
level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
}

if (max_level_size == 0) {
// No data for L1 and up. L0 compacts to last level directly.
// No compaction from L1+ needs to be scheduled.
base_level_ = num_levels_ - 1;
} else {
uint64_t l0_size = 0;
for (const auto& f : files_[0]) {
l0_size += f->fd.GetFileSize();
}

uint64_t base_bytes_max =
std::max(options.max_bytes_for_level_base, l0_size);
uint64_t base_bytes_min = static_cast<uint64_t>(
base_bytes_max / options.max_bytes_for_level_multiplier);

// Try whether we can make last level's target size to be max_level_size
uint64_t cur_level_size = max_level_size;
for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) { //从倒数第二层level往上到first non empty level
// Round up after dividing
cur_level_size = static_cast<uint64_t>(
cur_level_size / options.max_bytes_for_level_multiplier);
}

// Calculate base level and its size.
uint64_t base_level_size;
if (cur_level_size <= base_bytes_min) {
// Case 1. If we make target size of last level to be max_level_size,
// target size of the first non-empty level would be smaller than
// base_bytes_min. We set it be base_bytes_min.
base_level_size = base_bytes_min + 1U;
base_level_ = first_non_empty_level;
ROCKS_LOG_INFO(ioptions.logger,
"More existing levels in DB than needed. "
"max_bytes_for_level_multiplier may not be guaranteed.");
} else {
// Find base level (where L0 data is compacted to).
base_level_ = first_non_empty_level;
while (base_level_ > 1 && cur_level_size > base_bytes_max) {
--base_level_;
cur_level_size = static_cast<uint64_t>(
cur_level_size / options.max_bytes_for_level_multiplier);
}
if (cur_level_size > base_bytes_max) {
// Even L1 will be too large
assert(base_level_ == 1);
base_level_size = base_bytes_max;
} else {
base_level_size = cur_level_size;
}
}

level_multiplier_ = options.max_bytes_for_level_multiplier;
assert(base_level_size > 0);
if (l0_size > base_level_size &&
(l0_size > options.max_bytes_for_level_base ||
static_cast<int>(files_[0].size() / 2) >=
options.level0_file_num_compaction_trigger)) {
// We adjust the base level according to actual L0 size, and adjust
// the level multiplier accordingly, when:
// 1. the L0 size is larger than level size base, or
// 2. number of L0 files reaches twice the L0->L1 compaction trigger
// We don't do this otherwise to keep the LSM-tree structure stable
// unless the L0 compaction is backlogged.
base_level_size = l0_size;
if (base_level_ == num_levels_ - 1) {
level_multiplier_ = 1.0;
} else {
level_multiplier_ = std::pow(
static_cast<double>(max_level_size) /
static_cast<double>(base_level_size),
1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
}
}

uint64_t level_size = base_level_size;
for (int i = base_level_; i < num_levels_; i++) {
if (i > base_level_) {
level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
}
// Don't set any level below base_bytes_max. Otherwise, the LSM can
// assume an hourglass shape where L1+ sizes are smaller than L0. This
// causes compaction scoring, which depends on level sizes, to favor L1+
// at the expense of L0, which may fill up and stall.
level_max_bytes_[i] = std::max(level_size, base_bytes_max);
}
}
}
}
  1. static:每一层的大小都是固定的

  2. dynamic:动态根据每一层大小进行计算

  • 引入base level的概念,通常使用空间放大来衡量空间效率,忽略数据压缩的影响,空间放大 = size_on_file_system / size_of_user_data。

挑选参与compaction的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Compaction* LevelCompactionBuilder::PickCompaction() {
// Pick up the first file to start compaction. It may have been extended
// to a clean cut.
SetupInitialFiles();
if (start_level_inputs_.empty()) {
return nullptr;
}
assert(start_level_ >= 0 && output_level_ >= 0);

// If it is a L0 -> base level compaction, we need to set up other L0
// files if needed.
if (!SetupOtherL0FilesIfNeeded()) {
return nullptr;
}

// Pick files in the output level and expand more files in the start level
// if needed.
if (!SetupOtherInputsIfNeeded()) {
return nullptr;
}

// Form a compaction object containing the files we picked.
Compaction* c = GetCompaction();

TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);

return c;
}

这里PickCompaction分别调用了三个主要的函数。

  • SetupInitialFiles 初始化需要compact的文件
  • SetupOtherL0FilesIfNeeded 如果需要的话,setup一些其他的L0文件
  • SetupOtherInputsIfNeeded 如果需要的话,setup一些其他的inputs

下面首先分析SetupInitialFiles

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
void LevelCompactionBuilder::SetupInitialFiles() {
// Find the compactions by size on all levels.
bool skipped_l0_to_base = false;
for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) {
start_level_score_ = vstorage_->CompactionScore(i);
start_level_ = vstorage_->CompactionScoreLevel(i);
assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1));
if (start_level_score_ >= 1) {
if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) {
// If L0->base_level compaction is pending, don't schedule further
// compaction from base level. Otherwise L0->base_level compaction
// may starve.
continue;
}
output_level_ =
(start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
if (PickFileToCompact()) {
// found the compaction!
if (start_level_ == 0) {
// L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
compaction_reason_ = CompactionReason::kLevelL0FilesNum;
} else {
// L1+ score = `Level files size` / `MaxBytesForLevel`
compaction_reason_ = CompactionReason::kLevelMaxLevelSize;
}
break;
} else {
// didn't find the compaction, clear the inputs
start_level_inputs_.clear();
if (start_level_ == 0) {
skipped_l0_to_base = true;
// L0->base_level may be blocked due to ongoing L0->base_level
// compactions. It may also be blocked by an ongoing compaction from
// base_level downwards.
//
// In these cases, to reduce L0 file count and thus reduce likelihood
// of write stalls, we can attempt compacting a span of files within
// L0.
if (PickIntraL0Compaction()) {
output_level_ = 0;
compaction_reason_ = CompactionReason::kLevelL0FilesNum;
break;
}
}
}
} else {
// Compaction scores are sorted in descending order, no further scores
// will be >= 1.
break;
}
}
if (!start_level_inputs_.empty()) {
return;
}

// if we didn't find a compaction, check if there are any files marked for
// compaction
parent_index_ = base_index_ = -1;

compaction_picker_->PickFilesMarkedForCompaction(
cf_name_, vstorage_, &start_level_, &output_level_, &start_level_inputs_);
if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
return;
}

// Bottommost Files Compaction on deleting tombstones
PickFileToCompact(vstorage_->BottommostFilesMarkedForCompaction(), false);
if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kBottommostFiles;
return;
}

// TTL Compaction
PickFileToCompact(vstorage_->ExpiredTtlFiles(), true);
if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kTtl;
return;
}

// Periodic Compaction
PickFileToCompact(vstorage_->FilesMarkedForPeriodicCompaction(), false);
if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kPeriodicCompaction;
return;
}
}
  • 首先遍历所有的level,从之前计算好的的compaction信息中得到每个level对应的score,只有当score>=1才能继续进行compact的处理。

  • 通过PickFileToCompact来选择input以及output文件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    bool LevelCompactionBuilder::PickFileToCompact() {
    // level 0 files are overlapping. So we cannot pick more
    // than one concurrent compactions at this level. This
    // could be made better by looking at key-ranges that are
    // being compacted at level 0.
    if (start_level_ == 0 &&
    !compaction_picker_->level0_compactions_in_progress()->empty()) {
    TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
    return false;
    }

    start_level_inputs_.clear();

    assert(start_level_ >= 0);

    // Pick the largest file in this level that is not already
    // being compacted
    const std::vector<int>& file_size =
    vstorage_->FilesByCompactionPri(start_level_);
    const std::vector<FileMetaData*>& level_files =
    vstorage_->LevelFiles(start_level_);

    unsigned int cmp_idx;
    for (cmp_idx = vstorage_->NextCompactionIndex(start_level_);
    cmp_idx < file_size.size(); cmp_idx++) {
    int index = file_size[cmp_idx];
    auto* f = level_files[index];

    // do not pick a file to compact if it is being compacted
    // from n-1 level.
    if (f->being_compacted) {
    continue;
    }

    start_level_inputs_.files.push_back(f);
    start_level_inputs_.level = start_level_;
    if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
    &start_level_inputs_) ||
    compaction_picker_->FilesRangeOverlapWithCompaction(
    {start_level_inputs_}, output_level_)) {
    // A locked (pending compaction) input-level file was pulled in due to
    // user-key overlap.
    start_level_inputs_.clear();
    continue;
    }

    // Now that input level is fully expanded, we check whether any output files
    // are locked due to pending compaction.
    //
    // Note we rely on ExpandInputsToCleanCut() to tell us whether any output-
    // level files are locked, not just the extra ones pulled in for user-key
    // overlap.
    InternalKey smallest, largest;
    compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
    CompactionInputFiles output_level_inputs;
    output_level_inputs.level = output_level_;
    vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
    &output_level_inputs.files);
    if (!output_level_inputs.empty() &&
    !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
    &output_level_inputs)) {
    start_level_inputs_.clear();
    continue;
    }
    base_index_ = index;
    break;
    }

    // store where to start the iteration in the next call to PickCompaction
    vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);

    return start_level_inputs_.size() > 0;
    }
    • 首先得到当前level(start_level_)的未compacted的最大大小的文件。

    • 通过cmp_idx索引到对应的文件。

    • 通过ExpandInputsToCleanCut扩展当前文件的key的范围,需要满足"clean cut"。

    • 通过FilesRangeOverlapWithCompaction判断是否有正在compact的out_level的文件范围和已经选择好的文件的key有overlap,如果有则跳过(clear start_level_inputs然后continue)。

    • 最后在output_level中选择和start_level已经选择的文件的key有overlap的文件,通过ExpandInputsToCleanCut来判断output level files是否有被lock的,如果有则跳过(clear start_level_inputs然后continue)。

继续分析PickCompaction,在RocksDB中level-0比较特殊,因为只有level-0中的sst文件之间是无序的,因此接下来我们需要特殊处理level-0的情况,这个函数就是SetupOtherL0FilesIfNeeded

1
2
3
4
5
6
7
bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {
if (start_level_ == 0 && output_level_ != 0) {
return compaction_picker_->GetOverlappingL0Files(
vstorage_, &start_level_inputs_, output_level_, &parent_index_);
}
return true;
}

如果调用start_level_ == 0 且 output_level_ != 0则调用GetOverlappingL0Files

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bool CompactionPicker::GetOverlappingL0Files(
VersionStorageInfo* vstorage, CompactionInputFiles* start_level_inputs,
int output_level, int* parent_index) {
// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
assert(level0_compactions_in_progress()->empty());
InternalKey smallest, largest;
GetRange(*start_level_inputs, &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
start_level_inputs->files.clear();
vstorage->GetOverlappingInputs(0, &smallest, &largest,
&(start_level_inputs->files));

// If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range
GetRange(*start_level_inputs, &smallest, &largest);
if (IsRangeInCompaction(vstorage, &smallest, &largest, output_level,
parent_index)) {
return false;
}
assert(!start_level_inputs->files.empty());

return true;
}
  • 从level-0中得到所有的重合key的文件,然后加入到start_level_inputs中。

最后调用SetupOtherInputsIfNeeded()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
// Setup input files from output level. For output to L0, we only compact
// spans of files that do not interact with any pending compactions, so don't
// need to consider other levels.
if (output_level_ != 0) {
output_level_inputs_.level = output_level_;
if (!compaction_picker_->SetupOtherInputs(
cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
&output_level_inputs_, &parent_index_, base_index_)) {
return false;
}

compaction_inputs_.push_back(start_level_inputs_);
if (!output_level_inputs_.empty()) {
compaction_inputs_.push_back(output_level_inputs_);
}

// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output level. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_,
output_level_)) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return false;
}
compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
output_level_inputs_, &grandparents_);
} else {
compaction_inputs_.push_back(start_level_inputs_);
}
return true;
}
  • 调用SetupOtherInputs,扩展start_level_inputs对应的output。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    // Populates the set of inputs of all other levels that overlap with the
    // start level.
    // Now we assume all levels except start level and output level are empty.
    // Will also attempt to expand "start level" if that doesn't expand
    // "output level" or cause "level" to include a file for compaction that has an
    // overlapping user-key with another file.
    // REQUIRES: input_level and output_level are different
    // REQUIRES: inputs->empty() == false
    // Returns false if files on parent level are currently in compaction, which
    // means that we can't compact them
    bool CompactionPicker::SetupOtherInputs(
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
    VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
    CompactionInputFiles* output_level_inputs, int* parent_index,
    int base_index) {
    assert(!inputs->empty());
    assert(output_level_inputs->empty());
    const int input_level = inputs->level;
    const int output_level = output_level_inputs->level;
    if (input_level == output_level) {
    // no possibility of conflict
    return true;
    }

    // For now, we only support merging two levels, start level and output level.
    // We need to assert other levels are empty.
    for (int l = input_level + 1; l < output_level; l++) {
    assert(vstorage->NumLevelFiles(l) == 0);
    }

    InternalKey smallest, largest;

    // Get the range one last time.
    GetRange(*inputs, &smallest, &largest);

    // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
    // include in compaction
    vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
    &output_level_inputs->files, *parent_index,
    parent_index);
    if (AreFilesInCompaction(output_level_inputs->files)) {
    return false;
    }
    if (!output_level_inputs->empty()) {
    if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) {
    return false;
    }
    }

    // See if we can further grow the number of inputs in "level" without
    // changing the number of "level+1" files we pick up. We also choose NOT
    // to expand if this would cause "level" to include some entries for some
    // user key, while excluding other entries for the same user key. This
    // can happen when one user key spans multiple files.
    if (!output_level_inputs->empty()) {
    const uint64_t limit = mutable_cf_options.max_compaction_bytes;
    const uint64_t output_level_inputs_size =
    TotalCompensatedFileSize(output_level_inputs->files);
    const uint64_t inputs_size = TotalCompensatedFileSize(inputs->files);
    bool expand_inputs = false;

    CompactionInputFiles expanded_inputs;
    expanded_inputs.level = input_level;
    // Get closed interval of output level
    InternalKey all_start, all_limit;
    GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
    bool try_overlapping_inputs = true;
    vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
    &expanded_inputs.files, base_index, nullptr);
    uint64_t expanded_inputs_size =
    TotalCompensatedFileSize(expanded_inputs.files);
    if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) {
    try_overlapping_inputs = false;
    }
    if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() &&
    output_level_inputs_size + expanded_inputs_size < limit &&
    !AreFilesInCompaction(expanded_inputs.files)) {
    InternalKey new_start, new_limit;
    GetRange(expanded_inputs, &new_start, &new_limit);
    CompactionInputFiles expanded_output_level_inputs;
    expanded_output_level_inputs.level = output_level;
    vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
    &expanded_output_level_inputs.files,
    *parent_index, parent_index);
    assert(!expanded_output_level_inputs.empty());
    if (!AreFilesInCompaction(expanded_output_level_inputs.files) &&
    ExpandInputsToCleanCut(cf_name, vstorage,
    &expanded_output_level_inputs) &&
    expanded_output_level_inputs.size() == output_level_inputs->size()) {
    expand_inputs = true;
    }
    }
    if (!expand_inputs) {
    vstorage->GetCleanInputsWithinInterval(input_level, &all_start,
    &all_limit, &expanded_inputs.files,
    base_index, nullptr);
    expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files);
    if (expanded_inputs.size() > inputs->size() &&
    output_level_inputs_size + expanded_inputs_size < limit &&
    !AreFilesInCompaction(expanded_inputs.files)) {
    expand_inputs = true;
    }
    }
    if (expand_inputs) {
    ROCKS_LOG_INFO(ioptions_.logger,
    "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
    "(%" PRIu64 "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt
    "+%" ROCKSDB_PRIszt " (%" PRIu64 "+%" PRIu64 " bytes)\n",
    cf_name.c_str(), input_level, inputs->size(),
    output_level_inputs->size(), inputs_size,
    output_level_inputs_size, expanded_inputs.size(),
    output_level_inputs->size(), expanded_inputs_size,
    output_level_inputs_size);
    inputs->files = expanded_inputs.files;
    }
    }
    return true;
    }
  • 将start_level_inputs和output_level_inputs加入到compaction_inputs中。

  • 防止一些可能会出现的conflict情况,进行一些判断。

回到PickCompaction函数,最后构造一个compaction然后返回。

1
2
3
4
// Form a compaction object containing the files we picked.
Compaction* c = GetCompaction();
TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
return c;

Compaction job:根据获取到数据分配compaction线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
c->column_family_data());
int output_level __attribute__((__unused__));
output_level = c->output_level();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
&output_level);
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
SnapshotChecker* snapshot_checker;
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
mutable_db_options_, file_options_for_compaction_, versions_.get(),
&shutting_down_, preserve_deletes_seqnum_.load(), log_buffer,
directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()),
GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr,
is_manual ? manual_compaction->canceled : nullptr, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
&blob_callback_);
compaction_job.Prepare();

NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
mutex_.Unlock();
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
// Should handle erorr?
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();

status = compaction_job.Install(*c->mutable_cf_options());
io_s = compaction_job.io_status();
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
}
*made_progress = true;
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
c->column_family_data());
  • Prepare

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    void CompactionJob::Prepare() {
    AutoThreadOperationStageUpdater stage_updater(
    ThreadStatus::STAGE_COMPACTION_PREPARE);

    // Generate file_levels_ for compaction before making Iterator
    auto* c = compact_->compaction;
    assert(c->column_family_data() != nullptr);
    assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
    compact_->compaction->level()) > 0);

    write_hint_ =
    c->column_family_data()->CalculateSSTWriteHint(c->output_level());
    bottommost_level_ = c->bottommost_level();

    if (c->ShouldFormSubcompactions()) {
    {
    StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
    GenSubcompactionBoundaries();
    }
    assert(sizes_.size() == boundaries_.size() + 1);

    for (size_t i = 0; i <= boundaries_.size(); i++) {
    Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
    Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
    compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i],
    static_cast<uint32_t>(i));
    }
    RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
    compact_->sub_compact_states.size());
    } else {
    constexpr Slice* start = nullptr;
    constexpr Slice* end = nullptr;
    constexpr uint64_t size = 0;

    compact_->sub_compact_states.emplace_back(c, start, end, size,
    /*sub_job_id*/ 0);
    }
    }
    • 调用GenSubcompactionBoundaries构造subcompaction。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      void CompactionJob::GenSubcompactionBoundaries() {
      auto* c = compact_->compaction;
      auto* cfd = c->column_family_data();
      const Comparator* cfd_comparator = cfd->user_comparator();
      std::vector<Slice> bounds;
      int start_lvl = c->start_level();
      int out_lvl = c->output_level();

      // Add the starting and/or ending key of certain input files as a potential
      // boundary
      for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
      int lvl = c->level(lvl_idx);
      if (lvl >= start_lvl && lvl <= out_lvl) {
      const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
      size_t num_files = flevel->num_files;

      if (num_files == 0) {
      continue;
      }

      if (lvl == 0) {
      // For level 0 add the starting and ending key of each file since the
      // files may have greatly differing key ranges (not range-partitioned)
      for (size_t i = 0; i < num_files; i++) {
      bounds.emplace_back(flevel->files[i].smallest_key);
      bounds.emplace_back(flevel->files[i].largest_key);
      }
      } else {
      // For all other levels add the smallest/largest key in the level to
      // encompass the range covered by that level
      bounds.emplace_back(flevel->files[0].smallest_key);
      bounds.emplace_back(flevel->files[num_files - 1].largest_key);
      if (lvl == out_lvl) {
      // For the last level include the starting keys of all files since
      // the last level is the largest and probably has the widest key
      // range. Since it's range partitioned, the ending key of one file
      // and the starting key of the next are very close (or identical).
      for (size_t i = 1; i < num_files; i++) {
      bounds.emplace_back(flevel->files[i].smallest_key);
      }
      }
      }
      }
      }

      std::sort(bounds.begin(), bounds.end(),
      [cfd_comparator](const Slice& a, const Slice& b) -> bool {
      return cfd_comparator->Compare(ExtractUserKey(a),
      ExtractUserKey(b)) < 0;
      });
      // Remove duplicated entries from bounds
      bounds.erase(
      std::unique(bounds.begin(), bounds.end(),
      [cfd_comparator](const Slice& a, const Slice& b) -> bool {
      return cfd_comparator->Compare(ExtractUserKey(a),
      ExtractUserKey(b)) == 0;
      }),
      bounds.end());

      // Combine consecutive pairs of boundaries into ranges with an approximate
      // size of data covered by keys in that range
      uint64_t sum = 0;
      std::vector<RangeWithSize> ranges;
      // Get input version from CompactionState since it's already referenced
      // earlier in SetInputVersioCompaction::SetInputVersion and will not change
      // when db_mutex_ is released below
      auto* v = compact_->compaction->input_version();
      for (auto it = bounds.begin();;) {
      const Slice a = *it;
      ++it;

      if (it == bounds.end()) {
      break;
      }

      const Slice b = *it;

      // ApproximateSize could potentially create table reader iterator to seek
      // to the index block and may incur I/O cost in the process. Unlock db
      // mutex to reduce contention
      db_mutex_->Unlock();
      uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
      b, start_lvl, out_lvl + 1,
      TableReaderCaller::kCompaction);
      db_mutex_->Lock();
      ranges.emplace_back(a, b, size);
      sum += size;
      }

      // Group the ranges into subcompactions
      const double min_file_fill_percent = 4.0 / 5;
      int base_level = v->storage_info()->base_level();
      uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
      sum / min_file_fill_percent /
      MaxFileSizeForLevel(
      *(c->mutable_cf_options()), out_lvl,
      c->immutable_options()->compaction_style, base_level,
      c->immutable_options()->level_compaction_dynamic_level_bytes)));
      uint64_t subcompactions =
      std::min({static_cast<uint64_t>(ranges.size()),
      static_cast<uint64_t>(c->max_subcompactions()),
      max_output_files});

      if (subcompactions > 1) {
      double mean = sum * 1.0 / subcompactions;
      // Greedily add ranges to the subcompaction until the sum of the ranges'
      // sizes becomes >= the expected mean size of a subcompaction
      sum = 0;
      for (size_t i = 0; i + 1 < ranges.size(); i++) {
      sum += ranges[i].size;
      if (subcompactions == 1) {
      // If there's only one left to schedule then it goes to the end so no
      // need to put an end boundary
      continue;
      }
      if (sum >= mean) {
      boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
      sizes_.emplace_back(sum);
      subcompactions--;
      sum = 0;
      }
      }
      sizes_.emplace_back(sum + ranges.back().size);
      } else {
      // Only one range so its size is the total sum of sizes computed above
      sizes_.emplace_back(sum);
      }
      }
      • 遍历所有的需要compact的level,然后取得每一个level的边界(最大key和最小key)加入到bounds数组之中。
      • 然后对获取到的bounds进行排序去重。
      • 计算理想情况下所需要的subcompactions的个数以及输出文件的个数。
      • 最后更新boundaries_,这里会根据文件的大小,通过平均的size,把所有的range分为几份,最终这些都会保存在boundaries_中。
  • Run

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    Status CompactionJob::Run() {
    AutoThreadOperationStageUpdater stage_updater(
    ThreadStatus::STAGE_COMPACTION_RUN);
    TEST_SYNC_POINT("CompactionJob::Run():Start");
    log_buffer_->FlushBufferToLog();
    LogCompaction();

    const size_t num_threads = compact_->sub_compact_states.size();
    assert(num_threads > 0);
    const uint64_t start_micros = db_options_.clock->NowMicros();

    // Launch a thread for each of subcompactions 1...num_threads-1
    std::vector<port::Thread> thread_pool;
    thread_pool.reserve(num_threads - 1);
    for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
    thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
    &compact_->sub_compact_states[i]);
    }

    // Always schedule the first subcompaction (whether or not there are also
    // others) in the current thread to be efficient with resources
    ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);

    // Wait for all other threads (if there are any) to finish execution
    for (auto& thread : thread_pool) {
    thread.join();
    }

    compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
    compaction_stats_.cpu_micros = 0;
    for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
    compaction_stats_.cpu_micros +=
    compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
    }

    RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
    RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
    compaction_stats_.cpu_micros);

    TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");

    // Check if any thread encountered an error during execution
    Status status;
    IOStatus io_s;
    bool wrote_new_blob_files = false;

    for (const auto& state : compact_->sub_compact_states) {
    if (!state.status.ok()) {
    status = state.status;
    io_s = state.io_status;
    break;
    }

    if (!state.blob_file_additions.empty()) {
    wrote_new_blob_files = true;
    }
    }

    if (io_status_.ok()) {
    io_status_ = io_s;
    }
    if (status.ok()) {
    constexpr IODebugContext* dbg = nullptr;

    if (output_directory_) {
    io_s = output_directory_->Fsync(IOOptions(), dbg);
    }

    if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
    blob_output_directory_ != output_directory_) {
    io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
    }
    }
    if (io_status_.ok()) {
    io_status_ = io_s;
    }
    if (status.ok()) {
    status = io_s;
    }
    if (status.ok()) {
    thread_pool.clear();
    std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
    for (const auto& state : compact_->sub_compact_states) {
    for (const auto& output : state.outputs) {
    files_output.emplace_back(&output);
    }
    }
    ColumnFamilyData* cfd = compact_->compaction->column_family_data();
    auto prefix_extractor =
    compact_->compaction->mutable_cf_options()->prefix_extractor.get();
    std::atomic<size_t> next_file_idx(0);
    auto verify_table = [&](Status& output_status) {
    while (true) {
    size_t file_idx = next_file_idx.fetch_add(1);
    if (file_idx >= files_output.size()) {
    break;
    }
    // Verify that the table is usable
    // We set for_compaction to false and don't OptimizeForCompactionTableRead
    // here because this is a special case after we finish the table building
    // No matter whether use_direct_io_for_flush_and_compaction is true,
    // we will regard this verification as user reads since the goal is
    // to cache it here for further user reads
    ReadOptions read_options;
    InternalIterator* iter = cfd->table_cache()->NewIterator(
    read_options, file_options_, cfd->internal_comparator(),
    files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
    prefix_extractor,
    /*table_reader_ptr=*/nullptr,
    cfd->internal_stats()->GetFileReadHist(
    compact_->compaction->output_level()),
    TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
    /*skip_filters=*/false, compact_->compaction->output_level(),
    MaxFileSizeForL0MetaPin(
    *compact_->compaction->mutable_cf_options()),
    /*smallest_compaction_key=*/nullptr,
    /*largest_compaction_key=*/nullptr,
    /*allow_unprepared_value=*/false);
    auto s = iter->status();

    if (s.ok() && paranoid_file_checks_) {
    OutputValidator validator(cfd->internal_comparator(),
    /*_enable_order_check=*/true,
    /*_enable_hash=*/true);
    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
    s = validator.Add(iter->key(), iter->value());
    if (!s.ok()) {
    break;
    }
    }
    if (s.ok()) {
    s = iter->status();
    }
    if (s.ok() &&
    !validator.CompareValidator(files_output[file_idx]->validator)) {
    s = Status::Corruption("Paranoid checksums do not match");
    }
    }

    delete iter;

    if (!s.ok()) {
    output_status = s;
    break;
    }
    }
    };
    for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
    thread_pool.emplace_back(verify_table,
    std::ref(compact_->sub_compact_states[i].status));
    }
    verify_table(compact_->sub_compact_states[0].status);
    for (auto& thread : thread_pool) {
    thread.join();
    }
    for (const auto& state : compact_->sub_compact_states) {
    if (!state.status.ok()) {
    status = state.status;
    break;
    }
    }
    }

    TablePropertiesCollection tp;
    for (const auto& state : compact_->sub_compact_states) {
    for (const auto& output : state.outputs) {
    auto fn =
    TableFileName(state.compaction->immutable_options()->cf_paths,
    output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
    tp[fn] = output.table_properties;
    }
    }
    compact_->compaction->SetOutputTableProperties(std::move(tp));

    // Finish up all book-keeping to unify the subcompaction results
    AggregateStatistics();
    UpdateCompactionStats();

    RecordCompactionIOStats();
    LogFlush(db_options_.info_log);
    TEST_SYNC_POINT("CompactionJob::Run():End");

    compact_->status = status;
    return status;
    }
    • 遍历所有的sub_compact,然后启动线程来进行对应的compact工作,最后等到所有的线程完成,然后退出。
    • 通过ProcessKeyValueCompaction拿到的sub_compact_states进行真正的compaction处理实际的key-value数据。

Process keys

构造能够访问所有key的迭代器

首先进入到ProcessKeyValueCompaction函数中,通过之前步骤中填充的sub_compact数据取出对应的key-value数据,构造一个InternalIterator。这一部分主要做key之间的排序以及inernal key的merge操作。

1
2
3
4
std::unique_ptr<InternalIterator> raw_input(
versions_->MakeInputIterator(read_options, sub_compact->compaction,
&range_del_agg, file_options_for_read_));
InternalIterator* input = raw_input.get();
  • 构造的过程是通过函数MakeInputIterator进行的,我们进入到该函数,这个函数构造迭代器的逻辑同样区分level-0和level-其他。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    InternalIterator* VersionSet::MakeInputIterator(
    const ReadOptions& read_options, const Compaction* c,
    RangeDelAggregator* range_del_agg,
    const FileOptions& file_options_compactions) {
    auto cfd = c->column_family_data();
    // Level-0 files have to be merged together. For other levels,
    // we will make a concatenating iterator per level.
    // TODO(opt): use concatenating iterator for level-0 if there is no overlap
    const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
    c->num_input_levels() - 1
    : c->num_input_levels());
    InternalIterator** list = new InternalIterator* [space];
    size_t num = 0;
    for (size_t which = 0; which < c->num_input_levels(); which++) {
    if (c->input_levels(which)->num_files != 0) {
    if (c->level(which) == 0) {
    const LevelFilesBrief* flevel = c->input_levels(which);
    for (size_t i = 0; i < flevel->num_files; i++) {
    list[num++] = cfd->table_cache()->NewIterator(
    read_options, file_options_compactions,
    cfd->internal_comparator(), *flevel->files[i].file_metadata,
    range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
    /*table_reader_ptr=*/nullptr,
    /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
    /*arena=*/nullptr,
    /*skip_filters=*/false,
    /*level=*/static_cast<int>(c->level(which)),
    MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
    /*smallest_compaction_key=*/nullptr,
    /*largest_compaction_key=*/nullptr,
    /*allow_unprepared_value=*/false);
    }
    } else {
    // Create concatenating iterator for the files from this level
    list[num++] = new LevelIterator(
    cfd->table_cache(), read_options, file_options_compactions,
    cfd->internal_comparator(), c->input_levels(which),
    c->mutable_cf_options()->prefix_extractor.get(),
    /*should_sample=*/false,
    /*no per level latency histogram=*/nullptr,
    TableReaderCaller::kCompaction, /*skip_filters=*/false,
    /*level=*/static_cast<int>(c->level(which)), range_del_agg,
    c->boundaries(which));
    }
    }
    }
    assert(num <= space);
    InternalIterator* result =
    NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
    static_cast<int>(num));
    delete[] list;
    return result;
    }
    • 首先获取当前sub_compact所属的cfd。

    • 针对level-0,为其中的每一个sst文件构建一个table_cache迭代器,放入list中。

    • 针对其他非level-0的层,每一层直接创建一个级联的迭代器并放入list中。也就是这个迭代器从它的start就能够顺序访问到该层最后一个sst文件的最后一个key。

    • 将所有层的迭代器添加到一个迭代器数组list中,通过NewMergingIterator迭代器维护一个底层的排序堆结构,完成所有层之间的key-value的排序。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
      InternalIterator** list, int n,
      Arena* arena, bool prefix_seek_mode) {
      assert(n >= 0);
      if (n == 0) {
      return NewEmptyInternalIterator<Slice>(arena);
      } else if (n == 1) {
      return list[0];
      } else {
      if (arena == nullptr) {
      return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
      } else {
      auto mem = arena->AllocateAligned(sizeof(MergingIterator));
      return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
      }
      }
      }
      • 如果list是空的,则直接返回空。

      • 如果只有一个,那么认为这个迭代器本身就是有序的,不需要构建一个堆排序的迭代器(level-0的sst内部是有序的,之前创建的时候是为level-0每一个sst创建一个list元素;非level-0的整层都是有序的)。

      • 如果有多个,那么直接通过MergingIterator来创建堆排序的迭代器。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        MergingIterator(const InternalKeyComparator* comparator,
        InternalIterator** children, int n, bool is_arena_mode,
        bool prefix_seek_mode)
        : is_arena_mode_(is_arena_mode),
        comparator_(comparator),
        current_(nullptr),
        direction_(kForward),
        minHeap_(comparator_),
        prefix_seek_mode_(prefix_seek_mode),
        pinned_iters_mgr_(nullptr) {
        children_.resize(n);
        for (int i = 0; i < n; i++) {
        children_[i].Set(children[i]);
        }
        for (auto& child : children_) {
        AddToMinHeapOrCheckStatus(&child);
        }
        current_ = CurrentForward();
        }

        通过将传入的list也就是函数中的children中的所有元素添加到一个vector中,再遍历其中的每一个key-value,通过函数 AddToMinHeapOrCheckStatus构造底层结构堆,堆中的元素顺序是由用户参数option.comparator指定,默认是BytewiseComparator支持的lexicographical order,也就是字典顺序。

        1
        2
        3
        4
        5
        6
        7
        8
        void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
        if (child->Valid()) {
        assert(child->status().ok());
        minHeap_.push(child);
        } else {
        considerStatus(child->status());
        }
        }

通过SeekToFirst和Next指针处理元素

回到ProcessKeyValueCompaction函数,使用构造好的internalIterator再构造一个包含所有状态的CompactionIterator,直接初始化就可以,构造完成需要将CompactionIterator的内部指针放在整个迭代器最开始的部位,通过Next指针来获取下一个key-value,同时还需要需要在每次迭代器元素内部移动的时候除了调整底层堆中的字典序结构之外,还兼顾处理各个不同type的key数据,将kValueType,kTypeDeletion,kTypeSingleDeletion,kValueDeleteRange,kTypeMerge 等不同的key type处理完成。

1
2
3
4
5
6
7
8
9
10
11
c_iter->SeekToFirst();
......
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
const Slice& key = c_iter->key();
const Slice& value = c_iter->value();
......
c_iter->Next();
...
}

Write keys

这一步也在ProcessKeyValueCompaction函数中,将key-value写入SST文件中。

  • 确认key 的valueType类型,如果是data_block或者index_block类型,则放入builder状态机中

  • 优先创建filter_buiilder和index_builder,index builer创建成 分层格式(两层index leve, 第一层多个restart点,用来索引具体的datablock;第二层索引第一层的index block),方便加载到内存进行二分查找,节约内存消耗,加速查找;其次再写data_block_builder

  • 如果key的 valueType类型是 range_deletion,则加入到range_delete_block_builder之中

  • 先将data_block builder 利用绑定的输出的文件的writer写入底层文件

  • 将filter_block / index_builder / compress_builder/range_del_builder/properties_builder 按照对应的格式加入到 meta_data_builder之中,利用绑定ouput 文件的 writer写入底层存储

  • 利用meta_data_handle 和 index_handle 封装footer,写入底层存储

将builder与输出文件的writer绑定

默认的blockbase table SST文件有很多不同的block,除了data block之外,其他的block都是需要先写入到一个临时的数据结构builder,然后由builder通过其绑定的output文件的writer写入到底层磁盘形成磁盘的sst文件结构。

这里的逻辑就是将builder与output文件的writer进行绑定,创建好table builder。

1
2
3
4
5
6
7
// Open output file if necessary
if (sub_compact->builder == nullptr) {
status = OpenCompactionOutputFile(sub_compact);
if (!status.ok()) {
break;
}
}

通过table_builder的状态机添加block数据

然后调用builder->Add函数构造对应的builder结构,添加的过程主要是通过拥有三个状态的状态机完成不同block的builder创建,状态机是由构造tablebuilder的时候创建的。

1
2
3
4
status = sub_compact->AddToBuilder(key, value);
if (!status.ok()) {
break;
}
1
2
3
4
5
6
7
8
9
10
11
Status AddToBuilder(const Slice& key, const Slice& value) {
auto curr = current_output();
assert(builder != nullptr);
assert(curr != nullptr);
Status s = curr->validator.Add(key, value);
if (!s.ok()) {
return s;
}
builder->Add(key, value);
return Status::OK();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
ValueType value_type = ExtractValueType(key);
if (IsValueType(value_type)) {
#ifndef NDEBUG
if (r->props.num_entries > r->props.num_range_deletions) {
assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
}
#endif // !NDEBUG

auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
r->first_key_in_next_block = &key;
Flush();
if (r->state == Rep::State::kBuffered) {
bool exceeds_buffer_limit =
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
bool is_cache_full = false;

// Increase cache reservation for the last buffered data block
// only if the block is not going to be unbuffered immediately
// and there exists a cache reservation manager
if (!exceeds_buffer_limit && r->cache_rev_mng != nullptr) {
Status s = r->cache_rev_mng->UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
r->data_begin_offset);
is_cache_full = s.IsIncomplete();
}

if (exceeds_buffer_limit || is_cache_full) {
EnterUnbuffered();
}
}

// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
if (ok() && r->state == Rep::State::kUnbuffered) {
if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys->Clear();
} else {
r->index_builder->AddIndexEntry(&r->last_key, &key,
r->pending_handle);
}
}
}

// Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder.
if (r->state == Rep::State::kUnbuffered) {
if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys->PushBack(key);
} else {
if (r->filter_builder != nullptr) {
size_t ts_sz =
r->internal_comparator.user_comparator()->timestamp_size();
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
}
}
}

r->last_key.assign(key.data(), key.size());
r->data_block.Add(key, value);
if (r->state == Rep::State::kBuffered) {
// Buffered keys will be replayed from data_block_buffers during
// `Finish()` once compression dictionary has been finalized.
} else {
if (!r->IsParallelCompressionEnabled()) {
r->index_builder->OnKeyAdded(key);
}
}
// TODO offset passed in is not accurate for parallel compression case
NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);

} else if (value_type == kTypeRangeDeletion) {
r->range_del_block.Add(key, value);
// TODO offset passed in is not accurate for parallel compression case
NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);
} else {
assert(false);
}

r->props.num_entries++;
r->props.raw_key_size += key.size();
r->props.raw_value_size += value.size();
if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
r->props.num_deletions++;
} else if (value_type == kTypeRangeDeletion) {
r->props.num_deletions++;
r->props.num_range_deletions++;
} else if (value_type == kTypeMerge) {
r->props.num_merge_operands++;
}
}
  • kBuffered为状态机的初始状态。处于这个状态的时候,内存有较多缓存的未压缩的datablock。在该状态的过程中,通过 EnterUnbuffered 函数构造compression block,依此构建对应的index block和filterblock。最终将状态置为下一个状态的:kUnbuffered。
  • kUnbuffered这个状态时,compressing block已经通过之前的buffer中的data初步构造完成,且接下来将在这个状态通过 Finish 完成各个block的写入 或者通过 Abandon 丢弃当前的写入。
  • kClosed这个状态之前已经完成了table builder的finish或者abandon,那么接下来将析构当前的table builder。

对于第一个状态,进入下面的逻辑。如果data block能够满足flush的条件,则直接flush datablock的数据到当前bulider对应的datablock存储结构中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
r->first_key_in_next_block = &key;
Flush();
if (r->state == Rep::State::kBuffered) {
bool exceeds_buffer_limit =
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
bool is_cache_full = false;

// Increase cache reservation for the last buffered data block
// only if the block is not going to be unbuffered immediately
// and there exists a cache reservation manager
if (!exceeds_buffer_limit && r->cache_rev_mng != nullptr) {
Status s = r->cache_rev_mng->UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
r->data_begin_offset);
is_cache_full = s.IsIncomplete();
}

if (exceeds_buffer_limit || is_cache_full) {
EnterUnbuffered();
}
}

// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
if (ok() && r->state == Rep::State::kUnbuffered) {
if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys->Clear();
} else {
r->index_builder->AddIndexEntry(&r->last_key, &key,
r->pending_handle);
}
}
}

EnterUnbuffered函数主要逻辑是构造compression block,如果我们开启了compression的选项则会构造。

同时依据之前flush添加到datablock中的数据来构造index block和filter block,用来索引datablock的数据。选择在这里构造的话主要还是因为flush的时候表示一个完整的datablock已经写入完成,这里需要通过一个完整的datablock数据才有必要构造一条indexblock的数据。

其中data_block_and_keys_buffers数组存放的是未经过压缩的datablock数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
if (iter == nullptr) {
iter = get_iterator_for_block(i);
assert(iter != nullptr);
};

if (i + 1 < r->data_block_buffers.size()) {
next_block_iter = get_iterator_for_block(i + 1);
}

auto& data_block = r->data_block_buffers[i];

if (r->IsParallelCompressionEnabled()) {
Slice first_key_in_next_block;
const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
if (i + 1 < r->data_block_buffers.size()) {
assert(next_block_iter != nullptr);
first_key_in_next_block = next_block_iter->key();
} else {
first_key_in_next_block_ptr = r->first_key_in_next_block;
}

std::vector<std::string> keys;
for (; iter->Valid(); iter->Next()) {
keys.emplace_back(iter->key().ToString());
}

ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);

assert(block_rep != nullptr);
r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
r->get_offset());
r->pc_rep->EmitBlock(block_rep);
} else {
for (; iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (r->filter_builder != nullptr) {
size_t ts_sz =
r->internal_comparator.user_comparator()->timestamp_size();
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
}
r->index_builder->OnKeyAdded(key);
}
WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
if (ok() && i + 1 < r->data_block_buffers.size()) {
assert(next_block_iter != nullptr);
Slice first_key_in_next_block = next_block_iter->key();

Slice* first_key_in_next_block_ptr = &first_key_in_next_block;

iter->SeekToLast();
std::string last_key = iter->key().ToString();
r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr,
r->pending_handle);
}
}

EnterUnbuffered函数中创建index block。

1
2
3
4
5
6
7
8
9
10
11
12
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
&internal_comparator, use_delta_encoding_for_index_values,
table_options);
index_builder.reset(p_index_builder_);
} else {
index_builder.reset(IndexBuilder::CreateIndexBuilder(
table_options.index_type, &internal_comparator,
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
table_options));
}

回到ProcessKeyValueCompaction中的while循环中,不断遍历迭代器中的key,将其添加到对应的datablock,并完善indeblock和filter block,以及compression block。

通过构建的meta_index_builder和Footer完成数据的固化

接下来将通过FinishCompactionOutputFil对之前添加的builder数据进行整合,处理一些delete range的block以及更新当前compaction的边界。
这个函数调用是当之前累计的builder中block数据的大小达到可以写入的sst文件本身的大小max_output_file_size,会触发当前函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
//
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB)
bool output_file_ended = false;
if (sub_compact->compaction->output_level() != 0 &&
sub_compact->current_output_file_size >=
sub_compact->compaction->max_output_file_size()) {
// (1) this key terminates the file. For historical reasons, the iterator
// status before advancing will be given to FinishCompactionOutputFile().
output_file_ended = true;
}
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
if (partitioner.get()) {
last_key_for_partitioner.assign(c_iter->user_key().data_,
c_iter->user_key().size_);
}
c_iter->Next();
if (c_iter->status().IsManualCompactionPaused()) {
break;
}
if (!output_file_ended && c_iter->Valid()) {
if (((partitioner.get() &&
partitioner->ShouldPartition(PartitionerRequest(
last_key_for_partitioner, c_iter->user_key(),
sub_compact->current_output_file_size)) == kRequired) ||
(sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(
c_iter->key(), sub_compact->current_output_file_size))) &&
sub_compact->builder != nullptr) {
// (2) this key belongs to the next file. For historical reasons, the
// iterator status after advancing will be given to
// FinishCompactionOutputFile().
output_file_ended = true;
}
}
if (output_file_ended) {
const Slice* next_key = nullptr;
if (c_iter->Valid()) {
next_key = &c_iter->key();
}
CompactionIterationStats range_del_out_stats;
status = FinishCompactionOutputFile(input->status(), sub_compact,
&range_del_agg, &range_del_out_stats,
next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
}

FinishCompactionOutputFile函数内部最终调用s=sub_compact->builder->Finish()完成所有数据的固化写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
assert(r->state != Rep::State::kClosed);
bool empty_data_block = r->data_block.empty();
r->first_key_in_next_block = nullptr;
Flush();
if (r->state == Rep::State::kBuffered) {
EnterUnbuffered();
}
if (r->IsParallelCompressionEnabled()) {
StopParallelCompression();
#ifndef NDEBUG
for (const auto& br : r->pc_rep->block_rep_buf) {
assert(br.status.ok());
}
#endif // !NDEBUG
} else {
// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries first.
if (ok() && !empty_data_block) {
r->index_builder->AddIndexEntry(
&r->last_key, nullptr /* no next data block */, r->pending_handle);
}
}

// Write meta blocks, metaindex block and footer in the following order.
// 1. [meta block: filter]
// 2. [meta block: index]
// 3. [meta block: compression dictionary]
// 4. [meta block: range deletion tombstone]
// 5. [meta block: properties]
// 6. [metaindex block]
// 7. Footer
BlockHandle metaindex_block_handle, index_block_handle;
MetaIndexBuilder meta_index_builder;
WriteFilterBlock(&meta_index_builder);
WriteIndexBlock(&meta_index_builder, &index_block_handle);
WriteCompressionDictBlock(&meta_index_builder);
WriteRangeDelBlock(&meta_index_builder);
WritePropertiesBlock(&meta_index_builder);
if (ok()) {
// flush the meta index block
WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
&metaindex_block_handle, BlockType::kMetaIndex);
}
if (ok()) {
WriteFooter(metaindex_block_handle, index_block_handle);
}
r->state = Rep::State::kClosed;
r->SetStatus(r->CopyIOStatus());
Status ret_status = r->CopyStatus();
assert(!ret_status.ok() || io_status().ok());
return ret_status;
}

Compaction参数设置

参数 说明 默认值
write_buffer_size 限定Memtable的大小 64MB
level0_file_num_compaction_trigger 限定Level 0层的文件数量 4
target_file_size_base 每一层单个目标文件的大小 64MB
target_file_size_multiplier 每一层单个目标文件的乘法因子 1
max_bytes_for_level_base 每一层所有文件的大小 256MB
max_bytes_for_level_multiplier 每一层所有文件的乘法因子 10
level_compaction_dynamic_level_bytes 是否将Compact的策略改为层级从下往上应用 False
num_levels LSM的层级数量 7
  • 参数target_file_size_basetarget_file_size_multiplier用来限定Compact之后的每一层的单个文件大小。target_file_size_base是Level-1中每个文件的大小,Level N层可以用target_file_size_base * target_file_size_multiplier ^ (L -1) 计算。target_file_size_base 默认为64MB,target_file_size_multiplier默认为1。

  • 参数max_bytes_for_level_basemax_bytes_for_level_multiplier用来限定每一层所有文件的限定大小。 max_bytes_for_level_base是Level-1层的所有文件的限定大小。Level N层的所有文件的限定大小可以用 (max_bytes_for_level_base) * (max_bytes_for_level_multiplier ^ (L-1))计算。max_bytes_for_level_base的默认为256MB,max_bytes_for_level_multiplier默认为10。

  • 参数level_compaction_dynamic_level_bytes用来指示Compact的策略改为层级从下往上应用。Target_Size(Ln-1) = Target_Size(Ln) / max_bytes_for_level_multiplier来限定大小:假如 max_bytes_for_level_base是 1GB, num_levels设为6。最底层的实际容量是276GB, 所以L1-L6层的大小分别是 0, 0, 0.276GB, 2.76GB, 27.6GB and 276GB。

  • MutableDBOptions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct MutableDBOptions {
static const char* kName() { return "MutableDBOptions"; }
MutableDBOptions();
explicit MutableDBOptions(const MutableDBOptions& options) = default;
explicit MutableDBOptions(const DBOptions& options);

void Dump(Logger* log) const;

int max_background_jobs;
int base_background_compactions;
int max_background_compactions;
uint32_t max_subcompactions;
bool avoid_flush_during_shutdown;
size_t writable_file_max_buffer_size;
uint64_t delayed_write_rate;
uint64_t max_total_wal_size;
uint64_t delete_obsolete_files_period_micros;
unsigned int stats_dump_period_sec;
unsigned int stats_persist_period_sec;
size_t stats_history_buffer_size;
int max_open_files;
uint64_t bytes_per_sync;
uint64_t wal_bytes_per_sync;
bool strict_bytes_per_sync;
size_t compaction_readahead_size;
int max_background_flushes;
};
  • mutable_cf_options_
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
explicit MutableCFOptions(const ColumnFamilyOptions& options)
: write_buffer_size(options.write_buffer_size),
max_write_buffer_number(options.max_write_buffer_number),
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_size_ratio(
options.memtable_prefix_bloom_size_ratio),
memtable_whole_key_filtering(options.memtable_whole_key_filtering),
memtable_huge_page_size(options.memtable_huge_page_size),
max_successive_merges(options.max_successive_merges),
inplace_update_num_locks(options.inplace_update_num_locks),
prefix_extractor(options.prefix_extractor),
disable_auto_compactions(options.disable_auto_compactions),
soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit),
hard_pending_compaction_bytes_limit(
options.hard_pending_compaction_bytes_limit),
level0_file_num_compaction_trigger(
options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
level0_stop_writes_trigger(options.level0_stop_writes_trigger),
max_compaction_bytes(options.max_compaction_bytes),
target_file_size_base(options.target_file_size_base),
target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
ttl(options.ttl),
periodic_compaction_seconds(options.periodic_compaction_seconds),
max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional),
compaction_options_fifo(options.compaction_options_fifo),
compaction_options_universal(options.compaction_options_universal),
enable_blob_files(options.enable_blob_files),
min_blob_size(options.min_blob_size),
blob_file_size(options.blob_file_size),
blob_compression_type(options.blob_compression_type),
enable_blob_garbage_collection(options.enable_blob_garbage_collection),
blob_garbage_collection_age_cutoff(
options.blob_garbage_collection_age_cutoff),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
check_flush_compaction_key_order(
options.check_flush_compaction_key_order),
paranoid_file_checks(options.paranoid_file_checks),
report_bg_io_stats(options.report_bg_io_stats),
compression(options.compression),
bottommost_compression(options.bottommost_compression),
compression_opts(options.compression_opts),
bottommost_compression_opts(options.bottommost_compression_opts),
bottommost_temperature(options.bottommost_temperature),
sample_for_compression(
options.sample_for_compression) { // TODO: is 0 fine here?
RefreshDerivedOptions(options.num_levels, options.compaction_style);
}

Some Concepts

  • Slice is a simple structure containing a pointer into some external storage and a size.
  • parents && grandparents: parent=level+1 grandparent==level+2
  • column family(cfd)
  • compaction filter
  • compression
  • sst file maneger(sfm)
  • background(bg)

Reference

Author

Yiheng Tong

Posted on

2021-09-24

Updated on

2022-11-14

Licensed under


Comments