brpc網(wǎng)絡(luò)實現(xiàn)思考

這些時間斷斷續(xù)續(xù)在分析brpc的網(wǎng)絡(luò)實現(xiàn)洁奈,因為之前說過這個事情专酗。然后因為工作中使用到的框架實現(xiàn)辟癌,其底層網(wǎng)絡(luò)實現(xiàn)是單線程處理所有的收發(fā)數(shù)據(jù)邏輯坷牛,然后再轉(zhuǎn)發(fā)亲桥,不再處理其他多余的邏輯轩娶。后來因為線上的問題造成卡頓魄缚,猜測這兒是否當(dāng)網(wǎng)絡(luò)數(shù)據(jù)過多時暗甥,或者一個socket要寫的數(shù)據(jù)太多喜滨,導(dǎo)致其他socket餓死的情況?進而導(dǎo)致client收不到對應(yīng)response或者廣播的消息撤防,比如多只怪愣著不動幾秒虽风。雖然client發(fā)的請求數(shù)據(jù)不會太多,不會導(dǎo)致server一直讀,但一直寫某個socket會影響其他socket上的讀寫事件辜膝,所以這里便引發(fā)深入思考无牵。

分析過的開源框架,可能處理的業(yè)務(wù)場景不同厂抖,所以實現(xiàn)和引發(fā)的問題也是不同的茎毁。比如redis單線程,性能也很好验游,網(wǎng)絡(luò)和業(yè)務(wù)邏輯都在一個線程中充岛;之前分析過的skynet底層雖然是多線程,但只有一個網(wǎng)絡(luò)線程耕蝉,接管所有socket連接的讀寫事件處理崔梗;phxrpc雖然是多線程的,一個accept線程對收到的新連接輪詢其他unit負載情況垒在,哪個輕的給哪個處理蒜魄,unit里是多個工作線程+一個網(wǎng)絡(luò)線程eventloop,雖然是平均但本質(zhì)還是一個场躯,而且如果有的unit空閑谈为,其他的unit忙,那么忙的unit里的待處理的socket也會因為同一個unit中正在讀寫的socket情況導(dǎo)致得不到處理踢关,也是有這種情況伞鲫。

直到brpc,帶來一種新的實現(xiàn)签舞,這里引用下原文:
“brpc使用一個或多個EventDispatcher(簡稱為EDISP)等待任一fd發(fā)生事件秕脓。和常見的“IO線程”不同,EDISP不負責(zé)讀取儒搭。IO線程的問題在于一個線程同時只能讀一個fd吠架,當(dāng)多個繁忙的fd聚集在一個IO線程中時,一些讀取就被延遲了搂鲫。多租戶傍药、復(fù)雜分流算法,Streaming RPC等功能會加重這個問題魂仍。高負載下常見的某次讀取卡頓會拖慢一個IO線程中所有fd的讀取拐辽,對可用性的影響幅度較大〔磷茫”

接下來會整體分析下socket的相關(guān)實現(xiàn)俱诸,這里的socket實現(xiàn)較為復(fù)雜,會省去一些細節(jié)仑氛,會簡化分析一些關(guān)鍵實現(xiàn)乙埃。

在server啟動的時候會調(diào)用StartInternal函數(shù)闸英,其中會創(chuàng)建listen socket用于監(jiān)聽socket連接。

 967         if (_am == NULL) {
 968             _am = BuildAcceptor();
 969             //check...
 973         }
 974         //more code...
 981         // Pass ownership of `sockfd' to `_am'
 982         if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
 983                              _default_ssl_ctx) != 0) {
 985             return -1;
 986         }

brpc之消息處理流程中分析可知介袜,在BuildAcceptor會初始化各協(xié)議處理handler甫何。

 35 // Accept connections from a specific port and then
 36 // process messages from which it reads
 37 class Acceptor : public InputMessenger {
 38 public:
 39     typedef butil::FlatMap<SocketId, ConnectStatistics> SocketMap;
 40 
 41     enum Status {
 42         UNINITIALIZED = 0,
 43         READY = 1,
 44         RUNNING = 2,
 45         STOPPING = 3,
 46     };
 47 
 48 public:
 83 private:
 84     // Accept connections.
 85     static void OnNewConnectionsUntilEAGAIN(Socket* m);
 86     static void OnNewConnections(Socket* m);
 87 
 96     bthread_keytable_pool_t* _keytable_pool; // owned by Server
 99     bthread_t _close_idle_tid;
107 
108     // The map containing all the accepted sockets
109     SocketMap _socket_map;
112 };

其中Acceptor繼承InputMessenger類,管理所有的連接遇伞。接著StartAccept:

 50 int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
 51                           const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
 52     //more code...
 69     if (idle_timeout_sec > 0) {
 70         if (bthread_start_background(&_close_idle_tid, NULL,
 71                                      CloseIdleConnections, this) != 0) {
 73             return -1;
 74         }
 75     }
 79     // Creation of _acception_id is inside lock so that OnNewConnections
 80     // (which may run immediately) should see sane fields set below.
 81     SocketOptions options;
 82     options.fd = listened_fd;
 83     options.user = this;
 84     options.on_edge_triggered_events = OnNewConnections;
 85     if (Socket::Create(options, &_acception_id) != 0) {
 86         // Close-idle-socket thread will be stopped inside destructor
 88         return -1;
 89     }
 90 
 91     _listened_fd = listened_fd;
 92     _status = RUNNING;
 93     return 0;
 94 }

