ceph 写流程(1)

| 分类 ceph-internal  | 标签 ceph 

前言

前面花了两篇博客的篇幅介绍了读流程。写流程和读流程相比,有大量的流程走向是公用的,我将这些公共的流程总结如下:

对于读流程而言,相对比较简单,这也是我们先介绍读流程的原因。先设置一个小目标,达成之后,对总体的大目标也有好处,相当于将大任务分解成了几个小任务。

从 execute_ctx 开始,读写流程开始严重的分叉,下图是读流程的流程图

写流程的三个侧面

写流程之所以比读流程复杂,原因在于读流程只需要去Primary OSD读取响应的Object即可,而写流程牵扯到多个OSD。下图来自Ceph的官方文档:

写流程之所以比读流程复杂源于多个方面,

  • 牵扯多个OSD的写入,如何确保多副本之间一致性 (PGLog)
  • 对于单个OSD的写入,如何确保最终的一致性 (Journal and FileStore)
  • 多个副本所在的OSD,如果状态不是active + clean

多种因素造成了写流程的异常复杂。本人功力有限,所以先从主干流程介绍起。这个写流程,打算至少从三个侧重点分别介绍:

  • 第一个篇侧重在Primary OSD和Secondary OSD的交互流程,即Primary 如何将写相关的任务发送给Secondary OSD,Secondary OSD又如何发送自己的完成进度给Primary OSD, Primary OSD收到相关的进度,又采取了什么样的行动,以及Primary如何给Client发送响应

  • 第二篇文章侧重于数据部分,即各个OSD 收到写入的请求之后,从filestore层面做了哪些的事情,在不同的完成阶段,会做哪些事情

  • 第三篇文章会侧重于PGLog,为了确保各个副本的一致性,出了写入数据,事务操作也会纪录PGLog,一旦某个出现异常,可以根据PGLog的信息,确定哪个OSD的数据是最可靠的,从发起数据的同步。

因为写入的流程异常的复杂,因此,介绍A侧面的时候,尽量不涉及B和C侧面,否则所有细节纠缠在一起,就会将设计思想淹没在无数的细节之中,这并不利于我们理解写入流程。

准备事务(prepare_transaction)

ceph的读和写显著的不同在于读基本上只需要从Primary OSD中读取(offset,length)指定部分的内容即可,不牵扯到多个OSD之间的交互,而且读并没有对存储作出改变。 而写则不然,首先,ceph支持多副本,也支持纠删码(本系列暂不考虑纠删码),写入本身就牵扯到多个OSD之间的互动。其次,正常情况自不必说,但是多个副本之间的写入可能会在某个副本出现问题,副本之间需要能够确定哪个副本的数据已经正确写入,哪个副本的数据还未写入完毕,这就加剧了ceph写入的复杂程度。

本文只介绍Primary 和Secondary之间的消息交互,作为整个写入过程的整体框架。

对于写入而言,execute_ctx函数中第一个比较重要的函数是 prepare_transaction。这个函数顾名思义,就是用来准备transaction事务的。但是我们前面一起研读过read 的代码,prepare_transaction有点言不由衷,事实上,prepare_transaction有点挂羊头卖狗肉,该函数中的do_osd_ops函数直接将完成了读操作的核心步骤,并非做什么准备工作。

但是对于写入而言,该函数不再挂羊头卖狗肉,正而八经地做准备工作。我们开始学习该函数了哪些事情.

和之前一样,execute_ctx调用了prepare_tranasction,而prepare_transaction调用了do_osd_ops。我们来看do_osd_ops做了哪些事情:

瞿天善绘制了一张图,详细介绍了对于write流程,do_osd_ops做的事情:

其核心代码在此:


case CEPH_OSD_OP_WRITE:
      ++ctx->num_write;      

     { 
          ....
		  if (seq && (seq > op.extent.truncate_seq) &&
		            (op.extent.offset + op.extent.length > oi.size)) {
			  // old write, arrived after trimtrunc
			  op.extent.length = (op.extent.offset > oi.size ? 0 : oi.size - op.extent.offset);
			  dout(10) << " old truncate_seq " << op.extent.truncate_seq << " < current " << seq
				   << ", adjusting write length to " << op.extent.length << dendl;
			  bufferlist t;
			  t.substr_of(osd_op.indata, 0, op.extent.length);
			  osd_op.indata.swap(t);
		  }
			if (op.extent.truncate_seq > seq) {
			  // write arrives before trimtrunc
			  if (obs.exists && !oi.is_whiteout()) {
			    dout(10) << " truncate_seq " << op.extent.truncate_seq << " > current " << seq
				     << ", truncating to " << op.extent.truncate_size << dendl;
			    t->truncate(soid, op.extent.truncate_size);
			    oi.truncate_seq = op.extent.truncate_seq;
			    oi.truncate_size = op.extent.truncate_size;
			    if (op.extent.truncate_size != oi.size) {
			      ctx->delta_stats.num_bytes -= oi.size;
			      ctx->delta_stats.num_bytes += op.extent.truncate_size;
			      oi.size = op.extent.truncate_size;
			    }
			  } else {
			    dout(10) << " truncate_seq " << op.extent.truncate_seq << " > current " << seq
				     << ", but object is new" << dendl;
			    oi.truncate_seq = op.extent.truncate_seq;
			    oi.truncate_size = op.extent.truncate_size;
			  }
			}
			result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size);
			if (result < 0)
			  break;
			if (pool.info.require_rollback()) {
			  t->append(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
			} else {
			  t->write(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
			}
		
			maybe_create_new_object(ctx);
			if (op.extent.offset == 0 && op.extent.length >= oi.size)
			  obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
			else if (op.extent.offset == oi.size && obs.oi.is_data_digest())
			  obs.oi.set_data_digest(osd_op.indata.crc32c(obs.oi.data_digest));
			else
			  obs.oi.clear_data_digest();
			write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
						    op.extent.offset, op.extent.length, true);

  }
  break;

对于多副本的写入,该函数的眼在:

     t->write(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);  

是的,do_osd_ops的核心功能就是瞿天善那张非常详细图的中心位置的子图,产生出ObjectStore::Transaction

我们一起阅读os/ObjectStore.h中的写操作:

   /**
     * Write data to an offset within an object. If the object is too
     * small, it is expanded as needed.  It is possible to specify an
     * offset beyond the current end of an object and it will be
     * expanded as needed. Simple implementations of ObjectStore will
     * just zero the data between the old end of the object and the
     * newly provided data. More sophisticated implementations of
     * ObjectStore will omit the untouched data and store it as a
     * "hole" in the file.
     */
    void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len,
	       const bufferlist& write_data, uint32_t flags = 0) {
      uint32_t orig_len = data_bl.length();
      Op* _op = _get_next_op();
      _op->op = OP_WRITE;
      _op->cid = _get_coll_id(cid);
      _op->oid = _get_object_id(oid);
      _op->off = off;
      _op->len = len;
      ::encode(write_data, data_bl);

      assert(len == write_data.length());
      data.fadvise_flags = data.fadvise_flags | flags;
      if (write_data.length() > data.largest_data_len) {
        	data.largest_data_len = write_data.length();
	        data.largest_data_off = off;
	        data.largest_data_off_in_data_bl = orig_len + sizeof(__u32);  // we are about to
      }
      data.ops++;
    }

这一部分是很容易理解的,就是说把操作码设置为OP_WRITE,记录好要写入的object和coll,将offset 和length设置正确,同时将要写入的data纪录下来,后续ObjectStore部分(更具体地说是filestore),就可以根据上述信息,完成写入底层对象存储的动作。

