LevelDB剖析(4):Get, Write 和 Recover

前文分别介绍了LevelDB在持久层和内存中的数据组织,现在可以来看看LevelDB中的几个主要接口的流程实现.

一、Get
先简单了解下DB元数据在内存中的内容,主要就是从level到其table文件信息列表的一个映射,文件信息具体包括文件ID(数字标识)、该文件内的最大Key和最小Key、文件大小等. Get流程比较直接,我们按照数据从新到旧的顺序依次从对应位置查找即可:Memtable -> Immutable Memtable(正在Dump的memtable,如果存在) -> Level_0 tables -> Level_1 table -> Level_2 table -> ... Level_k table,对应下面代码的高亮部分:

Status DBImpl::Get(const ReadOptions& options,
                   const Slice& key,
                   std::string* value) {
  Status s;
  MutexLock l(&mutex_);
  SequenceNumber snapshot;
  if (options.snapshot != NULL) {
    snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
  } else {
    snapshot = versions_->LastSequence();
  }

  MemTable* mem = mem_;
  MemTable* imm = imm_;
  Version* current = versions_->current();
  mem->Ref();
  if (imm != NULL) imm->Ref();
  current->Ref();

  bool have_stat_update = false;
  Version::GetStats stats;

  // Unlock while reading from files and memtables
  {
    mutex_.Unlock();
    // First look in the memtable, then in the immutable memtable (if any).
    LookupKey lkey(key, snapshot);
    if (mem->Get(lkey, value, &s)) {
      // Done
    } else if (imm != NULL && imm->Get(lkey, value, &s)) {
      // Done
    } else {
      s = current->Get(options, lkey, value, &stats);
      have_stat_update = true;
    }
    mutex_.Lock();
  }

  if (have_stat_update && current->UpdateStats(stats)) {
    MaybeScheduleCompaction();
  }
  mem->Unref();
  if (imm != NULL) imm->Unref();
  current->Unref();
  return s;
}

如果在memtable中没有找到,current->Get内会按照从level_0到level_k的顺序,根据key和DB元数据定位到其可能所在的sstable文件,打开文件并加载其index等元数据,最终调用LevelDB剖析(2):sstable 和 recordio里提到的Table::InternalGet进行查找. 这里注意由于level_0文件相互间范围有交集,可能需要查找多个table,但level_1到level_k最多只需要查1个,并且在只要在低层level查到就可以直接返回了.

另外注意到Get时还允许用户指定snapshot,以及查询完的MaybeScheduleCompaction调用,后面会分别解释.

Get流程示意图:

leveldb_get

Get过程支持与其他操作并发进行,对于memtable通过原子指针的ReleaseStore和AcquireLoad与Write操作同步,而通过对当前版本和文件的引用计数来防止Compaction完成后删除正在进行查询的table及其内存相关数据.

二、Put和Delete
插入和删除在LevelDB中是一致处理的,统一用Write接口,Delete(key)操作被转化为插入一个DeletionMarker,value为空. Write的核心逻辑很简单:先写入log,再写入memtable即可(memtable的落地由其Compaction线程处理),对应下面代码高亮部分:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

有几个地方值得注意: 1) 由于插入memtable以及写log不允许并发,因此代码起始处的writers队列用于Write请求排队串行处理(线程安全),每次从队头取Write任务;2) 为了提高效率,从writer中会取尽量多的请求进行合并后批量写,对应于21行的BuildBatchGroup调用;任务完成后还要分别逐个通知可能还在队列里的任务,对应尾部的while(true)循环. 3) 当前memtable写满时需要切换新memtable,旧memtable冻结并等待dump;假如新memtable也满了而旧memtable的dump还未完成,则无法写入,需要阻塞写请求;这些重要逻辑都在17行的MakeRoomForWrite中处理.

