LevelDB剖析(6):Iterator 和 Snapshot

一、Iterators Done Right
LevelDB中不少场景都存在遍历记录的需求,如:
1) Get:在Block内“遍历”查找某个key
2) Compaction:遍历所有输入table文件,并合并输出
3) DB提供的遍历所有数据的外部接口

某些流程可能比较复杂,比如考虑如何实现3),整体上要对内存、磁盘上的所有数据做遍历+归并去重,memtable和table的遍历机制显然不同,并且level0与其他level又不相同:level0内文件有交集,需要同时参与归并,而level_1及以上则按照key的顺序一个个文件遍历即可;每个文件内部则是根据index按顺序分别去遍历每个block等等. 可以看到这里面需要分别遍历不同的结构,其中某些结构的结果需要做合并处理,某些结构需要根据一定的规则拼接在一起. LevelDB通过使用Iterator抽象模式,组合表达出了各种语义,使得相关代码组织非常紧凑清晰,通用性也高.

我们先看所有Iterator的基类接口:

class Iterator {
 public:
  Iterator();
  virtual ~Iterator();

  virtual bool Valid() const = 0;
  virtual void SeekToFirst() = 0;
  virtual void SeekToLast() = 0;
  virtual void Seek(const Slice& target) = 0;
  virtual void Next() = 0;
  virtual void Prev() = 0;
  virtual Slice key() const = 0;
  virtual Slice value() const = 0;
  virtual Status status() const = 0;
  ...
}   

其中key()和value()分别返回当前位置指向的记录的键值部分. 可以想象,所有结构都可以通过Iterator来提供在其上的遍历和查找等操作,比如Memtable::Iterator,Table::Iterator,Block::Iterator等,尽管底层实现各不相同(如Memtable在其SkipList上遍历,Block在其内存块里遍历),但都有相同的抽象接口,其好处就是,可以方便地通过各iterator间的组合来实现功能更强大的个性化的iterator,如LevelDB::Iterator.

首先LevelDB注意到了上述两类对遍历输出流整合的需求:结果合并(Merge)以及结果拼接(Concatenate). 本质上拼接的场景也可以用合并来代替,如在level_1层遍历可以同时打开所有table做归并,但由于table间是严格有序的,完全可以按顺序一个个文件输出,即文件在需要的时候再打开,这种lazy的方式更加节省资源(smaller footprint)也更灵活(真正需要时才使用). 因此,LevelDB首先实现了这两种通用Iterator:

class MergingIterator : public Iterator {
 public:
  MergingIterator(const Comparator* comparator, Iterator** children, int n)
      : comparator_(comparator),
        children_(new IteratorWrapper[n]),
        n_(n),
        current_(NULL),
        direction_(kForward) {
    for (int i = 0; i < n; i++) {
      children_[i].Set(children[i]);
    }
  }

  virtual ~MergingIterator() {
    delete[] children_;
  }

  virtual void Next();
  virtual void Prev();
  ...
}

typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&);

class TwoLevelIterator: public Iterator {
 public:
  TwoLevelIterator(
    Iterator* index_iter,
    BlockFunction block_function,
    void* arg,
    const ReadOptions& options);

  virtual ~TwoLevelIterator();

  virtual void Next();
  virtual void Prev();
  ...
}

先来看MergingIterator,构造时需要传待合并的n个child Iterator,那么其Next/Prev等操作可想而知,实现时要先找到所有child Iterator中当前位置最小(或最大)的key,并调用其上的Next/Prev,等价于多路归并. 再来看TwoLevelIterator怎么实现拼接多个Iterator,这里为了灵活和通用,每个待拼接的Iterator是以Generator的方式动态产生的,即代码中的BlockFunction,而index_iter是用于生成Generator参数的一个meta Iterator,BlockFunction根据index_iter的value动态产生每个Iterator,整体上是一个两层结构,故叫做TweLevelIterator.

接下来看看LevelDB::NewIterator接口的实现,该用于Iterator遍历当前DB所有用户的key-value记录:

Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
                                      SequenceNumber* latest_snapshot,
                                      uint32_t* seed) {
  IterState* cleanup = new IterState;
  mutex_.Lock();
  *latest_snapshot = versions_->LastSequence();

  // Collect together all needed child iterators
  std::vector<Iterator*> list;
  list.push_back(mem_->NewIterator());
  mem_->Ref();
  if (imm_ != NULL) {
    list.push_back(imm_->NewIterator());
    imm_->Ref();
  }
  versions_->current()->AddIterators(options, &list);
  Iterator* internal_iter =
      NewMergingIterator(&internal_comparator_, &list[0], list.size());
  versions_->current()->Ref();

  cleanup->mu = &mutex_;
  cleanup->mem = mem_;
  cleanup->imm = imm_;
  cleanup->version = versions_->current();
  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);

  *seed = ++seed_;
  mutex_.Unlock();
  return internal_iter;
}

