前言
上篇介绍了从读请求到达开始讲起,一直讲到了进入消息队列。以及负责处理读请求的消息队列和线程池的渊源由来。本文重点focus在从队列中取出读请求,后续如何处理该请求。
流程图
上图是从运行队列开始取出消息,一直到FileStore从磁盘中读到文件的内容的过程。箭头指向并非下一个工序的一次,而是A调用了B的含义。
很多函数很复杂,并非仅仅处理读,比如ReplicatedPG::execute_ctx不仅仅处理读,也处理写以及其他的请求。函数流程异常复杂。我们从读入手的目的,是熟悉相关的调用流程,为后面更复杂的写入,以及更复杂的异常流程做好铺垫,
相关的debug log
我们打开debug log,写入一个4M的文件file_01,从另外一个存储节点读取它
root@BEAN-2:/var/share/ezfs/shareroot/NAS# cephfs file_01 map
WARNING: This tool is deprecated. Use the layout.* xattrs to query and modify layouts.
FILE OFFSET OBJECT OFFSET LENGTH OSD
0 100000003eb.00000000 0 4194304 2
root@BEAN-2:/var/share/ezfs/shareroot/NAS#
root@BEAN-2:/var/share/ezfs/shareroot/NAS#
root@BEAN-2:/var/share/ezfs/shareroot/NAS# ceph osd map data 100000003eb.00000000
osdmap e49 pool 'data' (2) object '100000003eb.00000000' -> pg 2.afe74fa0 (2.3a0) -> up ([2,0], p2) acting ([2,0], p2)
我们通过dd命令读取该文件:
root@BEAN-0:/var/share/ezfs/shareroot/NAS# dd if=file_01 of=/dev/null bs=1M
4+0 records in
4+0 records out
4194304 bytes (4.2 MB) copied, 0.0335059 s, 125 MB/s
很明显osd.2是Primary OSD,因此,读取会从osd.2中读取。osd.2的debug log如下:
log 是giant版本的ceph
2016-09-19 18:02:42.338838 7f9aae9fd700 15 osd.2 49 enqueue_op 0x7f9aa796ae00 prio 127 cost 0 latency 0.000105 osd_op(client.19941.1:20 100000003eb.00000000 [read 2097152~2097152 [1@-1]] 2.afe74fa0 read e49) v4
2016-09-19 18:02:42.338879 7f9ac17f8700 10 osd.2 49 dequeue_op 0x7f9aa796ae00 prio 127 cost 0 latency 0.000146 osd_op(client.19941.1:20 100000003eb.00000000 [read 2097152~2097152 [1@-1]] 2.afe74fa0 read e49) v4 pg pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean]
2016-09-19 18:02:42.338899 7f9ac17f8700 20 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] op_has_sufficient_caps pool=2 (data ) owner=0 need_read_cap=1 need_write_cap=0 need_class_read_cap=0 need_class_write_cap=0 -> yes
2016-09-19 18:02:42.338912 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] handle_message: 0x7f9aa796ae00
2016-09-19 18:02:42.338920 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] do_op osd_op(client.19941.1:20 100000003eb.00000000 [read 2097152~2097152 [1@-1]] 2.afe74fa0 read e49) v4 may_read -> read-ordered flags read
2016-09-19 18:02:42.338943 7f9ac17f8700 15 filestore(/data/osd.2) getattr 2.3a0_head/afe74fa0/100000003eb.00000000/head//2 '_'
2016-09-19 18:02:42.338970 7f9ac17f8700 10 filestore(/data/osd.2) getattr 2.3a0_head/afe74fa0/100000003eb.00000000/head//2 '_' = 247
2016-09-19 18:02:42.338983 7f9ac17f8700 15 filestore(/data/osd.2) getattr 2.3a0_head/afe74fa0/100000003eb.00000000/head//2 'snapset'
2016-09-19 18:02:42.338991 7f9ac17f8700 10 filestore(/data/osd.2) getattr 2.3a0_head/afe74fa0/100000003eb.00000000/head//2 'snapset' = 31
2016-09-19 18:02:42.338998 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] populate_obc_watchers afe74fa0/100000003eb.00000000/head//2
2016-09-19 18:02:42.339006 7f9ac17f8700 20 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] ReplicatedPG::check_blacklisted_obc_watchers for obc afe74fa0/100000003eb.00000000/head//2
2016-09-19 18:02:42.339013 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] get_object_context: creating obc from disk: 0x7f9ad3533500
2016-09-19 18:02:42.339021 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] get_object_context: 0x7f9ad3533500 afe74fa0/100000003eb.00000000/head//2 rwstate(none n=0 w=0) oi: afe74fa0/100000003eb.00000000/head//2(49'1 client.31094.1:9 wrlock_by=unknown.0.0:0 dirty s 4194304 uv1) ssc: 0x7f9ab23e4e00 snapset: 1=[]:[]+head
2016-09-19 18:02:42.339033 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] find_object_context afe74fa0/100000003eb.00000000/head//2 @head oi=afe74fa0/100000003eb.00000000/head//2(49'1 client.31094.1:9 wrlock_by=unknown.0.0:0 dirty s 4194304 uv1)
2016-09-19 18:02:42.339052 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] execute_ctx 0x7f9ad34cd000
2016-09-19 18:02:42.339061 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] do_op afe74fa0/100000003eb.00000000/head//2 [read 2097152~2097152 [1@-1]] ov 49'1
2016-09-19 18:02:42.339069 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] taking ondisk_read_lock
2016-09-19 18:02:42.339077 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] do_osd_op afe74fa0/100000003eb.00000000/head//2 [read 2097152~2097152 [1@-1]]
2016-09-19 18:02:42.339084 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] do_osd_op read 2097152~2097152 [1@-1]
2016-09-19 18:02:42.339092 7f9ac17f8700 15 filestore(/data/osd.2) read 2.3a0_head/afe74fa0/100000003eb.00000000/head//2 2097152~2097152
2016-09-19 18:02:42.339590 7f9ac17f8700 10 filestore(/data/osd.2) FileStore::read 2.3a0_head/afe74fa0/100000003eb.00000000/head//2 2097152~2097152/2097152
2016-09-19 18:02:42.339597 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] read got 2097152 / 2097152 bytes from obj afe74fa0/100000003eb.00000000/head//2
2016-09-19 18:02:42.339611 7f9ac17f8700 15 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] do_osd_op_effects on session 0x7f9aa7811500
2016-09-19 18:02:42.339619 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] dropping ondisk_read_lock
2016-09-19 18:02:42.339629 7f9ac17f8700 15 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] log_op_stats osd_op(client.19941.1:20 100000003eb.00000000 [read 2097152~2097152] 2.afe74fa0 read e49) v4 inb 0 outb 2097152 rlat 0.000000 lat 0.000895
2016-09-19 18:02:42.339659 7f9ac17f8700 15 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] publish_stats_to_osd 49:35
2016-09-19 18:02:42.339671 7f9ac17f8700 15 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47 n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] requeue_ops
2016-09-19 18:02:42.339688 7f9ac17f8700 10 osd.2 49 dequeue_op 0x7f9aa796ae00 finish
我们可以通过上面的debug打印,跟踪读流程的全部过程。
代码分析
deuque_op fucntion
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
void OSD::dequeue_op(
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle)
{
utime_t now = ceph_clock_now(cct);
op->set_dequeued_time(now);
utime_t latency = now - op->get_req()->get_recv_stamp();
dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
<< " cost " << op->get_req()->get_cost()
<< " latency " << latency
<< " " << *(op->get_req())
<< " pg " << *pg << dendl;
// share our map with sender, if they're old
if (op->send_map_update) {
Message *m = op->get_req();
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
epoch_t last_sent_epoch;
if (session) {
session->sent_epoch_lock.lock();
last_sent_epoch = session->last_sent_epoch;
session->sent_epoch_lock.unlock();
}
service.share_map(
m->get_source(),
m->get_connection().get(),
op->sent_epoch,
osdmap,
session ? &last_sent_epoch : NULL);
if (session) {
session->sent_epoch_lock.lock();
if (session->last_sent_epoch < last_sent_epoch) {
session->last_sent_epoch = last_sent_epoch;
}
session->sent_epoch_lock.unlock();
session->put();
}
}
if (pg->deleting)
return;
op->mark_reached_pg();
pg->do_request(op, handle);
// finish
dout(10) << "dequeue_op " << op << " finish" << dendl;
}
其中op->mark_reached_pg 表示,对于该op的处理已经到了reach_pg的阶段。
void mark_reached_pg() {
mark_flag_point(flag_reached_pg, "reached_pg");
}
我们dump_ops_in_flight 可以看到当前的OP进行到了哪一步:
注意例子和read没关系,只是为了展示reched_pg 节点。
root@Storage2:~# ceph daemon osd.7 dump_ops_in_flight
{ "num_ops": 1,
"ops": [
{ "description": "osd_op(client.2130451838.0:899198714 rbd_data.2686620486def23.0000000000011595 [sparse-read 4034048~16384] 13.2f7f3fd e34077)",
"rmw_flags": 2,
"received_at": "2016-08-03 10:06:13.399398",
"age": "235.772246",
"duration": "0.000113",
"flag_point": "reached pg",
"client_info": { "client": "client.2130451838",
"tid": 899198714},
"events": [
{ "time": "2016-08-03 10:06:13.399452",
"event": "waiting_for_osdmap"},
{ "time": "2016-08-03 10:06:13.399511",
"event": "reached_pg"}]}]}
do_request function
void ReplicatedPG::do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle)
{
assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op));
if (can_discard_request(op)) {
return;
}
if (flushes_in_progress > 0) {
dout(20) << flushes_in_progress
<< " flushes_in_progress pending "
<< "waiting for active on " << op << dendl;
waiting_for_peered.push_back(op);
op->mark_delayed("waiting for peered");
return;
}
if (!is_peered()) {
// Delay unless PGBackend says it's ok
if (pgbackend->can_handle_while_inactive(op)) {
bool handled = pgbackend->handle_message(op);
assert(handled);
return;
} else {
waiting_for_peered.push_back(op);
op->mark_delayed("waiting for peered");
return;
}
}
assert(is_peered() && flushes_in_progress == 0);
if (pgbackend->handle_message(op))
return;
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
if (!is_active()) {
dout(20) << " peered, not active, waiting for active on " << op << dendl;
waiting_for_active.push_back(op)
op->mark_delayed("waiting for active");
return;
}
if (is_replay()) {
dout(20) << " replay, waiting for active on " << op << dendl;
waiting_for_active.push_back(op);
op->mark_delayed("waiting for replay end");
return;
}
// verify client features
if ((pool.info.has_tiers() || pool.info.is_tier()) &&
!op->has_feature(CEPH_FEATURE_OSD_CACHEPOOL)) {
osd->reply_op_error(op, -EOPNOTSUPP);
return;
}
do_op(op); // do it now
break;
...
}
}
该函数是一个消息处理的总控,根据收到的消息的类型不同,调用不同的函数处理。对于read请求,调用为do_op。
do_op function
这个函数非常重要,非常的长,也非常的复杂,原因在于无论是读还是写,无论是cephfs还是rbd,是否又快照,集群状态是否健康,种种因素汇聚于此,导致该函数非常难以理解。
本文不打算每一行代码都详细的展开,展开的话会导致本文异常繁复,事实上我的功力也到不了这个层次,我们就抓主要的正常的流程中的读流程。
void ReplicatedPG::do_op(OpRequestRef& op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);
/*消息解码*/
m->finish_decode();
m->clear_payload();
if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
// not implemented.
osd->reply_op_error(op, -EINVAL);
return;
}
if (op->rmw_flags == 0) {
/*此处会根据op的类型打上相应的标记,对于我们是读,只会打上*/
int r = osd->osd->init_op_flags(op);
if (r) {
osd->reply_op_error(op, r);
return;
}
}
对于我们普通的读而言,只会打上读标志位
void OpRequest::set_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_READ); }
正常情况下,读操作只从Primary OSD读取信息,但是如果是读操作,并且设置CEPH_OSD_FLAG_BALANCE_READS或者CEPH_OSD_FLAG_LOCALIZE_READS标志位,那么Primary OSD或者Replica OSD都可以承担读请求。当然了既不是Primary OSD也不是Replica OSD,那么毫无疑问,请求发错了地方。相关代码如下:
if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
CEPH_OSD_FLAG_LOCALIZE_READS)) &&
op->may_read() &&
!(op->may_write() || op->may_cache())) {
// balanced reads; any replica will do
if (!(is_primary() || is_replica())) {
osd->handle_misdirected_op(this, op);
return;
}
} else {
// normal case; must be primary
if (!is_primary()) {
osd->handle_misdirected_op(this, op);
return;
}
}
接下来判断op中是否includes_pg_op操作。调用pg_op_must_wait检查该操作是否需要等待,如果需要等待,加入waiting_for_all_missing队列,如果不需要等待,调用do_pg_op处理pg相关的操作。
if (op->includes_pg_op()) {
if (pg_op_must_wait(m)) {
wait_for_all_missing(op);
return;
}
return do_pg_op(op);
}
接下来是op_has_sufficient_caps检查是否有足够权限
if (!op_has_sufficient_caps(op)) {
osd->reply_op_error(op, -EPERM);
return;
}
接下来根据请求,构建head对象,判断对象是否合法
hobject_t head(m->get_oid(), m->get_object_locator().key,
CEPH_NOSNAP, m->get_pg().ps(),
info.pgid.pool(), m->get_object_locator().nspace);
// object name too long?
if (m->get_oid().name.size() > g_conf->osd_max_object_name_len) {
dout(4) << "do_op name is longer than "
<< g_conf->osd_max_object_name_len
<< " bytes" << dendl;
osd->reply_op_error(op, -ENAMETOOLONG);
return;
}
if (m->get_object_locator().key.size() > g_conf->osd_max_object_name_len) {
dout(4) << "do_op locator is longer than "
<< g_conf->osd_max_object_name_len
<< " bytes" << dendl;
osd->reply_op_error(op, -ENAMETOOLONG);
return;
}
if (m->get_object_locator().nspace.size() >
g_conf->osd_max_object_namespace_len) {
dout(4) << "do_op namespace is longer than "
<< g_conf->osd_max_object_namespace_len
<< " bytes" << dendl;
osd->reply_op_error(op, -ENAMETOOLONG);
return;
}
if (int r = osd->store->validate_hobject_key(head)) {
dout(4) << "do_op object " << head << " invalid for backing store: "
<< r << dendl;
osd->reply_op_error(op, r);
return;
}
// blacklisted?
if (get_osdmap()->is_blacklisted(m->get_source_addr())) {
dout(10) << "do_op " << m->get_source_addr() << " is blacklisted" << dendl;
osd->reply_op_error(op, -EBLACKLISTED);
return;
}
首先对象的名字长度,ceph是有限制的,超过长度限制,会返回ENAMETOOLONG,后面有一连串的检查。
也会检查客户端是否在黑名单之后,如果是的话,也会拒绝服务,返回EBLACKLISTED。
接下来的检查时集群是否full,这种检查只有在写请求的时候才会检查,和读无关,如下所示:
// order this op as a write?
bool write_ordered =
op->may_write() ||
op->may_cache() ||
m->has_flag(CEPH_OSD_FLAG_RWORDERED);
// discard due to cluster full transition? (we discard any op that
// originates before the cluster or pool is marked full; the client
// will resend after the full flag is removed or if they expect the
// op to succeed despite being full). The except is FULL_FORCE ops,
// which there is no reason to discard because they bypass all full
// checks anyway.
// If this op isn't write or read-ordered, we skip
// FIXME: we exclude mds writes for now.
if (write_ordered && !( m->get_source().is_mds() || m->has_flag(CEPH_OSD_FLAG_FULL_FORCE)) &&
info.history.last_epoch_marked_full > m->get_map_epoch()) {
dout(10) << __func__ << " discarding op sent before full " << m << " "
<< *m << dendl;
return;
}
if (!(m->get_source().is_mds()) && osd->check_failsafe_full() && write_ordered) {
dout(10) << __func__ << " fail-safe full check failed, dropping request"
<< dendl;
return;
}
常规的检查之后,下面的一条debug级别的打印宣告了do_op要做的事情,
dout(10) << "do_op " << *m
<< (op->may_write() ? " may_write" : "")
<< (op->may_read() ? " may_read" : "")
<< (op->may_cache() ? " may_cache" : "")
<< " -> " << (write_ordered ? "write-ordered" : "read-ordered")
<< " flags " << ceph_osd_flag_string(m->get_flags())
<< dendl;
对于我们这个例子而言,就是如下打印:
2016-09-19 18:02:42.338920 7f9ac17f8700 10 osd.2 pg_epoch: 49 pg[2.3a0( v 49'1 (0'0,49'1] local-les=47
n=1 ec=10 les/c 47/47 46/46/42) [2,0] r=0 lpr=46 crt=0'0 lcod 0'0 mlcod 0'0 active+clean] do_op osd_op
(client.19941.1:20 100000003eb.00000000 [read 2097152~2097152 [1@-1]] 2.afe74fa0 read e49) v4 may_read
-> read-ordered flags read
检查是否是missing object:
// missing object?
if (is_unreadable_object(head)) {
wait_for_unreadable_object(head, op);
return;
}
检查snapdir对象,是否missing等状态,因为此处我们研究最简单的情形,不考虑snap,因此可以略过不读:
// missing snapdir?
hobject_t snapdir = head.get_snapdir();
if (is_unreadable_object(snapdir)) {
wait_for_unreadable_object(snapdir, op);
return;
}
构建oid对象,这才是真正要操作的对象:
hobject_t missing_oid;
hobject_t oid(m->get_oid(),
m->get_object_locator().key,
m->get_snapid(),
m->get_pg().ps(),
m->get_object_locator().get_pool(),
m->get_object_locator().nspace);
接下来的内容是调用函数find_object_context 获取object_context。此处非常关键,是do_op函数中做实事的函数之一。
注意,ObjectContext,是对象的上下文信息,其作用有点类似于内核中的struct file这种上下文信息。对于同一个对象,可能有读操作,有写操作,这些操作必须要互斥,否则的话,可能发生不一致。比如,如果正在进行写操作,那么,对对象的读取操作就必须先阻塞住。反之亦然。
因此,ObjectContext是必不可少的,如果内存中尚没有对应对象的上下文信息ObjectContext,需要从磁盘上加载相关的信息。但是读写操作的时候,一定要有ObjectContext。ceph将对象的上下文信息ObjectContext以LRU的算法维护,维护在内存中。如果内存中没有找到,就需要去磁盘上通过对象的扩展属性生成上下文信息。
这两个属性为:
#define OI_ATTR "_"
#define SS_ATTR "snapset"
从要操作的对象可以看到,其确实存在如下两个扩展属性:
root@BEAN-2:/data/osd.2/current/2.3a0_head# ll
total 4244
drwxr-xr-x 2 root root 4096 Sep 19 16:09 ./
drwxr-xr-x 4646 root root 135168 Sep 19 14:58 ../
-rw-r--r-- 1 root root 4194304 Sep 19 16:09 100000003eb.00000000__head_AFE74FA0__2
-rw-r--r-- 1 root root 0 Sep 19 14:58 __head_000003A0__2
root@BEAN-2:/data/osd.2/current/2.3a0_head# man getfattr
root@BEAN-2:/data/osd.2/current/2.3a0_head# man getfattr
root@BEAN-2:/data/osd.2/current/2.3a0_head# getfattr -d 100000003eb.00000000__head_AFE74FA0__2
# file: 100000003eb.00000000__head_AFE74FA0__2
user.ceph._=0sDgjxAAAABAM1AAAAAAAAABQAAAAxMDAwMDAwMDNlYi4wMDAwMDAwMP7/////////oE/nrwAAAAAAAgAAAAAAAAAGAxwAAAACAAAAAAAAAP////8AAAAAAAAAAP//////////AAAAAAEAAAAAAAAAMQAAAAAAAAAAAAAAAAAAAAICFQAAAAh2eQAAAAAAAAkAAAAAAAAAAQAAAAAAQAAAAAAAPZ3fVxtaICECAhUAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAQAAABCnd9XELeZIQ==
user.ceph.snapset=0sAgIZAAAAAQAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAA==
user.cephos.spill_out=0sMAA=
find_object_context 函数,通过user.ceph._ 属性,获得到object_info_t类型的oi信息,通过snapset获得ssc信息,这些信息的具体含义此处略过.
有了obc之后,接下来又是一通检查,在此处略过,直接进入创建opContext和执行execute_ctx:
op->mark_started();
ctx->src_obc.swap(src_obc);
execute_ctx(ctx);
utime_t prepare_latency = ceph_clock_now(cct);
prepare_latency -= op->get_dequeued_time();
osd->logger->tinc(l_osd_op_prepare_lat, prepare_latency);
if (op->may_read() && op->may_write()) {
osd->logger->tinc(l_osd_op_rw_prepare_lat, prepare_latency);
} else if (op->may_read()) {
osd->logger->tinc(l_osd_op_r_prepare_lat, prepare_latency);
} else if (op->may_write() || op->may_cache()) {
osd->logger->tinc(l_osd_op_w_prepare_lat, prepare_latency);
}
这个execute_ctx是非常重要的函数,是整个处理流程中正常流程的台柱子,前面是一些检查,和准备素材,到了execute_ctx开始进入深水区了。 这个函数和do_op一样,因为负责的东西太多,因此比较繁复,我们一样只分析read相关的流程
execute_ctx function
对于读流程而言,相对于写要简单很多:
if (op->may_read()) {
dout(10) << " taking ondisk_read_lock" << dendl;
obc->ondisk_read_lock();
}
for (map<hobject_t,ObjectContextRef, hobject_t::BitwiseComparator>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
dout(10) << " taking ondisk_read_lock for src " << p->first << dendl;
p->second->ondisk_read_lock();
}
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = ctx->op->get_reqid();
#endif
tracepoint(osd, prepare_tx_enter, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
int result = prepare_transaction(ctx);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = ctx->op->get_reqid();
#endif
tracepoint(osd, prepare_tx_exit, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
/**/
if (op->may_read()) {
dout(10) << " dropping ondisk_read_lock" << dendl;
obc->ondisk_read_unlock();
}
for (map<hobject_t,ObjectContextRef, hobject_t::BitwiseComparator>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
dout(10) << " dropping ondisk_read_lock for src " << p->first << dendl;
p->second->ondisk_read_unlock();
}
if (result == -EINPROGRESS) {
// come back later.
return;
}
if (result == -EAGAIN) {
// clean up after the ctx
close_op_ctx(ctx);
return;
}
bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
// prepare the reply
ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0,
successful_write);
if (successful_write) {
// write. normalize the result code.
dout(20) << " zeroing write result code " << result << dendl;
result = 0;
}
ctx->reply->set_result(result);
// read or error?
if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
// finish side-effects
if (result >= 0)
do_osd_op_effects(ctx, m->get_connection());
if (ctx->pending_async_reads.empty()) {
complete_read_ctx(result, ctx);
} else {
in_progress_async_reads.push_back(make_pair(op, ctx));
ctx->start_async_reads(this);
}
return;
}
主要干了4件事,加锁,prepare_transaction,解锁,回应。
其中加锁就是对对象上下文ObjectContext加读锁。 其中毫无疑问,prepare_transaction承担了核心任务。
注意这个函数绝不像我此处描述的这么简单,尤其是对于后面的写流程来说,控制流非常复杂,后面的 issue_repop,以及eval_repop都是重要的角色,只不过读流程不牵扯该部分的代码,绝不能低估该函数的复杂程度。。
prepare_transaction
该函数将主要的工作委托给了do_osd_ops函数:
int ReplicatedPG::prepare_transaction(OpContext *ctx)
{
assert(!ctx->ops.empty());
const hobject_t& soid = ctx->obs->oi.soid;
// valid snap context?
if (!ctx->snapc.is_valid()) {
dout(10) << " invalid snapc " << ctx->snapc << dendl;
return -EINVAL;
}
// prepare the actual mutation
int result = do_osd_ops(ctx, ctx->ops);
if (result < 0) {
if (ctx->op->may_write() &&
get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
// need to save the error code in the pg log, to detect dup ops,
// but do nothing else
ctx->update_log_only = true;
}
return result;
}
// read-op? done?
if (ctx->op_t->empty() && !ctx->modify) {
unstable_stats.add(ctx->delta_stats);
return result;
}