leveldb之Version VersionEdit and VersionSet

| 分类 leveldb  | 标签 leveldb 

前言

上一篇介绍了MemTable Dump成SSTable的过程。包括Compaction的整体流程。但是Compaction并未介绍完毕。此处不得不横插一杠子,先介绍Version和VersionEdit以及VersionSet。原因是Compaction和Version相关的部分交织在一起,不搞明白Version很难理解Compaction。其实这句话反过来说也是对的,如果不理解Compaction也很难理解VersionSet。

VersionEdit

Version相关的数据结构有3个,Version VersionEdit and VersionSet。其中VersionEdit顾名思义,是编辑或修改Version,它记录的是两个Version之间的差异。简单的说

    Version0 + VersionEdit = Version1

对于VersionEdit,有如下成员变量:

 private:
  friend class VersionSet;

  typedef std::set< std::pair<int, uint64_t> > DeletedFileSet;

  std::string comparator_;
  uint64_t log_number_;
  uint64_t prev_log_number_;
  uint64_t next_file_number_;
  SequenceNumber last_sequence_;
  bool has_comparator_;
  bool has_log_number_;
  bool has_prev_log_number_;
  bool has_next_file_number_;
  bool has_last_sequence_;

  std::vector< std::pair<int, InternalKey> > compact_pointers_;
  DeletedFileSet deleted_files_;
  std::vector< std::pair<int, FileMetaData> > new_files_;

LevelDB中的文件主要的文件有 数据文件即sstable 文件, log文件,manifest文件,current文件(指向当前的manifest),LOG文件和lock文件,而VersionEdit中,最重要的3个成员变量是:

  std::vector< std::pair<int, InternalKey> > compact_pointers_;
  DeletedFileSet deleted_files_;
  std::vector< std::pair<int, FileMetaData> > new_files_;

第一个compact_pointers按下不表,第二个和第三个顾名思义,就是相对于上一个Version,新的Version新增了那些文件以及删除了那些文件。 注意,从一个版本到到另一个版本的过渡,是由Compaction引起的。

Compaction分成两种:Minor Compaction和Major Compaction。

Minor Compaction是说,用户输入的key-value足够多了,需要讲memtable转成immutable memtable,然后讲immutable memtable dump成 sstable,而这种情况下,sstable文件多了一个,而能属于level0,也能属于level1或者level2。这种情况下,我们成为version发生了变化,需要升级版本。这种情况比较简单,基本上是新增一个文件。

Major Compaction 要复杂一些,它牵扯到两个Level的文件。它会计算出重叠部分的文件,然后归并排序,merge成新的sstable文件,一旦新的文件merge完毕,老的文件也就没啥用了。因此对于这种Compaction除了new_files_还有deleted_files_(当然还有compaction_pointers_)。

为了更深入地理解version以及后面的Compaction,我们需要介绍下FileMetaData这个数据结构,这个数据结构非常的重要,正是因为存在这个数据结构,才能很方面的选择which文件需要Compaction。

struct FileMetaData {
  int refs;
  int allowed_seeks;          // Seeks allowed until compaction
  uint64_t number;
  uint64_t file_size;         // File size in bytes
  InternalKey smallest;       // Smallest internal key served by table
  InternalKey largest;        // Largest internal key served by table

  FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { }
};

注意sstable文件的名字是 number.sst 如11423.sst这种格式,只要一个number就可以表示该SSTable文件的名字,除此外还存放着该SSTable文件的长度。因为SSTable文件里面的键值是有序的,因此,最大的key和最小的key就足矣描述key的范围。

Version and VersionSet and MVCC

对于同一笔记录,如果读和写同一时间发生,reader可能读到不一致的数据或者是修改了一半的数据。对于这种情况,有三种常见的解决方法:

悲观锁   最简单的处理方式,就是加锁保护,写的时候不许读,读的时候不许写。效率低。

乐观锁   它假设多用户并发的事物在处理时不会彼此互相影响,各食物能够在不产生锁的的情况下处理各自影响的那部分数据。
        在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。
        如果其他事务有更新的话,正在提交的事务会进行回滚;这样做不会有锁竞争更不会产生死锁,
        但如果数据竞争的概率较高,效率也会受影响 。

