目的
梳理 fdb 有哪些 ProcessClass,是怎么啟動(dòng)這些 ProcessClass 的命雀,以及主要的 ProcessClass 的作用。
思路
在 fdb 的實(shí)現(xiàn)中斩箫,在 WorkerInterfacer.h 文件中有一個(gè) startRole 的函數(shù)吏砂,在 fdb 啟動(dòng)某個(gè) ProcessClass 的時(shí)候都會(huì)調(diào)用該函數(shù)來(lái)進(jìn)行注冊(cè)。所以我們可以通過(guò)該函數(shù)來(lái)幫助我們梳理fdb的實(shí)現(xiàn)乘客。
void startRole(UID roleId, UID workerId, std::string as, std::map<std::string, std::string> details = std::map<std::string, std::string>(), std::string origination = "Recruited");
角色列表
通過(guò)直接搜索 startRole狐血,可以得到完整的 ProcessClass 列表有:
- LogRouter
- Resolver
- MasterProxyServer
- StorageServer
- SharedTLog
- MasterServer
- Worker
- TLog
- Tester
- ClusterController
在 ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix )
函數(shù)中,先根據(jù) getDiskStores
來(lái)啟動(dòng) StorageServer/TLog, 并注冊(cè)一個(gè) Worker 的角色易核。之后通過(guò)事件循環(huán)監(jiān)聽(tīng) interf 來(lái)決定啟動(dòng)其它角色匈织。例如其中的 MasterServer
loop choose {
when( RecruitMasterRequest req = waitNext(interf.master.getFuture()) ) {
MasterInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
startRole( recruited.id(), interf.id(), "MasterServer" );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.getRateInfo );
DUMPTOKEN( recruited.tlogRejoin );
DUMPTOKEN( recruited.changeCoordinators );
DUMPTOKEN( recruited.getCommitVersion );
//printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer( recruited, dbInfo, ServerCoordinators( connFile ), req.lifetime );
errorForwarders.add( zombie(recruited, forwardError( errors, "MasterServer", recruited.id(), masterProcess )) );
req.reply.send(recruited);
}
}
流程上:
- ClusterConnectionFile 里面有 coordinator 的地址
- fdbd 方法嘗試從 coordiantor 中選擇一個(gè)選舉為 clusterControler,同時(shí)
- 會(huì)調(diào)用 workerServer 方法牡直,這兒 workerServer 會(huì)和上一步的 clusterControler 方法共用一個(gè) ClusterControllerFullInterface 變量(之后 clusterControler 會(huì)通過(guò)其來(lái)發(fā)送控制消息缀匕?)。
- clusterControler 會(huì)啟動(dòng) Master碰逸,見(jiàn)方法 clusterWatchDatabase
角色個(gè)數(shù)
fdb 期望來(lái)自動(dòng)決策“啟動(dòng)哪些角色以及啟動(dòng)多少”乡小,默認(rèn)的配置在 fdbclient/Knobs.cpp 下。如下
// Configuration
init( DEFAULT_AUTO_PROXIES, 3 );
init( DEFAULT_AUTO_RESOLVERS, 1 );
init( DEFAULT_AUTO_LOGS, 3 );
ClusterControllerData 類中的一些方法饵史,例如 getStorageWorker 根據(jù)請(qǐng)求的條件劲件,分配出合適的 worker。在分析機(jī)制约急,有一個(gè) Fitness 的概念來(lái)選擇最合適的 worker 負(fù)責(zé)相應(yīng)的角色零远。 findWorkersForConfiguration 函數(shù)負(fù)責(zé)從配置中規(guī)劃整個(gè)集群。
vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
vector<std::pair<WorkerInterface, ProcessClass>> results;
if (amount <= 0)
return results;
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.interf.id() != minWorker.get().worker.first.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
}
}
for( auto& it : fitness_workers ) {
auto& w = it.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
results.push_back(w[i]);
id_used[w[i].first.locality.processId()]++;
if( results.size() == amount )
return results;
}
}
return results;
}
ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) const {
switch( role ) {
case ProcessClass::Storage:
switch( _class ) {
case ProcessClass::StorageClass:
return ProcessClass::BestFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TransactionClass:
return ProcessClass::WorstFit;
case ProcessClass::LogClass:
return ProcessClass::WorstFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::NeverAssign;
}
case ProcessClass::TLog:
switch( _class ) {
case ProcessClass::LogClass:
return ProcessClass::BestFit;
case ProcessClass::TransactionClass:
return ProcessClass::GoodFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::StorageClass:
return ProcessClass::WorstFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::NeverAssign;
}
case ProcessClass::Proxy:
switch( _class ) {
case ProcessClass::ProxyClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::Master:
switch( _class ) {
case ProcessClass::MasterClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::Resolver:
switch( _class ) {
case ProcessClass::ResolutionClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::LogRouter:
switch( _class ) {
case ProcessClass::LogRouterClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::ClusterController:
switch( _class ) {
case ProcessClass::ClusterControllerClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::MasterClass:
return ProcessClass::BestOtherFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::ProxyClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
default:
return ProcessClass::NeverAssign;
}
}
std::pair<WorkerInterface, ProcessClass> getStorageWorker( RecruitStorageRequest const& req ) {
std::set<Optional<Standalone<StringRef>>> excludedMachines( req.excludeMachines.begin(), req.excludeMachines.end() );
std::set<Optional<Standalone<StringRef>>> includeDCs( req.includeDCs.begin(), req.includeDCs.end() );
std::set<AddressExclusion> excludedAddresses( req.excludeAddresses.begin(), req.excludeAddresses.end() );
for( auto& it : id_worker )
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
it.second.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) {
return std::make_pair(it.second.interf, it.second.processClass);
}
if( req.criticalRecruitment ) {
ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
Optional<std::pair<WorkerInterface, ProcessClass>> bestInfo;
for( auto& it : id_worker ) {
ProcessClass::Fitness fit = it.second.processClass.machineClassFitness( ProcessClass::Storage );
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
fit < bestFit ) {
bestFit = fit;
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
}
}
if( bestInfo.present() ) {
return bestInfo.get();
}
}
throw no_more_servers();
}
WorkerInterface
WorkerInterface 包含ClientWorkerInterface