CTDB 之 发现其他节点

| 分类 CTDB  | 标签 CTDB 

前言

  • CTDB cluster中存在多个节点时,各个节点如何发现对方,和对方建立连接,
  • 当某个节点关闭CTDB 服务的时候,正常节点如何探测到该节点CTDB服务已经退出
  • 当某个节点crash,CTDB猝死,来不及说goodbye,正常节点的CTDB如何探测到该节点已经死亡
  • 当死去的CTDB节点重启服务,正常节点如何重建连接,时效性如何,本文解决这些问题。

谁是我们的伙伴

毛主席说,谁是我们的朋友,谁是我们的敌人,这是首先要解决的问题。同样道理,CTDB中,集群里面有几个节点,这是首先要解决的问题。

对于CTDB集群而言,这个配置文件位于:

root@node1:~# cat /etc/ctdb/nodes
10.10.10.3
10.10.10.2
10.10.10.1

ctdbd daemon 在启动的早期,就会调用下面函数来获得所有的CTDB node的IP。

        /* tell ctdb what nodes are available */
        ctdb_load_nodes_file(ctdb);

下面我们看下该函数的实现:

void ctdb_load_nodes_file(struct ctdb_context *ctdb)
{
    int ret;
    ret = ctdb_set_nlist(ctdb, options.nlist);
    if (ret == -1) {
        DEBUG(DEBUG_ALERT,("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)));
        exit(1);
    }   
}

int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
{
    char **lines;                                                                                                                                      
    int nlines;
    int i, j, num_present;

    talloc_free(ctdb->nodes);
    ctdb->nodes     = NULL;
    ctdb->num_nodes = 0;

    lines = file_lines_load(nlist, &nlines, ctdb);
    if (lines == NULL) {
        ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
        return -1; 
    }   
    while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
        nlines--;
    }   

    num_present = 0;
    for (i=0; i < nlines; i++) {
        char *node;

        node = lines[i];
        /* strip leading spaces */
        while((*node == ' ') || (*node == '\t')) {
            node++;
        }   
        if (*node == '#') {
            /*一般来讲删除一个节点,最好使用#注释掉,
             * 好处时,大部分其他的节点对应的PNN号不变*/
            if (ctdb_add_deleted_node(ctdb) != 0) {
                talloc_free(lines);
                return -1;
            }
            continue;
        }
        /*空行忽略*/
        if (strcmp(node, "") == 0) {
            continue;
        }
        /*正常的节点,是CTDB cluster的成员*/
        if (ctdb_add_node(ctdb, node) != 0) {
            talloc_free(lines);
            return -1;
        }
        /*删除的节点就不计入num_present*/
        num_present++;
    }

    /* initialize the vnn mapping table now that we have the nodes list,
       skipping any deleted nodes
    */
    ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
    CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);

    ctdb->vnn_map->generation = INVALID_GENERATION;
    ctdb->vnn_map->size = num_present;
    ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);                                                               
    CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
    
    for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
        if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
            continue;
        }
        ctdb->vnn_map->map[j] = i;
        j++;
    }
    
    talloc_free(lines);
    return 0;                                                                                                                                          
}

上述的代码其实可以分成三部分,

  • 用# 注释掉的node,调用 ctdb_add_deleted_node函数
static int ctdb_add_deleted_node(struct ctdb_context *ctdb)
{
    struct ctdb_node *node, **nodep;                                                                 
    nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
    CTDB_NO_MEMORY(ctdb, nodep);

    ctdb->nodes = nodep;
    nodep = &ctdb->nodes[ctdb->num_nodes];
    (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
    CTDB_NO_MEMORY(ctdb, *nodep);
    node = *nodep;
    
    /*该节点是删除掉的节点,因此,用0.0.0.0这种没有意义的IP指代它*/
    if (ctdb_parse_address(ctdb, node, "0.0.0.0", &node->address) != 0) {
        DEBUG(DEBUG_ERR,("Failed to setup deleted node %d\n", ctdb->num_nodes));
        return -1;
    }
    node->ctdb = ctdb;
    /*node的name也设置成0.0.0.0:0这种无意义的值*/
    node->name = talloc_strdup(node, "0.0.0.0:0");

    /* this assumes that the nodes are kept in sorted order, and no gaps */
    node->pnn = ctdb->num_nodes;
    /* 设置上NODE_FLAGS_DELETED标志位,表示该node已被删除 */
    node->flags = NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED;
    
    ctdb->num_nodes++;
    node->dead_count = 0;
    return 0;
}

注意上面的delete node 的处理方式和下面正常node的处理方式的比较

  • 正常node, 调用ctdb_add_node
static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
{
    struct ctdb_node *node, **nodep;                                                                  
    nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
    CTDB_NO_MEMORY(ctdb, nodep);

    ctdb->nodes = nodep;
    nodep = &ctdb->nodes[ctdb->num_nodes];
    (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
    CTDB_NO_MEMORY(ctdb, *nodep);
    node = *nodep;

    if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
        return -1;
    }
    node->ctdb = ctdb;
    node->name = talloc_asprintf(node, "%s:%u", 
                     node->address.address, 
                     node->address.port);
    /* this assumes that the nodes are kept in sorted order, and no gaps */
    node->pnn = ctdb->num_nodes;

    /* nodes start out disconnected and unhealthy */
    node->flags = (NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY);

    if (ctdb->address.address &&
        ctdb_same_address(&ctdb->address, &node->address)) {
        /* for automatic binding to interfaces, see tcp_connect.c */
        ctdb->pnn = node->pnn;
        node->flags &= ~NODE_FLAGS_DISCONNECTED;

        /* do we start out in DISABLED mode? */
        if (ctdb->start_as_disabled != 0) {
            DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));      
            node->flags |= NODE_FLAGS_DISABLED;
        }
        /* do we start out in STOPPED mode? */
        if (ctdb->start_as_stopped != 0) {
            DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
            node->flags |= NODE_FLAGS_STOPPED;
        }
    }

    ctdb->num_nodes++;
    node->dead_count = 0;

    return 0;
}
                          

