ceph-mon之Paxos算法(2)

| 分类 ceph-internal  | 标签 ceph-internal 

前言

上一篇文章介绍了一次提案通过的正常流程,尽管流程已经介绍完毕了,但是,总有一些困扰萦绕心头。

accepted_pn到底是什么鬼?

在monitor leader的begin 函数中:

 t->put(get_name(), last_committed+1, new_value);

  // note which pn this pending value is for.
  t->put(get_name(), "pending_v", last_committed + 1);
  t->put(get_name(), "pending_pn", accepted_pn);

在Peon的handle_begin函数中:

  t->put(get_name(), v, begin->values[v]);

  // note which pn this pending value is for.
  t->put(get_name(), "pending_v", v);
  t->put(get_name(), "pending_pn", accepted_pn);

讲提案编码这块是有意义的,因为commit的阶段要解码这段bufferlist,并提交事务,这好理解,可是后两句,pending_v和pending_pn到底是干嘛滴?后面一直也没下文,也不知道设置pending_v和pending_pn到底有啥用途。

这一步逻辑,其实是用于恢复的。正常情况下,自然不会用到,但是如果有异常发生,Paxos的恢复逻辑需要用到上述的信息。

基本概念

  • PN Proposal Number

Leader当选之后,会执行一次Phase 1过程来确定PN,在其为Leader的过程中,所有的Phase 2共用一个PN。所以省略了大量的Phase 1过程。这也是Paxos能够减小网络开销的原因。

A newly chosen leader executes phase 1 for infinitely many instances of the consensus algorithm
                                                                                  -- << Paxos Make Simple>>
  • Version

verson可以理解为Paxos中的Instance ID。应用层的每一个提案,可以encode成二进制的字节流,作为value,而version或者Instance ID作为键值和该value对应。

需要持久化的数据结构有:

名称 含义 其他
last_pn 上次当选leader后生成的PN get_new_proposal_number()使用,下次当选后,接着生成
accepted_pn 我接受过的PN,可能是别的leader提议的PN peon根据这个值拒绝较小的PN
first_committed 本节点记录的第一个被commit的版本 更早的版本(日志),本节点没有了
last_committed 本节点记录的最后一次被commit的版本 往后的版本,未被commit,可能有一个
uncommitted_v 本节点记录的未commit的版本,如果有,只能等于last_commit+1 ceph只允许有一个未commit的版本
uncommitted_pn 未commit的版本对应的PN 与uncommitted_v,uncommitted_value在一个事务中记录
uncommitted_value 未commit的版本的内容 与uncommitted_v,uncommitted_value在一个事务中记录

注意,上述三个”uncommitted”开头的值,可能压根就不存在,比如正常关机,全部都commit了。

介绍完这些基本概念,我们需要开始考虑异常了。事实上,从时间顺序上讲,这一篇才是应该是第一篇,因为整个集群的mon要首先到达一个一致的状态,然后开始有条不紊地进行上一篇文章进行的步骤。

但是,从认知规律上讲,上一篇讲的内容,是Paxos主干路径,每天进行无数次,而ceph mon恢复到一致的状态,才是异常路径,只有发生异常的时候,才会走到。因此,我们选择了先介绍正常,然后介绍异常,以及从异常中恢复到一致的状态。

注意哈,Leader选举成功之后,会掉用collect,这个名字看起来怪怪的,其实是有意义的,是说可能发生了杂七杂八的异常,现在新的老大也已经选出来了,搜集一下各自的信息,然后将所有的成员的状态达成一致。

如果不能理清楚,可能会发生哪些异常,单纯流水账一样的阅读 collect handle_collect handle_last,可能无法体会代码为什么要这么写,为什么集群经过这么几个步骤就能达成一致。

所以,下面我们要从异常出发,可能产生哪几种异常,以及如何恢复的。

Recovery

当mon leader选举出来之后,会进入到STATE_RECOVERING状态,并调用collect函数,搜集peon的信息,以期互通有无,达成一致。

void Paxos::leader_init()
{
  cancel_events();
  new_value.clear();

  finish_contexts(g_ceph_context, proposals, -EAGAIN);

  logger->inc(l_paxos_start_leader);

  if (mon->get_quorum().size() == 1) {
    state = STATE_ACTIVE;
    return;
  }

  /*进入 recovering状态*/
  state = STATE_RECOVERING;
  lease_expire = utime_t();
  dout(10) << "leader_init -- starting paxos recovery" << dendl;
  
  /*掉用collect函数,进入phase 1*/
  collect(0);
}