其中會在后臺跑個bthread辙喂,調(diào)用CloseIdleConnections,每隔一段時間醒來鸠珠,對所有可用連接判斷是否要作空閑連接處理巍耗,如果沒有收發(fā)數(shù)據(jù)一定時間的話,功能是比較簡單的渐排。其中void (on_edge_triggered_events)(Socket)是當(dāng)有事件到來時的回調(diào)函數(shù)炬太,對于listen socket來說,是OnNewConnections驯耻,后面再分析OnNewConnections亲族。接著create socket對象:

 585 // SocketId = 32-bit version + 32-bit slot.
 586 //   version: from version part of _versioned_nref, must be an EVEN number.
 587 //   slot: designated by ResourcePool.
 588 int Socket::Create(const SocketOptions& options, SocketId* id) {
 589     butil::ResourceId<Socket> slot;
 590     Socket* const m = butil::get_resource(&slot, Forbidden());
 591     if (m == NULL) {
 593         return -1;
 594     }
 601     m->_on_edge_triggered_events = options.on_edge_triggered_events;
 609     m->_this_id = MakeSocketId(
 610             VersionOfVRef(m->_versioned_ref.fetch_add(
 611                     1, butil::memory_order_release)), slot);
 612     m->_preferred_index = -1;
 652     CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
 653     // Must be last one! Internal fields of this Socket may be access
 654     // just after calling ResetFileDescriptor.
 655     if (m->ResetFileDescriptor(options.fd) != 0) {
 656         //error...
 661     }
 662     *id = m->_this_id;
 663     return 0;
 664 }

以上列的是涉及重要數(shù)據(jù)成員的操作,其他的雖然也是重要但在這里不是要關(guān)心的細節(jié)可缚。在socket.h文件中霎迫,類socket的聲明加上注釋有640多行,這里就不貼上相關(guān)代碼咯帘靡,分析到相關(guān)的操作時再列一下知给,接著ResetFileDescriptor:

 522 int Socket::ResetFileDescriptor(int fd) {
 523     // Reset message sizes when fd is changed.
 524     _last_msg_size = 0;
 525     _avg_msg_size = 0;
 526     // MUST store `_fd' before adding itself into epoll device to avoid
 527     // race conditions with the callback function inside epoll
 528     _fd.store(fd, butil::memory_order_release);
 543     // Make the fd non-blocking.
 544     if (butil::make_non_blocking(fd) != 0) {
 546         return -1;
 547     }
 550     butil::make_no_delay(fd);
 574     if (_on_edge_triggered_events) {
 575         if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
 576             //error...
 580         }
 581     }
 582     return 0;
 583 }

其中會設(shè)置fd為非阻塞,nodelay描姚,設(shè)置fd的SO_SNDBUF和SO_RCVBUF大小涩赢,最后添加至epoll中等待事件的發(fā)生,其中EventDispatcher是事件分發(fā)器的封裝轰胁,下面分析此類實現(xiàn)谒主。

因為數(shù)量是可配的朝扼,可以配置多個EventDispatcher實例:

352 static void StopAndJoinGlobalDispatchers() {
353     for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
354         g_edisp[i].Stop();
355         g_edisp[i].Join();
356     }    
357 }            
358 void InitializeGlobalDispatchers() {
359     g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
360     for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
361         const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
362             BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
363         CHECK_EQ(0, g_edisp[i].Start(&attr));
364     }
365     // This atexit is will be run before g_task_control.stop() because above
366     // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
367     CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
368 }
369 
370 EventDispatcher& GetGlobalEventDispatcher(int fd) {
371     pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
372     if (FLAGS_event_dispatcher_num == 1) {
373         return g_edisp[0];
374     }    
375     int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
376     return g_edisp[index];
377 }

以上是初始化和隨機獲取對象的實現(xiàn)赃阀,EventDispatcher使用et模式來監(jiān)聽事件:

 89 int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
115     int rc = bthread_start_background(
116         &_tid, &_consumer_thread_attr, RunThis, this);
117     if (rc) {
119         return -1;
120     }
121     return 0;
122 }

273 void* EventDispatcher::RunThis(void* arg) {
274     ((EventDispatcher*)arg)->Run();
275     return NULL;
276 }
278 void EventDispatcher::Run() {
279     while (!_stop) {
282         epoll_event e[32];
290         const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
296         if (_stop) {
297             // epoll_ctl/epoll_wait should have some sort of memory fencing
298             // guaranteeing that we(after epoll_wait) see _stop set before
299             // epoll_ctl.
300             break;
301         }
302         if (n < 0) {
303             if (EINTR == errno) {
304                 // We've checked _stop, no wake-up will be missed.
305                 continue;
306             }
312             break;
313         }
314         for (int i = 0; i < n; ++i) {
316             if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
321                 // We don't care about the return value.
322                 Socket::StartInputEvent(e[i].data.u64, e[i].events,
323                                         _consumer_thread_attr);
324             }
332         }
333         for (int i = 0; i < n; ++i) {
335             if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
336                 // We don't care about the return value.
337                 Socket::HandleEpollOut(e[i].data.u64);
338             }
345         }
346     }
347 }