我们比较一下添加删除节点和添加正常节点的代码,发现无论是已删除节点还是正常节点,都会记录再ctdb->nodes数组中,每个节点都有一个PNN号,这个号码的值即对应节点在 /etc/ctdb/nodes中的行号。

但是对于正常节点,我们会解析得到其IP和port, IP:port作为该节点的名字node->name 。 另外一个不同点在于,已经删除的节点,其标志位flags中 NODE_FLAGS_DELETED置位,而正常节点并无该标志位。

  • 创建vnn_map

已经删除的节点并不会计入vnnmap,因此下面会有continue跳过那些已经被删除的节点。

    ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
    CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);

    ctdb->vnn_map->generation = INVALID_GENERATION;
    /*注意,已经删除的节点是不会计入其中的,size为num_present*/
    ctdb->vnn_map->size = num_present;
    ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
    CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);

    for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
        if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
            continue;
        }
        ctdb->vnn_map->map[j] = i;
        j++;
    }    

注意,我们删除一个节点的时候,使用注释有另一层好处,即热更新,退出的节点自然魂归大地,但是只需要加入一个#号,然后调用ctdb reloadnodes 去通知其他正常节点重新加载node配置。这种方式下,各个node的PNN号并无变化。

做好准备,等待其他节点来连

定义好了谁是我们的伙伴之后,后面就可以做准备工作等待别人来连了。那么准备工作有哪些,什么时机做的呢?

一图胜千言,请看下面的图:

因为大部分情况下,我们transport 为tcp,因此,网络层的实作为tcp,因此initialize调用的是:

static const struct ctdb_methods ctdb_tcp_methods = {
        .initialise   = ctdb_tcp_initialise,
        .start        = ctdb_tcp_start,
        .queue_pkt    = ctdb_tcp_queue_pkt,
        .add_node     = ctdb_tcp_add_node,
        .connect_node = ctdb_tcp_connect_node,
        .allocate_pkt = ctdb_tcp_allocate_pkt,
        .shutdown     = ctdb_tcp_shutdown,
        .restart      = ctdb_tcp_restart,
};

调用的是ctdb_tcp_initialise:

static int ctdb_tcp_initialise(struct ctdb_context *ctdb)
{
    int i;

    /* listen on our own address */
    if (ctdb_tcp_listen(ctdb) != 0) {
        DEBUG(DEBUG_CRIT, (__location__ " Failed to start listening on the CTDB socket\n"));
        exit(1);
    }   

    for (i=0; i < ctdb->num_nodes; i++) {                                                                                                              
        if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
            continue;   
        }       
        if (ctdb_tcp_add_node(ctdb->nodes[i]) != 0) { 
            DEBUG(DEBUG_CRIT, ("methods->add_node failed at %d\n", i));
            return -1;  
        }       
    }   
        
    return 0;
}

注意上面的代码分成 两个部分

  • ctdb_tcp_listen 其实可以顾名思义了,就是说准备好网络,等待其他节点主动来连。
  • ctdb_tcp_add_node 此处只是一个准备工作,其实这一部分是为了连其他节点做准备的。

很有意思的是地方时第一步 是被动的,张好网,等别人来连我,第二步是主动的,我要主动连其他节点,此后面的详细分析中可以看出来。

ctdb_tcp_listen

int ctdb_tcp_listen(struct ctdb_context *ctdb)
{
        struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
                                                struct ctdb_tcp);
        ctdb_sock_addr sock;
        int sock_size;
        int one = 1;
        struct tevent_fd *fde;

        /* we can either auto-bind to the first available address, or we can
           use a specified address */
        if (!ctdb->address.address) {
                return ctdb_tcp_listen_automatic(ctdb);
        }
        ...
}

注意,ctdb刚开始启动的时候,虽然有了所有节点的信息 (通过ctdb->nodes数组),但是其实并没有设置自己的address,即ctdb->address.adress 是为空,这时候,走的是ctdb_tcp_listen_automatic。