注意在collect函数中,会生成一个新的PN(Proposal Number)。注意哈,这个编号有要求,要全局唯一,并且单调递增。那么集群这么多节点,mon leader也可能会变动,如何确保PN的这两个特点呢?

version_t Paxos::get_new_proposal_number(version_t gt)
{
  if (last_pn < gt) 
    last_pn = gt;
  
  // update. make it unique among all monitors.
  /*核心的算法在下面四句*/
  last_pn /= 100;
  last_pn++;
  last_pn *= 100;
  last_pn += (version_t)mon->rank;

  // write
  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
  t->put(get_name(), "last_pn", last_pn);

  dout(30) << __func__ << " transaction dump:\n";
  JSONFormatter f(true);
  t->dump(&f);
  f.flush(*_dout);
  *_dout << dendl;

  logger->inc(l_paxos_new_pn);
  utime_t start = ceph_clock_now(NULL);

  get_store()->apply_transaction(t);

  utime_t end = ceph_clock_now(NULL);
  logger->tinc(l_paxos_new_pn_latency, end - start);

  dout(10) << "get_new_proposal_number = " << last_pn << dendl;
  return last_pn;
}

在上次的值的基础上,加上100,然后加上该mon 的rank值即可。比如,如果rank值为0,1,2,最开始PN为100,每次触发选举,如果monitor 0存在的话,总是它获胜,那么下一个产生的PN= 100+100+0 = 200。如果当前PN为200,再次发生了monitor的选举,但是这一次,monitor 0并不在(发生了异常),那么monitor 1就会获胜,新产生的PN为200+100+1=301;如果突然monitor 0成功启动了,那么新的PN为(300/100+1)*100+0 = 400。

注意这个值,只会在leader选举完成后,collect的时候更新一次,当达成一致之后,后面可能有很多的提案,但是这个PN并不会发生变化。

步骤 Leader Peon 说明
1 collect() =>   Leader给quorum中各个peon发送PN以及其他附带信息,告诉peon,请将各自信息汇报上来
2   <=handle_collect() Peon同意或者拒绝PN。并中间可能分享已经commit的数据
3 handle_last()   Quorum中peon全部同意leader的PN,才算成功。这个函数会根据peon的信息以及自身的信息,要么重新propose uncommitted的提案,要么将某成员缺失的信息share出去,确保各个成员达成一致。

下面的内容,根据mon leader down还是Peon down,分开讨论

Peon down

Peon down的话,Leader会检测到。

首先有租约机制:

void Paxos::lease_ack_timeout()
{
  dout(1) << "lease_ack_timeout -- calling new election" << dendl;
  assert(mon->is_leader());
  assert(is_active());
  logger->inc(l_paxos_lease_ack_timeout);
  lease_ack_timeout_event = 0;
  mon->bootstrap();
}

其次,如果发送了OP_BEGIN,和peon因为down,无法回复OP_ACCEPT消息,会触发:

void Paxos::accept_timeout()
{
  dout(1) << "accept timeout, calling fresh election" << dendl;
  accept_timeout_event = 0;
  assert(mon->is_leader());
  assert(is_updating() || is_updating_previous() || is_writing() ||
	 is_writing_previous());
  logger->inc(l_paxos_accept_timeout);
  mon->bootstrap();
}

无论是哪一种情况,都会因bootstrap重新选举,选举结束后,原来的Leader仍然是Leader,这时候会调用collect函数。

这里我们分成 Peon Down 和Up两个阶段来讨论

Peon Down

注意,在collect函数中,会生成新的PN :

  accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
  accepted_pn_from = last_committed;

Peon Down 就意味着 Leader一直都完好如初,而重新选举之后,Leader节点不会发生变化。这意味着所有Peon的数据并不会比Leader更新。

  • last_committed(leader) >= last_committed(peon)
  • accepted_pn(leader) > accepted_pn(peon)

第二条之所以成立,是因为在collect函数中,Leader重新生成了新的PN,因此,leader的accepted_pn要大于所有的Peon的accepted_pn。