MVCC    MVCC是一个数据库常用的概念。Multiversion concurrency control多版本并发控制。每一个执行操作的用户,
        看到的都是数据库特定时刻的的快照(snapshot), writer的任何未完成的修改都不会被其他的用户所看到;
        当对数据进行更新的时候并是不直接覆盖,而是先进行标记, 然后在其他地方添加新的数据,从而形成一个新版本, 
        此时再来读取的reader看到的就是最新的版本了。所以这种处理策略是维护了多个版本的数据的,但只有一个是最新的。

sstable级别的MVCC就是利用Version实现的。

  • 只有一个current version,持有最新的sstable集合。
  • VersionEdit 代表一次更新,新增了哪些sstable file,以及删除了哪些sstable file

那么我们来看Version类的定义:

class Version {
 public:

  void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);


  struct GetStats {
    FileMetaData* seek_file;
    int seek_file_level;
  };
  Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
             GetStats* stats);


  bool UpdateStats(const GetStats& stats);

  // Record a sample of bytes read at the specified internal key.
  // Samples are taken approximately once every config::kReadBytesPeriod
  // bytes.  Returns true if a new compaction may need to be triggered.
  // REQUIRES: lock is held
  bool RecordReadSample(Slice key);

  // Reference count management (so Versions do not disappear out from
  // under live iterators)
  void Ref();
  void Unref();

  void GetOverlappingInputs(
      int level,
      const InternalKey* begin,         // NULL means before all keys
      const InternalKey* end,           // NULL means after all keys
      std::vector<FileMetaData*>* inputs);

  // Returns true iff some file in the specified level overlaps
  // some part of [*smallest_user_key,*largest_user_key].
  // smallest_user_key==NULL represents a key smaller than all keys in the DB.
  // largest_user_key==NULL represents a key largest than all keys in the DB.
  bool OverlapInLevel(int level,
                      const Slice* smallest_user_key,
                      const Slice* largest_user_key);


  /*该函数用来选择  需要将从MemTable dump出的sstable file放入第几层*/
  int PickLevelForMemTableOutput(const Slice& smallest_user_key,
                                 const Slice& largest_user_key);

  /*判断某层level的文件个数*/
  int NumFiles(int level) const { return files_[level].size(); }

  // Return a human readable string that describes this version's contents.
  std::string DebugString() const;

 private:
  friend class Compaction;
  friend class VersionSet;

  class LevelFileNumIterator;
  Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;

  // Call func(arg, level, f) for every file that overlaps user_key in
  // order from newest to oldest.  If an invocation of func returns
  // false, makes no more calls.
  //
  // REQUIRES: user portion of internal_key == user_key.
  void ForEachOverlapping(Slice user_key, Slice internal_key,
                          void* arg,
                          bool (*func)(void*, int, FileMetaData*));

  /*所有的version都属于一个集合即Version Set*/
  VersionSet* vset_;  
  
  /*VersionSet是Version组成的双链表,因此Version需要记录前一个Version和后一个Version*/          
  Version* next_;              
  Version* prev_;               
  int refs_;                   

  /*该version下的所有level的所有sstable文件,每个文件由FileMetaData表示*/
  std::vector<FileMetaData*> files_[config::kNumLevels];

  // Next file to compact based on seek stats.
  FileMetaData* file_to_compact_;
  int file_to_compact_level_;


  /********************************************************************
   *Compaction需要用compaction_score_来判断是否需要发起major compaction
   * 这部分逻辑与某level所有SSTable file的大小有关系
   ********************************************************************/
  double compaction_score_;
  int compaction_level_;

  explicit Version(VersionSet* vset)
      : vset_(vset), next_(this), prev_(this), refs_(0),
        file_to_compact_(NULL),
        file_to_compact_level_(-1),
        compaction_score_(-1),
        compaction_level_(-1) {
  }

  ~Version();

  // No copying allowed
  Version(const Version&);
  void operator=(const Version&);
};

LevelDB将所有的Version置于一个双向链表之中,即位于一个集合之中。这样所有的Version组成一个名为VersionSet的结构。

LevelDB会触发Compaction,会对一些文件进行清理操作,让数据更加有序,清理后的数据放到新的版本里面,而老的数据作为原始的素材,最终是要清理掉的,但是如果有读事务位于旧的文件,那么暂时就不能删除。因此利用引用计数,只要一个Verison还活着,就不允许删除该Verison管理的所有文件。当一个Version生命周期结束,它管理的所有文件的引用计数减1.