static int ctdb_tcp_listen_automatic(struct ctdb_context *ctdb)
{
    struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
                        struct ctdb_tcp);       
        ctdb_sock_addr sock;
    int lock_fd, i;
    const char *lock_path = "/tmp/.ctdb_socket_lock";
    struct flock lock;
    int one = 1;
    int sock_size;
    struct tevent_fd *fde;

    /* in order to ensure that we don't get two nodes with the
       same adddress, we must make the bind() and listen() calls
       atomic. The SO_REUSEADDR setsockopt only prevents double
       binds if the first socket is in LISTEN state  */
    lock_fd = open(lock_path, O_RDWR|O_CREAT, 0666);
    if (lock_fd == -1) {
        DEBUG(DEBUG_CRIT,("Unable to open %s\n", lock_path));
        return -1;
    }

    lock.l_type = F_WRLCK;
    lock.l_whence = SEEK_SET;
    lock.l_start = 0;
    lock.l_len = 1;
    lock.l_pid = 0;  
    
    /*排它锁,防止竞争*/
    if (fcntl(lock_fd, F_SETLKW, &lock) != 0) {
        DEBUG(DEBUG_CRIT,("Unable to lock %s\n", lock_path));
        close(lock_fd);
        return -1;
        

上面这一段没啥好说的,加一个锁,防止竞争。

    /*此处是一个for循环,也就是说,ctdb->nodes中记录了所有的node IP,for循环会挨个尝试,
     * 它会尝试bind这些IP,对于如果这个IP不属于这个节点,那么bind就会失败,没关系尝试下一个
     * 知道bind成功之后,break*/
    for (i=0; i < ctdb->num_nodes; i++) {
        if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
            continue;
        }

        /* if node_ip is specified we will only try to bind to that
           ip.
        */
        if (ctdb->node_ip != NULL) {
            if (strcmp(ctdb->node_ip, ctdb->nodes[i]->address.address)) {
                continue;
            }
        }

        ZERO_STRUCT(sock);
        if (ctdb_tcp_get_address(ctdb,
                ctdb->nodes[i]->address.address, 
                &sock) != 0) {
            continue;
        }
    
        switch (sock.sa.sa_family) {
        case AF_INET:
            sock.ip.sin_port = htons(ctdb->nodes[i]->address.port);
            sock_size = sizeof(sock.ip);
            break;
        case AF_INET6:
            sock.ip6.sin6_port = htons(ctdb->nodes[i]->address.port);
            sock_size = sizeof(sock.ip6);
            break;
        default:
            DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
                sock.sa.sa_family));        
            continue;
        }
#ifdef HAVE_SOCK_SIN_LEN
        sock.ip.sin_len = sock_size;
#endif

        ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
        if (ctcp->listen_fd == -1) {
            ctdb_set_error(ctdb, "socket failed\n");
            continue;
        }

        set_close_on_exec(ctcp->listen_fd);
        setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));

        if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) == 0) {
            break;
        }
        
        if (errno == EADDRNOTAVAIL) {
            DEBUG(DEBUG_DEBUG,(__location__ " Failed to bind() to socket. %s(%d)\n",
                    strerror(errno), errno));
        } else {
            DEBUG(DEBUG_ERR,(__location__ " Failed to bind() to socket. %s(%d)\n",
                    strerror(errno), errno));
         }
     }

注意,如果打开DEBUG开关 (ctdb setdebug DEBUG), 在启动过程中会开到如下log

2017/05/22 11:19:09.172119 [461730]: tcp/tcp_connect.c:362 Failed to bind() to socket. Cannot assign requested address(99)
2017/05/22 11:19:09.172155 [461730]: ctdb chose network address 10.11.12.2:4379 pnn 1

这是因为bind采用了神农尝百草的思路,ctdb->nodes中的IP挨个尝试bind,如果bind不成功,就试试下一个IP。当IP试对了,就break。

然后我们看这个函数的后续部分:

    if (i == ctdb->num_nodes) {
        DEBUG(DEBUG_CRIT,("Unable to bind to any of the node addresses - giving up\n"));
        goto failed;
    }
    
    /*注意,直到此时,ctdb->address.address 才不是NULL,
     *回想ctdb_tcp_listen函数你开头,一上来就判断这个值是不是NULL*/
    ctdb->address.address = talloc_strdup(ctdb, ctdb->nodes[i]->address.address);
    ctdb->address.port    = ctdb->nodes[i]->address.port;
    /*ctdb->name 一般是这样的 10.11.12.2:4379*/
    ctdb->name = talloc_asprintf(ctdb, "%s:%u",  
                     ctdb->address.address, 
                     ctdb->address.port);
    ctdb->pnn = ctdb->nodes[i]->pnn;
    ctdb->nodes[i]->flags &= ~NODE_FLAGS_DISCONNECTED;
    DEBUG(DEBUG_INFO,("ctdb chose network address %s:%u pnn %u\n", 
         ctdb->address.address, 
         ctdb->address.port, 
         ctdb->pnn));
    /* do we start out in DISABLED mode? */
    if (ctdb->start_as_disabled != 0) {
        DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
        ctdb->nodes[i]->flags |= NODE_FLAGS_DISABLED;
    }
    /* do we start out in STOPPED mode? */
    if (ctdb->start_as_stopped != 0) {
        DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
        ctdb->nodes[i]->flags |= NODE_FLAGS_STOPPED;
    }
    
    /*listen很重要,表示我开始开大门等待生意上门了。*/
    if (listen(ctcp->listen_fd, 10) == -1) {
        goto failed;
    }
    /*CTDB使用epoll,同时了事件源,无论你是定时任务timer,还是文件描述符,还是信号,
     *统统用epoll来管理,这里面有注册收到事件后的函数,
     *即非常重要的ctdb_listen_event函数*/
    fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,
               ctdb_listen_event, ctdb);
    tevent_fd_set_auto_close(fde);
    close(lock_fd);
    return 0;
    