上述内容仅仅是一个部分,之前也提过,除了data,还有PGLog,这部分内容是为了纪录各个副本之间的写入情况,预防异常发生。prepare_transaction函数的最后,会调用finish_ctx函数,finish_ctx函数里就会调用ctx->log.push_back就会构造pg_log_entry_t插入到vector log里。

PGLog后续会有专门文章介绍,我们按下不表,包括ReplicatedBackend::submit_transaction里调用parent->log_operation将PGLog序列化到transaction里,这些内容我们都不会在本文介绍。

消息流动

有这么一张图,可以粗略地介绍Primary OSD和Replica OSD之间的消息流动,本文的重点也是介绍这个:

从上图可以看出,client只会和Primary OSD之间有消息交互,至于其它副本的数据写入,要靠Primary OSD发送请求通知它们,当然当Replica OSD写完之后,需要发送消息告知Primary OSD, Primary OSD汇总各个OSD的写入情况,在合适的时机给Client发送响应信息。

注意,无论是Primary OSD还是 Replica OSD,写入都分成2个阶段,第一阶段是写入Journal,写入Journal成功,第一阶段的任务就算完成了,当Replica OSD如果完成了第一阶段的任务,写入了Journal,就会给Primary OSD发送第一个消息,表示第一阶段的任务完成了。如果Primary同时收到了所有的OSD的消息,确认所有OSD上的第一阶段任务完成,就会给Client回复第一个消息,表示写入已经完成。

注意,第一阶段是写入Journal,因此,Journal如果是SSD这种比较快速的设备,会极大地改善写入请求的处理速度。

我们从上图中也可以看出每一个Replica OSD给Primary OSD发送了2个消息,其中第二个消息,对应的是第二阶段任务的完成。当Journal中的数据向OSD的data partition写入成功后,第二阶段任务就算完成了,Replica OSD就会给Primary OSD发送第二个消息。当Primary OSD搜集齐了所有OSD都完成了的消息之后,就会确认整体第二阶段的任务完成,就会给Client消息,通知数据可读。

很粗略,基本是简单地介绍了消息流动图,但是,我们是研究ceph internal的,这么粗略是不能原谅的。很多细节都隐藏在这些笼统的描述中。比如,Primary在什么时机将写入的消息发送给Replica OSD,消息类型是什么;又比如,完成第一阶段和第二阶段过程之后,Replica OSD分别给Primary OSD发送了什么消息,而Primary OSD又是如何处理这些消息的;再比如Primary OSD 如何判断 所有的OSD是否都完成了第一阶段的任务?

注意,上面笼统的描述中,介绍了当Primary OSD发现所有的OSD都完成了第一阶段任务,则发送消息给client,告知client写入完成,第二阶段的任务亦然,这表明,Primary OSD必须有数据结构能够纪录下各个OSD的完成情况,这个数据结构是什么呢?

这个数据结构就是in_progress_ops !

在issue_repop函数中,会调用

  Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
  Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
    ctx->obc,
    ctx->clone_obc,
    unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef());
  pgbackend->submit_transaction(
    soid,
    ctx->at_version,
    std::move(ctx->op_t),
    pg_trim_to,
    min_last_complete_ondisk,
    ctx->log,
    ctx->updated_hset_history,
    onapplied_sync,
    on_all_applied,
    on_all_commit,
    repop->rep_tid,
    ctx->reqid,
    ctx->op);

其中submit_transaction函数中,会将与该PG操作对应的OSD都纪录在册:

void ReplicatedBackend::submit_transaction(
  const hobject_t &soid,
  const eversion_t &at_version,
  PGTransactionUPtr &&_t,
  const eversion_t &trim_to,
  const eversion_t &trim_rollback_to,
  const vector<pg_log_entry_t> &log_entries,
  boost::optional<pg_hit_set_history_t> &hset_history,
  Context *on_local_applied_sync,
  Context *on_all_acked,
  Context *on_all_commit,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  OpRequestRef orig_op)
{

  ...
  InProgressOp &op = in_progress_ops.insert(
    make_pair(
      tid,
      InProgressOp(
	     tid, on_all_commit, on_all_acked,
	     orig_op, at_version)
      )
    ).first->second;

  op.waiting_for_applied.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());
  op.waiting_for_commit.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());
    
    ...
}