第10行和第13行分别获取memtable和immutable memtable的Iterator并加入列表,第16行versions_->current()->AddIterators将在所有level上遍历的Iterator加入列表,最后NewMergingIterator基于列表中各Iterator构造MergingIterator返回给上层. 下面再看下AddIterators的实现:

void Version::AddIterators(const ReadOptions& options,
                           std::vector<Iterator*>* iters) {
  // Merge all level zero files together since they may overlap
  for (size_t i = 0; i < files_[0].size(); i++) {
    iters->push_back(
        vset_->table_cache_->NewIterator(
            options, files_[0][i]->number, files_[0][i]->file_size));
  }

  // For levels > 0, we can use a concatenating iterator that sequentially
  // walks through the non-overlapping files in the level, opening them
  // lazily.
  for (int level = 1; level < config::kNumLevels; level++) {
    if (!files_[level].empty()) {
      iters->push_back(NewConcatenatingIterator(options, level));
    }
  }
}

Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
                                            int level) const {
  return NewTwoLevelIterator(
      new LevelFileNumIterator(vset_->icmp_, &files_[level]),
      &GetFileIterator, vset_->table_cache_, options);
}

首先对于level_0,分别获取每个table文件的Iterator并加入到结果列表,而对于level_1及以上,由于table间是有序不相交的,因此分别对每个level构造1个ConcatenatingIterator:LevelFileNumIterator作为meta Iterator用于遍历level内文件编号,GetFileIterator根据文件编号生成该table的Iterator.

最后来看下Table::Iterator的实现:

Iterator* Table::NewIterator(const ReadOptions& options) const {
  return NewTwoLevelIterator(
      rep_->index_block->NewIterator(rep_->options.comparator),
      &Table::BlockReader, const_cast<Table*>(this), options);
}

可以看到也是一个TwoLevelIterator的组合,第一层index_block的Iterator用于在index上遍历block的位置,第二层则是根据block的位置Load进内存并最终构造Block::Iterator(Table::BLockReader函数).

综上,通过对Iterator抽象接口的简单组合,使得原本复杂的遍历过程在编码上变得通用(复用)和简单了,是一种典型可扩展编码方式,个人认为其中也有函数式编程思想的影子.

leveldb_iter

二、Snapshot
LevelDB支持在某个特定历史快照上的读操作,如:

  leveldb::ReadOptions options;
  options.snapshot = db->GetSnapshot();

  //... apply some updates to db ...

  leveldb::Iterator* iter = db->NewIterator(options);

  //... read using iter to view the state when the snapshot was created ...

  delete iter;
  db->ReleaseSnapshot(options.snapshot);

鉴于LevelDB的插入方式和内部Key的表示,实现Snapshot是比较简单直接的. 具体来说,LevelDB内部只有插入操作,即在同一UserKey上删除或更新请求被转化成为了一个新的InternalKey,其格式为[UserKey+SeqNumber+Tag],其中SeqNumber是全局递增的,可以视为每次外部修改DB后产生的DB版本号,Tag用于标记是更新还是删除;因此同一个UserKey上SeqNumber越大则内容越新,所谓Snapshot即允许用户读取特定SeqNumber下的记录. LevelDB里无论是memtable还是sstable其记录都按上述InternalKey排序:首先按UserKey比较,如果UserKey相同则SeqNumber越大InternalKey越小(位置靠前). 那么指定Snapshot的查询就很简单了:

leveldb_snapshot

如图,假如'Bob'这个key曾先后发生过插入、删除和再次插入,在DB中对应SeqNum版本号分别为100064、100087、100093的3条更新记录,默认情况下查询'Bob'会取到最新的100093记录,如果用户指定快照号100090查询,则应当返回<=100090的SeqNum最大的那条记录,即100087,而根据Tag这是一条删除记录,那么应当返回'记录不存在'. 对100090号快照来说,最新100093是一条未来产生的记录,不可见,而100064是一条过时的无效记录(masked out by delete). DB提供的GetSnapshot接口用于获取当前DB的最新快照号, 随后用户可以在任何时候安全访问此快照下的数据,直到调用ReleaseSnapshot.

历史快照带来的一个主要问题是:数据膨胀,可能存在大量不用的冗余历史数据. 因此需要一种清理机制来对DB瘦身. 但如果要保证任何时候用户都能拿到所有版本的数据则无法进行处理,因此LevelDB在设计上做了合理的折衷,即假如用户不表达对某个历史快照的关注意向,则默认其是可回收的, 而一旦用户关注了该快照版本(GetSnapshot)或正在读取快照数据,则保证不会被回收,而一旦用户调用ReleaseSnapshot则会又被标记为可回收. 比较自然地,旧数据回收可以在Compaction过程中进行,主要丢弃以下两类记录:a) 版本号旧于当前用户关注的最老Snapshot的历史记录;b) 没有mask掉任何历史值的Delete记录.

发表评论

电子邮件地址不会被公开。