ctdbd進(jìn)程通信流程
ctdb_start_daemon
--> ux_socket_bind(ctdb) // 準(zhǔn)備好unix域通信的listen fd
--> ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0)
--> bind(ctdb->daemon.sd, (struct sockaddr )&addr, sizeof(addr))
--> listen(ctdb->daemon.sd, 100)
--> tdb_reopen_all(false)
--> ctdb_tcp_init(ctdb); // 初始化tcp通信的相關(guān)方法:ctdb_tcp_methods變量中
--> ctdb->methods->initialise(ctdb)即ctdb_tcp_initialise*
--> ctdb_tcp_listen(ctdb)
--> struct ctdb_tcp ctcp = talloc_get_type(ctdb->private_data,struct ctdb_tcp);
--> ctcp->listen_fd = socket(sock.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
--> bind(ctcp->listen_fd, (struct sockaddr * )&sock, sock_size) != 0);
--> listen(ctcp->listen_fd, 10)
--> fde = event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ,ctdb_listen_event, ctdb); // **等待其他ctdbd進(jìn)程tcp通信建鏈**
--> 當(dāng)ctcp->listen_fd變成可讀:ctdb_listen_event
--> fd = accept(ctcp->listen_fd, (struct sockaddr )&addr, &len);
--> struct ctdb_incoming in; in = talloc_zero(ctcp, struct ctdb_incoming); in->fd = fd; in->ctdb = ctdb;
--> in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, ctdb_tcp_read_cb, in, "ctdbd-%s", incoming_node);
--> ctdb_queue_set_fd(queue, fd)
--> queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, queue_io_handler, queue); // 等待對端發(fā)tcp消息過來
--> ctdb_tcp_add_node(ctdb->nodes[i]) // for循環(huán) 初始化主動建鏈需要的相關(guān)結(jié)構(gòu):ctdb->nodes[i]有子成員tnode保存在private_data字段员咽;還有一個子成員queue保存在tnode的out_queue
--> struct ctdb_tcp_node tnode; tnode = talloc_zero(node, struct ctdb_tcp_node);
--> node->private_data = tnode; // 主動建鏈的鏈路信息保存在ctdb->nodes[i].private_data中 // tnode的fd是connectfd和queue的fd相同
--> tnode->out_queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT, ctdb_tcp_tnode_cb, node, "to-node-%s", node->name); // 處理tcp鏈路關(guān)閉的包
--> queue = talloc_zero(mem_ctx, struct ctdb_queue); // mem_ctx是第二個參數(shù)即node
--> queue->ctdb = ctdb; queue->fd = fd; queue->private_data = private_data; queue->callback = callback; // private_data是node
--> ctdb_queue_set_fd(queue, fd) // fd是tnode->fd
--> queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,queue_io_handler, queue); // (1)監(jiān)聽讀事件:被動tcp鏈接的數(shù)據(jù)包
--> ctdb_set_public_addresses(ctdb, true);
--> ctdb_attach_databases(ctdb)
--> fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb); // **等待本節(jié)點recover和cmd工具進(jìn)程unix域通信建鏈**
--> 當(dāng)ctdb->daemon.sd變成可讀 ctdb_accept_client
--> client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, ctdb_daemon_read_cb, client, "client-%u", client->pid);
--> queue->fd = fd;
--> queue->callback = callback;
--> ctdb_queue_set_fd(queue, fd)
--> queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, queue_io_handler, queue); // (2)監(jiān)聽讀事件:recover和cmd工具發(fā)來的unix域數(shù)據(jù)包
--> ctdb->methods->start(ctdb)即ctdb_tcp_start
--> ctdb_tcp_connect_node(ctdb->nodes[i]); // for循環(huán)**主動向其他節(jié)點的ctdbd進(jìn)程建鏈**
--> tnode->connect_te = event_add_timed(ctdb->ev, tnode, timeval_zero(), ctdb_tcp_node_connect, node);
--> ctdb_tcp_node_connect
--> tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
--> bind(tnode->fd, (struct sockaddr )&sock_in, sockin_size)
--> connect(tnode->fd, (struct sockaddr )&sock_out, sockout_size) // tnode->fd是實際讀寫的fd即connectfd
--> tnode->connect_fde = event_add_fd(node->ctdb->ev, tnode, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ, ctdb_node_connect_write, node); 等待發(fā)送消息包 (3)監(jiān)聽主動建鏈的寫事件
當(dāng)tnode->fd變成可寫即有tcp消息即將發(fā)走:ctdb_node_connect_write
--> ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
--> queue->fd = fd; 等待回的tcp應(yīng)答消息
--> queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,queue_io_handler**, queue); (4)監(jiān)聽主動建鏈的讀事件:收到的數(shù)據(jù)包
--> 當(dāng)收到對端tcp消息queue->fd變成可讀 queue_io_handler
--> queue_io_read(queue)
--> nread = sys_read(queue->fd,queue->buffer.data + queue->buffer.length,num_ready);
--> queue_process(queue);
--> queue->callback(data, pkt_size, queue->private_data);即 ctdb_tcp_read_cb:見前面ctdb_queue_setup
--> in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt);即ctdb_recv_pkt:main函數(shù)中設(shè)置的
--> **ctdb_input_pkt**(ctdb, hdr);
// **ctdbd進(jìn)程收到的tcp消息和recover的消息最終都是它來處理Q刻凇!H!!励堡!**
--> tnode->connect_te = event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0), ctdb_tcp_node_connect, node); 保證對端節(jié)點down再up可以快速重連!1ぬ汀应结!
--> event_loop_wait(ctdb->ev);
阻塞在這里