timeout事件是在time线程内完成,time线程干活的时候会获取monitor lock,那么可以推断,leader的paxos流程可能被中断的情况包括以下几个点:

  1. Leader处于active状态,未开始任何提案
  2. leader为updating状态,即begin函数已经执行,等待accept中,此时leader有uncommitted数据,并且可能已经有部分accept消息
  3. leader为writing状态,说明已经接收到所有accept消息,即commit_start已经开始执行,事务已经排队等待执行
  4. leader为writing状态,写操作已经执行完成,即事务已经生效,只是回调函数(commit_finish)还没有被执行(回调函数没被执行是因为需要获取monitor lock的锁)

3和4会发生是因为Leader的commit采取了异步的方式:

  get_store()->queue_transaction(t, new C_Committed(this));
  
struct C_Committed : public Context {
  Paxos *paxos;
  explicit C_Committed(Paxos *p) : paxos(p) {}
  void finish(int r) {
    assert(r >= 0);
    Mutex::Locker l(paxos->mon->lock);
    paxos->commit_finish();
  }
};

一旦commit_finish 开始执行,就意味着持有monitor lock(paxos->mon->lock。leader不会被中断在refresh状态,因为一旦commit_finish函数开始执行, 会将refresh状态执行完成,重新回到active状态,time线程才可能获取到锁执行。

第1种情况,不需要处理,并没有什么新的提案在行进中,无需理会。 第二种情况下,存在uncommitted数据,Leader会重新开始一个propose的过程。如何做到?

注意哈,下面的注释部分,仅仅考虑Peon Down情况下的第二种情况,即Leader已经发起begin,正在等待OP_ACCEPT消息,可能收到了部分OP_ACCEPT的情况。

void Paxos::collect(version_t oldpn)
{
  // we're recoverying, it seems!
  state = STATE_RECOVERING;
  assert(mon->is_leader());

  /*uncommitted_v uncommitted_pn以及uncommitted_value是个三元组
   *collect也会搜集其他Peon的数据,因此此处为初始化*/
  uncommitted_v = 0;
  uncommitted_pn = 0;
  uncommitted_value.clear();
  peer_first_committed.clear();
  peer_last_committed.clear();

  /*注意哈,考虑第二种情况,Leader自己也有uncommitted数据,因此,本循环体是可以得到尚未commit的提案
   * 包括上一轮的PN存放到uncommitted_pn,
   * 上一轮的提案的Instance ID存放到 uncommitted_v,
   * 以及上一轮提案的值存放入uncommitted_value*/
  if (get_store()->exists(get_name(), last_committed+1)) {
    version_t v = get_store()->get(get_name(), "pending_v");
    version_t pn = get_store()->get(get_name(), "pending_pn");
    if (v && pn && v == last_committed + 1) {
      uncommitted_pn = pn;
    } else {
      dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
	       << " and crossing our fingers" << dendl;
      uncommitted_pn = accepted_pn;
    }
    uncommitted_v = last_committed+1;

    get_store()->get(get_name(), last_committed+1, uncommitted_value);
    assert(uncommitted_value.length());
    dout(10) << "learned uncommitted " << (last_committed+1)
	     << " pn " << uncommitted_pn
	     << " (" << uncommitted_value.length() << " bytes) from myself" 
	     << dendl;

    logger->inc(l_paxos_collect_uncommitted);
  }

  /*重新生成新的PN,这个PN一定*/
  accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
  accepted_pn_from = last_committed;
  num_last = 1;
  dout(10) << "collect with pn " << accepted_pn << dendl;

  // send collect
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    if (*p == mon->rank) continue;
    
    /*向其他节点发送OP_COLLECT,搜集信息,来使集群恢复到一致的状态*/
    
    MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
				       ceph_clock_now(g_ceph_context));
    collect->last_committed = last_committed;
    collect->first_committed = first_committed;
    collect->pn = accepted_pn;
    mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
  }

  // set timeout event
  collect_timeout_event = new C_MonContext(mon, [this](int r) {
	if (r == -ECANCELED)
	  return;
	collect_timeout();
    });
  mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
			     g_conf->mon_lease,
			     collect_timeout_event);
}