三、Recover
考虑以下几种异常情况:
1) Write完成后进程挂了,由于成功提交到memtable的数据在log中都有完整的一份,因此通过读log可以恢复到内存.
2) 写log成功,写memtable时进程挂了,这时灾后recover仍然可以继续提交到memtable.
3) 写log时record写到一半挂了,recover时利用log的recordio格式可以成功跳过此条记录.
4) immutable memtable在dump的过程中挂了,recover时需要先从log恢复出immutable memtable做dump,再恢复memtable.
5) Compaction做到一半挂了,此时并没有更新manifest(元数据增量log文件),所以下次recover时会恢复到Compaction之前的版本,再重启Compaction即可.
6) Compaction完成后写manifest.log的途中挂了,同3),会忽略此错误记录,相当于Compaction白做了需要重做一次,之前遗留下的垃圾文件会在适当的时机删除.

leveldb本身只是个lib,根据1)-3),设想如果将其封装成服务,则只需在log写入成功时即对外返回成功,这样就保证了对数据状态理解的一致性.

LevelDB每次Open时都会调用Recover,代码如下:

Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
  mutex_.AssertHeld();

  // Ignore error from CreateDir since the creation of the DB is
  // committed only when the descriptor is created, and this directory
  // may already exist from a previous failed creation attempt.
  env_->CreateDir(dbname_);
  assert(db_lock_ == NULL);
  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
  if (!s.ok()) {
    return s;
  }

  if (!env_->FileExists(CurrentFileName(dbname_))) {
    if (options_.create_if_missing) {
      s = NewDB();
      if (!s.ok()) {
        return s;
      }
    } else {
      return Status::InvalidArgument(
          dbname_, "does not exist (create_if_missing is false)");
    }
  } else {
    if (options_.error_if_exists) {
      return Status::InvalidArgument(
          dbname_, "exists (error_if_exists is true)");
    }
  }

  s = versions_->Recover(save_manifest);
  if (!s.ok()) {
    return s;
  }
  SequenceNumber max_sequence(0);

  // Recover from all newer log files than the ones named in the
  // descriptor (new log files may have been added by the previous
  // incarnation without registering them in the descriptor).
  //
  // Note that PrevLogNumber() is no longer used, but we pay
  // attention to it in case we are recovering a database
  // produced by an older version of leveldb.
  const uint64_t min_log = versions_->LogNumber();
  const uint64_t prev_log = versions_->PrevLogNumber();
  std::vector<std::string> filenames;
  s = env_->GetChildren(dbname_, &filenames);
  if (!s.ok()) {
    return s;
  }
  std::set<uint64_t> expected;
  versions_->AddLiveFiles(&expected);
  uint64_t number;
  FileType type;
  std::vector<uint64_t> logs;
  for (size_t i = 0; i < filenames.size(); i++) {
    if (ParseFileName(filenames[i], &number, &type)) {
      expected.erase(number);
      if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
        logs.push_back(number);
    }
  }
  if (!expected.empty()) {
    char buf[50];
    snprintf(buf, sizeof(buf), "%d missing files; e.g.",
             static_cast<int>(expected.size()));
    return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
  }

  // Recover in the order in which the logs were generated
  std::sort(logs.begin(), logs.end());
  for (size_t i = 0; i < logs.size(); i++) {
    s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
                       &max_sequence);
    if (!s.ok()) {
      return s;
    }

    // The previous incarnation may not have written any MANIFEST
    // records after allocating this log number.  So we manually
    // update the file number allocation counter in VersionSet.
    versions_->MarkFileNumberUsed(logs[i]);
  }

  if (versions_->LastSequence() < max_sequence) {
    versions_->SetLastSequence(max_sequence);
  }

  return Status::OK();
}

首先通过LockFile在LevelDB目录下获取进程锁文件防止多进程并发损坏数据(进程间互斥),然后31行的versions_->Recover从manifest.log(元数据log文件)恢复出DB元数据到内存,即最新版本中每个level对应有哪些table文件等等. 然后检查元数据中所需的文件是否在当前目录下都存在,不满足则报异常退出. 最后73行调用RecoverLogFile恢复所有的log文件,一般情况下最多有2个,分别对应上一次运行时的immutable memtable和memtable. RecoverLogFile的逻辑即逐条读取log record然后插入到memtable中去,满了则dump到新table并切换新memtable继续. 因此当log较多(memtable设置较大)时可能会导致较长的Open时间.

发表评论

电子邮件地址不会被公开。