failed:
    close(lock_fd);
    close(ctcp->listen_fd);
    ctcp->listen_fd = -1;
    return -1;                                                                                                                                         
}

此时,socket也建了,bind也成功了,listen也开始了,就是打开大门做生意了,但是生意不知道何时才上门,所以用了epoll,监控其他node是否发connect,如果其他node发了connect,就会调用ctdb_listen_event处理之。

下面看下ctdb_listen_event函数你实现:

static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
                  uint16_t flags, void *private_data)
{
    struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);                                                                    
    struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
    ctdb_sock_addr addr;
    socklen_t len;
    int fd, nodeid;
    /*ctdb_incoming 这个名字非常有意思,很明确表明,这是其他节点主动连我们,是incoming
     *后面我们主动连其他节点是outqueue,名字也很有意思*/
    struct ctdb_incoming *in;
    int one = 1;
    const char *incoming_node;

    memset(&addr, 0, sizeof(addr));
    len = sizeof(addr);
    /*来者都是客,我们accept*/
    fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
    if (fd == -1) return;

    /*从addr中获取IP,然后从ctdb->nodes中找,是不是我们的伙伴,如果不是,拒绝之*/
    incoming_node = ctdb_addr_to_str(&addr);
    nodeid = ctdb_ip_to_nodeid(ctdb, incoming_node);
    if (nodeid == -1) {
        DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", incoming_node));
        close(fd);
        return;
    }

    in = talloc_zero(ctcp, struct ctdb_incoming);
    in->fd = fd;
    in->ctdb = ctdb;

    set_nonblocking(in->fd);
    set_close_on_exec(in->fd);

    DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d to incoming ctdb connection - %s\n", fd, incoming_node));
    setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
    in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, 
                     ctdb_tcp_read_cb, in, "ctdbd-%s", incoming_node);
}

注意,连接进来的node,需要先辨识身份,获得其IP,然后根据ctdb->nodes数组,判断是不是我们的伙伴,如果不是,拒绝之。

相关的log如下:

2017/05/22 11:54:08.002867 [995627]: tcp/tcp_connect.c:261 Created SOCKET FD:24 to incoming ctdb connection - 10.11.12.3

(注意,源码中并未没有打印对端的IP,但是我修改了代码,添加了IP信息,读者不要困扰)

如果是的话,分配ctdb_incoming, 然后设置fd,设置ctdb。最重要的是queue。 连接建立之后,用于通信,因此会调用ctdb_tcp_read_cb,此处我们就不展开了。

注意,这一大片是做好准备,等其他节点来连, 下面介绍如何主动连接其他节点。相当于介绍完阴面,介绍阳面。

连接其他伙伴

对于CTDB,我们会发现,一下问题,如果某个节点执行service ctdb stop,其他节点几乎立刻能感知到:

2017/05/22 14:30:11.180802 [995627]: 10.11.12.2:4379: node 10.11.12.3:4379 is dead: 1 connected
2017/05/22 14:30:11.180869 [995627]: Tearing down connection to dead node :0

而死掉的节点,重启ctdb服务之后,本节点几乎立刻就能建立和该节点的连接。这是为何。

我们先来介绍介绍正常情况下,如何主动出击,连接其他节点,然后介绍两种异常情况下(CTDB正常退出和异常死亡),当对端节点回来,本地节点如何快速发现并建立连接。

正常主动出击,建立连接

下面先看流程图:

我们的起始点就是ctdb_tcp_start函数:

static int ctdb_tcp_start(struct ctdb_context *ctdb)
{
    int i;
    for (i=0; i < ctdb->num_nodes; i++) {
        if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
            continue;
        }   
        ctdb_tcp_connect_node(ctdb->nodes[i]);                                      
    }   

    return 0;
}

可以看出来,对于任何节点,只要还没有被删除,就要调用ctdb_tcp_connect_node,这里面就有一个问题了,自己会连自己么?

不会,这个是做在ctdb_tcp_connect_node里面。

static int ctdb_tcp_connect_node(struct ctdb_node *node)
{
    struct ctdb_context *ctdb = node->ctdb;
    struct ctdb_tcp_node *tnode = talloc_get_type(
        node->private_data, struct ctdb_tcp_node);

    /* startup connection to the other server - will happen on
       next event loop */
   /*注意,自己不会连自己,是要连CTDB cluster内的其他节点*/
    if (!ctdb_same_address(&ctdb->address, &node->address)) {
        tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
                            timeval_zero(), 
                            ctdb_tcp_node_connect, node);
    }

    return 0;
}

设置了定时任务,在下一个loop中会即时触发,调用ctdb_tcp_node_connect。