注意,InProgressOP维护有两个集合,waiting_for_commit这个集合存放的是尚未完成第一阶段任务(即写入Journal)的所有OSD,waiting_for_applied这个集合存放的是尚未完成第二阶段任务(即从Journal写入OSD的data partition或Disk)的所有OSD。

  struct InProgressOp {
    ceph_tid_t tid;
    set<pg_shard_t> waiting_for_commit;  /*尚未完成第一阶段任务的OSD的集合*/
    set<pg_shard_t> waiting_for_applied; /*尚未完成第二阶段任务的OSD的集合*/
    Context *on_commit;
    Context *on_applied;
    OpRequestRef op;
    eversion_t v;
    InProgressOp(
      ceph_tid_t tid, Context *on_commit, Context *on_applied,
      OpRequestRef op, eversion_t v)
      : tid(tid), on_commit(on_commit), on_applied(on_applied),
	op(op), v(v) {}
    bool done() const {
      return waiting_for_commit.empty() &&
	waiting_for_applied.empty();
    }
  };

很明显,当Replica OSD或者Primary OSD完成第一阶段或者第二阶段任务的时候,都必然会通知到Primary OSD,更新这两个集合中的元素: 如何更新? 这就牵扯到了很重要的回调机制。

写操作有三个贯穿始终的回调,这个回调函数会层层传递,当何时的时机到达的时候,就会执行相关的回调函数。回调在ceph中非常普遍,这个东西就有点像诸葛亮给赵云的三个锦囊,到了合适的时间点,就立刻启动回调函数。

我们介绍回调函数之前,我们先把Primary OSD 和Replica OSD之间的消息交互捋顺。

在ReplicatedBackend::submit_transaction中,准备好了InProgressOP之后,紧接着就会通过调用issue_op函数,在该函数里,会向Replica OSD发送消息:


  issue_op(
    soid,
    at_version,
    tid,
    reqid,
    trim_to,
    trim_rollback_to,
    t->get_temp_added().empty() ? hobject_t() : *(t->get_temp_added().begin()),
    t->get_temp_cleared().empty() ?
      hobject_t() : *(t->get_temp_cleared().begin()),
    log_entries,
    hset_history,
    &op,
    op_t);
void ReplicatedBackend::issue_op(
  const hobject_t &soid,
  const eversion_t &at_version,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  eversion_t pg_trim_to,
  eversion_t pg_trim_rollback_to,
  hobject_t new_temp_oid,
  hobject_t discard_temp_oid,
  const vector<pg_log_entry_t> &log_entries,
  boost::optional<pg_hit_set_history_t> &hset_hist,
  InProgressOp *op,
  ObjectStore::Transaction &op_t)
{

  if (parent->get_actingbackfill_shards().size() > 1) {
    ostringstream ss;
    set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
    replicas.erase(parent->whoami_shard());
    ss << "waiting for subops from " << replicas;
    if (op->op)
      op->op->mark_sub_op_sent(ss.str());
  }
  for (set<pg_shard_t>::const_iterator i =
	 parent->get_actingbackfill_shards().begin();
       i != parent->get_actingbackfill_shards().end();
       ++i) {
    if (*i == parent->whoami_shard()) continue;
    pg_shard_t peer = *i;
    const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;

    Message *wr;
    wr = generate_subop(
      soid,
      at_version,
      tid,
      reqid,
      pg_trim_to,
      pg_trim_rollback_to,
      new_temp_oid,
      discard_temp_oid,
      log_entries,
      hset_hist,
      op_t,
      peer,
      pinfo);

    get_parent()->send_message_osd_cluster(
      peer.osd, wr, get_osdmap()->get_epoch());
  }
}