其中分別處理in/out等事件,然后分別回調(diào)StartInputEvent和HandleEpollOut擎颖。另外榛斯,添加事件:

225 int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
226     if (_epfd < 0) {
227         errno = EINVAL;  
228         return -1;
229     }        
231     epoll_event evt;
232     evt.events = EPOLLIN | EPOLLET;
233     evt.data.u64 = socket_id;
237     return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
245 }

對于listen socket來說,有新連接到來時發(fā)生可讀事件搂捧,那么會調(diào)用StartInputEvent:

1924 int Socket::StartInputEvent(SocketId id, uint32_t events,
1925                             const bthread_attr_t& thread_attr) {
1926     SocketUniquePtr s;
1927     if (Address(id, &s) < 0) {
1928         return -1;
1929     }  
1930     if (NULL == s->_on_edge_triggered_events) {
1931         // Callback can be NULL when receiving error epoll events
1932         // (Added into epoll by `WaitConnected')
1933         return 0;
1934     }      
1935     if (s->fd() < 0) {
1941         return -1;
1942     }
1947     // Passing e[i].events causes complex visibility issues and
1948     // requires stronger memory fences, since reading the fd returns
1949     // error as well, we don't pass the events.
1950     if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
1956         bthread_t tid;
1957         // transfer ownership as well, don't use s anymore!
1958         Socket* const p = s.release();
1959 
1960         bthread_attr_t attr = thread_attr;
1961         attr.keytable_pool = p->_keytable_pool;
1962         if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
1964             ProcessEvent(p);
1965         }
1966     }
1967     return 0;
1968 }

以上邏輯會根據(jù)socket_id獲取socket對象驮俗,如果沒有就表示刪除了,這種情況是允許的允跑。然后判斷是否有回調(diào)函數(shù)王凑,沒有的話則返回搪柑。這里使用_nevent控制是否是有bthread在處理該socket上的讀事件,這么做的好處引用:
"EDISP使用Edge triggered模式索烹。當(dāng)收到事件時工碾,EDISP給一個原子變量加1,只有當(dāng)加1前的值是0時啟動一個bthread處理對應(yīng)fd上的數(shù)據(jù)百姓。在背后渊额,EDISP把所在的pthread讓給了新建的bthread,使其有更好的cache locality垒拢,可以盡快地讀取fd上的數(shù)據(jù)旬迹。而EDISP所在的bthread會被偷到另外一個pthread繼續(xù)執(zhí)行,這個過程即是bthread的work stealing調(diào)度求类。"

由之前的分析bthread_start_urgent是立即啟動一個bthread并執(zhí)行ProcessEvent:

1017 void* Socket::ProcessEvent(void* arg) {
1018     // the enclosed Socket is valid and free to access inside this function.
1019     SocketUniquePtr s(static_cast<Socket*>(arg));
1020     s->_on_edge_triggered_events(s.get());
1021     return NULL;
1022 }

這里執(zhí)行回調(diào)OnNewConnections:

317 void Acceptor::OnNewConnections(Socket* acception) {
318     int progress = Socket::PROGRESS_INIT;
319     do {
320         OnNewConnectionsUntilEAGAIN(acception);
321         if (acception->Failed()) {
322             return;
323         }
324     } while (acception->MoreReadEvents(&progress));
325 } 
243 void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
244     while (1) {
245         struct sockaddr in_addr;
246         socklen_t in_len = sizeof(in_addr);
247         butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
248         if (in_fd < 0) {
249             // no EINTR because listened fd is non-blocking.
250             if (errno == EAGAIN) {
251                 return;
252             }
253             //error...
260             continue;
261         }
270         SocketId socket_id;
271         SocketOptions options;
272         options.keytable_pool = am->_keytable_pool;
273         options.fd = in_fd;
274         options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
275         options.user = acception->user();
276         options.on_edge_triggered_events = InputMessenger::OnNewMessages;
277         options.initial_ssl_ctx = am->_ssl_ctx;
278         if (Socket::Create(options, &socket_id) != 0) {
280             continue;
281         }
282         in_fd.release(); // transfer ownership to socket_id
284         // There's a funny race condition here. After Socket::Create, messages
285         // from the socket are already handled and a RPC is possibly done
286         // before the socket is added into _socket_map below. This is found in
287         // ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
288         // on machines with few cores) where the _messenger.ConnectionCount()
289         // may surprisingly be 0 even if the RPC is already done.
290                         
291         SocketUniquePtr sock;
292         if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
293             bool is_running = true;
294             {       
295                 BAIDU_SCOPED_LOCK(am->_map_mutex);
296                 is_running = (am->status() == RUNNING);
297                 // Always add this socket into `_socket_map' whether it
298                 // has been `SetFailed' or not, whether `Acceptor' is
299                 // running or not. Otherwise, `Acceptor::BeforeRecycle'
300                 // may be called (inside Socket::OnRecycle) after `Acceptor'
301                 // has been destroyed
302                 am->_socket_map.insert(socket_id, ConnectStatistics());
303             }
304             if (!is_running) {
305                 //error...
310                 return;
311             }
312         } // else: The socket has already been destroyed, Don't add its id
313           // into _socket_map
314     }
315 }