注意对于这种情况下,其他Peon节点,其accepted_pn一定会小于新产生的PN,即OP_COLLECT消息体中的PN。我们来看其他PEON节点的反应:

void Paxos::handle_collect(MonOpRequestRef op)
{
  op->mark_paxos_event("handle_collect");

  MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
  dout(10) << "handle_collect " << *collect << dendl;

  assert(mon->is_peon()); // mon epoch filter should catch strays

  // we're recoverying, it seems!
  state = STATE_RECOVERING;

  /*这不会发生,对于我们限定的这种场景*/
  if (collect->first_committed > last_committed+1) {
    dout(2) << __func__
            << " leader's lowest version is too high for our last committed"
            << " (theirs: " << collect->first_committed
            << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
    op->mark_paxos_event("need to bootstrap");
    mon->bootstrap();
    return;
  }

  /*回复OP_LAST消息,将自己的last_committed和first_committed放入消息体内*/
  MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
				  ceph_clock_now(g_ceph_context));
  last->last_committed = last_committed;
  last->first_committed = first_committed;
  
  version_t previous_pn = accepted_pn;

  /*注意,collect->pn是选举之后,原来的leader新产生出来的,因此一定会比PEON的accepted_n大*/
  if (collect->pn > accepted_pn) {
    // ok, accept it
    accepted_pn = collect->pn;
    accepted_pn_from = collect->pn_from;
    dout(10) << "accepting pn " << accepted_pn << " from " 
	     << accepted_pn_from << dendl;
  
    MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
    t->put(get_name(), "accepted_pn", accepted_pn);

    dout(30) << __func__ << " transaction dump:\n";
    JSONFormatter f(true);
    t->dump(&f);
    f.flush(*_dout);
    *_dout << dendl;

    logger->inc(l_paxos_collect);
    logger->inc(l_paxos_collect_keys, t->get_keys());
    logger->inc(l_paxos_collect_bytes, t->get_bytes());
    utime_t start = ceph_clock_now(NULL);

    get_store()->apply_transaction(t);

    utime_t end = ceph_clock_now(NULL);
    logger->tinc(l_paxos_collect_latency, end - start);
  } else {
    // don't accept!
    dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
	     << ", we already accepted " << accepted_pn
	     << " from " << accepted_pn_from << dendl;
  }
  last->pn = accepted_pn;
  last->pn_from = accepted_pn_from;

  // share whatever committed values we have
  if (collect->last_committed < last_committed)
    share_state(last, collect->first_committed, collect->last_committed);

  // do we have an accepted but uncommitted value?
  //  (it'll be at last_committed+1)
  bufferlist bl;
  
  /*注意,如果已经有Peon回复过OP_ACCEPT消息,那么此处就会走到*/
  if (collect->last_committed <= last_committed &&
      get_store()->exists(get_name(), last_committed+1)) {
    get_store()->get(get_name(), last_committed+1, bl);
    assert(bl.length() > 0);
    dout(10) << " sharing our accepted but uncommitted value for " 
	     << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
    last->values[last_committed+1] = bl;

    version_t v = get_store()->get(get_name(), "pending_v");
    version_t pn = get_store()->get(get_name(), "pending_pn");
    if (v && pn && v == last_committed + 1) {
      last->uncommitted_pn = pn;
    } else {
      // previously we didn't record which pn a value was accepted
      // under!  use the pn value we just had...  :(
      dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
	       << " and crossing our fingers" << dendl;
      last->uncommitted_pn = previous_pn;
    }

    logger->inc(l_paxos_collect_uncommitted);
  }

  // send reply
  collect->get_connection()->send_message(last);
}

我们以196 197 198集群为例,毫无疑问,196是monitor leader,在这种情况下,把197的mon 关闭,我们会看到:


196节点:
-------
2017-10-04 21:15:26.559490 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) begin for 1737443 25958 bytes
2017-10-04 21:15:26.559516 7f36cefe9700 30 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) begin transaction dump:
{ "ops": [
        { "op_num": 0,
          "type": "PUT",
          "prefix": "paxos",
          "key": "1737443",
          "length": 25958},
        { "op_num": 1,
          "type": "PUT",
          "prefix": "paxos",
          "key": "pending_v",
          "length": 8},
        { "op_num": 2,
          "type": "PUT",
          "prefix": "paxos",
          "key": "pending_pn",
          "length": 8}],
  "num_keys": 3,
  "num_bytes": 26015}