我们看到了,在循环体中,会遍历所有的Replica OSD,向对应的OSD发送消息,而消息体的组装,是在generate_subop函数中,我们进入该函数,看下发送的到底是哪种类型的消息:

Message * ReplicatedBackend::generate_subop(
  const hobject_t &soid,
  const eversion_t &at_version,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  eversion_t pg_trim_to,
  eversion_t pg_trim_rollback_to,
  hobject_t new_temp_oid,
  hobject_t discard_temp_oid,
  const vector<pg_log_entry_t> &log_entries,
  boost::optional<pg_hit_set_history_t> &hset_hist,
  ObjectStore::Transaction &op_t,
  pg_shard_t peer,
  const pg_info_t &pinfo)
{
  int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
  // forward the write/update/whatever
  MOSDRepOp *wr = new MOSDRepOp(
    reqid, parent->whoami_shard(),
    spg_t(get_info().pgid.pgid, peer.shard),
    soid, acks_wanted,
    get_osdmap()->get_epoch(),
    tid, at_version);

  // ship resulting transaction, log entries, and pg_stats
  if (!parent->should_send_op(peer, soid)) {
    dout(10) << "issue_repop shipping empty opt to osd." << peer
	     <<", object " << soid
	     << " beyond MAX(last_backfill_started "
	     << ", pinfo.last_backfill "
	     << pinfo.last_backfill << ")" << dendl;
    ObjectStore::Transaction t;
    ::encode(t, wr->get_data());
  } else {
    ::encode(op_t, wr->get_data());
    wr->get_header().data_off = op_t.get_data_alignment();
  }

  ::encode(log_entries, wr->logbl);

  if (pinfo.is_incomplete())
    wr->pg_stats = pinfo.stats;  // reflects backfill progress
  else
    wr->pg_stats = get_info().stats;

  wr->pg_trim_to = pg_trim_to;
  wr->pg_trim_rollback_to = pg_trim_rollback_to;

  wr->new_temp_oid = new_temp_oid;
  wr->discard_temp_oid = discard_temp_oid;
  wr->updated_hit_set_history = hset_hist;
  return wr;
}

注意发送的消息MOSDRepOp,其中Message的type字段为 MSG_OSD_REPOP。

  MOSDRepOp(osd_reqid_t r, pg_shard_t from,
	    spg_t p, const hobject_t& po, int aw,
	    epoch_t mape, ceph_tid_t rtid, eversion_t v)
    : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
      map_epoch(mape),
      reqid(r),
      pgid(p),
      final_decode_needed(false),
      from(from),
      poid(po),
      acks_wanted(aw),
      version(v) {
    set_tid(rtid);
  }
值得注意的是acks_wanted字段为CEPH_OSD_FLAG_ACK CEPH_OSD_FLAG_ONDISK,这就意味着消息接收方的Replica OSD需要给发送方的Primary OSD回复2个消息。先不扯这么远,先看Replica OSD 收到消息之后,如何处理。

Primary发送了这种消息之后,Replica OSD会和本文的前两张图一样,进入到队列,然后从osd.op_wq中取出消息进行处理。当走到do_request函数之后,并没有机会执行do_op,或者do_sub_op之类的函数,而是被handle_message函数拦截了:

  if (pgbackend->handle_message(op))
    return;
