前言
网络层的代码确实太长,所以不得不分成多篇文章来讲。
上一篇博客,Accepter,Connection,Pipe,PipeConnection,都亮相了。还有最重要的部分,即到底如何和发送消息,如何处理消息,以及如何回应。这篇文章会介绍client端和server端会话的建立。
Pipe的connect和accept
对于client端:
conn = messenger->get_connection(dest_server);
如果需要创建新的Pipe,会调用connect_rank函数,如下所示:
Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
int type,
PipeConnection *con,
Message *first)
{
assert(lock.is_locked());
assert(addr != my_inst.addr);
ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
// 创建新的Pipe,并且将Pipe的状态设置为STATE_CONNECTING,这很重要
Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING,
static_cast<PipeConnection*>(con));
pipe->pipe_lock.Lock();
pipe->set_peer_type(type);
pipe->set_peer_addr(addr);
pipe->policy = get_policy(type);
/*启动Pipe的写线程*/
pipe->start_writer();
if (first)
pipe->_send(first);
pipe->pipe_lock.Unlock();
pipe->register_pipe();
pipes.insert(pipe);
return pipe;
}
void Pipe::start_writer()
{
assert(pipe_lock.is_locked());
assert(!writer_running);
writer_running = true;
writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
}
client端会创建Pipe的写进程,写进程的主函数是Pipe::writer , 而此时Pipe处于Pipe::STATE_CONNECTING状态,
void Pipe::writer()
{
pipe_lock.Lock();
while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
ldout(msgr->cct,10) << "writer: state = " << get_state_name()
<< " policy.server=" << policy.server << dendl;
// standby?
if (is_queued() && state == STATE_STANDBY && !policy.server)
state = STATE_CONNECTING;
// connect?
if (state == STATE_CONNECTING) {
assert(!policy.server);
connect();
continue;
}
...
}
当Pipe处于Pipe::STATE_CONNECTING状态,writer函数会调用Pipe::connect函数,该函数负责与服务器建立连接,真正意义上的通信通道,我们不妨看看该函数:
int Pipe::connect()
{
bool got_bad_auth = false;
ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
assert(pipe_lock.is_locked());
__u32 cseq = connect_seq;
__u32 gseq = msgr->get_global_seq();
// stop reader thrad
join_reader();
pipe_lock.Unlock();
char tag = -1;
int rc = -1;
struct msghdr msg;
struct iovec msgvec[2];
int msglen;
char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy
entity_addr_t paddr;
entity_addr_t peer_addr_for_me, socket_addr;
AuthAuthorizer *authorizer = NULL;
bufferlist addrbl, myaddrbl;
const md_config_t *conf = msgr->cct->_conf;
// close old socket. this is safe because we stopped the reader thread above.
if (sd >= 0)
::close(sd);
// create socket?
sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
if (sd < 0) {
rc = -errno;
lderr(msgr->cct) << "connect couldn't created socket " << cpp_strerror(rc) << dendl;
goto fail;
}
recv_reset();
set_socket_options();
/*发起连接,事实上在Server端Accepter类的对应的工作线程正阻塞在accept系统调用上,等待client端的连接*/
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
if (rc < 0) {
rc = -errno;
ldout(msgr->cct,2) << "connect error " << peer_addr
<< ", " << cpp_strerror(rc) << dendl;
goto fail;
}
client端通过Pipe::connect函数,会真正地调用connect系统调用,尝试连接服务器端的监听地址;在通信的另一端,服务器端的Accepter线程正阻塞在accept系统调用上,等待client调用connect系统调用来连,一旦服务器端的accept函数返回,Accepter中的线程就会调用add_accept_pipe函数来创建一个新的Pipe,全权负责和client的通信。创建出来Pipe之后,Accepter类的主线程不敢恋战,而是继续下一个循环,即调用accept系统调用,等待新的连接。如下所示:
void *Accepter::entry()
{
....
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
if (sd >= 0) {
int r = set_close_on_exec(sd);
if (r) {
ldout(msgr->cct,0) << "accepter set_close_on_exec() failed "
<< cpp_strerror(r) << dendl;
}
errors = 0;
ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl;
msgr->add_accept_pipe(sd);
} else {
ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
if (++errors > 4)
break;
}
....
}
而SimpleMessenger的add_accept_pipe函数也是创建一个Pipe对象,同时启动reader_thread:
Pipe *SimpleMessenger::add_accept_pipe(int sd)
{
lock.Lock();
/*创建Pipe,接管与Client端的通信事宜,以便让Accepter线程即使返回,继续调用accept,等待新连接*/
Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
p->sd = sd;
p->pipe_lock.Lock();
/*启动Pipe的读线程,由于新的Pipe初始状态是ACCEPTING,因此会读线程会调用Pipe::accept函数,和client端建立会话*/
p->start_reader();
p->pipe_lock.Unlock();
pipes.insert(p);
accepting_pipes.insert(p);
lock.Unlock();
return p;
}
注意,新创建的Pipe处于Pipe::STATE_ACCEPTING,Pipe的读线程的主函数是Pipe::reader,在该函数中,如果Pipe状态是STATE_ACCEPTING,会调用accept函数和client进行通信,创建会话。如下所示
void Pipe::reader()
{
pipe_lock.Lock();
if (state == STATE_ACCEPTING) {
accept();
assert(pipe_lock.is_locked());
}
当Pipe处于STATE_ACCEPTING状态时,读线程会执行accept函数。在通信的初始化阶段,Pipe::connect和Pipe::accept是一对,他俩互相协商,互相通信,建立连接关系。
接下来client端和server,交换了哪些信息呢?我们阅读Pipe::connect和Pipe::accept不难得到下图(图来自麦子迈的解析Ceph: 网络层的处理):
client端的代码如下:
// verify banner
// FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
/*接收server端Pipe::accept函数中发送过来的CEPH_BANNER*/
rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
if (rc < 0) {
ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
goto fail;
}
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
goto fail;
}
/*发送自己的地址信息给Server端*/
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = banner;
msgvec[0].iov_len = strlen(CEPH_BANNER);
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
rc = do_sendmsg(&msg, msglen);
if (rc < 0) {
ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
goto fail;
}
...
server端的代码我就不贴了.
上图给出了一个会话建立的过程,BANNER有点是建立连接的接头暗号一般,如同天王盖地虎,宝塔镇河妖的一位,BANNER是常量。
/*
* tcp connection banner. include a protocol version. and adjust
* whenever the wire protocol changes. try to keep this string length
* constant.
*/
#define CEPH_BANNER "ceph v027"
通过BANNER暗号之后,互相向对方通告自己的地址。这一部分逻辑并无特别,就不展开了。
最重要的逻辑是connection message的沟通和交互。很明显Pipe::connect和Pipe::accept函数下半段有一大段很难懂的代码。这段代码的用途在于,服务器端会校验这些连接信息并确保面向这个地址的连接只有一条。
client端首先会发送一个ceph_msg_connect的结构体,
/*
* connection negotiation
*/
struct ceph_msg_connect {
__le64 features; /* supported feature bits */
__le32 host_type; /* CEPH_ENTITY_TYPE_* */
__le32 global_seq; /* count connections initiated by this host */
__le32 connect_seq; /* count connections initiated in this session */
__le32 protocol_version;
__le32 authorizer_protocol;
__le32 authorizer_len;
__u8 flags; /* CEPH_MSG_CONNECT_* */
} __attribute__ ((packed));
/*client端发送connection negotiation的代码部分*/
while (1) {
delete authorizer;
authorizer = msgr->get_authorizer(peer_type, false);
bufferlist authorizer_reply;
ceph_msg_connect connect;
connect.features = policy.features_supported;
connect.host_type = msgr->get_myinst().name.type();
connect.global_seq = gseq;
connect.connect_seq = cseq;
connect.protocol_version = msgr->get_proto_version(peer_type, true);
connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
if (authorizer)
ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
<< " protocol=" << connect.authorizer_protocol << dendl;
connect.flags = 0;
if (policy.lossy)
connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&connect;
msgvec[0].iov_len = sizeof(connect);
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
if (authorizer) {
msgvec[1].iov_base = authorizer->bl.c_str();
msgvec[1].iov_len = authorizer->bl.length();
msg.msg_iovlen++;
msglen += msgvec[1].iov_len;
}
ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
<< " proto=" << connect.protocol_version << dendl;
rc = do_sendmsg(&msg, msglen);
if (rc < 0) {
ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
goto fail;
}
client端将 本端的global_seq和 connect_seq发送到了服务器端。服务器端会首先查找是否已经存在负责和client端通信的的Pipe
existing = msgr->_lookup_pipe(peer_addr);
如果不存在已有的Pipe,问题就简单了,根据client端发过来的connect.connect_seq是否为0,分成两种情况:
else if (connect.connect_seq > 0) {
// we reset, and they are opening a new session
/*服务器端reset了,通过CEPH_MSGR_TAG_RESETSESSION tag告诉client端,reset session*/
ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
msgr->lock.Unlock();
reply.tag = CEPH_MSGR_TAG_RESETSESSION;
goto reply;
} else {
/*创建新的session*/
ldout(msgr->cct,10) << "accept new session" << dendl;
existing = NULL;
goto open;
}
对于新连接而言,一般connect.connect_seq 总是等于0。这时候,只需要将状态转换成STATE_OPEN,同时发送回应给client 端就行了,如下所示:
open:
// open
assert(pipe_lock.is_locked());
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
assert(state == STATE_ACCEPTING);
/*将Pipe的状态设置为 STATE_OPEN*/
state = STATE_OPEN;
ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
// send READY reply
reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
reply.features = policy.features_supported;
reply.global_seq = msgr->get_global_seq();
reply.connect_seq = connect_seq;
reply.flags = 0;
reply.authorizer_len = authorizer_reply.length();
if (policy.lossy)
reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
session_security.reset(
get_auth_session_handler(msgr->cct,
connect.authorizer_protocol,
session_key,
connection_state->get_features()));
// notify
msgr->dispatch_queue.queue_accept(connection_state.get());
msgr->ms_deliver_handle_fast_accept(connection_state.get());
// ok!
if (msgr->dispatch_queue.stop)
goto shutting_down;
removed = msgr->accepting_pipes.erase(this);
assert(removed == 1);
register_pipe();
msgr->lock.Unlock();
pipe_lock.Unlock();
/*回复client 端*/
r = tcp_write((char*)&reply, sizeof(reply));
if (r < 0) {
goto fail_registered;
}
if (reply.authorizer_len) {
r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
if (r < 0) {
goto fail_registered;
}
}
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
goto fail_registered;
}
if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
goto fail_registered;
}
}
pipe_lock.Lock();
discard_requeued_up_to(newly_acked_seq);
if (state != STATE_CLOSED) {
ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
/*启动Pipe的写线程*/
start_writer();
}
ldout(msgr->cct,20) << "accept done" << dendl;
maybe_start_delay_thread();
return 0; // success.
但是如果client端发过来的connect_seq不是0,而服务器端又找不到负责和client地址通信的Pipe,那就说明服务器端reset了,需要把这个情况通过CEPH_MSGR_TAG_RESETSESSION tag告知client端,client端收到这个tag之后,会执行was_session_reset函数,然后将cseq设置成0,然后重新发送 connect message。
/*client端收到CEPH_MSGR_TAG_RESETSESSION,执行的动作*/
if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
was_session_reset();
cseq = 0;
pipe_lock.Unlock();
continue;
}
下面我们看下 was_session_reset 函数:
void Pipe::was_session_reset()
{
assert(pipe_lock.is_locked());
ldout(msgr->cct,10) << "was_session_reset" << dendl;
in_q->discard_queue(conn_id);
if (delay_thread)
delay_thread->discard();
discard_out_queue();
msgr->dispatch_queue.queue_remote_reset(connection_state.get());
if (randomize_out_seq()) {
lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
}
in_seq = 0;
connect_seq = 0;
}
前面讨论的是服务器端找不到client addr通信的Pipe,如果在服务器端找到了面向client端的Pipe,那么需要根据收到的client端发过来的global_seq和connect_seq和服务器端的global_seq和connect_seq的值的关系,来判断当前的情况,采取不同的行动。
比如 client端发过来的connect_seq = 0 ,而服务器端的connect_seq不是0,那么表明,client端的会话重置了,而服务器的正确的行为是清掉老的的会话,然后用新的Pipe来替换。
这中间其他的可能性就不再赘述了。
至此,client端和服务器端的会话就建立起来,后续的通信,就是有事说事。后面的部分和Dispatcher以及DispatchQueue关系比较密切,放倒下一篇介绍。