這里接收到一個新連接奔垦,并關(guān)聯(lián)OnNewMessages回調(diào)。這里貼上注釋以及AddressFailedAsWell相關(guān)的代碼尸疆,是因為比較有思考意義宴倍,如注釋說的,當(dāng)create注冊到epoll中后仓技,可能立即發(fā)生可讀事件鸵贬,并可能切到其他線程所在的bthread執(zhí)行并立即釋放,完成整個rpc過程脖捻,所以這里使用到的socket_id所代表的對象可能無效阔逼。AddressFailedAsWell實現(xiàn)中較復(fù)雜,后面再分析地沮。OnNewMessages回調(diào)分析在之前的[brpc之消息處理流程]已經(jīng)分析過嗜浮,不再分析。

因為accept連接邏輯是個while(1)摩疑,所以當(dāng)EAGAIN時表示暫時沒新連接可accept便返回危融。接著:

227 inline bool Socket::MoreReadEvents(int* progress) {
228     // Fail to CAS means that new events arrived.
229     return !_nevent.compare_exchange_strong(
230         *progress, 0, butil::memory_order_release,
231             butil::memory_order_acquire);
232 }

這里是因為在OnNewConnections處理過程中,可能發(fā)生StartInputEvent雷袋,所以這里還是要判斷下吉殃,如果_nevent和progress的值相同(初始為1)則把_nevent設(shè)置回0,否則記錄到progress楷怒,這樣繼續(xù)處理accept蛋勺。

當(dāng)socket發(fā)生可寫之類的事件時:

1258 int Socket::HandleEpollOut(SocketId id) {
1259     SocketUniquePtr s;
1260     // Since Sockets might have been `SetFailed' before they were
1261     // added into epoll, these sockets miss the signal inside
1262     // `SetFailed' and therefore must be signalled here using
1263     // `AddressFailedAsWell' to prevent waiting forever
1264     if (Socket::AddressFailedAsWell(id, &s) < 0) {
1265         // Ignore recycled sockets
1266         return -1;
1267     } 
1268    
1269     EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
1270     if (req != NULL) {
1271         return s->HandleEpollOutRequest(0, req);
1272     }
1273     
1274     // Currently `WaitEpollOut' needs `_epollout_butex'
1275     // TODO(jiangrujie): Remove this in the future 
1276     s->_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
1277     bthread::butex_wake_except(s->_epollout_butex, 0);
1278     return 0;
1279 }

1295 int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
1296     // Only one thread can `SetFailed' this `Socket' successfully
1297     // Also after this `req' will be destroyed when its reference
1298     // hits zero
1299     if (SetFailed() != 0) {
1300         return -1;
1301     }
1302     // We've got the right to call user callback
1303     // The timer will be removed inside destructor of EpollOutRequest
1304     GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
1305     return req->on_epollout_event(req->fd, error_code, req->data);
1306 }

這里幾行代碼沒啥可分析的,后面綜合一下鸠删,這里具體分析下當(dāng)發(fā)生要寫數(shù)據(jù)時的實現(xiàn)抱完。

由于對于accept到的連接,只設(shè)置EPOLLIN | EPOLLET事件刃泡,并未有EPOLLOUT事件巧娱,這塊是在后面的才調(diào)用AddEpollOut設(shè)置的碉怔。

比如當(dāng)server對rpc進行響應(yīng)時,在baidu協(xié)議中有:

253         Socket::WriteOptions wopt;
254         wopt.ignore_eovercrowded = true;
255         if (sock->Write(&res_buf, &wopt) != 0) {
256             const int errcode = errno;
258             cntl->SetFailed(errcode, "Fail to write into %s",
259                             sock->description().c_str());
260             return;
261         }
1432 int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
1433     //more code...
1456     WriteRequest* req = butil::get_object<WriteRequest>();
1457     if (!req) {
1458         return SetError(opt.id_wait, ENOMEM);
1459     }   
1460     
1461     req->data.swap(*data);
1462     // Set `req->next' to UNCONNECTED so that the KeepWrite thread will
1463     // wait until it points to a valid WriteRequest or NULL.
1464     req->next = WriteRequest::UNCONNECTED;
1465     req->id_wait = opt.id_wait;
1466     req->set_pipelined_count_and_user_message(
1467         opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
1468     return StartWrite(req, opt);
1469 } 
 303 struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest {
 304     static WriteRequest* const UNCONNECTED;
 305     
 306     butil::IOBuf data;
 307     WriteRequest* next; 
 308     bthread_id_t id_wait;
 309     Socket* socket;
54 };

以在開始寫的時候會申請一個WriteRequest結(jié)構(gòu)禁添,swap要寫的數(shù)據(jù)并set_pipelined_count_and_user_message(具體作用后面再分析)眨层,接著StartWrite:

1506 int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
1507     // Release fence makes sure the thread getting request sees *req
1508     WriteRequest* const prev_head =
1509         _write_head.exchange(req, butil::memory_order_release);
1510     if (prev_head != NULL) {
1511         // Someone is writing to the fd. The KeepWrite thread may spin
1512         // until req->next to be non-UNCONNECTED. This process is not
1513         // lock-free, but the duration is so short(1~2 instructions,
1514         // depending on compiler) that the spin rarely occurs in practice
1515         // (I've not seen any spin in highly contended tests).
1516         req->next = prev_head;
1517         return 0;
1518     }