bool ReplicatedBackend::handle_message(
  OpRequestRef op
  )
{
  dout(10) << __func__ << ": " << op << dendl;
  switch (op->get_req()->get_type()) {
  ...

  case MSG_OSD_REPOP: {
    sub_op_modify(op);
    return true;
  }
}

下面我们看下,Replica OSD收到信息之后,在sub_op_modify函数执行了什么操作:

// sub op modify
void ReplicatedBackend::sub_op_modify(OpRequestRef op)
{
  MOSDRepOp *m = static_cast<MOSDRepOp *>(op->get_req());
  m->finish_decode();
  int msg_type = m->get_type();
  assert(MSG_OSD_REPOP == msg_type);

  const hobject_t& soid = m->poid;

  dout(10) << "sub_op_modify trans"
           << " " << soid
           << " v " << m->version
	   << (m->logbl.length() ? " (transaction)" : " (parallel exec")
	   << " " << m->logbl.length()
	   << dendl;

  // sanity checks
  assert(m->map_epoch >= get_info().history.same_interval_since);

  // we better not be missing this.
  assert(!parent->get_log().get_missing().is_missing(soid));

  int ackerosd = m->get_source().num();

  op->mark_started();

  RepModifyRef rm(std::make_shared<RepModify>());
  rm->op = op;
  rm->ackerosd = ackerosd;
  rm->last_complete = get_info().last_complete;
  rm->epoch_started = get_osdmap()->get_epoch();

  assert(m->logbl.length());
  // shipped transaction and log entries
  vector<pg_log_entry_t> log;

  bufferlist::iterator p = m->get_data().begin();
  ::decode(rm->opt, p);

  if (m->new_temp_oid != hobject_t()) {
    dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
    add_temp_obj(m->new_temp_oid);
  }
  if (m->discard_temp_oid != hobject_t()) {
    dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
    if (rm->opt.empty()) {
      dout(10) << __func__ << ": removing object " << m->discard_temp_oid
	       << " since we won't get the transaction" << dendl;
      rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
    }
    clear_temp_obj(m->discard_temp_oid);
  }

  p = m->logbl.begin();
  ::decode(log, p);
  rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);

  bool update_snaps = false;
  if (!rm->opt.empty()) {
    // If the opt is non-empty, we infer we are before
    // last_backfill (according to the primary, not our
    // not-quite-accurate value), and should update the
    // collections now.  Otherwise, we do it later on push.
    update_snaps = true;
  }
  parent->update_stats(m->pg_stats);
  parent->log_operation(
    log,
    m->updated_hit_set_history,
    m->pg_trim_to,
    m->pg_trim_rollback_to,
    update_snaps,
    rm->localt);

  rm->opt.register_on_commit(
    parent->bless_context(
      new C_OSD_RepModifyCommit(this, rm)));
  rm->localt.register_on_applied(
    parent->bless_context(
      new C_OSD_RepModifyApply(this, rm)));
  vector<ObjectStore::Transaction> tls;
  tls.reserve(2);
  tls.push_back(std::move(rm->localt));
  tls.push_back(std::move(rm->opt));
  parent->queue_transactions(tls, op);
  // op is cleaned up by oncommit/onapply when both are executed
}

这段代码非常有意思,如果有心的同志,可以注意到Primary OSD 和这一段代码对应的部分:即

void ReplicatedBackend::submit_transaction(
  const hobject_t &soid,
  const eversion_t &at_version,
  PGTransactionUPtr &&_t,
  const eversion_t &trim_to,
  const eversion_t &trim_rollback_to,
  const vector<pg_log_entry_t> &log_entries,
  boost::optional<pg_hit_set_history_t> &hset_history,
  Context *on_local_applied_sync,
  Context *on_all_acked,
  Context *on_all_commit,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  OpRequestRef orig_op)
{

  ...
  
  if (!(t->get_temp_added().empty())) {
    add_temp_objs(t->get_temp_added());
  }
  clear_temp_objs(t->get_temp_cleared());

  parent->log_operation(
    log_entries,
    hset_history,
    trim_to,
    trim_rollback_to,
    true,
    op_t);
  
  op_t.register_on_applied_sync(on_local_applied_sync);
  op_t.register_on_applied(
    parent->bless_context(
      new C_OSD_OnOpApplied(this, &op)));
  op_t.register_on_commit(
    parent->bless_context(
      new C_OSD_OnOpCommit(this, &op)));

  vector<ObjectStore::Transaction> tls;
  tls.push_back(std::move(op_t));

  parent->queue_transactions(tls, op.op);
}