Version::~Version() {
  assert(refs_ == 0);

  /* 从VersionSet中注销 */
  prev_->next_ = next_;
  next_->prev_ = prev_;

  /*本Version下所有的文件,引用计数减1*/
  for (int level = 0; level < config::kNumLevels; level++) {
    for (size_t i = 0; i < files_[level].size(); i++) {
      FileMetaData* f = files_[level][i];
      assert(f->refs > 0);
      f->refs--;
      if (f->refs <= 0) {
        delete f;
      }
    }
  }
}

从上图我们看到了,从Version 升级到另一个Version中间靠的是VersionEdit。VersionEdit告诉我们哪些文件可以删除了,哪些文件是新增的。这个过程是LogAndApply。为了方便实现

Version(N) + VersionEdit(N) = Version(N+1)

引入了Build数据结构,这个数据结构是一个helper类,帮忙实现Version的跃升。我们下面重点分析LogAndApply。

LogAndApply

调用LogAndApply的时机有4个,其中第一个是打开DB的时候,其余3个都与Compaction有关系。

  • Open DB 的时候,有些记录在上一次操作中,可能有一些记录只在log中,并未写入sstable,因此需要replay, 有点类似journal文件系统断电之后的replay操作。

  • Immutable MemTable dump成SStable之后,调用LogAndApply
  • 如果是非manual,同时仅仅是sstable文件简单地在不同level之间移动,并不牵扯两个不同level的sstable之间归并排序,就直接调用LogAndApply
  • Major Compaction,不同level的文件存在交叉,需要归并排序,生成新的不交叉重叠的sstable文件,同时可能将老的文件废弃。

如下图所示:

对于Compaction而言,它的作用是 : 或者是通过MemTable dump,生成新的sstable文件(一般是level0 或者是level1或者是level2),在一定的时机下,整理已有的sstable,通过归并排序,将某些文件推向更高的level,让数据集合变得更加有序。

生成完毕新文件后,需要产生新的version,这就是LogAndApply的做的事情。

Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
  if (edit->has_log_number_) {
    assert(edit->log_number_ >= log_number_);
    assert(edit->log_number_ < next_file_number_);
  } else {
    edit->SetLogNumber(log_number_);
  }

  if (!edit->has_prev_log_number_) {
    edit->SetPrevLogNumber(prev_log_number_);
  }

  edit->SetNextFile(next_file_number_);
  edit->SetLastSequence(last_sequence_);

  Version* v = new Version(this);
  {
    Builder builder(this, current_);
    builder.Apply(edit);
    builder.SaveTo(v);
  }
  Finalize(v);

  // Initialize new descriptor log file if necessary by creating
  // a temporary file that contains a snapshot of the current version.
  std::string new_manifest_file;
  Status s;
  if (descriptor_log_ == NULL) {
    // No reason to unlock *mu here since we only hit this path in the
    // first call to LogAndApply (when opening the database).
    assert(descriptor_file_ == NULL);
    new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
    edit->SetNextFile(next_file_number_);
    s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
    if (s.ok()) {
      descriptor_log_ = new log::Writer(descriptor_file_);
      s = WriteSnapshot(descriptor_log_);
    }
  }

  // Unlock during expensive MANIFEST log write
  {
    mu->Unlock();

    // Write new record to MANIFEST log
    if (s.ok()) {
      std::string record;
      edit->EncodeTo(&record);
      s = descriptor_log_->AddRecord(record);
      if (s.ok()) {
        s = descriptor_file_->Sync();
      }
      if (!s.ok()) {
        Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
      }
    }

    // If we just created a new descriptor file, install it by writing a
    // new CURRENT file that points to it.
    if (s.ok() && !new_manifest_file.empty()) {
      s = SetCurrentFile(env_, dbname_, manifest_file_number_);
    }

    mu->Lock();
  }

  // Install the new version
  if (s.ok()) {
    AppendVersion(v);
    log_number_ = edit->log_number_;
    prev_log_number_ = edit->prev_log_number_;
  } else {
    delete v;
    if (!new_manifest_file.empty()) {
      delete descriptor_log_;
      delete descriptor_file_;
      descriptor_log_ = NULL;
      descriptor_file_ = NULL;
      env_->DeleteFile(new_manifest_file);
    }
  }

  return s;
}

构造Builder

首先是以当前的Version为基础,创建出来一个Build类,这个类是一个help类:

    Builder builder(this, current_);