这个ctdb_tcp_node_connect非常重要,无论是初次连接,还是其他节点的CTDB死而复活,都靠这个函数去建立连接。

void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
               struct timeval t, void *private_data)
{                                                                   
    struct ctdb_node *node = talloc_get_type(private_data,
                         struct ctdb_node);
    struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, 
                              struct ctdb_tcp_node);
    struct ctdb_context *ctdb = node->ctdb;
    ctdb_sock_addr sock_in;
    int sockin_size;
    int sockout_size;
    ctdb_sock_addr sock_out;
    
    /*断掉连接*/
    ctdb_tcp_stop_connection(node);
    ZERO_STRUCT(sock_out);
#ifdef HAVE_SOCK_SIN_LEN
    sock_out.ip.sin_len = sizeof(sock_out);
#endif
    if (ctdb_tcp_get_address(ctdb, node->address.address, &sock_out) != 0) {
        return;
    }
    switch (sock_out.sa.sa_family) {
    case AF_INET:
        sock_out.ip.sin_port = htons(node->address.port);
        break;
    case AF_INET6:
        sock_out.ip6.sin6_port = htons(node->address.port);
        break;
    default:
        DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
            sock_out.sa.sa_family));
        return;
    }
    /*创建socket*/
    tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
    set_nonblocking(tnode->fd);
    set_close_on_exec(tnode->fd);

    DEBUG(DEBUG_DEBUG, (__location__ " Created TCP SOCKET FD:%d\n", tnode->fd));

    /* Bind our side of the socketpair to the same address we use to listen
     * on incoming CTDB traffic.
     * We must specify this address to make sure that the address we expose to
     * the remote side is actually routable in case CTDB traffic will run on
     * a dedicated non-routeable network.
     */
    ZERO_STRUCT(sock_in);
    if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock_in) != 0) {
        DEBUG(DEBUG_ERR, (__location__ " Failed to find our address. Failing bind.\n"));
        close(tnode->fd);
        return;
    }

    /* AIX libs check to see if the socket address and length
       arguments are consistent with each other on calls like                                                                                          
       connect().   Can not get by with just sizeof(sock_in),
       need sizeof(sock_in.ip).
    */
    switch (sock_in.sa.sa_family) {
    case AF_INET:
        sockin_size = sizeof(sock_in.ip);
        sockout_size = sizeof(sock_out.ip);
        break;
    case AF_INET6: 
        sockin_size = sizeof(sock_in.ip6);
        sockout_size = sizeof(sock_out.ip6);
        break;
    default:
        DEBUG(DEBUG_ERR, (__location__ " unknown family %u\n",
            sock_in.sa.sa_family));
        close(tnode->fd);
        return;
    }
#ifdef HAVE_SOCK_SIN_LEN
    sock_in.ip.sin_len = sockin_size;
    sock_out.ip.sin_len = sockout_size;
#endif
    /*bind 到自己的地址*/
    bind(tnode->fd, (struct sockaddr *)&sock_in, sockin_size);
    /*主动出击,连接对端*/
    if (connect(tnode->fd, (struct sockaddr *)&sock_out, sockout_size) != 0 &&
        errno != EINPROGRESS) {
        /*如果对端压根就没准备好,比如CTDB进程不在,或者还没有完成listen,就无法建立连接
         *此时合理的做法是,稍后再战,设置定时任务,1秒钟后重连*/
        ctdb_tcp_stop_connection(node);
        tnode->connect_te = event_add_timed(ctdb->ev, tnode,
                            timeval_current_ofs(1, 0),
                            ctdb_tcp_node_connect, node);
        return;
    }
    
    /* non-blocking connect - wait for write event */
    tnode->connect_fde = event_add_fd(node->ctdb->ev, tnode, tnode->fd,
                      EVENT_FD_WRITE|EVENT_FD_READ, 
                      ctdb_node_connect_write, node);
    /* don't give it long to connect - retry in one second. This ensures
       that we find a node is up quickly (tcp normally backs off a syn reply
       delay by quite a lot) */
    tnode->connect_te = event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0), ctdb_tcp_node_connect, node);
}

注意,仅仅本函数就有多次设置定时任务,稍后再展的场景,如果连接顺利建立的话:

static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde,uint16_t flags, void *private_data)
{
    struct ctdb_node *node = talloc_get_type(private_data,
                         struct ctdb_node);
    struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
                              struct ctdb_tcp_node);
    struct ctdb_context *ctdb = node->ctdb;
    int error = 0;
    socklen_t len = sizeof(error);
    int one = 1;

    talloc_free(tnode->connect_te);
    tnode->connect_te = NULL;

    if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
        error != 0) {
        ctdb_tcp_stop_connection(node);
        tnode->connect_te = event_add_timed(ctdb->ev, tnode, 
                            timeval_current_ofs(1, 0),
                            ctdb_tcp_node_connect, node);
        return;
    }

    talloc_free(tnode->connect_fde);
    tnode->connect_fde = NULL;    
    setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
    setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
    
    ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
    /* the queue subsystem now owns this fd */
    tnode->fd = -1;                                                                                                                                    
}

注意函数中的talloc_free,如果连接已经成功建立的话, 这些对应的定时任务就会砍掉,正常运行中,不会不断地建立连接,正相反,这些连接是稳定的。

