代碼版本:stable/v1.7.1
上篇文章我們分析過Envoy啟動(dòng)的第一步旁赊,進(jìn)行六個(gè)模塊的初始化操作,其中比較重要的一塊是server的初始化椅野。本片文章將繼續(xù)走讀server對(duì)象實(shí)例被初始化之后的啟動(dòng)流程以及新連接的建立终畅。
1. 入口
入口函數(shù)還是在main.cc中的int main中,如下
/**
* Basic Site-Specific main()
*
* This should be used to do setup tasks specific to a particular site's
* deployment such as initializing signal handling. It calls main_common
* after setting up command line options.
*/
int main(int argc, char** argv) {
std::unique_ptr<Envoy::MainCommon> main_common;
// Initialize the server's main context under a try/catch loop and simply return EXIT_FAILURE
// as needed. Whatever code in the initialization path that fails is expected to log an error
// message so the user can diagnose.
try {
main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
} catch (const Envoy::NoServingException& e) {
return EXIT_SUCCESS;
} catch (const Envoy::MalformedArgvException& e) {
return EXIT_FAILURE;
} catch (const Envoy::EnvoyException& e) {
return EXIT_FAILURE;
}
// Run the server listener loop outside try/catch blocks, so that unexpected exceptions
// show up as a core-dumps for easier diagnostis.
return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}
其中main_common是Envoy::MainCommon類型實(shí)例竟闪,在其class定義中离福,run函數(shù)定義如下
lass MainCommon {
public:
MainCommon(int argc, const char* const* argv);
bool run() { return base_.run(); }
static std::string hotRestartVersion(uint64_t max_num_stats, uint64_t max_stat_name_len,
bool hot_restart_enabled);
private:
#ifdef ENVOY_HANDLE_SIGNALS
Envoy::SignalAction handle_sigs;
Envoy::TerminateHandler log_on_terminate;
#endif
Envoy::OptionsImpl options_;
MainCommonBase base_;
};
base_類型是是MainCommonBase類型的實(shí)例,在main_common.cc中找到run的具體實(shí)現(xiàn)
bool MainCommonBase::run() {
switch (options_.mode()) {
case Server::Mode::Serve:
server_->run();
return true;
case Server::Mode::Validate: {
auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
return Server::validateConfig(options_, local_address, component_factory_);
}
case Server::Mode::InitOnly:
PERF_DUMP();
return true;
}
NOT_REACHED;
}
對(duì)應(yīng)的炼蛤,這里執(zhí)行了server->run()方法妖爷,代碼如下:
void InstanceImpl::run() {
RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
[this]() -> void { startWorkers(); });
// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
watchdog->startWatchdog(*dispatcher_);
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(info, "main dispatch loop exited");
guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
這部分代碼就是run的核心啟動(dòng)邏輯了,下圖簡要梳理了Envoy從啟動(dòng)到建立連接的主要流程理朋。主要流程是啟動(dòng)worker絮识,加載Listener,然后接受新連接三個(gè)三塊操作嗽上。
2. 啟動(dòng)worker
RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
[this]() -> void { startWorkers(); });
在serve.cc中次舌,server去調(diào)用RunHelper來啟動(dòng)startWorkers(),其代碼如下:
void InstanceImpl::startWorkers() {
listener_manager_->startWorkers(*guard_dog_);
// At this point we are ready to take traffic and all listening ports are up. Notify our parent
// if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
drain_manager_->startParentShutdownSequence();
}
先看第一行l(wèi)istener_manager_->startWorkers(*guard_dog_)
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
ENVOY_LOG(info, "all dependencies initialized. starting workers");
ASSERT(!workers_started_);
workers_started_ = true;
for (const auto& worker : workers_) {
ASSERT(warming_listeners_.empty());
for (const auto& listener : active_listeners_) {
addListenerToWorker(*worker, *listener);
}
worker->start(guard_dog);
}
}
拋開斷言和日志兽愤,核心是兩個(gè)for循環(huán)彼念,外面這層是遍歷初始化中的worker_,還記得在http://www.reibang.com/p/204d1631239d
中對(duì)worker_的初始化么浅萧,這里取出其中的每一個(gè)逐沙,內(nèi)層for取出active狀態(tài)的所有l(wèi)istener,添加listener到worker上洼畅,執(zhí)行addListenerToWorker(*worker, *listener)吩案,最后啟動(dòng)每一個(gè)worker_
那么去看看addListenerWorker做了什么吧。
void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) {
worker.addListener(listener, [this, &listener](bool success) -> void {
// The add listener completion runs on the worker thread. Post back to the main thread to
// avoid locking.
server_.dispatcher().post([this, success, &listener]() -> void {
// It is theoretically possible for a listener to get added on 1 worker but not the others.
// The below check with onListenerCreateFailure() is there to ensure we execute the
// removal/logging/stats at most once on failure. Note also that that drain/removal can race
// with addition. It's guaranteed that workers process remove after add so this should be
// fine.
if (!success && !listener.onListenerCreateFailure()) {
// TODO(mattklein123): In addition to a critical log and a stat, we should consider adding
// a startup option here to cause the server to exit. I think we
// probably want this at Lyft but I will do it in a follow up.
ENVOY_LOG(critical, "listener '{}' failed to listen on address '{}' on worker",
listener.name(), listener.socket().localAddress()->asString());
stats_.listener_create_failure_.inc();
removeListener(listener.name());
}
if (success) {
stats_.listener_create_success_.inc();
}
});
});
}
worker的addListener方法兩個(gè)入?yún)⒌鄞兀粋€(gè)是listener徘郭,另一個(gè)是add成功后的回調(diào)函數(shù)靠益,這個(gè)可以從其頭文件中的注釋中看出,
/**
* Add a listener to the worker.
* @param listener supplies the listener to add.
* @param completion supplies the completion to call when the listener has been added (or not) on
* the worker.
*/
virtual void addListener(Network::ListenerConfig& listener,
AddListenerCompletion completion) PURE;
回過頭看worker的addListener如下崎岂,調(diào)用dispatcher的post方法,傳入callback func闪湾,callback的入?yún)⑹莣orker addListener本身的入?yún)istener冲甘、和complete這個(gè)外部傳入的回調(diào),來完成給worker添加listener途样,
void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
// All listener additions happen via post. However, we must deal with the case where the listener
// can not be created on the worker. There is a race condition where 2 processes can successfully
// bind to an address, but then fail to listen() with EADDRINUSE. During initial startup, we want
// to surface this.
dispatcher_->post([this, &listener, completion]() -> void {
try {
handler_->addListener(listener);
hooks_.onWorkerListenerAdded();
completion(true);
} catch (const Network::CreateListenerException& e) {
completion(false);
}
});
}
上面dispatcher的callback的body體中handler->addListener(listener)是添加listener的核心代碼
void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
ActiveListenerPtr l(new ActiveListener(*this, config));
listeners_.emplace_back(config.socket().localAddress(), std::move(l));
}
handler->addListener(listener)由ConnectionHandlerImpl實(shí)現(xiàn)如下江醇,入?yún)onfig是上述兩層for循環(huán)中的內(nèi)層activelistener配置,config作為ActiveListener的構(gòu)造函數(shù)入?yún)?gòu)造了一個(gè)ActiveListener何暇,并用指針指向它陶夜。觀察ActiveListener的構(gòu)造函數(shù)如下
ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,
Network::ListenerConfig& config)
: ActiveListener(
parent,
parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
config.handOffRestoredDestinationConnections()),
config) {}
那句createListener的實(shí)現(xiàn)在DispatcherIml這個(gè)類中(明確我們現(xiàn)在是在分析addListener的流程代碼,add之前要把這個(gè)listener創(chuàng)建出來裆站,所以目前在connection_handler_impl.cc中条辟,而他的構(gòu)造函數(shù)中,createListener是有dispatcher對(duì)象創(chuàng)建的)宏胯,其實(shí)實(shí)現(xiàn)就一句話羽嫡,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};這里new出來了一個(gè)ListenerImpl,并用一個(gè)NetWork:ListenerPtr的指針指向它肩袍。這里創(chuàng)建出來的Listener返回 ConnectionHandlerImpl::addListener中杭棵,最后,listener_是一個(gè)list結(jié)構(gòu)std::list<std::pair<Network::Address::InstanceConstSharedPtr, ActiveListenerPtr>> listeners_;它定義在connection_handler_impl.h中氛赐。
3. Listener的加載&接收連接
其實(shí)上面一部分已經(jīng)涉及到了不少Listener加載的流程(addListener)魂爪,接下來的部分主要是加載后的連接建立部分。我們來復(fù)習(xí)一下Envoy啟動(dòng)和接收連接那個(gè)圖艰管。
在1和2中滓侍,我們已經(jīng)完成了從server->run到createListener的流程,現(xiàn)在牲芋,讓我們深入到2的最后那句話粗井,看看Listener這個(gè)對(duì)象是怎么實(shí)現(xiàn)的,畢竟是在這里面完成對(duì)上下游流量連接的監(jiān)聽和回調(diào)處理的街图。
其實(shí)實(shí)現(xiàn)就一句話浇衬,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};這里new出來了一個(gè)ListenerImpl,并用一個(gè)NetWork:ListenerPtr的指針指向它餐济。
ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections)
: local_address_(nullptr), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
const auto ip = socket.localAddress()->ip();
// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
}
if (bind_to_port) {
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}
if (!Network::Socket::applyOptions(socket.options(), socket, Socket::SocketState::Listening)) {
throw CreateListenerException(fmt::format(
"cannot set post-listen socket option on socket: {}", socket.localAddress()->asString()));
}
evconnlistener_set_error_cb(listener_.get(), errorCallback);
}
}
在listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new來注冊(cè)監(jiān)聽socket的fd的新連接耘擂,通過listencallback回調(diào)。listencallback函數(shù)如下,在這當(dāng)中實(shí)現(xiàn)了listener->cb_.onAccept().
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
// Get the local address from the new socket if the listener is listening on IP ANY
// (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case).
const Address::InstanceConstSharedPtr& local_address =
listener->local_address_ ? listener->local_address_ : listener->getLocalAddress(fd);
// The accept() call that filled in remote_addr doesn't fill in more than the sa_family field
// for Unix domain sockets; apparently there isn't a mechanism in the kernel to get the
// sockaddr_un associated with the client socket when starting from the server socket.
// We work around this by using our own name for the socket in this case.
// Pass the 'v6only' parameter as true if the local_address is an IPv6 address. This has no effect
// if the socket is a v4 socket, but for v6 sockets this will create an IPv4 remote address if an
// IPv4 local_address was created from an IPv6 mapped IPv4 address.
const Address::InstanceConstSharedPtr& remote_address =
(remote_addr->sa_family == AF_UNIX)
? Address::peerAddressFromFd(fd)
: Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr),
remote_addr_len,
local_address->ip()->version() == Address::IpVersion::v6);
listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address),
listener->hand_off_restored_destination_connections_);
}
cb_的onAccept是ConnectionHandlerImpl::ActiveListener::onAccept絮姆,實(shí)現(xiàn)如下醉冤,
void ConnectionHandlerImpl::ActiveListener::onAccept(
Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {
Network::Address::InstanceConstSharedPtr local_address = socket->localAddress();
auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);
// Create and run the filters
config_.filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);
// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (active_socket->iter_ != active_socket->accept_filters_.end()) {
active_socket->moveIntoListBack(std::move(active_socket), sockets_);
}
}
上述代碼中秩霍,config_.filterChainFactory().createListenerFilterChain(*active_socket);會(huì)創(chuàng)建listener過濾器的filterChain,然后通過continueFilterChain()運(yùn)行過濾器蚁阳。
到這里我們先大致梳理一下再繼續(xù)去繼續(xù)去看Envoy是如何接收連接的铃绒。
在上面,我們提到螺捐,
在listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new來注冊(cè)監(jiān)聽socket的fd的新連接颠悬,通過listencallback回調(diào)。
也就是說定血,當(dāng)envoy以sidecar形式運(yùn)行在pod中去代理client的流量時(shí)赔癌,通過監(jiān)聽對(duì)應(yīng)端口,在client通過該端口發(fā)起請(qǐng)求時(shí)澜沟,觸發(fā)envoy的對(duì)應(yīng)回調(diào)灾票,執(zhí)行以下函數(shù)調(diào)用鏈listenCallback->cb_.onAccept->createListenerFilterChain+continueFilterChain,
- createListenerFilterChain 創(chuàng)建FilterChain,這個(gè)可以用來拓展過濾器
- continueFilterChain啟動(dòng)過濾器
接收連接主要分為兩部分
- continueFilterChain中l(wèi)istener_.newConnection(std::move(socket_));中createServerConnection
createServerConnection建立了client請(qǐng)求段和Envoy server之間的連接茫虽,并在這里進(jìn)行buffer的高低水位進(jìn)行流量控制刊苍。這個(gè)createServerConnection返回的是一個(gè)指向connection對(duì)象的指針,其構(gòu)造方法在connection_impl.cc中濒析,在這當(dāng)中實(shí)現(xiàn)了并創(chuàng)建FileEvent班缰。對(duì)應(yīng)其構(gòu)造函數(shù)中實(shí)現(xiàn)如下:
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_ = dispatcher_.createFileEvent(
fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
Event::FileReadyType::Read | Event::FileReadyType::Write);
在創(chuàng)建FileEvent后,會(huì)創(chuàng)建新的FileEventImpl()悼枢,然后通過assignEvents()分配事件埠忘,再通過event_add()注冊(cè)事件。
- continueFilterChain中l(wèi)istener_.newConnection(std::move(socket_));中createNetworkFilterChain
在createServerConnection之后馒索,通過createNetworkFilterChain創(chuàng)建網(wǎng)絡(luò)的過濾鏈莹妒,代碼如下。
const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
*new_connection, filter_chain->networkFilterFactories());
通過FilterFactory的回調(diào)來執(zhí)行函數(shù)buildFilterChain()绰上,返回filter_manager的initializeReadFilters,初始化readFilter旨怠。如果這里network的filter是空的,即empty_filter_chain是空的蜈块,則關(guān)閉鏈接如下:
if (empty_filter_chain) {
ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "closing connection: no filters",
*new_connection);
new_connection->close(Network::ConnectionCloseType::NoFlush);
return;
}
- 在上述兩步完成之后鉴腻,啟動(dòng)一次技術(shù)。通過onNewConnection百揭,進(jìn)行一次ActiveConnection的監(jiān)聽計(jì)數(shù):自增1.
4. 回顧
OK爽哎,至此,我們已經(jīng)走完了本文剛開始器一,Envoy啟動(dòng)和對(duì)client發(fā)起請(qǐng)求時(shí)新連接建立(監(jiān)聽實(shí)現(xiàn))的整個(gè)梳理如下圖课锌。現(xiàn)在,來回顧之前最早main函數(shù)為出發(fā)點(diǎn)中祈秕,server->run()做了什么渺贤。還記得run的實(shí)現(xiàn)么雏胃?
void InstanceImpl::run() {
RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
[this]() -> void { startWorkers(); });
// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
watchdog->startWatchdog(*dispatcher_);
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(info, "main dispatch loop exited");
guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
- helper啟動(dòng)worker,添加listener志鞍,綁定回調(diào)和filterchain
- createWatchDog啟動(dòng)守護(hù)進(jìn)程看門狗瞭亮,防止死鎖
- dispatcher_->run(Event::Dispatcher::RunType::Block); 運(yùn)行調(diào)度器,調(diào)度器運(yùn)行之后會(huì)調(diào)用libevent的envet_base_loop進(jìn)行監(jiān)聽固棚,當(dāng)有新事件來到時(shí)進(jìn)進(jìn)入處理流程统翩。這部分我們下節(jié)分析。