调用的构造函数如下:

  Builder(VersionSet* vset, Version* base)
      : vset_(vset),
        base_(base) {
    base_->Ref();
    BySmallestKey cmp;
    cmp.internal_comparator = &vset_->icmp_;
    for (int level = 0; level < config::kNumLevels; level++) {
      levels_[level].added_files = new FileSet(cmp);
    }
  }

对于Builder这个类,基本的成员有:

  typedef std::set<FileMetaData*, BySmallestKey> FileSet;
  struct LevelState {
    std::set<uint64_t> deleted_files;
    FileSet* added_files;
  };

  VersionSet* vset_;
  Version* base_;
  LevelState levels_[config::kNumLevels];

初始化的时候,base_这个指针指向current_ 而,vset_指针指向vset,即当前VersionSet。至于各个level的文件,初始化成空的集合。

apply

    builder.Apply(edit);

这一步是将版本与版本的变化部分VersionEdit 记录在Builder

  // Apply all of the edits in *edit to the current state.
  /*此处不好理解,后面讲到Compaction的部分再来分析*/
  void Apply(VersionEdit* edit) {
    // Update compaction pointers
    for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
      const int level = edit->compact_pointers_[i].first;
      vset_->compact_pointer_[level] =
          edit->compact_pointers_[i].second.Encode().ToString();
    }

    /*将删除部分的文件,包括文件所属的层级,记录在Builder的数据结构中
     *删除的文件,只需要记录sstable文件的数字就可以了*/
    const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
    for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
         iter != del.end();
         ++iter) {
      const int level = iter->first;
      const uint64_t number = iter->second;
      levels_[level].deleted_files.insert(number);
    }

    /*处理新增文件部分,要将VersionEdit中的增加文件部分的整个FileMetaData都记录下来*/
    for (size_t i = 0; i < edit->new_files_.size(); i++) {
      const int level = edit->new_files_[i].first;
      FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
      f->refs = 1;

      // We arrange to automatically compact this file after
      // a certain number of seeks.  Let's assume:
      //   (1) One seek costs 10ms
      //   (2) Writing or reading 1MB costs 10ms (100MB/s)
      //   (3) A compaction of 1MB does 25MB of IO:
      //         1MB read from this level
      //         10-12MB read from next level (boundaries may be misaligned)
      //         10-12MB written to next level
      // This implies that 25 seeks cost the same as the compaction
      // of 1MB of data.  I.e., one seek costs approximately the
      // same as the compaction of 40KB of data.  We are a little
      // conservative and allow approximately one seek for every 16KB
      // of data before triggering a compaction.
      f->allowed_seeks = (f->file_size / 16384);
      if (f->allowed_seeks < 100) f->allowed_seeks = 100;

      levels_[level].deleted_files.erase(f->number);
      levels_[level].added_files->insert(f);
    }
  }

对于文件也记录的allow_seeks这个字段,原因是触发Compaction,可能是因为某一个层级的file个数太多,总长度超过了指定的上限,也可能是因为某sstable seek的次数过多,此处并不展开讲解。

SaveTo

    builder.SaveTo(v);

这一部分是根据Builder中的base_指向的当前版本current_, 以及在Apply部分记录的删除文件集合,新增文件集合和CompactionPointer集合,计算出一个新的Verison,保存在v中。


  // Save the current state in *v.
  void SaveTo(Version* v) {
    BySmallestKey cmp;
    cmp.internal_comparator = &vset_->icmp_;
    for (int level = 0; level < config::kNumLevels; level++) {
      // Merge the set of added files with the set of pre-existing files.
      // Drop any deleted files.  Store the result in *v.
      const std::vector<FileMetaData*>& base_files = base_->files_[level];
      std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
      std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
      const FileSet* added = levels_[level].added_files;
      v->files_[level].reserve(base_files.size() + added->size());
      for (FileSet::const_iterator added_iter = added->begin();
           added_iter != added->end();
           ++added_iter) {
        // Add all smaller files listed in base_
        for (std::vector<FileMetaData*>::const_iterator bpos
                 = std::upper_bound(base_iter, base_end, *added_iter, cmp);
             base_iter != bpos;
             ++base_iter) {
          MaybeAddFile(v, level, *base_iter);
        }

        MaybeAddFile(v, level, *added_iter);
      }

      // Add remaining base files
      for (; base_iter != base_end; ++base_iter) {
        MaybeAddFile(v, level, *base_iter);
      }

#ifndef NDEBUG
      // Make sure there is no overlap in levels > 0
      if (level > 0) {
        for (uint32_t i = 1; i < v->files_[level].size(); i++) {
          const InternalKey& prev_end = v->files_[level][i-1]->largest;
          const InternalKey& this_begin = v->files_[level][i]->smallest;
          if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
            fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
                    prev_end.DebugString().c_str(),
                    this_begin.DebugString().c_str());
            abort();
          }
        }
      }