当远端的CTDB服务停止

如果CTDB集群的另一个节点的CTDB 服务停掉了,那么本地CTDB要多久才能探测到远端节点已经死掉了。

答案是立刻马上。

2017/05/22 11:22:01.146844 [501881]: 10.11.12.2:4379: node 10.11.12.3:4379 is dead: 1 connected
2017/05/22 11:22:01.146894 [501881]: Tearing down connection to dead node :0

为什么立刻就可以?

在主动出击,连接其他节点的时候:

static int ctdb_tcp_add_node(struct ctdb_node *node)
{
    struct ctdb_tcp_node *tnode;
    tnode = talloc_zero(node, struct ctdb_tcp_node);
    CTDB_NO_MEMORY(node->ctdb, tnode);

    tnode->fd = -1; 
    node->private_data = tnode;
    talloc_set_destructor(tnode, tnode_destructor);

    tnode->out_queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
                        ctdb_tcp_tnode_cb, node, "to-node-%s", node->name);        
    return 0;
}

这里面有一个重要的回调函数:

/*
  called when a complete packet has come in - should not happen on this socket
  unless the other side closes the connection with RST or FIN
 */
void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
{
    struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node);
    struct ctdb_tcp_node *tnode = talloc_get_type(
        node->private_data, struct ctdb_tcp_node);

    if (data == NULL) {
        node->ctdb->upcalls->node_dead(node);
    }   
                                                                   
    ctdb_tcp_stop_connection(node);
    tnode->connect_te = event_add_timed(node->ctdb->ev, tnode,
                        timeval_current_ofs(3, 0), 
                        ctdb_tcp_node_connect, node);
}

void ctdb_node_dead(struct ctdb_node *node)
{
    if (node->flags & NODE_FLAGS_DISCONNECTED) {
       /*此处就是我们看到的DEBUG LOG*/
        DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
             node->ctdb->name, node->name, 
             node->ctdb->num_connected));
        return; 
    }   
    
    node->ctdb->num_connected--;
    node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
    node->rx_cnt = 0;
    node->dead_count = 0;

    DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
         node->ctdb->name, node->name, node->ctdb->num_connected));
    ctdb_daemon_cancel_controls(node->ctdb, node);

    if (node->ctdb->methods == NULL) {
        DEBUG(DEBUG_ERR,(__location__ " Can not restart transport while shutting down daemon.\n"));
        return; 
    }   
                                                                                                                                                    
    node->ctdb->methods->restart(node);
}

注释说的比较清除了,当RST或者FIN到来的时候,会调用ctdb_tcp_tnode_cb函数,这时候会执行ctdb_tcp_stop_connection和注册定时时间三秒后会尝试调用ctdb_tcp_node_connect重连。

当远端CTDB 异常崩溃

注意,上面的例子是CTDB正常退出,远端的CTDB来不及说goodbye,比如说该节点异常断电,或者操作系统crash,这种情况下,本地节点多久才能发现远端节点退出了,

取决于keepalive 如下两个参数:

root@BEAN-3:~# ctdb listvars |grep -i keepalive
KeepaliveInterval       = 5
KeepaliveLimit          = 5

注意,每5秒会向其他node发送依次心跳信息,或者叫做keepalive信息,如果过去5秒内没有收到node X任何消息,那么dead_count ++,如果连续dead_count == KeepaliveLimit,即5次,仍然没有收到任何消息,那么判定死亡。

注意5*5 = 25秒内收到任何一个消息,不一定心跳信息,也可能是普通消息,那么dead_count = 0 ,从头计数。

void ctdb_start_keepalive(struct ctdb_context *ctdb)
{
        struct timed_event *te;

        ctdb->keepalive_ctx = talloc_new(ctdb);
        CTDB_NO_MEMORY_FATAL(ctdb, ctdb->keepalive_ctx);

        te = event_add_timed(ctdb->ev, ctdb->keepalive_ctx,
                             timeval_current_ofs(ctdb->tunable.keepalive_interval, 0), 
                             ctdb_check_for_dead_nodes, ctdb);                   
        CTDB_NO_MEMORY_FATAL(ctdb, te);

        DEBUG(DEBUG_NOTICE,("Keepalive monitoring has been started\n"));
}

下面看ctdb_check_for_dead_nodes函数:

static void ctdb_check_for_dead_nodes(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
{
    struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);    
    int i;

    /* send a keepalive to all other nodes, unless */
    for (i=0;i<ctdb->num_nodes;i++) {
        struct ctdb_node *node = ctdb->nodes[i];
        
        if (node->flags & NODE_FLAGS_DELETED) {
            continue;
        }
        
        if (node->pnn == ctdb->pnn) {
            continue;
        }
        
        if (node->flags & NODE_FLAGS_DISCONNECTED) {
            /* it might have come alive again */
            if (node->rx_cnt != 0) {
                ctdb_node_connected(node);
            }
            continue;
        }
        
        /*注意node->rx_cnt, 这个*/
        if (node->rx_cnt == 0) {
            node->dead_count++;
        } else {
            node->dead_count = 0;
        }

        node->rx_cnt = 0;
        /*dead_count大于等于5,表示在5*5 25秒内没有收到任何消息
         *这种情况下,判定该节点死亡*/
        if (node->dead_count >= ctdb->tunable.keepalive_limit) {
            DEBUG(DEBUG_NOTICE,("dead count reached for node %u\n", node->pnn));
            /*注意调用了ctdb_node_dead,这个在上一小节已经介绍过了*/
            ctdb_node_dead(node);
            ctdb_send_keepalive(ctdb, node->pnn);
            /* maybe tell the transport layer to kill the
               sockets as well?
            */
            continue;
        }
        
        DEBUG(DEBUG_DEBUG,("sending keepalive to %u\n", node->pnn));
        ctdb_send_keepalive(ctdb, node->pnn);

        node->tx_cnt = 0;
    }
    
    /*预约下一次发送心跳和检查对端存活*/
    event_add_timed(ctdb->ev, ctdb->keepalive_ctx,
            timeval_current_ofs(c
            tdb->tunable.keepalive_interval, 0), 
            ctdb_check_for_dead_nodes, ctdb);
}                                       

注意这个node->rx_cnt统计,收到任何消息都会++ :

static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
    struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;

    CTDB_INCREMENT_STAT(ctdb, node_packets_recv);

    /* up the counter for this source node, so we know its alive */
    if (ctdb_validate_pnn(ctdb, hdr->srcnode)) {
        /* as a special case, redirected calls don't increment the rx_cnt */
        if (hdr->operation != CTDB_REQ_CALL ||
            ((struct ctdb_req_call *)hdr)->hopcount == 0) {
            ctdb->nodes[hdr->srcnode]->rx_cnt++; 
        }
    }   
        
    ctdb_input_pkt(ctdb, hdr);
}

每一轮keepalive之后,这个值会清零。如果连续5轮次的检查,每一次rx_cnt都是0,这就表明,我们已经很久没有收到该对端节点发来的消息了,包括keepalive消息。

下面是模拟异常断电时候场景:

2017:05:15 16:37:38.367  FINISH ONE FRAME
2017:05:15 16:37:38.425  FINISH ONE FRAME

断电时间为16:37:38.367,我们看剩余两个节点检测到该节点死亡的时间:

node-1
---------------
2017/05/15 16:38:04.140307 [11898]: dead count reached for node 0
2017/05/15 16:38:04.140347 [11898]: 10.10.10.1:4379: node 10.10.10.3:4379 is dead: 1 connected
2017/05/15 16:38:04.140379 [11898]: Tearing down connection to dead node :0

node-2
------------
2017/05/15 16:38:05.180325 [ 9492]: dead count reached for node 0
2017/05/15 16:38:05.180364 [ 9492]: 10.10.10.2:4379: node 10.10.10.3:4379 is dead: 1 connected
2017/05/15 16:38:05.180397 [ 9492]: Tearing down connection to dead node :0

这种情况下,就会延时25后发现对端死掉,然后采取行动,所谓采取行动,就是不停地尝试重连。

如果检测到异常死亡,会调用ctdb_node_dead,这个函数中会调用tcp的restart方法,如下所示,就是注册定时器,反复尝试重连。

void ctdb_node_dead(struct ctdb_node *node)
{
    if (node->flags & NODE_FLAGS_DISCONNECTED) {
        DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
             node->ctdb->name, node->name, 
             node->ctdb->num_connected));
        return;
    }
    node->ctdb->num_connected--;
    node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
    node->rx_cnt = 0;
    node->dead_count = 0;

    DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
         node->ctdb->name, node->name, node->ctdb->num_connected));
    ctdb_daemon_cancel_controls(node->ctdb, node);

    if (node->ctdb->methods == NULL) {
        DEBUG(DEBUG_ERR,(__location__ " Can not restart transport while shutting down daemon.\n"));
        return;
    }

    node->ctdb->methods->restart(node);
}

static void ctdb_tcp_restart(struct ctdb_node *node)
{
    struct ctdb_tcp_node *tnode = talloc_get_type(
        node->private_data, struct ctdb_tcp_node);

    DEBUG(DEBUG_NOTICE,("Tearing down connection to dead node :%d\n", node->pnn));

    ctdb_tcp_stop_connection(node);

    tnode->connect_te = event_add_timed(node->ctdb->ev, tnode, timeval_zero(), 
                        ctdb_tcp_node_connect, node);
}

我们从如下log中也可以看到,如果对端死亡,本地端会不断地发起重连:

2017/05/22 11:53:53.477582 [995627]: Added timed event "ctdb_tcp_node_connect": 0x116db50
2017/05/22 11:53:53.477604 [995627]: Destroying timer event 0x116db50 "ctdb_tcp_node_connect"
2017/05/22 11:53:53.477624 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1161990
2017/05/22 11:53:56.478328 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1150ad0
2017/05/22 11:53:56.478353 [995627]: Ending timer event 0x1161990 "ctdb_tcp_node_connect"
2017/05/22 11:53:56.478490 [995627]: Destroying timer event 0x1150ad0 "ctdb_tcp_node_connect"
2017/05/22 11:53:56.478534 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1166a10
2017/05/22 11:53:57.479341 [995627]: Added timed event "ctdb_tcp_node_connect": 0x11567a0
2017/05/22 11:53:57.479365 [995627]: Ending timer event 0x1166a10 "ctdb_tcp_node_connect"
2017/05/22 11:53:57.480302 [995627]: Destroying timer event 0x11567a0 "ctdb_tcp_node_connect"
2017/05/22 11:53:57.480354 [995627]: Added timed event "ctdb_tcp_node_connect": 0x113eb70
2017/05/22 11:53:58.480948 [995627]: Added timed event "ctdb_tcp_node_connect": 0x115e960
2017/05/22 11:53:58.480963 [995627]: Ending timer event 0x113eb70 "ctdb_tcp_node_connect"
2017/05/22 11:53:58.481123 [995627]: Destroying timer event 0x115e960 "ctdb_tcp_node_connect"
2017/05/22 11:53:58.481160 [995627]: Added timed event "ctdb_tcp_node_connect": 0x115e960
2017/05/22 11:53:59.490461 [995627]: Added timed event "ctdb_tcp_node_connect": 0x113eda0
2017/05/22 11:53:59.490484 [995627]: Ending timer event 0x115e960 "ctdb_tcp_node_connect"
2017/05/22 11:53:59.498245 [995627]: Destroying timer event 0x113eda0 "ctdb_tcp_node_connect"
2017/05/22 11:53:59.527112 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1145430
2017/05/22 11:54:00.528222 [995627]: Added timed event "ctdb_tcp_node_connect": 0x115ce40
2017/05/22 11:54:00.528236 [995627]: Ending timer event 0x1145430 "ctdb_tcp_node_connect"
2017/05/22 11:54:00.528404 [995627]: Destroying timer event 0x115ce40 "ctdb_tcp_node_connect"
2017/05/22 11:54:00.528436 [995627]: Added timed event "ctdb_tcp_node_connect": 0x115cd90
2017/05/22 11:54:01.529274 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1156030
2017/05/22 11:54:01.529289 [995627]: Ending timer event 0x115cd90 "ctdb_tcp_node_connect"
2017/05/22 11:54:01.529484 [995627]: Destroying timer event 0x1156030 "ctdb_tcp_node_connect"
2017/05/22 11:54:01.529514 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1156030
2017/05/22 11:54:02.530539 [995627]: Added timed event "ctdb_tcp_node_connect": 0x115aa70
2017/05/22 11:54:02.530554 [995627]: Ending timer event 0x1156030 "ctdb_tcp_node_connect"
2017/05/22 11:54:02.536729 [995627]: Destroying timer event 0x115aa70 "ctdb_tcp_node_connect"
2017/05/22 11:54:02.536784 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1151190
2017/05/22 11:54:03.537043 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1145430
2017/05/22 11:54:03.537058 [995627]: Ending timer event 0x1151190 "ctdb_tcp_node_connect"
2017/05/22 11:54:03.542502 [995627]: Destroying timer event 0x1145430 "ctdb_tcp_node_connect"
2017/05/22 11:54:03.542541 [995627]: Added timed event "ctdb_tcp_node_connect": 0x113eb70
2017/05/22 11:54:04.543121 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1162860
2017/05/22 11:54:04.543137 [995627]: Ending timer event 0x113eb70 "ctdb_tcp_node_connect"
2017/05/22 11:54:04.544105 [995627]: Destroying timer event 0x1162860 "ctdb_tcp_node_connect"
2017/05/22 11:54:04.544142 [995627]: Added timed event "ctdb_tcp_node_connect": 0x115aa70
2017/05/22 11:54:05.544290 [995627]: Added timed event "ctdb_tcp_node_connect": 0x116dbb0
2017/05/22 11:54:05.544306 [995627]: Ending timer event 0x115aa70 "ctdb_tcp_node_connect"
2017/05/22 11:54:05.544581 [995627]: Destroying timer event 0x116dbb0 "ctdb_tcp_node_connect"
2017/05/22 11:54:05.544613 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1156730
2017/05/22 11:54:06.544825 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1145af0
2017/05/22 11:54:06.544840 [995627]: Ending timer event 0x1156730 "ctdb_tcp_node_connect"
2017/05/22 11:54:06.545130 [995627]: Destroying timer event 0x1145af0 "ctdb_tcp_node_connect"
2017/05/22 11:54:06.545163 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1144cf0
2017/05/22 11:54:07.546060 [995627]: Added timed event "ctdb_tcp_node_connect": 0x114f070
2017/05/22 11:54:07.546086 [995627]: Ending timer event 0x1144cf0 "ctdb_tcp_node_connect"
2017/05/22 11:54:07.562138 [995627]: Destroying timer event 0x114f070 "ctdb_tcp_node_connect"
2017/05/22 11:54:07.562233 [995627]: Added timed event "ctdb_tcp_node_connect": 0x115b2d0
2017/05/22 11:54:08.562968 [995627]: Added timed event "ctdb_tcp_node_connect": 0x1156030
2017/05/22 11:54:08.562990 [995627]: Ending timer event 0x115b2d0 "ctdb_tcp_node_connect"


上一篇     下一篇