代码非常的像对不对,原因是很简单的,即Primary OSD 和Replica OSD 本身要执行的操作,原本是一样的,只不过存在Primary OSD肩负着和Client通信的责任,而Replica 并没有这种责任,但是Replica需要及时向Primary OSD汇报进度。

如何上报进度?

注意Primary执行的ReplicatedBackend::submit_transaction函数和Replica OSD执行的ReplicatedBackend::sub_op_modify 函数都有一段很有意思的回调注册部分:


Primary OSD在submit_transaction函数中:
-----------------------
  op_t.register_on_applied(
    parent->bless_context(
      new C_OSD_OnOpApplied(this, &op)));
  op_t.register_on_commit(
    parent->bless_context(
      new C_OSD_OnOpCommit(this, &op)));
      
Replica OSD 在 sub_op_modify函数中:
-------------------------
  rm->opt.register_on_commit(
    parent->bless_context(
      new C_OSD_RepModifyCommit(this, rm)));
  rm->localt.register_on_applied(
    parent->bless_context(
      new C_OSD_RepModifyApply(this, rm)));

ceph的代码中,存在大量的回调函数,回调机制是一个非常重要的机制。事先注册,待到何时的时间点,触发执行回调函数,就如同诸葛亮给赵云的三个锦囊。对于写入来讲,每个OSD无论是Primary还是Replica,都有两个关键的milestone

  • commit
  • apply

对于第一个milestone表示已经写入到了osd的journal,此时表示已经完成commit,第二个milestone曾为applied,表示已经写入OSD的data partition。完成任何一个milestone,都要执行相关的回调。

我们本文不会介绍写入Journal和写入disk 部分的代码流程,因为这是下一篇博客的使命,我们只介绍,当Replica OSD完成了每一个milestone,会做哪些事情:


struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
  ReplicatedBackend *pg;
  RepModifyRef rm;
  C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
    : pg(pg), rm(r) {}
  void finish(int r) {
    pg->sub_op_modify_applied(rm);
  }
};

struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
  ReplicatedBackend *pg;
  RepModifyRef rm;
  C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
    : pg(pg), rm(r) {}
  void finish(int r) {
    pg->sub_op_modify_commit(rm);
  }
};


这两个回调,其实很明显,就是给Primary OSD发送消息,通知Primary OSD 对应的milestone已经完成。