bl dump:
bl dump:
{ "ops": [
        { "op_num": 0,
          "type": "PUT",
          "prefix": "logm",
          "key": "full_432632",
          "length": 15884},
        { "op_num": 1,
          "type": "PUT",
          "prefix": "logm",
          "key": "full_latest",
          "length": 8},
        { "op_num": 2,
          "type": "PUT",
          "prefix": "logm",
          "key": "432633",
          "length": 9882},
        { "op_num": 3,
          "type": "PUT",
          "prefix": "logm",
          "key": "last_committed",
          "length": 8}],
  "num_keys": 4,
  "num_bytes": 25840}
2017-10-04 21:15:26.580022 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442)  sending begin to mon.1
2017-10-04 21:15:26.580110 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442)  sending begin to mon.2




2017-10-04 21:15:26.594622 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) handle_accept paxos(accept lc 1737442 fc 0 pn 1100 opn 0) v3
2017-10-04 21:15:26.594631 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442)  now 0,2 have accepted


2017-10-04 21:15:40.996887 7f36cefe9700 10 mon.oquew@0(electing) e3 win_election epoch 26 quorum 0,2 features 211106232532991
2017-10-04 21:15:40.996955 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) leader_init -- starting paxos recovery
2017-10-04 21:15:40.997144 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) learned uncommitted 1737443 pn 1100 (25958 bytes) from myself
2017-10-04 21:15:40.997172 7f36cefe9700 30 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) get_new_proposal_number transaction dump:
{ "ops": [
        { "op_num": 0,
          "type": "PUT",
          "prefix": "paxos",
          "key": "last_pn",
          "length": 8}],
  "num_keys": 1,
  "num_bytes": 20}
2017-10-04 21:15:41.000424 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) get_new_proposal_number = 1200
2017-10-04 21:15:41.000456 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) collect with pn 1200






198节点
---------

2017-10-04 21:15:41.042089 7f7c043e3700 10 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442) handle_collect paxos(collect lc 1737442 fc 1736921 pn 1200 opn 0) v3
2017-10-04 21:15:41.042094 7f7c043e3700 10 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442) accepting pn 1200 from 0
2017-10-04 21:15:41.042101 7f7c043e3700 30 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442) handle_collect transaction dump:
{ "ops": [
        { "op_num": 0,
          "type": "PUT",
          "prefix": "paxos",
          "key": "accepted_pn",
          "length": 8}],
  "num_keys": 1,
  "num_bytes": 24}
2017-10-04 21:15:41.046361 7f7c043e3700 10 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442)  sharing our accepted but uncommitted value for 1737443 (25958 bytes)


注意,1737443议案已经发起,并且收到了两个OP_ACCEPT,0和2,其中0是monitor leader本身,2是198发过来的OP_ACCEPT,1对应的mon是197的monitor,因为down所以迟迟收不到OP_ACCEPT。当196重新当选Leader之后,会发送OP_COLLECT消息到198,而198会接受新的PN 1200(之前是1100),但是它会在OP_LAST消息体中,告诉monitor leader,它曾经收到一份1737443号议案,它议案它已经accept,但是尚未committed。

