|
= (env_->NowMicros() - imm_start); } Slice key = input->key(); //如果当前于grandparent层产生overlap的size超过阈值,立即结束当前写入的table的构造,写入磁盘 if (compact->compaction->ShouldStopBefore(key) && compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } } // Handle key/value, add to state, etc. //key舍弃标志位 bool drop = false; //key解析错误,放弃 if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; } else { //key与前面的key重复,丢弃 if (!has_current_user_key || user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 0) { // First occurrence of this user key current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; } //key是删除类型,丢弃 if (last_sequence_for_key <= compact->smallest_snapshot) { // Hidden by an newer entry for same user key drop = true; // (A) } else if (ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers // (3) data in layers that are being compacted here and have // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; } last_sequence_for_key = ikey.sequence; } #if 0 Log(options_.info_log, " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " "%d smallest_snapshot: %d", ikey.user_key.ToString().c_str(), (int)ikey.sequence, ikey.type, kTypeva lue, drop, compact->compaction->IsBaseLevelForKey(ikey.user_key), (int)last_sequence_for_key, (int)compact->smallest_snapshot); #endif if (!drop) { //如果output sstable未生成,构造新的tablebuilder // Open output file if necessary if (compact->builder == NULL) { status = OpenCompactionOutputFile(compact); if (!status.ok()) { break; } } //第一次写入的key作为output的smallest key if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } //新的key写入时,更新largest key,并add进table compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value()); // Close output file if it is big enough //当前sstable太大了就结束table构造 if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } } } //下一个key input->Next(); } if (status.ok() && shutting_down_.Acquire_Load()) { status = Status::IOError("Deleting DB during compaction"); } if (status.ok() && compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); } if (status.ok()) { status = input->status(); } delete input; input = NULL; //将此次compact的信息加入dbimpl::status_ CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; for (int which = 0; which < 2; which++) { for (int i = 0; i < compact->compaction->num_input_files(which); i++) { stats.bytes_read += compact->compaction->input(which, i)->file_size; } } for (size_t i = 0; i < compact->outputs.size(); i++) { stats.bytes_written += compact->outputs[i].file_size; } mutex_.Lock(); stats_[compact->compaction->level() + 1].Add(stats); if (status.ok()) { status = InstallCompactionResults(compact); } if (!status.ok()) { RecordBackground |