void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
{
  rm->op->mark_commit_sent();
  rm->committed = true;

  // send commit.
  dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
	   << ", sending commit to osd." << rm->ackerosd
	   << dendl;

  assert(get_osdmap()->is_up(rm->ackerosd));
  get_parent()->update_last_complete_ondisk(rm->last_complete);

  Message *m = rm->op->get_req();
  Message *commit = NULL;
  if (m->get_type() == MSG_OSD_SUBOP) {
     ...
  } else if (m->get_type() == MSG_OSD_REPOP) {
  
    //给Primary OSD回消息
    MOSDRepOpReply *reply = new MOSDRepOpReply(
      static_cast<MOSDRepOp*>(m),
      get_parent()->whoami_shard(),
      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
    reply->set_last_complete_ondisk(rm->last_complete);
    commit = reply;
  }
  else {
    assert(0);
  }

  commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
  get_parent()->send_message_osd_cluster(
    rm->ackerosd, commit, get_osdmap()->get_epoch());

  log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
}


void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
{
  rm->op->mark_event("sub_op_applied");
  rm->applied = true;

  dout(10) << "sub_op_modify_applied on " << rm << " op "
	   << *rm->op->get_req() << dendl;
  Message *m = rm->op->get_req();

  Message *ack = NULL;
  eversion_t version;

  if (m->get_type() == MSG_OSD_SUBOP) {
    ...
  } else if (m->get_type() == MSG_OSD_REPOP) {
    MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
    version = req->version;
    if (!rm->committed)
      ack = new MOSDRepOpReply(
	            static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
	            0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
  } else {
    assert(0);
  }

  // send ack to acker only if we haven't sent a commit already
  if (ack) {
    ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
    get_parent()->send_message_osd_cluster(
      rm->ackerosd, ack, get_osdmap()->get_epoch());
  }

  parent->op_applied(version);
}

Replica OSD在每个MileStone给Primary OSD发送的消息类型都是一样的,都是MSG_OSD_REPOPREPLY

  MOSDRepOpReply(
    MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
    Message(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
    map_epoch(e),
    reqid(req->reqid),
    from(from),
    pgid(req->pgid.pgid, req->from.shard),
    ack_type(at),
    result(result_),
    final_decode_needed(false) {
    set_tid(req->get_tid());
  }

区别在于ack_type不同,一种是CEPH_OSD_FLAG_ONDISK,另一种是CEPH_OSD_FLAG_ACK。

Replica OSD 向Primary OSD发送了 MSG_OSD_REPOPREPLY 消息,处理流程和本文的前两张图一样,不同之处是do_request函数中的hanle_message会处理这种类型的消息:


bool ReplicatedBackend::handle_message(
  OpRequestRef op
  )
{
  case MSG_OSD_REPOPREPLY: {
    sub_op_modify_reply(op);
    return true;
  }
  
}
void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
{
  MOSDRepOpReply *r = static_cast<MOSDRepOpReply *>(op->get_req());
  r->finish_decode();
  assert(r->get_header().type == MSG_OSD_REPOPREPLY);

  op->mark_started();

  // must be replication.
  ceph_tid_t rep_tid = r->get_tid();
  pg_shard_t from = r->from;

  if (in_progress_ops.count(rep_tid)) {
    map<ceph_tid_t, InProgressOp>::iterator iter =
      in_progress_ops.find(rep_tid);
    InProgressOp &ip_op = iter->second;
    MOSDOp *m = NULL;
    if (ip_op.op)
      m = static_cast<MOSDOp *>(ip_op.op->get_req());

    if (m)
      dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
	      << " ack_type " << (int)r->ack_type
	      << " from " << from
	      << dendl;
    else
      dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
	      << " ack_type " << (int)r->ack_type
	      << " from " << from
	      << dendl;

    // oh, good.

    if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
      assert(ip_op.waiting_for_commit.count(from));
      ip_op.waiting_for_commit.erase(from);
      if (ip_op.op) {
        ostringstream ss;
        ss << "sub_op_commit_rec from " << from;
        ip_op.op->mark_event(ss.str());
      }
    } else {
      assert(ip_op.waiting_for_applied.count(from));
      if (ip_op.op) {
        ostringstream ss;
        ss << "sub_op_applied_rec from " << from;
        ip_op.op->mark_event(ss.str());
      }
    }
    ip_op.waiting_for_applied.erase(from);

    parent->update_peer_last_complete_ondisk(
      from,
      r->get_last_complete_ondisk());

    if (ip_op.waiting_for_applied.empty() &&
        ip_op.on_applied) {
      ip_op.on_applied->complete(0);
      ip_op.on_applied = 0;
    }
    if (ip_op.waiting_for_commit.empty() &&
        ip_op.on_commit) {
      ip_op.on_commit->complete(0);
      ip_op.on_commit= 0;
    }
    if (ip_op.done()) {
      assert(!ip_op.on_commit && !ip_op.on_applied);
      in_progress_ops.erase(iter);
    }
  }
}

关于这个交互,后面介绍完FileStore部分之后,会再次梳理这部分逻辑,可以比较Primary OSD和 Replica OSD在处理上区别。


上一篇     下一篇