那么monitor leader收到消息之后会怎样呢?

  if (last->pn > accepted_pn) {
    // no, try again.
    dout(10) << " they had a higher pn than us, picking a new one." << dendl;

    // cancel timeout event
    mon->timer.cancel_event(collect_timeout_event);
    collect_timeout_event = 0;

    collect(last->pn);
  } else if (last->pn == accepted_pn) {
  
    /*对于我们构造的这种场景,会走这个分支*/
    // yes, they accepted our pn.  great.
    num_last++;
    dout(10) << " they accepted our pn, we now have " 
	     << num_last << " peons" << dendl;

    
    /*记录下收到的uncommitted三元组*/
    if (last->uncommitted_pn) {
      if (last->uncommitted_pn >= uncommitted_pn &&
	       last->last_committed >= last_committed &&
	       last->last_committed + 1 >= uncommitted_v) {
	         uncommitted_v = last->last_committed+1;
	         uncommitted_pn = last->uncommitted_pn;
	         uncommitted_value = last->values[uncommitted_v];
	         dout(10) << "we learned an uncommitted value for " << uncommitted_v
	                  << " pn " << uncommitted_pn
	                  << " " << uncommitted_value.length() << " bytes"
	                  << dendl;
      } else {
        dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
                 << " pn " << last->uncommitted_pn
                 << " " << last->values[last->last_committed+1].length() << " bytes"
                 << dendl;
      }
    }
    
    /*如果已经搜集齐了所有的Peon的消息*/
    if (num_last == mon->get_quorum().size()) {
      // cancel timeout event
      mon->timer.cancel_event(collect_timeout_event);
      collect_timeout_event = 0;
      peer_first_committed.clear();
      peer_last_committed.clear();

      // almost...

      /*如果发现uncommitted等于last_committed+1*/
      if (uncommitted_v == last_committed+1 &&
          uncommitted_value.length()) {
          dout(10) << "that's everyone.  begin on old learned value" << dendl;
          
          /*注意后面两句,对于我们说的场景2,leader会把未完成的提案,再次begin,即重新发起一次,确保完成,
           *不过状态是STATE_UPDATING_PREVIOUS,即完成上一轮的情况*/
          state = STATE_UPDATING_PREVIOUS;
          begin(uncommitted_value);
      } else {
      // active!
      dout(10) << "that's everyone.  active!" << dendl;
      extend_lease();
      
      need_refresh = false;
      if (do_refresh()) {
        finish_round();
      }
     }
   }
 } else {
    // no, this is an old message, discard
    dout(10) << "old pn, ignoring" << dendl;
  }

注意哈,无论是否存在某个Peon已经回复了OP_ACCEPT,这个未完成的提案都会通过begin函数,再次发起。

  • 如果一个OP_ACCEPT都没有收到,那么Monitor Leader自己已经记录了uncommitted三元组,不需要通过Peon来学习到这个提案
  • 如果收到了某个OP_ACCEPT信息,那么该Peon在OP_LAST消息体中自然会告诉monitor leader uncommitted 三元组

无论哪种方法,monitor leader在 handle_last函数中都会执行 begin函数,完成上一轮未完成的提案。

2017-10-04 21:15:41.038753 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) handle_last paxos(last lc 1737442 fc 1736921 pn 1200 opn 1100) v3
2017-10-04 21:15:41.038759 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) store_state nothing to commit
2017-10-04 21:15:41.038824 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442)  they accepted our pn, we now have 2 peons
2017-10-04 21:15:41.038835 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) we learned an uncommitted value for 1737443 pn 1100 25958 bytes
2017-10-04 21:15:41.038843 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) that's everyone.  begin on old learned value
2017-10-04 21:15:41.038848 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating-previous c 1736921..1737442) begin for 1737443 25958 bytes
2017-10-04 21:15:41.038868 7f36ce7e8700 30 mon.oquew@0(leader).paxos(paxos updating-previous c 1736921..1737442) begin transaction dump:
{ "ops": [
        { "op_num": 0,
          "type": "PUT",
          "prefix": "paxos",
          "key": "1737443",
          "length": 25958},
        { "op_num": 1,
          "type": "PUT",
          "prefix": "paxos",
          "key": "pending_v",
          "length": 8},
        { "op_num": 2,
          "type": "PUT",
          "prefix": "paxos",
          "key": "pending_pn",
          "length": 8}],
  "num_keys": 3,
  "num_bytes": 26015}
bl dump:
{ "ops": [
        { "op_num": 0,
          "type": "PUT",
          "prefix": "logm",
          "key": "full_432632",
          "length": 15884},
        { "op_num": 1,
          "type": "PUT",
          "prefix": "logm",
          "key": "full_latest",
          "length": 8},
        { "op_num": 2,
          "type": "PUT",
          "prefix": "logm",
          "key": "432633",
          "length": 9882},
        { "op_num": 3,
          "type": "PUT",
          "prefix": "logm",
          "key": "last_committed",
          "length": 8}],
  "num_keys": 4,
  "num_bytes": 25840}

2017-10-04 21:15:41.057345 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating-previous c 1736921..1737442)  sending begin to mon.2

花了很长的篇幅,终于介绍完了当Peon down的时候的第二种情形。下面我们来考虑第三和第四种情况。

