這些時間斷斷續(xù)續(xù)在分析brpc的網(wǎng)絡(luò)實現(xiàn)洁奈,因為之前說過這個事情专酗。然后因為工作中使用到的框架實現(xiàn)辟癌,其底層網(wǎng)絡(luò)實現(xiàn)是單線程處理所有的收發(fā)數(shù)據(jù)邏輯坷牛,然后再轉(zhuǎn)發(fā)亲桥,不再處理其他多余的邏輯轩娶。后來因為線上的問題造成卡頓魄缚,猜測這兒是否當(dāng)網(wǎng)絡(luò)數(shù)據(jù)過多時暗甥,或者一個socket要寫的數(shù)據(jù)太多喜滨,導(dǎo)致其他socket餓死的情況?進而導(dǎo)致client收不到對應(yīng)response或者廣播的消息撤防,比如多只怪愣著不動幾秒虽风。雖然client發(fā)的請求數(shù)據(jù)不會太多,不會導(dǎo)致server一直讀,但一直寫某個socket會影響其他socket上的讀寫事件辜膝,所以這里便引發(fā)深入思考无牵。
分析過的開源框架,可能處理的業(yè)務(wù)場景不同厂抖,所以實現(xiàn)和引發(fā)的問題也是不同的茎毁。比如redis單線程,性能也很好验游,網(wǎng)絡(luò)和業(yè)務(wù)邏輯都在一個線程中充岛;之前分析過的skynet底層雖然是多線程,但只有一個網(wǎng)絡(luò)線程耕蝉,接管所有socket連接的讀寫事件處理崔梗;phxrpc雖然是多線程的,一個accept線程對收到的新連接輪詢其他unit負載情況垒在,哪個輕的給哪個處理蒜魄,unit里是多個工作線程+一個網(wǎng)絡(luò)線程eventloop,雖然是平均但本質(zhì)還是一個场躯,而且如果有的unit空閑谈为,其他的unit忙,那么忙的unit里的待處理的socket也會因為同一個unit中正在讀寫的socket情況導(dǎo)致得不到處理踢关,也是有這種情況伞鲫。
直到brpc,帶來一種新的實現(xiàn)签舞,這里引用下原文:
“brpc使用一個或多個EventDispatcher(簡稱為EDISP)等待任一fd發(fā)生事件秕脓。和常見的“IO線程”不同,EDISP不負責(zé)讀取儒搭。IO線程的問題在于一個線程同時只能讀一個fd吠架,當(dāng)多個繁忙的fd聚集在一個IO線程中時,一些讀取就被延遲了搂鲫。多租戶傍药、復(fù)雜分流算法,Streaming RPC等功能會加重這個問題魂仍。高負載下常見的某次讀取卡頓會拖慢一個IO線程中所有fd的讀取拐辽,對可用性的影響幅度較大〔磷茫”
接下來會整體分析下socket的相關(guān)實現(xiàn)俱诸,這里的socket實現(xiàn)較為復(fù)雜,會省去一些細節(jié)仑氛,會簡化分析一些關(guān)鍵實現(xiàn)乙埃。
在server啟動的時候會調(diào)用StartInternal函數(shù)闸英,其中會創(chuàng)建listen socket用于監(jiān)聽socket連接。
967 if (_am == NULL) {
968 _am = BuildAcceptor();
969 //check...
973 }
974 //more code...
981 // Pass ownership of `sockfd' to `_am'
982 if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
983 _default_ssl_ctx) != 0) {
985 return -1;
986 }
由brpc之消息處理流程中分析可知介袜,在BuildAcceptor會初始化各協(xié)議處理handler甫何。
35 // Accept connections from a specific port and then
36 // process messages from which it reads
37 class Acceptor : public InputMessenger {
38 public:
39 typedef butil::FlatMap<SocketId, ConnectStatistics> SocketMap;
40
41 enum Status {
42 UNINITIALIZED = 0,
43 READY = 1,
44 RUNNING = 2,
45 STOPPING = 3,
46 };
47
48 public:
83 private:
84 // Accept connections.
85 static void OnNewConnectionsUntilEAGAIN(Socket* m);
86 static void OnNewConnections(Socket* m);
87
96 bthread_keytable_pool_t* _keytable_pool; // owned by Server
99 bthread_t _close_idle_tid;
107
108 // The map containing all the accepted sockets
109 SocketMap _socket_map;
112 };
其中Acceptor繼承InputMessenger類,管理所有的連接遇伞。接著StartAccept:
50 int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
51 const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
52 //more code...
69 if (idle_timeout_sec > 0) {
70 if (bthread_start_background(&_close_idle_tid, NULL,
71 CloseIdleConnections, this) != 0) {
73 return -1;
74 }
75 }
79 // Creation of _acception_id is inside lock so that OnNewConnections
80 // (which may run immediately) should see sane fields set below.
81 SocketOptions options;
82 options.fd = listened_fd;
83 options.user = this;
84 options.on_edge_triggered_events = OnNewConnections;
85 if (Socket::Create(options, &_acception_id) != 0) {
86 // Close-idle-socket thread will be stopped inside destructor
88 return -1;
89 }
90
91 _listened_fd = listened_fd;
92 _status = RUNNING;
93 return 0;
94 }
其中會在后臺跑個bthread辙喂,調(diào)用CloseIdleConnections,每隔一段時間醒來鸠珠,對所有可用連接判斷是否要作空閑連接處理巍耗,如果沒有收發(fā)數(shù)據(jù)一定時間的話,功能是比較簡單的渐排。其中void (on_edge_triggered_events)(Socket)是當(dāng)有事件到來時的回調(diào)函數(shù)炬太,對于listen socket來說,是OnNewConnections驯耻,后面再分析OnNewConnections亲族。接著create socket對象:
585 // SocketId = 32-bit version + 32-bit slot.
586 // version: from version part of _versioned_nref, must be an EVEN number.
587 // slot: designated by ResourcePool.
588 int Socket::Create(const SocketOptions& options, SocketId* id) {
589 butil::ResourceId<Socket> slot;
590 Socket* const m = butil::get_resource(&slot, Forbidden());
591 if (m == NULL) {
593 return -1;
594 }
601 m->_on_edge_triggered_events = options.on_edge_triggered_events;
609 m->_this_id = MakeSocketId(
610 VersionOfVRef(m->_versioned_ref.fetch_add(
611 1, butil::memory_order_release)), slot);
612 m->_preferred_index = -1;
652 CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
653 // Must be last one! Internal fields of this Socket may be access
654 // just after calling ResetFileDescriptor.
655 if (m->ResetFileDescriptor(options.fd) != 0) {
656 //error...
661 }
662 *id = m->_this_id;
663 return 0;
664 }
以上列的是涉及重要數(shù)據(jù)成員的操作,其他的雖然也是重要但在這里不是要關(guān)心的細節(jié)可缚。在socket.h文件中霎迫,類socket的聲明加上注釋有640多行,這里就不貼上相關(guān)代碼咯帘靡,分析到相關(guān)的操作時再列一下知给,接著ResetFileDescriptor:
522 int Socket::ResetFileDescriptor(int fd) {
523 // Reset message sizes when fd is changed.
524 _last_msg_size = 0;
525 _avg_msg_size = 0;
526 // MUST store `_fd' before adding itself into epoll device to avoid
527 // race conditions with the callback function inside epoll
528 _fd.store(fd, butil::memory_order_release);
543 // Make the fd non-blocking.
544 if (butil::make_non_blocking(fd) != 0) {
546 return -1;
547 }
550 butil::make_no_delay(fd);
574 if (_on_edge_triggered_events) {
575 if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
576 //error...
580 }
581 }
582 return 0;
583 }
其中會設(shè)置fd為非阻塞,nodelay描姚,設(shè)置fd的SO_SNDBUF和SO_RCVBUF大小涩赢,最后添加至epoll中等待事件的發(fā)生,其中EventDispatcher是事件分發(fā)器的封裝轰胁,下面分析此類實現(xiàn)谒主。
因為數(shù)量是可配的朝扼,可以配置多個EventDispatcher實例:
352 static void StopAndJoinGlobalDispatchers() {
353 for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
354 g_edisp[i].Stop();
355 g_edisp[i].Join();
356 }
357 }
358 void InitializeGlobalDispatchers() {
359 g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
360 for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
361 const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
362 BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
363 CHECK_EQ(0, g_edisp[i].Start(&attr));
364 }
365 // This atexit is will be run before g_task_control.stop() because above
366 // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
367 CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
368 }
369
370 EventDispatcher& GetGlobalEventDispatcher(int fd) {
371 pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
372 if (FLAGS_event_dispatcher_num == 1) {
373 return g_edisp[0];
374 }
375 int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
376 return g_edisp[index];
377 }
以上是初始化和隨機獲取對象的實現(xiàn)赃阀,EventDispatcher使用et模式來監(jiān)聽事件:
89 int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
115 int rc = bthread_start_background(
116 &_tid, &_consumer_thread_attr, RunThis, this);
117 if (rc) {
119 return -1;
120 }
121 return 0;
122 }
273 void* EventDispatcher::RunThis(void* arg) {
274 ((EventDispatcher*)arg)->Run();
275 return NULL;
276 }
278 void EventDispatcher::Run() {
279 while (!_stop) {
282 epoll_event e[32];
290 const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
296 if (_stop) {
297 // epoll_ctl/epoll_wait should have some sort of memory fencing
298 // guaranteeing that we(after epoll_wait) see _stop set before
299 // epoll_ctl.
300 break;
301 }
302 if (n < 0) {
303 if (EINTR == errno) {
304 // We've checked _stop, no wake-up will be missed.
305 continue;
306 }
312 break;
313 }
314 for (int i = 0; i < n; ++i) {
316 if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
321 // We don't care about the return value.
322 Socket::StartInputEvent(e[i].data.u64, e[i].events,
323 _consumer_thread_attr);
324 }
332 }
333 for (int i = 0; i < n; ++i) {
335 if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
336 // We don't care about the return value.
337 Socket::HandleEpollOut(e[i].data.u64);
338 }
345 }
346 }
347 }
其中分別處理in/out等事件,然后分別回調(diào)StartInputEvent和HandleEpollOut擎颖。另外榛斯,添加事件:
225 int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
226 if (_epfd < 0) {
227 errno = EINVAL;
228 return -1;
229 }
231 epoll_event evt;
232 evt.events = EPOLLIN | EPOLLET;
233 evt.data.u64 = socket_id;
237 return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
245 }
對于listen socket來說,有新連接到來時發(fā)生可讀事件搂捧,那么會調(diào)用StartInputEvent:
1924 int Socket::StartInputEvent(SocketId id, uint32_t events,
1925 const bthread_attr_t& thread_attr) {
1926 SocketUniquePtr s;
1927 if (Address(id, &s) < 0) {
1928 return -1;
1929 }
1930 if (NULL == s->_on_edge_triggered_events) {
1931 // Callback can be NULL when receiving error epoll events
1932 // (Added into epoll by `WaitConnected')
1933 return 0;
1934 }
1935 if (s->fd() < 0) {
1941 return -1;
1942 }
1947 // Passing e[i].events causes complex visibility issues and
1948 // requires stronger memory fences, since reading the fd returns
1949 // error as well, we don't pass the events.
1950 if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
1956 bthread_t tid;
1957 // transfer ownership as well, don't use s anymore!
1958 Socket* const p = s.release();
1959
1960 bthread_attr_t attr = thread_attr;
1961 attr.keytable_pool = p->_keytable_pool;
1962 if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
1964 ProcessEvent(p);
1965 }
1966 }
1967 return 0;
1968 }
以上邏輯會根據(jù)socket_id獲取socket對象驮俗,如果沒有就表示刪除了,這種情況是允許的允跑。然后判斷是否有回調(diào)函數(shù)王凑,沒有的話則返回搪柑。這里使用_nevent控制是否是有bthread在處理該socket上的讀事件,這么做的好處引用:
"EDISP使用Edge triggered模式索烹。當(dāng)收到事件時工碾,EDISP給一個原子變量加1,只有當(dāng)加1前的值是0時啟動一個bthread處理對應(yīng)fd上的數(shù)據(jù)百姓。在背后渊额,EDISP把所在的pthread讓給了新建的bthread,使其有更好的cache locality垒拢,可以盡快地讀取fd上的數(shù)據(jù)旬迹。而EDISP所在的bthread會被偷到另外一個pthread繼續(xù)執(zhí)行,這個過程即是bthread的work stealing調(diào)度求类。"
由之前的分析bthread_start_urgent是立即啟動一個bthread并執(zhí)行ProcessEvent:
1017 void* Socket::ProcessEvent(void* arg) {
1018 // the enclosed Socket is valid and free to access inside this function.
1019 SocketUniquePtr s(static_cast<Socket*>(arg));
1020 s->_on_edge_triggered_events(s.get());
1021 return NULL;
1022 }
這里執(zhí)行回調(diào)OnNewConnections:
317 void Acceptor::OnNewConnections(Socket* acception) {
318 int progress = Socket::PROGRESS_INIT;
319 do {
320 OnNewConnectionsUntilEAGAIN(acception);
321 if (acception->Failed()) {
322 return;
323 }
324 } while (acception->MoreReadEvents(&progress));
325 }
243 void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
244 while (1) {
245 struct sockaddr in_addr;
246 socklen_t in_len = sizeof(in_addr);
247 butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
248 if (in_fd < 0) {
249 // no EINTR because listened fd is non-blocking.
250 if (errno == EAGAIN) {
251 return;
252 }
253 //error...
260 continue;
261 }
270 SocketId socket_id;
271 SocketOptions options;
272 options.keytable_pool = am->_keytable_pool;
273 options.fd = in_fd;
274 options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
275 options.user = acception->user();
276 options.on_edge_triggered_events = InputMessenger::OnNewMessages;
277 options.initial_ssl_ctx = am->_ssl_ctx;
278 if (Socket::Create(options, &socket_id) != 0) {
280 continue;
281 }
282 in_fd.release(); // transfer ownership to socket_id
284 // There's a funny race condition here. After Socket::Create, messages
285 // from the socket are already handled and a RPC is possibly done
286 // before the socket is added into _socket_map below. This is found in
287 // ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
288 // on machines with few cores) where the _messenger.ConnectionCount()
289 // may surprisingly be 0 even if the RPC is already done.
290
291 SocketUniquePtr sock;
292 if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
293 bool is_running = true;
294 {
295 BAIDU_SCOPED_LOCK(am->_map_mutex);
296 is_running = (am->status() == RUNNING);
297 // Always add this socket into `_socket_map' whether it
298 // has been `SetFailed' or not, whether `Acceptor' is
299 // running or not. Otherwise, `Acceptor::BeforeRecycle'
300 // may be called (inside Socket::OnRecycle) after `Acceptor'
301 // has been destroyed
302 am->_socket_map.insert(socket_id, ConnectStatistics());
303 }
304 if (!is_running) {
305 //error...
310 return;
311 }
312 } // else: The socket has already been destroyed, Don't add its id
313 // into _socket_map
314 }
315 }
這里接收到一個新連接奔垦,并關(guān)聯(lián)OnNewMessages回調(diào)。這里貼上注釋以及AddressFailedAsWell相關(guān)的代碼尸疆,是因為比較有思考意義宴倍,如注釋說的,當(dāng)create注冊到epoll中后仓技,可能立即發(fā)生可讀事件鸵贬,并可能切到其他線程所在的bthread執(zhí)行并立即釋放,完成整個rpc過程脖捻,所以這里使用到的socket_id所代表的對象可能無效阔逼。AddressFailedAsWell實現(xiàn)中較復(fù)雜,后面再分析地沮。OnNewMessages回調(diào)分析在之前的[brpc之消息處理流程]已經(jīng)分析過嗜浮,不再分析。
因為accept連接邏輯是個while(1)摩疑,所以當(dāng)EAGAIN時表示暫時沒新連接可accept便返回危融。接著:
227 inline bool Socket::MoreReadEvents(int* progress) {
228 // Fail to CAS means that new events arrived.
229 return !_nevent.compare_exchange_strong(
230 *progress, 0, butil::memory_order_release,
231 butil::memory_order_acquire);
232 }
這里是因為在OnNewConnections處理過程中,可能發(fā)生StartInputEvent雷袋,所以這里還是要判斷下吉殃,如果_nevent和progress的值相同(初始為1)則把_nevent設(shè)置回0,否則記錄到progress楷怒,這樣繼續(xù)處理accept蛋勺。
當(dāng)socket發(fā)生可寫之類的事件時:
1258 int Socket::HandleEpollOut(SocketId id) {
1259 SocketUniquePtr s;
1260 // Since Sockets might have been `SetFailed' before they were
1261 // added into epoll, these sockets miss the signal inside
1262 // `SetFailed' and therefore must be signalled here using
1263 // `AddressFailedAsWell' to prevent waiting forever
1264 if (Socket::AddressFailedAsWell(id, &s) < 0) {
1265 // Ignore recycled sockets
1266 return -1;
1267 }
1268
1269 EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
1270 if (req != NULL) {
1271 return s->HandleEpollOutRequest(0, req);
1272 }
1273
1274 // Currently `WaitEpollOut' needs `_epollout_butex'
1275 // TODO(jiangrujie): Remove this in the future
1276 s->_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
1277 bthread::butex_wake_except(s->_epollout_butex, 0);
1278 return 0;
1279 }
1295 int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
1296 // Only one thread can `SetFailed' this `Socket' successfully
1297 // Also after this `req' will be destroyed when its reference
1298 // hits zero
1299 if (SetFailed() != 0) {
1300 return -1;
1301 }
1302 // We've got the right to call user callback
1303 // The timer will be removed inside destructor of EpollOutRequest
1304 GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
1305 return req->on_epollout_event(req->fd, error_code, req->data);
1306 }
這里幾行代碼沒啥可分析的,后面綜合一下鸠删,這里具體分析下當(dāng)發(fā)生要寫數(shù)據(jù)時的實現(xiàn)抱完。
由于對于accept到的連接,只設(shè)置EPOLLIN | EPOLLET事件刃泡,并未有EPOLLOUT事件巧娱,這塊是在后面的才調(diào)用AddEpollOut設(shè)置的碉怔。
比如當(dāng)server對rpc進行響應(yīng)時,在baidu協(xié)議中有:
253 Socket::WriteOptions wopt;
254 wopt.ignore_eovercrowded = true;
255 if (sock->Write(&res_buf, &wopt) != 0) {
256 const int errcode = errno;
258 cntl->SetFailed(errcode, "Fail to write into %s",
259 sock->description().c_str());
260 return;
261 }
1432 int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
1433 //more code...
1456 WriteRequest* req = butil::get_object<WriteRequest>();
1457 if (!req) {
1458 return SetError(opt.id_wait, ENOMEM);
1459 }
1460
1461 req->data.swap(*data);
1462 // Set `req->next' to UNCONNECTED so that the KeepWrite thread will
1463 // wait until it points to a valid WriteRequest or NULL.
1464 req->next = WriteRequest::UNCONNECTED;
1465 req->id_wait = opt.id_wait;
1466 req->set_pipelined_count_and_user_message(
1467 opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
1468 return StartWrite(req, opt);
1469 }
303 struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest {
304 static WriteRequest* const UNCONNECTED;
305
306 butil::IOBuf data;
307 WriteRequest* next;
308 bthread_id_t id_wait;
309 Socket* socket;
54 };
以在開始寫的時候會申請一個WriteRequest結(jié)構(gòu)禁添,swap要寫的數(shù)據(jù)并set_pipelined_count_and_user_message(具體作用后面再分析)眨层,接著StartWrite:
1506 int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
1507 // Release fence makes sure the thread getting request sees *req
1508 WriteRequest* const prev_head =
1509 _write_head.exchange(req, butil::memory_order_release);
1510 if (prev_head != NULL) {
1511 // Someone is writing to the fd. The KeepWrite thread may spin
1512 // until req->next to be non-UNCONNECTED. This process is not
1513 // lock-free, but the duration is so short(1~2 instructions,
1514 // depending on compiler) that the spin rarely occurs in practice
1515 // (I've not seen any spin in highly contended tests).
1516 req->next = prev_head;
1517 return 0;
1518 }
設(shè)置 _write_head指向req,如果有其他bthread寫則把prev_head掛在req的next上去并返回(反向的)上荡,這里不能同時寫一個socket否則會出現(xiàn)數(shù)據(jù)的交錯:
1520 int saved_errno = 0;
1521 bthread_t th;
1522 SocketUniquePtr ptr_for_keep_write;
1523 ssize_t nw = 0;
1524
1525 // We've got the right to write.
1526 req->next = NULL;
1527
1528 // Connect to remote_side() if not.
1529 int ret = ConnectIfNot(opt.abstime, req);
1530 //ret >=0
1540 // NOTE: Setup() MUST be called after Connect which may call app_connect,
1541 // which is assumed to run before any SocketMessage.AppendAndDestroySelf()
1542 // in some protocols(namely RTMP).
1543 req->Setup(this);
1544
1545 if (ssl_state() != SSL_OFF) {
1546 // Writing into SSL may block the current bthread, always write
1547 // in the background.
1548 goto KEEPWRITE_IN_BACKGROUND;
1549 }
1551 // Write once in the calling thread. If the write is not complete,
1552 // continue it in KeepWrite thread.
1553 if (_conn) {
1554 //
1556 } else {
1557 nw = req->data.cut_into_file_descriptor(fd());
1558 }
1559 if (nw < 0) {
1560 //error...
1569 } else {
1570 AddOutputBytes(nw);
1571 }
1572 if (IsWriteComplete(req, true, NULL)) {
1573 ReturnSuccessfulWriteRequest(req);
1574 return 0;
1575 }
1577 KEEPWRITE_IN_BACKGROUND:
1578 ReAddress(&ptr_for_keep_write);
1579 req->socket = ptr_for_keep_write.release();
1580 if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
1581 KeepWrite, req) != 0) {
1583 KeepWrite(req);
1584 }
1585 return 0;
1586
1587 FAIL_TO_WRITE:
1588 // `SetFailed' before `ReturnFailedWriteRequest' (which will calls
1589 // `on_reset' callback inside the id object) so that we immediately
1590 // know this socket has failed inside the `on_reset' callback
1591 ReleaseAllFailedWriteRequests(req);
1592 errno = saved_errno;
1593 return -1;
1594 }
如果能寫趴樱,則檢查ConnectIfNot是否有效,這里先假設(shè)有效酪捡,后面再分析當(dāng)無效時的處理叁征,不考慮ssl情況,接著開始寫req->data.cut_into_file_descriptor(fd())逛薇,并對寫成功的字節(jié)數(shù)進行調(diào)整AddOutputBytes捺疼。然后檢查本次寫是否完成,如果是獲取寫的權(quán)利永罚,那么當(dāng)本次req寫完時啤呼,還要判斷有沒有其他要寫的,因為可能有其他的bthread嘗試寫時暫時無法寫呢袱,進而把它的req掛在了head上官扣,即上面的req->next = prev_head這段代碼,假設(shè)寫完則:
471 void Socket::ReturnSuccessfulWriteRequest(Socket::WriteRequest* p) {
472 DCHECK(p->data.empty());
473 AddOutputMessages(1);
474 const bthread_id_t id_wait = p->id_wait;
475 butil::return_object(p);
476 if (id_wait != INVALID_BTHREAD_ID) {
477 NotifyOnFailed(id_wait);
478 }
479 }
如果沒有寫完羞福,這里分析下IsWriteComplete實現(xiàn):
1024 // Check if there're new requests appended.
1025 // If yes, point old_head to to reversed new requests and return false;
1026 // If no:
1027 // old_head is fully written, set _write_head to NULL and return true;
1028 // old_head is not written yet, keep _write_head unchanged and return false;
1029 // `old_head' is last new_head got from this function or (in another word)
1030 // tail of current writing list.
1031 // `singular_node' is true iff `old_head' is the only node in its list.
1032 bool Socket::IsWriteComplete(Socket::WriteRequest* old_head,
1033 bool singular_node,
1034 Socket::WriteRequest** new_tail) {
1035 CHECK(NULL == old_head->next);
1036 // Try to set _write_head to NULL to mark that the write is done.
1037 WriteRequest* new_head = old_head;
1038 WriteRequest* desired = NULL;
1039 bool return_when_no_more = true;
1040 if (!old_head->data.empty() || !singular_node) {
1041 desired = old_head;
1042 // Write is obviously not complete if old_head is not fully written.
1043 return_when_no_more = false;
1044 }
1045 if (_write_head.compare_exchange_strong(
1046 new_head, desired, butil::memory_order_acquire)) {
1047 // No one added new requests.
1048 if (new_tail) {
1049 *new_tail = old_head;
1050 }
1051 return return_when_no_more;
1052 }
如果old_head->data.empty()為false則肯定沒完寫完惕蹄,此時要判斷是否有其他的寫請求進來,如果沒有則直接返回false后臺寫治专,否則可能要逆置這個寫鏈表:
1057 // Someone added new requests.
1058 // Reverse the list until old_head.
1059 WriteRequest* tail = NULL;
1060 WriteRequest* p = new_head;
1061 do {
1062 while (p->next == WriteRequest::UNCONNECTED) {
1063 // TODO(gejun): elaborate this
1064 sched_yield();
1065 }
1066 WriteRequest* const saved_next = p->next;
1067 p->next = tail;
1068 tail = p;
1069 p = saved_next;
1070 CHECK(p != NULL);
1071 } while (p != old_head);
1072
1073 // Link old list with new list.
1074 old_head->next = tail;
1075 // Call Setup() from oldest to newest, notice that the calling sequence
1076 // matters for protocols using pipelined_count, this is why we don't
1077 // calling Setup in above loop which is from newest to oldest.
1078 for (WriteRequest* q = tail; q; q = q->next) {
1079 q->Setup(this);
1080 }
1081 if (new_tail) {
1082 *new_tail = new_head;
1083 }
1084 return false;
以上邏輯還是挺簡單的卖陵,頭插法單鏈表,從原來的a->b->c->old_head张峰,變成old_head->c->b->a這樣泪蔫。其中:
1062 while (p->next == WriteRequest::UNCONNECTED) {
1063 // TODO(gejun): elaborate this
1064 sched_yield();
1065 }
“可能會被一個值仍為UNCONNECTED的節(jié)點鎖定(這需要發(fā)起寫的線程正好在原子交換后,在設(shè)置next指針前喘批,僅僅一條指令的時間內(nèi)被OS換出)撩荣,但在實踐中很少出現(xiàn)“妫”
接著啟動個bthread繼續(xù)寫KeepWrite:
1598 void* Socket::KeepWrite(void* void_arg) {
1599 g_vars->nkeepwrite << 1;
1600 WriteRequest* req = static_cast<WriteRequest*>(void_arg);
1601 SocketUniquePtr s(req->socket);
1602
1603 // When error occurs, spin until there's no more requests instead of
1604 // returning directly otherwise _write_head is permantly non-NULL which
1605 // makes later Write() abnormal.
1606 WriteRequest* cur_tail = NULL;
1607 do {
1608 // req was written, skip it.
1609 if (req->next != NULL && req->data.empty()) {
1610 WriteRequest* const saved_req = req;
1611 req = req->next;
1612 s->ReturnSuccessfulWriteRequest(saved_req);
1613 }
1614 const ssize_t nw = s->DoWrite(req);
1615 if (nw < 0) {
1616 //error...
1623 } else {
1624 s->AddOutputBytes(nw);
1625 }
以上一個while婿滓,會判斷req是否完成并回調(diào)ReturnSuccessfulWriteRequest老速,否則直接DoWrite:
1674 ssize_t Socket::DoWrite(WriteRequest* req) {
1675 // Group butil::IOBuf in the list into a batch array.
1676 butil::IOBuf* data_list[DATA_LIST_MAX];
1677 size_t ndata = 0;
1678 for (WriteRequest* p = req; p != NULL && ndata < DATA_LIST_MAX;
1679 p = p->next) {
1680 data_list[ndata++] = &p->data;
1681 }
1682
1683 if (ssl_state() == SSL_OFF) {
1684 // Write IOBuf in the batch array into the fd.
1685 if (_conn) {
1686 return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
1687 } else {
1688 ssize_t nw = butil::IOBuf::cut_multiple_into_file_descriptor(
1689 fd(), data_list, ndata);
1690 return nw;
1691 }
1692 }
1729 }
這里進行批量寫粥喜,最多DATA_LIST_MAX個塊,底層調(diào)用writev函數(shù)橘券。繼續(xù)回到KeepWrite中:
1626 // Release WriteRequest until non-empty data or last request.
1627 while (req->next != NULL && req->data.empty()) {
1628 WriteRequest* const saved_req = req;
1629 req = req->next;
1630 s->ReturnSuccessfulWriteRequest(saved_req);
1631 }
1632 // TODO(gejun): wait for epollout when we actually have written
1633 // all the data. This weird heuristic reduces 30us delay...
1634 // Update(12/22/2015): seem not working. better switch to correct code.
1635 // Update(1/8/2016, r31823): Still working.
1636 // Update(8/15/2017): Not working, performance downgraded.
1637 //if (nw <= 0 || req->data.empty()/*note*/) {
1638 if (nw <= 0) {
1639 g_vars->nwaitepollout << 1;
1640 bool pollin = (s->_on_edge_triggered_events != NULL);
1641 // NOTE: Waiting epollout within timeout is a must to force
1642 // KeepWrite to check and setup pending WriteRequests periodically,
1643 // which may turn on _overcrowded to stop pending requests from
1644 // growing infinitely.
1645 const timespec duetime =
1646 butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
1647 const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
1648 if (rc < 0 && errno != ETIMEDOUT) {
1649 //error...
1653 break;
1654 }
1655 }
1656 if (NULL == cur_tail) {
1657 for (cur_tail = req; cur_tail->next != NULL;
1658 cur_tail = cur_tail->next);
1659 }
1660 // Return when there's no more WriteRequests and req is completely
1661 // written.
1662 if (s->IsWriteComplete(cur_tail, (req == cur_tail), &cur_tail)) {
1663 CHECK_EQ(cur_tail, req);
1664 s->ReturnSuccessfulWriteRequest(req);
1665 return NULL;
1666 }
1667 } while (1);
1668
1669 // Error occurred, release all requests until no new requests.
1670 s->ReleaseAllFailedWriteRequests(req);
1671 return NULL;
1672 }
因為批量寫额湘,對寫完的塊從鏈表中刪除并回調(diào)ReturnSuccessfulWriteRequest卿吐,接著WaitEpollOut并wait一定的時間:
1087 int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) {
1088 if (!ValidFileDescriptor(fd)) {
1089 return 0;
1090 }
1091 // Do not need to check addressable since it will be called by
1092 // health checker which called `SetFailed' before
1093 const int expected_val = _epollout_butex->load(butil::memory_order_relaxed);
1094 EventDispatcher& edisp = GetGlobalEventDispatcher(fd);
1095 if (edisp.AddEpollOut(id(), fd, pollin) != 0) {
1096 return -1;
1097 }
1098
1099 int rc = bthread::butex_wait(_epollout_butex, expected_val, abstime);
1100 const int saved_errno = errno;
1101 if (rc < 0 && errno == EWOULDBLOCK) {
1102 // Could be writable or spurious wakeup
1103 rc = 0;
1104 }
1105 // Ignore return value since `fd' might have been removed
1106 // by `RemoveConsumer' in `SetFailed'
1107 butil::ignore_result(edisp.RemoveEpollOut(id(), fd, pollin));
1108 errno = saved_errno;
1109 // Could be writable or spurious wakeup (by former epollout)
1110 return rc;
1111 }
151 int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
152 if (_epfd < 0) {
153 errno = EINVAL;
154 return -1;
155 }
158 epoll_event evt;
159 evt.data.u64 = socket_id;
160 evt.events = EPOLLOUT | EPOLLET;
164 if (pollin) {
165 evt.events |= EPOLLIN;
166 if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
167 // This fd has been removed from epoll via `RemoveConsumer',
168 // in which case errno will be ENOENT
169 return -1;
170 }
171 } else {
172 if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
173 return -1;
174 }
175 }
192 return 0;
193 }
以上還是挺簡單的實現(xiàn),注冊寫事件并butex_wait锋华,回來后RemoveEpollOut事件嗡官,并繼續(xù)返回到上面的KeepWrite邏輯。
再回到ConnectIfNot實現(xiàn)中毯焕,如果沒有連接則進行Connect衍腥,關(guān)聯(lián)回調(diào)KeepWriteIfConnected:
1236 int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
1237 if (_fd.load(butil::memory_order_consume) >= 0) {
1238 return 0;
1239 }
1240
1241 // Have to hold a reference for `req'
1242 SocketUniquePtr s;
1243 ReAddress(&s);
1244 req->socket = s.get();
1245 if (_conn) {
1246 //
1249 } else {
1250 if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
1251 return -1;
1252 }
1253 }
1254 s.release();
1255 return 1;
1256 }
1113 int Socket::Connect(const timespec* abstime,
1114 int (*on_connect)(int, int, void*), void* data) {
1134 const int rc = ::connect(
1135 sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
1136 if (rc != 0 && errno != EINPROGRESS) {
1138 return -1;
1139 }
1140 if (on_connect) {
1141 EpollOutRequest* req = new(std::nothrow) EpollOutRequest;
1142 if (req == NULL) {
1144 return -1;
1145 }
1146 req->fd = sockfd;
1147 req->timer_id = 0;
1148 req->on_epollout_event = on_connect;
1149 req->data = data;
1150 // A temporary Socket to hold `EpollOutRequest', which will
1151 // be added into epoll device soon
1152 SocketId connect_id;
1153 SocketOptions options;
1154 options.user = req;
1155 if (Socket::Create(options, &connect_id) != 0) {
1157 delete req;
1158 return -1;
1159 }
1160 // From now on, ownership of `req' has been transferred to
1161 // `connect_id'. We hold an additional reference here to
1162 // ensure `req' to be valid in this scope
1163 SocketUniquePtr s;
1164 CHECK_EQ(0, Socket::Address(connect_id, &s));
1165
1166 // Add `sockfd' into epoll so that `HandleEpollOutRequest' will
1167 // be called with `req' when epoll event reaches
1168 if (GetGlobalEventDispatcher(sockfd).
1169 AddEpollOut(connect_id, sockfd, false) != 0) {
1170 //
1174 return -1;
1175 }
1177 // Register a timer for EpollOutRequest. Note that the timeout
1178 // callback has no race with the one above as both of them try
1179 // to `SetFailed' `connect_id' while only one of them can succeed
1180 // It also work when `HandleEpollOutRequest' has already been
1181 // called before adding the timer since it will be removed
1182 // inside destructor of `EpollOutRequest' after leaving this scope
1183 if (abstime) {
1184 int rc = bthread_timer_add(&req->timer_id, *abstime,
1185 HandleEpollOutTimeout,
1186 (void*)connect_id);
1187 //
1192 }
1193
1194 } else {
1195 //
1202 }
1203 return sockfd.release();
1204 }
以上Connect實現(xiàn)大致是創(chuàng)建一個socket并connect到指定地址,如果有on_connect則new一個EpollOutRequest并聯(lián)到user纳猫,創(chuàng)建一個Socket對象婆咸,接著AddEpollOut事件,并注冊timer超時HandleEpollOutTimeout:
1281 void Socket::HandleEpollOutTimeout(void* arg) {
1282 SocketId id = (SocketId)arg;
1283 SocketUniquePtr s;
1284 if (Socket::Address(id, &s) != 0) {
1285 return;
1286 }
1287 EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
1288 if (req == NULL) {
1290 return;
1291 }
1292 s->HandleEpollOutRequest(ETIMEDOUT, req);
1293 }
1294
1295 int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
1296 // Only one thread can `SetFailed' this `Socket' successfully
1297 // Also after this `req' will be destroyed when its reference
1298 // hits zero
1299 if (SetFailed() != 0) {
1300 return -1;
1301 }
1302 // We've got the right to call user callback
1303 // The timer will be removed inside destructor of EpollOutRequest
1304 GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
1305 return req->on_epollout_event(req->fd, error_code, req->data);
1306 }
當(dāng)連接成功芜辕,即發(fā)生可寫事件尚骄,則會進行HandleEpollOut-> HandleEpollOutRequest->KeepWriteIfConnected:
1351 int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
1352 WriteRequest* req = static_cast<WriteRequest*>(data);
1353 Socket* s = req->socket;
1354 if (err == 0 && s->ssl_state() == SSL_CONNECTING) {
1355 //more code..
1367 }
1368 CheckConnectedAndKeepWrite(fd, err, data);
1369 return 0;
1370 }
1372 void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) {
1373 butil::fd_guard sockfd(fd);
1374 WriteRequest* req = static_cast<WriteRequest*>(data);
1375 Socket* s = req->socket;
1376 CHECK_GE(sockfd, 0);
1377 if (err == 0 && s->CheckConnected(sockfd) == 0
1378 && s->ResetFileDescriptor(sockfd) == 0) {
1379 if (s->_app_connect) {
1380 //
1381 } else {
1382 // Successfully created a connection
1383 AfterAppConnected(0, req);
1384 }
1385 // Release this socket for KeepWrite
1386 sockfd.release();
1387 } else {
1388 //
1392 }
1393 }
1308 void Socket::AfterAppConnected(int err, void* data) {
1309 WriteRequest* req = static_cast<WriteRequest*>(data);
1310 if (err == 0) {
1316 // requests are not setup yet. check the comment on Setup() in Write()
1317 req->Setup(s);
1318 bthread_t th;
1319 if (bthread_start_background(
1320 &th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) {
1322 KeepWrite(req);
1323 }
1324 } else {
1325 //more code...
1342 }
1343 }
最終還是會調(diào)用到KeepWrite。在最開始中Socket::HandleEpollOut侵续,因為有些是沒有關(guān)聯(lián)EpollOutRequest的情況倔丈,這里只是wakeup相關(guān)等待epollout事件的_epollout_butex,即 butex_wake_except状蜗。當(dāng)有可寫事件需五,關(guān)閉socket時會進行butex_wake_except(這些可參考之前的分析)。
設(shè)計的思想:
229 // Write `msg' into this Socket and clear it. The `msg' should be an
230 // intact request or response. To prevent messages from interleaving
231 // with other messages, the internal file descriptor is written by one
232 // thread at any time. Namely when only one thread tries to write, the
233 // message is written once directly in the calling thread. If the message
234 // is not completely written, a KeepWrite thread is created to continue
235 // the writing. When other threads want to write simultaneously (thread
236 // contention), they append WriteRequests to the KeepWrite thread in a
237 // wait-free manner rather than writing to the file descriptor directly.
238 // KeepWrite will not quit until all WriteRequests are complete.
239 // Key properties:
240 // - all threads have similar opportunities to write, no one is starved.
241 // - Write once when uncontended(most cases).
242 // - Wait-free when contended.
因為socket比較復(fù)雜轧坎,很多細節(jié)沒有分析和考慮全面警儒,后面需要對這種設(shè)計進行深入思考,再分析下IOBuf眶根。