設(shè)置 _write_head指向req,如果有其他bthread寫則把prev_head掛在req的next上去并返回(反向的)上荡,這里不能同時寫一個socket否則會出現(xiàn)數(shù)據(jù)的交錯:

1520     int saved_errno = 0;
1521     bthread_t th;
1522     SocketUniquePtr ptr_for_keep_write;
1523     ssize_t nw = 0;
1524 
1525     // We've got the right to write.
1526     req->next = NULL;
1527 
1528     // Connect to remote_side() if not.
1529     int ret = ConnectIfNot(opt.abstime, req);
1530     //ret >=0
1540     // NOTE: Setup() MUST be called after Connect which may call app_connect,
1541     // which is assumed to run before any SocketMessage.AppendAndDestroySelf()
1542     // in some protocols(namely RTMP).
1543     req->Setup(this);
1544 
1545     if (ssl_state() != SSL_OFF) {
1546         // Writing into SSL may block the current bthread, always write
1547         // in the background.
1548         goto KEEPWRITE_IN_BACKGROUND;
1549     }
1551     // Write once in the calling thread. If the write is not complete,
1552     // continue it in KeepWrite thread.
1553     if (_conn) {
1554         //
1556     } else {
1557         nw = req->data.cut_into_file_descriptor(fd());
1558     }
1559     if (nw < 0) {
1560         //error...
1569     } else {
1570         AddOutputBytes(nw);
1571     }
1572     if (IsWriteComplete(req, true, NULL)) {
1573         ReturnSuccessfulWriteRequest(req);
1574         return 0;
1575     }
1577 KEEPWRITE_IN_BACKGROUND:
1578     ReAddress(&ptr_for_keep_write);
1579     req->socket = ptr_for_keep_write.release();
1580     if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
1581                                  KeepWrite, req) != 0) {
1583         KeepWrite(req);
1584     }
1585     return 0;
1586 
1587 FAIL_TO_WRITE:
1588     // `SetFailed' before `ReturnFailedWriteRequest' (which will calls
1589     // `on_reset' callback inside the id object) so that we immediately
1590     // know this socket has failed inside the `on_reset' callback
1591     ReleaseAllFailedWriteRequests(req);
1592     errno = saved_errno;
1593     return -1;
1594 }

如果能寫趴樱,則檢查ConnectIfNot是否有效,這里先假設(shè)有效酪捡,后面再分析當(dāng)無效時的處理叁征,不考慮ssl情況,接著開始寫req->data.cut_into_file_descriptor(fd())逛薇,并對寫成功的字節(jié)數(shù)進行調(diào)整AddOutputBytes捺疼。然后檢查本次寫是否完成,如果是獲取寫的權(quán)利永罚,那么當(dāng)本次req寫完時啤呼,還要判斷有沒有其他要寫的,因為可能有其他的bthread嘗試寫時暫時無法寫呢袱,進而把它的req掛在了head上官扣,即上面的req->next = prev_head這段代碼,假設(shè)寫完則:

 471 void Socket::ReturnSuccessfulWriteRequest(Socket::WriteRequest* p) {
 472     DCHECK(p->data.empty());
 473     AddOutputMessages(1);
 474     const bthread_id_t id_wait = p->id_wait;
 475     butil::return_object(p);
 476     if (id_wait != INVALID_BTHREAD_ID) {
 477         NotifyOnFailed(id_wait);
 478     }
 479 }

如果沒有寫完羞福,這里分析下IsWriteComplete實現(xiàn):