3. leader为writing状态,说明已经接收到所有accept消息,即commit_start已经开始执行,事务已经排队等待执行
4. leader为writing状态,写操作已经执行完成,即事务已经生效,只是回调函数(commit_finish)还没有被执行(回调函数没被执行是因为需要获取         monitor lock的锁)

注意,第3和4种情况会等待已经在writing状态的数据commit完成后,才会重新选举:

void Monitor::wait_for_paxos_write()
{
  if (paxos->is_writing() || paxos->is_writing_previous()) {
    dout(10) << __func__ << " flushing pending write" << dendl;
    lock.Unlock();
    store->flush();
    lock.Lock();
    dout(10) << __func__ << " flushed pending write" << dendl;
  }
}

void Monitor::bootstrap()
{
  dout(10) << "bootstrap" << dendl;
  wait_for_paxos_write();
  ...
  
}

void Monitor::start_election()
{
  dout(10) << "start_election" << dendl;
  wait_for_paxos_write();
  ...
}

对于第三种和第四种情况,Paxos应该处于writing或者writing_previous状态,这种情况下,会执行store->flush,在选举之前,确保已经处于writing状态的数据commit完成,然后开始选举。

对于其他的Peon,无论是否commit,Leader都已经完成了commit,在handle_last阶段:

 for (map<int,version_t>::iterator p = peer_last_committed.begin();
       p != peer_last_committed.end();
       ++p) {
    if (p->second + 1 < first_committed && first_committed > 1) {
      dout(5) << __func__
	      << " peon " << p->first
	      << " last_committed (" << p->second
	      << ") is too low for our first_committed (" << first_committed
	      << ") -- bootstrap!" << dendl;
      op->mark_paxos_event("need to bootstrap");
      mon->bootstrap();
      return;
    }
    
    /*对于第三第四种情况,mon leader可以将peon缺失的部分share给Peon,让Peon commit这些缺失的部分*/
    if (p->second < last_committed) {
      // share committed values
      dout(10) << " sending commit to mon." << p->first << dendl;
      MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
					MMonPaxos::OP_COMMIT,
					ceph_clock_now(g_ceph_context));
      share_state(commit, peer_first_committed[p->first], p->second);
      mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
    }
  }

我们来看下第三第四种情况下的log。其中197是down的Peon,198是正常的Peon,但是没来得及commit,这时候,Leader会发现198缺失1743405这个commit,会通过share_state函数,将缺失部分塞入消息体,发给198,即mon.2



2017-10-04 22:05:44.680463 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) handle_last paxos(last lc 1743404 fc 1742694 pn 1300 opn 0) v3
2017-10-04 22:05:44.680481 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) store_state nothing to commit

/*197*/
2017-10-04 22:05:44.680556 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405)  sending commit to mon.2
2017-10-04 22:05:44.680568 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) share_state peer has fc 1742694 lc 1743404
2017-10-04 22:05:44.680639 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405)  sharing 1743405 (133 bytes)
2017-10-04 22:05:44.680730 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405)  they accepted our pn, we now have 2 peons

Peon up

上面四种情况,讲述的是Peon down之后的4种可能性。当Down的Peon重新Up会发生什么事情呢?

因为Peon down了很长时间,它的很多信息都落后,因此启动的时候,会有sync的过程。这个过程并不是通过 collect–>handle_collect—>handle_last 完成的信息同步,而是在Peon启动的时候,调用sync_start函数,发起数据同步,进入STATE_SYNCHRONIZING状态。这部分内容不打算在此处展开。

数据sync完毕之后,调用sync_finish函数,在该函数中会再次bootstrap,会触发选举,当然,还是原来的leader会获胜。

Leader Down

Leader 可能会死在Paxos任意函数的任何地方,这时候,新的选举中,会从Peon中选择rank最小的Peon当新的Leader。和之前一样,我们来考虑,Leader down 和Leader Up这两件事情发生之后,集群如何恢复到一致。

Down

