前言
上一篇文章介绍了一次提案通过的正常流程,尽管流程已经介绍完毕了,但是,总有一些困扰萦绕心头。
accepted_pn到底是什么鬼?
在monitor leader的begin 函数中:
t->put(get_name(), last_committed+1, new_value);
// note which pn this pending value is for.
t->put(get_name(), "pending_v", last_committed + 1);
t->put(get_name(), "pending_pn", accepted_pn);
在Peon的handle_begin函数中:
t->put(get_name(), v, begin->values[v]);
// note which pn this pending value is for.
t->put(get_name(), "pending_v", v);
t->put(get_name(), "pending_pn", accepted_pn);
讲提案编码这块是有意义的,因为commit的阶段要解码这段bufferlist,并提交事务,这好理解,可是后两句,pending_v和pending_pn到底是干嘛滴?后面一直也没下文,也不知道设置pending_v和pending_pn到底有啥用途。
这一步逻辑,其实是用于恢复的。正常情况下,自然不会用到,但是如果有异常发生,Paxos的恢复逻辑需要用到上述的信息。
基本概念
- PN Proposal Number
Leader当选之后,会执行一次Phase 1过程来确定PN,在其为Leader的过程中,所有的Phase 2共用一个PN。所以省略了大量的Phase 1过程。这也是Paxos能够减小网络开销的原因。
A newly chosen leader executes phase 1 for infinitely many instances of the consensus algorithm
-- << Paxos Make Simple>>
- Version
verson可以理解为Paxos中的Instance ID。应用层的每一个提案,可以encode成二进制的字节流,作为value,而version或者Instance ID作为键值和该value对应。
需要持久化的数据结构有:
名称 | 含义 | 其他 |
---|---|---|
last_pn | 上次当选leader后生成的PN | get_new_proposal_number()使用,下次当选后,接着生成 |
accepted_pn | 我接受过的PN,可能是别的leader提议的PN | peon根据这个值拒绝较小的PN |
first_committed | 本节点记录的第一个被commit的版本 | 更早的版本(日志),本节点没有了 |
last_committed | 本节点记录的最后一次被commit的版本 | 往后的版本,未被commit,可能有一个 |
uncommitted_v | 本节点记录的未commit的版本,如果有,只能等于last_commit+1 | ceph只允许有一个未commit的版本 |
uncommitted_pn | 未commit的版本对应的PN | 与uncommitted_v,uncommitted_value在一个事务中记录 |
uncommitted_value | 未commit的版本的内容 | 与uncommitted_v,uncommitted_value在一个事务中记录 |
注意,上述三个”uncommitted”开头的值,可能压根就不存在,比如正常关机,全部都commit了。
介绍完这些基本概念,我们需要开始考虑异常了。事实上,从时间顺序上讲,这一篇才是应该是第一篇,因为整个集群的mon要首先到达一个一致的状态,然后开始有条不紊地进行上一篇文章进行的步骤。
但是,从认知规律上讲,上一篇讲的内容,是Paxos主干路径,每天进行无数次,而ceph mon恢复到一致的状态,才是异常路径,只有发生异常的时候,才会走到。因此,我们选择了先介绍正常,然后介绍异常,以及从异常中恢复到一致的状态。
注意哈,Leader选举成功之后,会掉用collect,这个名字看起来怪怪的,其实是有意义的,是说可能发生了杂七杂八的异常,现在新的老大也已经选出来了,搜集一下各自的信息,然后将所有的成员的状态达成一致。
如果不能理清楚,可能会发生哪些异常,单纯流水账一样的阅读 collect handle_collect handle_last,可能无法体会代码为什么要这么写,为什么集群经过这么几个步骤就能达成一致。
所以,下面我们要从异常出发,可能产生哪几种异常,以及如何恢复的。
Recovery
当mon leader选举出来之后,会进入到STATE_RECOVERING状态,并调用collect函数,搜集peon的信息,以期互通有无,达成一致。
void Paxos::leader_init()
{
cancel_events();
new_value.clear();
finish_contexts(g_ceph_context, proposals, -EAGAIN);
logger->inc(l_paxos_start_leader);
if (mon->get_quorum().size() == 1) {
state = STATE_ACTIVE;
return;
}
/*进入 recovering状态*/
state = STATE_RECOVERING;
lease_expire = utime_t();
dout(10) << "leader_init -- starting paxos recovery" << dendl;
/*掉用collect函数,进入phase 1*/
collect(0);
}
注意在collect函数中,会生成一个新的PN(Proposal Number)。注意哈,这个编号有要求,要全局唯一,并且单调递增。那么集群这么多节点,mon leader也可能会变动,如何确保PN的这两个特点呢?
version_t Paxos::get_new_proposal_number(version_t gt)
{
if (last_pn < gt)
last_pn = gt;
// update. make it unique among all monitors.
/*核心的算法在下面四句*/
last_pn /= 100;
last_pn++;
last_pn *= 100;
last_pn += (version_t)mon->rank;
// write
MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
t->put(get_name(), "last_pn", last_pn);
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
logger->inc(l_paxos_new_pn);
utime_t start = ceph_clock_now(NULL);
get_store()->apply_transaction(t);
utime_t end = ceph_clock_now(NULL);
logger->tinc(l_paxos_new_pn_latency, end - start);
dout(10) << "get_new_proposal_number = " << last_pn << dendl;
return last_pn;
}
在上次的值的基础上,加上100,然后加上该mon 的rank值即可。比如,如果rank值为0,1,2,最开始PN为100,每次触发选举,如果monitor 0存在的话,总是它获胜,那么下一个产生的PN= 100+100+0 = 200。如果当前PN为200,再次发生了monitor的选举,但是这一次,monitor 0并不在(发生了异常),那么monitor 1就会获胜,新产生的PN为200+100+1=301;如果突然monitor 0成功启动了,那么新的PN为(300/100+1)*100+0 = 400。
注意这个值,只会在leader选举完成后,collect的时候更新一次,当达成一致之后,后面可能有很多的提案,但是这个PN并不会发生变化。
步骤 | Leader | Peon | 说明 |
---|---|---|---|
1 | collect() => | Leader给quorum中各个peon发送PN以及其他附带信息,告诉peon,请将各自信息汇报上来 | |
2 | <=handle_collect() | Peon同意或者拒绝PN。并中间可能分享已经commit的数据 | |
3 | handle_last() | Quorum中peon全部同意leader的PN,才算成功。这个函数会根据peon的信息以及自身的信息,要么重新propose uncommitted的提案,要么将某成员缺失的信息share出去,确保各个成员达成一致。 |
下面的内容,根据mon leader down还是Peon down,分开讨论
Peon down
Peon down的话,Leader会检测到。
首先有租约机制:
void Paxos::lease_ack_timeout()
{
dout(1) << "lease_ack_timeout -- calling new election" << dendl;
assert(mon->is_leader());
assert(is_active());
logger->inc(l_paxos_lease_ack_timeout);
lease_ack_timeout_event = 0;
mon->bootstrap();
}
其次,如果发送了OP_BEGIN,和peon因为down,无法回复OP_ACCEPT消息,会触发:
void Paxos::accept_timeout()
{
dout(1) << "accept timeout, calling fresh election" << dendl;
accept_timeout_event = 0;
assert(mon->is_leader());
assert(is_updating() || is_updating_previous() || is_writing() ||
is_writing_previous());
logger->inc(l_paxos_accept_timeout);
mon->bootstrap();
}
无论是哪一种情况,都会因bootstrap重新选举,选举结束后,原来的Leader仍然是Leader,这时候会调用collect函数。
这里我们分成 Peon Down 和Up两个阶段来讨论
Peon Down
注意,在collect函数中,会生成新的PN :
accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
accepted_pn_from = last_committed;
Peon Down 就意味着 Leader一直都完好如初,而重新选举之后,Leader节点不会发生变化。这意味着所有Peon的数据并不会比Leader更新。
- last_committed(leader) >= last_committed(peon)
- accepted_pn(leader) > accepted_pn(peon)
第二条之所以成立,是因为在collect函数中,Leader重新生成了新的PN,因此,leader的accepted_pn要大于所有的Peon的accepted_pn。
timeout事件是在time线程内完成,time线程干活的时候会获取monitor lock,那么可以推断,leader的paxos流程可能被中断的情况包括以下几个点:
- Leader处于active状态,未开始任何提案
- leader为updating状态,即begin函数已经执行,等待accept中,此时leader有uncommitted数据,并且可能已经有部分accept消息
- leader为writing状态,说明已经接收到所有accept消息,即commit_start已经开始执行,事务已经排队等待执行
- leader为writing状态,写操作已经执行完成,即事务已经生效,只是回调函数(commit_finish)还没有被执行(回调函数没被执行是因为需要获取monitor lock的锁)
3和4会发生是因为Leader的commit采取了异步的方式:
get_store()->queue_transaction(t, new C_Committed(this));
struct C_Committed : public Context {
Paxos *paxos;
explicit C_Committed(Paxos *p) : paxos(p) {}
void finish(int r) {
assert(r >= 0);
Mutex::Locker l(paxos->mon->lock);
paxos->commit_finish();
}
};
一旦commit_finish 开始执行,就意味着持有monitor lock(paxos->mon->lock。leader不会被中断在refresh状态,因为一旦commit_finish函数开始执行, 会将refresh状态执行完成,重新回到active状态,time线程才可能获取到锁执行。
第1种情况,不需要处理,并没有什么新的提案在行进中,无需理会。 第二种情况下,存在uncommitted数据,Leader会重新开始一个propose的过程。如何做到?
注意哈,下面的注释部分,仅仅考虑Peon Down情况下的第二种情况,即Leader已经发起begin,正在等待OP_ACCEPT消息,可能收到了部分OP_ACCEPT的情况。
void Paxos::collect(version_t oldpn)
{
// we're recoverying, it seems!
state = STATE_RECOVERING;
assert(mon->is_leader());
/*uncommitted_v uncommitted_pn以及uncommitted_value是个三元组
*collect也会搜集其他Peon的数据,因此此处为初始化*/
uncommitted_v = 0;
uncommitted_pn = 0;
uncommitted_value.clear();
peer_first_committed.clear();
peer_last_committed.clear();
/*注意哈,考虑第二种情况,Leader自己也有uncommitted数据,因此,本循环体是可以得到尚未commit的提案
* 包括上一轮的PN存放到uncommitted_pn,
* 上一轮的提案的Instance ID存放到 uncommitted_v,
* 以及上一轮提案的值存放入uncommitted_value*/
if (get_store()->exists(get_name(), last_committed+1)) {
version_t v = get_store()->get(get_name(), "pending_v");
version_t pn = get_store()->get(get_name(), "pending_pn");
if (v && pn && v == last_committed + 1) {
uncommitted_pn = pn;
} else {
dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
<< " and crossing our fingers" << dendl;
uncommitted_pn = accepted_pn;
}
uncommitted_v = last_committed+1;
get_store()->get(get_name(), last_committed+1, uncommitted_value);
assert(uncommitted_value.length());
dout(10) << "learned uncommitted " << (last_committed+1)
<< " pn " << uncommitted_pn
<< " (" << uncommitted_value.length() << " bytes) from myself"
<< dendl;
logger->inc(l_paxos_collect_uncommitted);
}
/*重新生成新的PN,这个PN一定*/
accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
accepted_pn_from = last_committed;
num_last = 1;
dout(10) << "collect with pn " << accepted_pn << dendl;
// send collect
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == mon->rank) continue;
/*向其他节点发送OP_COLLECT,搜集信息,来使集群恢复到一致的状态*/
MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
ceph_clock_now(g_ceph_context));
collect->last_committed = last_committed;
collect->first_committed = first_committed;
collect->pn = accepted_pn;
mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
}
// set timeout event
collect_timeout_event = new C_MonContext(mon, [this](int r) {
if (r == -ECANCELED)
return;
collect_timeout();
});
mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
g_conf->mon_lease,
collect_timeout_event);
}
注意对于这种情况下,其他Peon节点,其accepted_pn一定会小于新产生的PN,即OP_COLLECT消息体中的PN。我们来看其他PEON节点的反应:
void Paxos::handle_collect(MonOpRequestRef op)
{
op->mark_paxos_event("handle_collect");
MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
dout(10) << "handle_collect " << *collect << dendl;
assert(mon->is_peon()); // mon epoch filter should catch strays
// we're recoverying, it seems!
state = STATE_RECOVERING;
/*这不会发生,对于我们限定的这种场景*/
if (collect->first_committed > last_committed+1) {
dout(2) << __func__
<< " leader's lowest version is too high for our last committed"
<< " (theirs: " << collect->first_committed
<< "; ours: " << last_committed << ") -- bootstrap!" << dendl;
op->mark_paxos_event("need to bootstrap");
mon->bootstrap();
return;
}
/*回复OP_LAST消息,将自己的last_committed和first_committed放入消息体内*/
MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
ceph_clock_now(g_ceph_context));
last->last_committed = last_committed;
last->first_committed = first_committed;
version_t previous_pn = accepted_pn;
/*注意,collect->pn是选举之后,原来的leader新产生出来的,因此一定会比PEON的accepted_n大*/
if (collect->pn > accepted_pn) {
// ok, accept it
accepted_pn = collect->pn;
accepted_pn_from = collect->pn_from;
dout(10) << "accepting pn " << accepted_pn << " from "
<< accepted_pn_from << dendl;
MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
t->put(get_name(), "accepted_pn", accepted_pn);
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
logger->inc(l_paxos_collect);
logger->inc(l_paxos_collect_keys, t->get_keys());
logger->inc(l_paxos_collect_bytes, t->get_bytes());
utime_t start = ceph_clock_now(NULL);
get_store()->apply_transaction(t);
utime_t end = ceph_clock_now(NULL);
logger->tinc(l_paxos_collect_latency, end - start);
} else {
// don't accept!
dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
<< ", we already accepted " << accepted_pn
<< " from " << accepted_pn_from << dendl;
}
last->pn = accepted_pn;
last->pn_from = accepted_pn_from;
// share whatever committed values we have
if (collect->last_committed < last_committed)
share_state(last, collect->first_committed, collect->last_committed);
// do we have an accepted but uncommitted value?
// (it'll be at last_committed+1)
bufferlist bl;
/*注意,如果已经有Peon回复过OP_ACCEPT消息,那么此处就会走到*/
if (collect->last_committed <= last_committed &&
get_store()->exists(get_name(), last_committed+1)) {
get_store()->get(get_name(), last_committed+1, bl);
assert(bl.length() > 0);
dout(10) << " sharing our accepted but uncommitted value for "
<< last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
last->values[last_committed+1] = bl;
version_t v = get_store()->get(get_name(), "pending_v");
version_t pn = get_store()->get(get_name(), "pending_pn");
if (v && pn && v == last_committed + 1) {
last->uncommitted_pn = pn;
} else {
// previously we didn't record which pn a value was accepted
// under! use the pn value we just had... :(
dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
<< " and crossing our fingers" << dendl;
last->uncommitted_pn = previous_pn;
}
logger->inc(l_paxos_collect_uncommitted);
}
// send reply
collect->get_connection()->send_message(last);
}
我们以196 197 198集群为例,毫无疑问,196是monitor leader,在这种情况下,把197的mon 关闭,我们会看到:
196节点:
-------
2017-10-04 21:15:26.559490 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) begin for 1737443 25958 bytes
2017-10-04 21:15:26.559516 7f36cefe9700 30 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) begin transaction dump:
{ "ops": [
{ "op_num": 0,
"type": "PUT",
"prefix": "paxos",
"key": "1737443",
"length": 25958},
{ "op_num": 1,
"type": "PUT",
"prefix": "paxos",
"key": "pending_v",
"length": 8},
{ "op_num": 2,
"type": "PUT",
"prefix": "paxos",
"key": "pending_pn",
"length": 8}],
"num_keys": 3,
"num_bytes": 26015}
bl dump:
bl dump:
{ "ops": [
{ "op_num": 0,
"type": "PUT",
"prefix": "logm",
"key": "full_432632",
"length": 15884},
{ "op_num": 1,
"type": "PUT",
"prefix": "logm",
"key": "full_latest",
"length": 8},
{ "op_num": 2,
"type": "PUT",
"prefix": "logm",
"key": "432633",
"length": 9882},
{ "op_num": 3,
"type": "PUT",
"prefix": "logm",
"key": "last_committed",
"length": 8}],
"num_keys": 4,
"num_bytes": 25840}
2017-10-04 21:15:26.580022 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) sending begin to mon.1
2017-10-04 21:15:26.580110 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) sending begin to mon.2
2017-10-04 21:15:26.594622 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) handle_accept paxos(accept lc 1737442 fc 0 pn 1100 opn 0) v3
2017-10-04 21:15:26.594631 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating c 1736921..1737442) now 0,2 have accepted
2017-10-04 21:15:40.996887 7f36cefe9700 10 mon.oquew@0(electing) e3 win_election epoch 26 quorum 0,2 features 211106232532991
2017-10-04 21:15:40.996955 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) leader_init -- starting paxos recovery
2017-10-04 21:15:40.997144 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) learned uncommitted 1737443 pn 1100 (25958 bytes) from myself
2017-10-04 21:15:40.997172 7f36cefe9700 30 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) get_new_proposal_number transaction dump:
{ "ops": [
{ "op_num": 0,
"type": "PUT",
"prefix": "paxos",
"key": "last_pn",
"length": 8}],
"num_keys": 1,
"num_bytes": 20}
2017-10-04 21:15:41.000424 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) get_new_proposal_number = 1200
2017-10-04 21:15:41.000456 7f36cefe9700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) collect with pn 1200
198节点
---------
2017-10-04 21:15:41.042089 7f7c043e3700 10 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442) handle_collect paxos(collect lc 1737442 fc 1736921 pn 1200 opn 0) v3
2017-10-04 21:15:41.042094 7f7c043e3700 10 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442) accepting pn 1200 from 0
2017-10-04 21:15:41.042101 7f7c043e3700 30 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442) handle_collect transaction dump:
{ "ops": [
{ "op_num": 0,
"type": "PUT",
"prefix": "paxos",
"key": "accepted_pn",
"length": 8}],
"num_keys": 1,
"num_bytes": 24}
2017-10-04 21:15:41.046361 7f7c043e3700 10 mon.yvmjl@2(peon).paxos(paxos recovering c 1736921..1737442) sharing our accepted but uncommitted value for 1737443 (25958 bytes)
注意,1737443议案已经发起,并且收到了两个OP_ACCEPT,0和2,其中0是monitor leader本身,2是198发过来的OP_ACCEPT,1对应的mon是197的monitor,因为down所以迟迟收不到OP_ACCEPT。当196重新当选Leader之后,会发送OP_COLLECT消息到198,而198会接受新的PN 1200(之前是1100),但是它会在OP_LAST消息体中,告诉monitor leader,它曾经收到一份1737443号议案,它议案它已经accept,但是尚未committed。
那么monitor leader收到消息之后会怎样呢?
if (last->pn > accepted_pn) {
// no, try again.
dout(10) << " they had a higher pn than us, picking a new one." << dendl;
// cancel timeout event
mon->timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
collect(last->pn);
} else if (last->pn == accepted_pn) {
/*对于我们构造的这种场景,会走这个分支*/
// yes, they accepted our pn. great.
num_last++;
dout(10) << " they accepted our pn, we now have "
<< num_last << " peons" << dendl;
/*记录下收到的uncommitted三元组*/
if (last->uncommitted_pn) {
if (last->uncommitted_pn >= uncommitted_pn &&
last->last_committed >= last_committed &&
last->last_committed + 1 >= uncommitted_v) {
uncommitted_v = last->last_committed+1;
uncommitted_pn = last->uncommitted_pn;
uncommitted_value = last->values[uncommitted_v];
dout(10) << "we learned an uncommitted value for " << uncommitted_v
<< " pn " << uncommitted_pn
<< " " << uncommitted_value.length() << " bytes"
<< dendl;
} else {
dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
<< " pn " << last->uncommitted_pn
<< " " << last->values[last->last_committed+1].length() << " bytes"
<< dendl;
}
}
/*如果已经搜集齐了所有的Peon的消息*/
if (num_last == mon->get_quorum().size()) {
// cancel timeout event
mon->timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
peer_first_committed.clear();
peer_last_committed.clear();
// almost...
/*如果发现uncommitted等于last_committed+1*/
if (uncommitted_v == last_committed+1 &&
uncommitted_value.length()) {
dout(10) << "that's everyone. begin on old learned value" << dendl;
/*注意后面两句,对于我们说的场景2,leader会把未完成的提案,再次begin,即重新发起一次,确保完成,
*不过状态是STATE_UPDATING_PREVIOUS,即完成上一轮的情况*/
state = STATE_UPDATING_PREVIOUS;
begin(uncommitted_value);
} else {
// active!
dout(10) << "that's everyone. active!" << dendl;
extend_lease();
need_refresh = false;
if (do_refresh()) {
finish_round();
}
}
}
} else {
// no, this is an old message, discard
dout(10) << "old pn, ignoring" << dendl;
}
注意哈,无论是否存在某个Peon已经回复了OP_ACCEPT,这个未完成的提案都会通过begin函数,再次发起。
- 如果一个OP_ACCEPT都没有收到,那么Monitor Leader自己已经记录了uncommitted三元组,不需要通过Peon来学习到这个提案
- 如果收到了某个OP_ACCEPT信息,那么该Peon在OP_LAST消息体中自然会告诉monitor leader uncommitted 三元组
无论哪种方法,monitor leader在 handle_last函数中都会执行 begin函数,完成上一轮未完成的提案。
2017-10-04 21:15:41.038753 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) handle_last paxos(last lc 1737442 fc 1736921 pn 1200 opn 1100) v3
2017-10-04 21:15:41.038759 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) store_state nothing to commit
2017-10-04 21:15:41.038824 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) they accepted our pn, we now have 2 peons
2017-10-04 21:15:41.038835 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) we learned an uncommitted value for 1737443 pn 1100 25958 bytes
2017-10-04 21:15:41.038843 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1736921..1737442) that's everyone. begin on old learned value
2017-10-04 21:15:41.038848 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating-previous c 1736921..1737442) begin for 1737443 25958 bytes
2017-10-04 21:15:41.038868 7f36ce7e8700 30 mon.oquew@0(leader).paxos(paxos updating-previous c 1736921..1737442) begin transaction dump:
{ "ops": [
{ "op_num": 0,
"type": "PUT",
"prefix": "paxos",
"key": "1737443",
"length": 25958},
{ "op_num": 1,
"type": "PUT",
"prefix": "paxos",
"key": "pending_v",
"length": 8},
{ "op_num": 2,
"type": "PUT",
"prefix": "paxos",
"key": "pending_pn",
"length": 8}],
"num_keys": 3,
"num_bytes": 26015}
bl dump:
{ "ops": [
{ "op_num": 0,
"type": "PUT",
"prefix": "logm",
"key": "full_432632",
"length": 15884},
{ "op_num": 1,
"type": "PUT",
"prefix": "logm",
"key": "full_latest",
"length": 8},
{ "op_num": 2,
"type": "PUT",
"prefix": "logm",
"key": "432633",
"length": 9882},
{ "op_num": 3,
"type": "PUT",
"prefix": "logm",
"key": "last_committed",
"length": 8}],
"num_keys": 4,
"num_bytes": 25840}
2017-10-04 21:15:41.057345 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos updating-previous c 1736921..1737442) sending begin to mon.2
花了很长的篇幅,终于介绍完了当Peon down的时候的第二种情形。下面我们来考虑第三和第四种情况。
3. leader为writing状态,说明已经接收到所有accept消息,即commit_start已经开始执行,事务已经排队等待执行
4. leader为writing状态,写操作已经执行完成,即事务已经生效,只是回调函数(commit_finish)还没有被执行(回调函数没被执行是因为需要获取 monitor lock的锁)
注意,第3和4种情况会等待已经在writing状态的数据commit完成后,才会重新选举:
void Monitor::wait_for_paxos_write()
{
if (paxos->is_writing() || paxos->is_writing_previous()) {
dout(10) << __func__ << " flushing pending write" << dendl;
lock.Unlock();
store->flush();
lock.Lock();
dout(10) << __func__ << " flushed pending write" << dendl;
}
}
void Monitor::bootstrap()
{
dout(10) << "bootstrap" << dendl;
wait_for_paxos_write();
...
}
void Monitor::start_election()
{
dout(10) << "start_election" << dendl;
wait_for_paxos_write();
...
}
对于第三种和第四种情况,Paxos应该处于writing或者writing_previous状态,这种情况下,会执行store->flush,在选举之前,确保已经处于writing状态的数据commit完成,然后开始选举。
对于其他的Peon,无论是否commit,Leader都已经完成了commit,在handle_last阶段:
for (map<int,version_t>::iterator p = peer_last_committed.begin();
p != peer_last_committed.end();
++p) {
if (p->second + 1 < first_committed && first_committed > 1) {
dout(5) << __func__
<< " peon " << p->first
<< " last_committed (" << p->second
<< ") is too low for our first_committed (" << first_committed
<< ") -- bootstrap!" << dendl;
op->mark_paxos_event("need to bootstrap");
mon->bootstrap();
return;
}
/*对于第三第四种情况,mon leader可以将peon缺失的部分share给Peon,让Peon commit这些缺失的部分*/
if (p->second < last_committed) {
// share committed values
dout(10) << " sending commit to mon." << p->first << dendl;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
MMonPaxos::OP_COMMIT,
ceph_clock_now(g_ceph_context));
share_state(commit, peer_first_committed[p->first], p->second);
mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
}
}
我们来看下第三第四种情况下的log。其中197是down的Peon,198是正常的Peon,但是没来得及commit,这时候,Leader会发现198缺失1743405这个commit,会通过share_state函数,将缺失部分塞入消息体,发给198,即mon.2
2017-10-04 22:05:44.680463 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) handle_last paxos(last lc 1743404 fc 1742694 pn 1300 opn 0) v3
2017-10-04 22:05:44.680481 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) store_state nothing to commit
/*197*/
2017-10-04 22:05:44.680556 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) sending commit to mon.2
2017-10-04 22:05:44.680568 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) share_state peer has fc 1742694 lc 1743404
2017-10-04 22:05:44.680639 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) sharing 1743405 (133 bytes)
2017-10-04 22:05:44.680730 7f36ce7e8700 10 mon.oquew@0(leader).paxos(paxos recovering c 1742694..1743405) they accepted our pn, we now have 2 peons
Peon up
上面四种情况,讲述的是Peon down之后的4种可能性。当Down的Peon重新Up会发生什么事情呢?
因为Peon down了很长时间,它的很多信息都落后,因此启动的时候,会有sync的过程。这个过程并不是通过 collect–>handle_collect—>handle_last 完成的信息同步,而是在Peon启动的时候,调用sync_start函数,发起数据同步,进入STATE_SYNCHRONIZING状态。这部分内容不打算在此处展开。
数据sync完毕之后,调用sync_finish函数,在该函数中会再次bootstrap,会触发选举,当然,还是原来的leader会获胜。
Leader Down
Leader 可能会死在Paxos任意函数的任何地方,这时候,新的选举中,会从Peon中选择rank最小的Peon当新的Leader。和之前一样,我们来考虑,Leader down 和Leader Up这两件事情发生之后,集群如何恢复到一致。
Down
peon在lease超时后会重新选举,peon可能中断在active或updating状态,peon之间的状态并不是一样的,可能一些在active,一些在updating:
- leader down在active状态,不需要特殊处理
- leader down在updating状态,如果没有peon已经accept,不需要特殊处理,如果有peon已经accept,新的leader要么自己已经accept,要么会从其他peon学习到,会重新propose
- leader down在writing状态,说明所有peon已经accept,新的leader会重新propose已经accept的值(此时down的leader可能已经写成功,也可能没有写成功)
- leader down在refresh状态,down的leader已经写成功,如果有peon已经收到commit消息,新的commit会被新的leader在collect阶段学习到,如果没有peon收到commit消息,会重新propose
对于情况2中,如果有些peon已经accept,那么在handle_collect函数,该peon就会讲这些uncommitted三元组发给新的Leader,或者新的Leader自己就曾经accept,自己从自身也能获得uncommmited三元组,这时候就会掉用 begin重新propose。
/*记录下收到的uncommitted三元组*/
if (last->uncommitted_pn) {
if (last->uncommitted_pn >= uncommitted_pn &&
last->last_committed >= last_committed &&
last->last_committed + 1 >= uncommitted_v) {
uncommitted_v = last->last_committed+1;
uncommitted_pn = last->uncommitted_pn;
uncommitted_value = last->values[uncommitted_v];
dout(10) << "we learned an uncommitted value for " << uncommitted_v
<< " pn " << uncommitted_pn
<< " " << uncommitted_value.length() << " bytes"
<< dendl;
} else {
dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
<< " pn " << last->uncommitted_pn
<< " " << last->values[last->last_committed+1].length() << " bytes"
<< dendl;
}
}
/*如果已经搜集齐了所有的Peon的消息*/
if (num_last == mon->get_quorum().size()) {
// cancel timeout event
mon->timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
peer_first_committed.clear();
peer_last_committed.clear();
// almost...
/*如果发现uncommitted等于last_committed+1*/
if (uncommitted_v == last_committed+1 &&
uncommitted_value.length()) {
dout(10) << "that's everyone. begin on old learned value" << dendl;
/*注意后面两句,对于我们说的场景2,leader会把未完成的提案,再次begin,即重新发起一次,确保完成,
*不过状态是STATE_UPDATING_PREVIOUS,即完成上一轮的情况*/
state = STATE_UPDATING_PREVIOUS;
begin(uncommitted_value);
}
....
对于情况3,和情况2一样,会通过如下代码,重新propose:
if (uncommitted_v == last_committed+1 &&
uncommitted_value.length()) {
dout(10) << "that's everyone. begin on old learned value" << dendl;
state = STATE_UPDATING_PREVIOUS;
begin(uncommitted_value);
}
情况4 稍稍复杂一点,因为不确定是否有peon 执行过commit,如果没有peon执行过commit,和情况2 3一样,重新propose,但是如果曾经commit过,新的leader会通过collect函数学习到来自某peon的commit,同时将其他peon缺失的部分通过share_state分享给其他peon。
UP
leader重新up后,可能probing阶段就会做一次sync,此时数据可能会同步一部分,再一次被选举成leader,collect阶段会同步差异的几个版本数据, 同时,如果peon有uncommitted的数据,也会同步给leader,由新的leader重新propose。
唯一需要注意的是,leader down的时候存在的uncommitted的数据,由上面的情况可知,如果有peon已经接受,数据会被重新propose, 重新up后,根据pending_v,由于版本较低,pending数据会被抛弃。如果leader已经commit过,peon也一定会commit,所以不会导致数据不一致。
因为上一种情况,已经详细地分析了代码了,对于Leader down 的这种情况,我们就不全面展开了。
尾声
注意,本文大量的参考第一篇参考文献,我基本是按图索骥,我无意抄袭前辈的文章,只是前辈水平太高,很多东西高屋建瓴,语焉不详,对于初学者而言,可能不能领会其含义,本文做了一些展开,将某些内容和代码以及日志输出对应,帮助初学者更好地理解。
另外,参考文献2也是非常不错的文章,但是如果不分析可能发生的异常,这个Phase 1往往会知其然,不知其所以然,将代码读成了流水账。