1024 // Check if there're new requests appended.
1025 // If yes, point old_head to to reversed new requests and return false;
1026 // If no:
1027 //    old_head is fully written, set _write_head to NULL and return true;
1028 //    old_head is not written yet, keep _write_head unchanged and return false;
1029 // `old_head' is last new_head got from this function or (in another word)
1030 // tail of current writing list.
1031 // `singular_node' is true iff `old_head' is the only node in its list.
1032 bool Socket::IsWriteComplete(Socket::WriteRequest* old_head,
1033                              bool singular_node,
1034                              Socket::WriteRequest** new_tail) {
1035     CHECK(NULL == old_head->next);
1036     // Try to set _write_head to NULL to mark that the write is done.
1037     WriteRequest* new_head = old_head;
1038     WriteRequest* desired = NULL;
1039     bool return_when_no_more = true;
1040     if (!old_head->data.empty() || !singular_node) {
1041         desired = old_head;
1042         // Write is obviously not complete if old_head is not fully written.
1043         return_when_no_more = false;
1044     }
1045     if (_write_head.compare_exchange_strong(
1046             new_head, desired, butil::memory_order_acquire)) {
1047         // No one added new requests.
1048         if (new_tail) {
1049             *new_tail = old_head;
1050         }
1051         return return_when_no_more;
1052     }

如果old_head->data.empty()為false則肯定沒完寫完惕蹄,此時要判斷是否有其他的寫請求進來,如果沒有則直接返回false后臺寫治专,否則可能要逆置這個寫鏈表:

1057     // Someone added new requests.
1058     // Reverse the list until old_head.
1059     WriteRequest* tail = NULL;
1060     WriteRequest* p = new_head;
1061     do {
1062         while (p->next == WriteRequest::UNCONNECTED) {
1063             // TODO(gejun): elaborate this
1064             sched_yield();
1065         }
1066         WriteRequest* const saved_next = p->next;
1067         p->next = tail;
1068         tail = p;
1069         p = saved_next;
1070         CHECK(p != NULL);
1071     } while (p != old_head);
1072 
1073     // Link old list with new list.
1074     old_head->next = tail;
1075     // Call Setup() from oldest to newest, notice that the calling sequence
1076     // matters for protocols using pipelined_count, this is why we don't
1077     // calling Setup in above loop which is from newest to oldest.
1078     for (WriteRequest* q = tail; q; q = q->next) {
1079         q->Setup(this);
1080     }
1081     if (new_tail) {
1082         *new_tail = new_head;
1083     }
1084     return false;

以上邏輯還是挺簡單的卖陵,頭插法單鏈表,從原來的a->b->c->old_head张峰,變成old_head->c->b->a這樣泪蔫。其中:

1062         while (p->next == WriteRequest::UNCONNECTED) {
1063             // TODO(gejun): elaborate this
1064             sched_yield();
1065         }

“可能會被一個值仍為UNCONNECTED的節(jié)點鎖定(這需要發(fā)起寫的線程正好在原子交換后,在設(shè)置next指針前喘批,僅僅一條指令的時間內(nèi)被OS換出)撩荣,但在實踐中很少出現(xiàn)“妫”

接著啟動個bthread繼續(xù)寫KeepWrite:

1598 void* Socket::KeepWrite(void* void_arg) {
1599     g_vars->nkeepwrite << 1;
1600     WriteRequest* req = static_cast<WriteRequest*>(void_arg);
1601     SocketUniquePtr s(req->socket);
1602 
1603     // When error occurs, spin until there's no more requests instead of
1604     // returning directly otherwise _write_head is permantly non-NULL which
1605     // makes later Write() abnormal.
1606     WriteRequest* cur_tail = NULL;
1607     do {
1608         // req was written, skip it.
1609         if (req->next != NULL && req->data.empty()) {
1610             WriteRequest* const saved_req = req;
1611             req = req->next;
1612             s->ReturnSuccessfulWriteRequest(saved_req);
1613         }
1614         const ssize_t nw = s->DoWrite(req);
1615         if (nw < 0) { 
1616             //error...
1623         } else {
1624             s->AddOutputBytes(nw);
1625         }

以上一個while婿滓,會判斷req是否完成并回調(diào)ReturnSuccessfulWriteRequest老速,否則直接DoWrite:

1674 ssize_t Socket::DoWrite(WriteRequest* req) {
1675     // Group butil::IOBuf in the list into a batch array.
1676     butil::IOBuf* data_list[DATA_LIST_MAX];
1677     size_t ndata = 0;
1678     for (WriteRequest* p = req; p != NULL && ndata < DATA_LIST_MAX;
1679          p = p->next) {
1680         data_list[ndata++] = &p->data;
1681     }           
1682                 
1683     if (ssl_state() == SSL_OFF) {
1684         // Write IOBuf in the batch array into the fd. 
1685         if (_conn) {
1686             return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
1687         } else {
1688             ssize_t nw = butil::IOBuf::cut_multiple_into_file_descriptor(
1689                 fd(), data_list, ndata);
1690             return nw;
1691         }
1692     }
1729 }

這里進行批量寫粥喜,最多DATA_LIST_MAX個塊,底層調(diào)用writev函數(shù)橘券。繼續(xù)回到KeepWrite中:

1626         // Release WriteRequest until non-empty data or last request.
1627         while (req->next != NULL && req->data.empty()) {
1628             WriteRequest* const saved_req = req;
1629             req = req->next;
1630             s->ReturnSuccessfulWriteRequest(saved_req);
1631         }
1632         // TODO(gejun): wait for epollout when we actually have written
1633         // all the data. This weird heuristic reduces 30us delay...
1634         // Update(12/22/2015): seem not working. better switch to correct code.
1635         // Update(1/8/2016, r31823): Still working.
1636         // Update(8/15/2017): Not working, performance downgraded.
1637         //if (nw <= 0 || req->data.empty()/*note*/) {
1638         if (nw <= 0) {
1639             g_vars->nwaitepollout << 1;
1640             bool pollin = (s->_on_edge_triggered_events != NULL);
1641             // NOTE: Waiting epollout within timeout is a must to force
1642             // KeepWrite to check and setup pending WriteRequests periodically,
1643             // which may turn on _overcrowded to stop pending requests from
1644             // growing infinitely.
1645             const timespec duetime =
1646                 butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
1647             const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
1648             if (rc < 0 && errno != ETIMEDOUT) {
1649                 //error...
1653                 break;
1654             }
1655         }
1656         if (NULL == cur_tail) {
1657             for (cur_tail = req; cur_tail->next != NULL;
1658                  cur_tail = cur_tail->next);
1659         }
1660         // Return when there's no more WriteRequests and req is completely
1661         // written.
1662         if (s->IsWriteComplete(cur_tail, (req == cur_tail), &cur_tail)) {
1663             CHECK_EQ(cur_tail, req);
1664             s->ReturnSuccessfulWriteRequest(req);
1665             return NULL;
1666         }
1667     } while (1);
1668 
1669     // Error occurred, release all requests until no new requests.
1670     s->ReleaseAllFailedWriteRequests(req);
1671     return NULL;
1672 }

因為批量寫额湘,對寫完的塊從鏈表中刪除并回調(diào)ReturnSuccessfulWriteRequest卿吐,接著WaitEpollOut并wait一定的時間:

1087 int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) {
1088     if (!ValidFileDescriptor(fd)) {
1089         return 0;
1090     }           
1091     // Do not need to check addressable since it will be called by
1092     // health checker which called `SetFailed' before
1093     const int expected_val = _epollout_butex->load(butil::memory_order_relaxed);
1094     EventDispatcher& edisp = GetGlobalEventDispatcher(fd);
1095     if (edisp.AddEpollOut(id(), fd, pollin) != 0) {
1096         return -1;
1097     }            
1098         
1099     int rc = bthread::butex_wait(_epollout_butex, expected_val, abstime);
1100     const int saved_errno = errno;
1101     if (rc < 0 && errno == EWOULDBLOCK) {
1102         // Could be writable or spurious wakeup
1103         rc = 0;
1104     }       
1105     // Ignore return value since `fd' might have been removed
1106     // by `RemoveConsumer' in `SetFailed'
1107     butil::ignore_result(edisp.RemoveEpollOut(id(), fd, pollin));
1108     errno = saved_errno;
1109     // Could be writable or spurious wakeup (by former epollout)
1110     return rc;
1111 }