peon在lease超时后会重新选举,peon可能中断在active或updating状态,peon之间的状态并不是一样的,可能一些在active,一些在updating:

  • leader down在active状态,不需要特殊处理
  • leader down在updating状态,如果没有peon已经accept,不需要特殊处理,如果有peon已经accept,新的leader要么自己已经accept,要么会从其他peon学习到,会重新propose
  • leader down在writing状态,说明所有peon已经accept,新的leader会重新propose已经accept的值(此时down的leader可能已经写成功,也可能没有写成功)
  • leader down在refresh状态,down的leader已经写成功,如果有peon已经收到commit消息,新的commit会被新的leader在collect阶段学习到,如果没有peon收到commit消息,会重新propose

对于情况2中,如果有些peon已经accept,那么在handle_collect函数,该peon就会讲这些uncommitted三元组发给新的Leader,或者新的Leader自己就曾经accept,自己从自身也能获得uncommmited三元组,这时候就会掉用 begin重新propose。

    /*记录下收到的uncommitted三元组*/
    if (last->uncommitted_pn) {
      if (last->uncommitted_pn >= uncommitted_pn &&
	       last->last_committed >= last_committed &&
	       last->last_committed + 1 >= uncommitted_v) {
	         uncommitted_v = last->last_committed+1;
	         uncommitted_pn = last->uncommitted_pn;
	         uncommitted_value = last->values[uncommitted_v];
	         dout(10) << "we learned an uncommitted value for " << uncommitted_v
	                  << " pn " << uncommitted_pn
	                  << " " << uncommitted_value.length() << " bytes"
	                  << dendl;
      } else {
        dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
                 << " pn " << last->uncommitted_pn
                 << " " << last->values[last->last_committed+1].length() << " bytes"
                 << dendl;
      }
    }
    
    /*如果已经搜集齐了所有的Peon的消息*/
    if (num_last == mon->get_quorum().size()) {
      // cancel timeout event
      mon->timer.cancel_event(collect_timeout_event);
      collect_timeout_event = 0;
      peer_first_committed.clear();
      peer_last_committed.clear();

      // almost...

      /*如果发现uncommitted等于last_committed+1*/
      if (uncommitted_v == last_committed+1 &&
          uncommitted_value.length()) {
          dout(10) << "that's everyone.  begin on old learned value" << dendl;
          
          /*注意后面两句,对于我们说的场景2,leader会把未完成的提案,再次begin,即重新发起一次,确保完成,
           *不过状态是STATE_UPDATING_PREVIOUS,即完成上一轮的情况*/
          state = STATE_UPDATING_PREVIOUS;
          begin(uncommitted_value);
      }
      
      ....
      

对于情况3,和情况2一样,会通过如下代码,重新propose:

      if (uncommitted_v == last_committed+1 &&
          uncommitted_value.length()) {
          dout(10) << "that's everyone.  begin on old learned value" << dendl;
          state = STATE_UPDATING_PREVIOUS;
          begin(uncommitted_value);
      }
      

情况4 稍稍复杂一点,因为不确定是否有peon 执行过commit,如果没有peon执行过commit,和情况2 3一样,重新propose,但是如果曾经commit过,新的leader会通过collect函数学习到来自某peon的commit,同时将其他peon缺失的部分通过share_state分享给其他peon。

UP

leader重新up后,可能probing阶段就会做一次sync,此时数据可能会同步一部分,再一次被选举成leader,collect阶段会同步差异的几个版本数据, 同时,如果peon有uncommitted的数据,也会同步给leader,由新的leader重新propose。

唯一需要注意的是,leader down的时候存在的uncommitted的数据,由上面的情况可知,如果有peon已经接受,数据会被重新propose, 重新up后,根据pending_v,由于版本较低,pending数据会被抛弃。如果leader已经commit过,peon也一定会commit,所以不会导致数据不一致。

因为上一种情况,已经详细地分析了代码了,对于Leader down 的这种情况,我们就不全面展开了。

尾声

注意,本文大量的参考第一篇参考文献,我基本是按图索骥,我无意抄袭前辈的文章,只是前辈水平太高,很多东西高屋建瓴,语焉不详,对于初学者而言,可能不能领会其含义,本文做了一些展开,将某些内容和代码以及日志输出对应,帮助初学者更好地理解。

另外,参考文献2也是非常不错的文章,但是如果不分析可能发生的异常,这个Phase 1往往会知其然,不知其所以然,将代码读成了流水账。

参考文献

  1. Ceph Monitor Paxos
  2. Ceph的Paxos源码注释 - Phase 1

上一篇     下一篇