#endif
    }

注意一开始新版本v里面各个level的文件集合都是空的,同时除了level0以外,其它的level中的文件是有序的,必须是有序的。因此上面的代码就比较容易理解了。

它是一个三层的循环:最外层是level。无论是当前的Version current_,还是正在生成中的Version v,文件都是分层,包括help类Builder中的delete集合和新增集合,都是分层表示的。因此最外层循环是level就很自然。各个层级处理的逻辑都是一样。

就是base_这个Version(其实就是当前Version current_) 中的文件和 Builder中的added_files文件进行比较,按照顺序进入新Version v的对应level中。

其中很有意思的是MayAddFile。

  void MaybeAddFile(Version* v, int level, FileMetaData* f) {
    if (levels_[level].deleted_files.count(f->number) > 0) {
      /*如果文件在删除列表之内,就没必要加入到新的Version v的对应层级的文件集合*/
    } else {
      std::vector<FileMetaData*>* files = &v->files_[level];
      if (level > 0 && !files->empty()) {
        /*除level0外的任何level (1~6),Version v内的对应层级的文件列表必须是有序的,不能交叉
         *所以此处有判断 level >0,这是因为并不care level0 是否交叉,事实上,它几乎总是交叉的* /
        assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
                                    f->smallest) < 0);
      }
      f->refs++;
      files->push_back(f);
    }
  }
};

VersionSet::Finalize

这个部分是用来帮忙选择下一次Compaction应该从which level 开始。计算部分比较简单,基本就是看该level的文件数目或者所有文件的size 之和是否超过上限:

void VersionSet::Finalize(Version* v) {
  // Precomputed best level for next compaction
  int best_level = -1;
  double best_score = -1;

  for (int level = 0; level < config::kNumLevels-1; level++) {
    double score;
    if (level == 0) {
      // We treat level-0 specially by bounding the number of files
      // instead of number of bytes for two reasons:
      //
      // (1) With larger write-buffer sizes, it is nice not to do too
      // many level-0 compactions.
      //
      // (2) The files in level-0 are merged on every read and
      // therefore we wish to avoid too many files when the individual
      // file size is small (perhaps because of a small write-buffer
      // setting, or very high compression ratios, or lots of
      // overwrites/deletions).
      score = v->files_[level].size() /
          static_cast<double>(config::kL0_CompactionTrigger);
    } else {
      // Compute the ratio of current size to size limit.
      const uint64_t level_bytes = TotalFileSize(v->files_[level]);
      score =
          static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
    }

    if (score > best_score) {
      best_level = level;
      best_score = score;
    }
  }

  v->compaction_level_ = best_level;
  v->compaction_score_ = best_score;
}

对于level0 而言,如果文件数目超过了config::kL0_CompactionTrigger, 就标记需要Compaction, 而该参数的值为:

// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;

对于其他 层级而言,是按照每一level的总大小来判定的。按照预想,每一个层级的文件的总大小是有上限的:

level 1                10M
level 2               100M 
level 3              1000M
level 4             10000M
level 5            100000M

该层级的得分是该层级的所有文件的size总和 除以上限,然后各个层级比较自己的score,选出最急迫需要Compaction的level。 当然了Level 6就不用算了,它已经是最高的层级了。

Compaction的时候,调用Pick Compaction函数来选择compaction的层级,那时候会先按照Finalize算出来的level进行Compaction。当然了,如果从文件大小和文件个数的角度看,没有任何level需要Compaction,就按照seek次数来决定


Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  int level;

  /*注意注释,优先按照size来选择compaction的层级,选择的依据即按照Finalize函数计算的score*/
  // We prefer compactions triggered by too much data in a level over
  // the compactions triggered by seeks.
  const bool size_compaction = (current_->compaction_score_ >= 1);
  const bool seek_compaction = (current_->file_to_compact_ != NULL);
  if (size_compaction) {
  
  }
}

LogAndApply的MANIFEST部分

注意,VerisonEdit会写入到磁盘的MANIFEST部分,这一部分不在本文讲解,后面会有单独的MANIFEST的详细介绍。


上一篇     下一篇