151 int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
152     if (_epfd < 0) {
153         errno = EINVAL;
154         return -1;
155     }
158     epoll_event evt;
159     evt.data.u64 = socket_id;
160     evt.events = EPOLLOUT | EPOLLET;
164     if (pollin) {
165         evt.events |= EPOLLIN;
166         if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
167             // This fd has been removed from epoll via `RemoveConsumer',
168             // in which case errno will be ENOENT
169             return -1;
170         }
171     } else {
172         if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
173             return -1;
174         }
175     }
192     return 0;
193 }

以上還是挺簡單的實現(xiàn),注冊寫事件并butex_wait锋华,回來后RemoveEpollOut事件嗡官,并繼續(xù)返回到上面的KeepWrite邏輯。

再回到ConnectIfNot實現(xiàn)中毯焕,如果沒有連接則進行Connect衍腥,關(guān)聯(lián)回調(diào)KeepWriteIfConnected:

1236 int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
1237     if (_fd.load(butil::memory_order_consume) >= 0) {
1238        return 0;
1239     }
1240     
1241     // Have to hold a reference for `req'
1242     SocketUniquePtr s;
1243     ReAddress(&s);
1244     req->socket = s.get();
1245     if (_conn) {
1246         //
1249     } else {
1250         if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
1251             return -1;
1252         }
1253     }
1254     s.release();
1255     return 1;
1256 }

1113 int Socket::Connect(const timespec* abstime,
1114                     int (*on_connect)(int, int, void*), void* data) {
1134     const int rc = ::connect(
1135         sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
1136     if (rc != 0 && errno != EINPROGRESS) {
1138         return -1;
1139     }
1140     if (on_connect) {
1141         EpollOutRequest* req = new(std::nothrow) EpollOutRequest;
1142         if (req == NULL) {
1144             return -1;
1145         }
1146         req->fd = sockfd;
1147         req->timer_id = 0;
1148         req->on_epollout_event = on_connect;
1149         req->data = data;
1150         // A temporary Socket to hold `EpollOutRequest', which will
1151         // be added into epoll device soon
1152         SocketId connect_id;
1153         SocketOptions options;
1154         options.user = req;
1155         if (Socket::Create(options, &connect_id) != 0) {
1157             delete req;
1158             return -1;
1159         }
1160         // From now on, ownership of `req' has been transferred to
1161         // `connect_id'. We hold an additional reference here to
1162         // ensure `req' to be valid in this scope
1163         SocketUniquePtr s;
1164         CHECK_EQ(0, Socket::Address(connect_id, &s));
1165 
1166         // Add `sockfd' into epoll so that `HandleEpollOutRequest' will
1167         // be called with `req' when epoll event reaches
1168         if (GetGlobalEventDispatcher(sockfd).
1169             AddEpollOut(connect_id, sockfd, false) != 0) {
1170             //
1174             return -1;
1175         }
1177         // Register a timer for EpollOutRequest. Note that the timeout
1178         // callback has no race with the one above as both of them try
1179         // to `SetFailed' `connect_id' while only one of them can succeed
1180         // It also work when `HandleEpollOutRequest' has already been
1181         // called before adding the timer since it will be removed
1182         // inside destructor of `EpollOutRequest' after leaving this scope
1183         if (abstime) {
1184             int rc = bthread_timer_add(&req->timer_id, *abstime,
1185                                        HandleEpollOutTimeout,
1186                                        (void*)connect_id);
1187             //
1192         }
1193 
1194     } else {
1195         //
1202     }
1203     return sockfd.release();
1204 }

以上Connect實現(xiàn)大致是創(chuàng)建一個socket并connect到指定地址,如果有on_connect則new一個EpollOutRequest并聯(lián)到user纳猫,創(chuàng)建一個Socket對象婆咸,接著AddEpollOut事件,并注冊timer超時HandleEpollOutTimeout:

1281 void Socket::HandleEpollOutTimeout(void* arg) {
1282     SocketId id = (SocketId)arg;       
1283     SocketUniquePtr s;                 
1284     if (Socket::Address(id, &s) != 0) {
1285         return; 
1286     }           
1287     EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
1288     if (req == NULL) {
1290         return;
1291     } 
1292     s->HandleEpollOutRequest(ETIMEDOUT, req);
1293 }           
1294             
1295 int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
1296     // Only one thread can `SetFailed' this `Socket' successfully
1297     // Also after this `req' will be destroyed when its reference
1298     // hits zero
1299     if (SetFailed() != 0) {
1300         return -1;
1301     }
1302     // We've got the right to call user callback
1303     // The timer will be removed inside destructor of EpollOutRequest
1304     GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
1305     return req->on_epollout_event(req->fd, error_code, req->data);
1306 }

當(dāng)連接成功芜辕,即發(fā)生可寫事件尚骄,則會進行HandleEpollOut-> HandleEpollOutRequest->KeepWriteIfConnected:

1351 int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
1352     WriteRequest* req = static_cast<WriteRequest*>(data);
1353     Socket* s = req->socket;
1354     if (err == 0 && s->ssl_state() == SSL_CONNECTING) {
1355         //more code..
1367     }   
1368     CheckConnectedAndKeepWrite(fd, err, data);
1369     return 0;
1370 }

1372 void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) {
1373     butil::fd_guard sockfd(fd);
1374     WriteRequest* req = static_cast<WriteRequest*>(data);
1375     Socket* s = req->socket;
1376     CHECK_GE(sockfd, 0);
1377     if (err == 0 && s->CheckConnected(sockfd) == 0
1378         && s->ResetFileDescriptor(sockfd) == 0) {
1379         if (s->_app_connect) {
1380             //
1381         } else {
1382             // Successfully created a connection
1383             AfterAppConnected(0, req);
1384         }
1385         // Release this socket for KeepWrite
1386         sockfd.release();
1387     } else {
1388         //
1392     }   
1393 } 
1308 void Socket::AfterAppConnected(int err, void* data) {
1309     WriteRequest* req = static_cast<WriteRequest*>(data);
1310     if (err == 0) {
1316         // requests are not setup yet. check the comment on Setup() in Write()
1317         req->Setup(s);
1318         bthread_t th;
1319         if (bthread_start_background(
1320                 &th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) {
1322             KeepWrite(req);
1323         }
1324     } else {
1325         //more code...
1342     }
1343 }

最終還是會調(diào)用到KeepWrite。在最開始中Socket::HandleEpollOut侵续,因為有些是沒有關(guān)聯(lián)EpollOutRequest的情況倔丈,這里只是wakeup相關(guān)等待epollout事件的_epollout_butex,即 butex_wake_except状蜗。當(dāng)有可寫事件需五,關(guān)閉socket時會進行butex_wake_except(這些可參考之前的分析)。

設(shè)計的思想:

229     // Write `msg' into this Socket and clear it. The `msg' should be an
230     // intact request or response. To prevent messages from interleaving
231     // with other messages, the internal file descriptor is written by one
232     // thread at any time. Namely when only one thread tries to write, the
233     // message is written once directly in the calling thread. If the message
234     // is not completely written, a KeepWrite thread is created to continue
235     // the writing. When other threads want to write simultaneously (thread
236     // contention), they append WriteRequests to the KeepWrite thread in a 
237     // wait-free manner rather than writing to the file descriptor directly.
238     // KeepWrite will not quit until all WriteRequests are complete.
239     // Key properties:
240     // - all threads have similar opportunities to write, no one is starved.
241     // - Write once when uncontended(most cases).
242     // - Wait-free when contended.

因為socket比較復(fù)雜轧坎,很多細節(jié)沒有分析和考慮全面警儒,后面需要對這種設(shè)計進行深入思考,再分析下IOBuf眶根。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蜀铲,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子属百,更是在濱河造成了極大的恐慌记劝,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件族扰,死亡現(xiàn)場離奇詭異厌丑,居然都是意外死亡,警方通過查閱死者的電腦和手機渔呵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門怒竿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扩氢,你說我怎么就攤上這事耕驰。” “怎么了录豺?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵朦肘,是天一觀的道長饭弓。 經(jīng)常有香客問我,道長媒抠,這世上最難降的妖魔是什么弟断? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮趴生,結(jié)果婚禮上阀趴,老公的妹妹穿的比我還像新娘。我一直安慰自己苍匆,他們只是感情好舍咖,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著锉桑,像睡著了一般排霉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上民轴,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天攻柠,我揣著相機與錄音,去河邊找鬼后裸。 笑死瑰钮,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的微驶。 我是一名探鬼主播浪谴,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼因苹!你這毒婦竟也來了苟耻?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤扶檐,失蹤者是張志新(化名)和其女友劉穎凶杖,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體款筑,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡智蝠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了奈梳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片杈湾。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖攘须,靈堂內(nèi)的尸體忽然破棺而出漆撞,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布叫挟,位于F島的核電站艰匙,受9級特大地震影響限煞,放射性物質(zhì)發(fā)生泄漏抹恳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一署驻、第九天 我趴在偏房一處隱蔽的房頂上張望奋献。 院中可真熱鬧,春花似錦旺上、人聲如沸瓶蚂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窃这。三九已至,卻和暖如春征候,著一層夾襖步出監(jiān)牢的瞬間杭攻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工疤坝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留兆解,地道東北人。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓跑揉,卻偏偏與公主長得像锅睛,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子历谍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容