首先服務提供方需要定義接口,
EchoService.java
package com.xx.pigeon.demo;
public interface EchoService {
public String echo(String name);
}
同時叫确,服務提供方同時需要實現(xiàn)該接口,然后服務提供方就可以注冊服務芍锦,傳統(tǒng)的spring注冊方法如下:
<bean class="com.dianping.dpsf.spring.ServiceRegistry"
init-method="init">
<property name="services">
<map>
<entry key="http://service.xx.com/demoService/echoService_1.0.0"
value-ref="echoServiceImpl" />
</map>
</property>
</bean>
OK竹勉,然后我們啟動spring上下文的時候,spring會幫助我們初始化這個bean然后調用init方法娄琉,這個方法調用了
ServiceFactory.addServices(providerConfigList);
ServiceFactory這個類是個工廠類次乓,它會幫我們初始化服務。剩下就是RPC框架幫我們做的一些事情了孽水。其核心的方法是:
ServicePublisher.addService(providerConfig);
ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);
ServicePublisher.publishService(providerConfig, false);
其中ServicePublisher.addService(providerConfig) 這個方法主要處理了serviceName,methodName,parameters票腰,同時對服務進行緩存。
ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig)則是啟動了“容器”女气。pigeon目前支持兩層杏慰,http和tcp。http方面炼鞠,pigeon會啟動內置的jetty缘滥,提供了一些服務控制的方法,比如publish簇搅、unpublish完域、online、offline等瘩将,同時支持用http來進行RPC吟税。tcp方面,pigeon底層依賴netty進行姿现,所謂啟動容器肠仪,這里是啟動對特定端口的監(jiān)聽。另外备典,這個方法還會對請求處理器進行注冊异旧,這個后面會聊到。
ServicePublisher.publishService(providerConfig, false);該方法為核心方法提佣,主要?用于服務發(fā)布吮蛹。
if (existingService) {
boolean autoPublishEnable = ConfigManagerLoader.getConfigManager().getBooleanValue(
Constants.KEY_AUTOPUBLISH_ENABLE, true);
if (autoPublishEnable || forcePublish) {
List<Server> servers = ProviderBootStrap.getServers(providerConfig);
int registerCount = 0;
for (Server server : servers) {
publishService(url, server.getRegistryUrl(url), server.getPort(), providerConfig.getServerConfig()
.getGroup(), providerConfig.isSupported());//注冊服務到zk
registerCount++;
}
if (registerCount > 0) {
boolean isHeartbeatEnable = configManager.getBooleanValue(Constants.KEY_HEARTBEAT_ENABLE,
DEFAULT_HEARTBEAT_ENABLE);
if (isHeartbeatEnable) {
HeartBeatListener.registerHeartBeat(providerConfig);//注冊心跳上報
}
boolean isNotify = configManager
.getBooleanValue(Constants.KEY_NOTIFY_ENABLE, DEFAULT_NOTIFY_ENABLE);
if (isNotify && serviceChangeListener != null) {
serviceChangeListener.notifyServicePublished(providerConfig);//向pigeon的管理中心上報發(fā)布情況
}
boolean autoRegisterEnable = ConfigManagerLoader.getConfigManager().getBooleanValue(
Constants.KEY_AUTOREGISTER_ENABLE, true);
if (autoRegisterEnable) {
ServiceOnlineTask.start();//上線服務
} else {
logger.info("auto register is disabled");
}
providerConfig.setPublished(true);
}
} else {
logger.info("auto publish is disabled");
}
}
首先會調用方法publishService(url, server.getRegistryUrl(url), server.getPort(), providerConfig.getServerConfig()
.getGroup(), providerConfig.isSupported())荤崇,該方法會先計算weight(權重),一般會初始化為0潮针,然后再進行服務注冊术荤,
RegistryManager.getInstance().registerService(registryUrl, group, serverAddress, weight);//注冊服務地址和服務名
RegistryManager.getInstance().registerSupportNewProtocol(serverAddress, registryUrl, support);//注冊服務協(xié)議
if (weight >= 0) {
if (!serverWeightCache.containsKey(serverAddress)) {
RegistryManager.getInstance().setServerApp(serverAddress, configManager.getAppName());//注冊應用名,一個實例注冊一次
RegistryManager.getInstance().setServerVersion(serverAddress, VersionUtils.VERSION);//注冊服務版本每篷,一個實例注冊一次
}
serverWeightCache.put(serverAddress, weight);
}
這幾個registry其實大同小異瓣戚,都是將特定的值寫入zk,以注冊服務為例焦读,
void registerPersistentNode(String serviceName, String group, String serviceAddress, int weight)
throws RegistryException {
String weightPath = Utils.getWeightPath(serviceAddress);
String servicePath = Utils.getServicePath(serviceName, group);
try {
if (client.exists(servicePath, false)) {
Stat stat = new Stat();
String addressValue = client.get(servicePath, stat);
String[] addressArray = addressValue.split(",");
List<String> addressList = new ArrayList<String>();
for (String addr : addressArray) {
addr = addr.trim();
if (addr.length() > 0 && !addressList.contains(addr)) {
addressList.add(addr.trim());
}
}
if (!addressList.contains(serviceAddress)) {
addressList.add(serviceAddress);
Collections.sort(addressList);
client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion());
}
} else {
client.create(servicePath, serviceAddress);
}
if (weight >= 0) {
client.set(weightPath, "" + weight);
}
if (logger.isInfoEnabled()) {
logger.info("registered service to persistent node: " + servicePath);
}
} catch (Throwable e) {
if(e instanceof BadVersionException || e instanceof NodeExistsException) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
//ignore
}
registerPersistentNode(serviceName, group, serviceAddress, weight);
} else {
logger.error("failed to register service to " + servicePath, e);
throw new RegistryException(e);
}
}
}
該方法就是將服務地址和服務名寫入zk子库。其中這里有一個Group的概念,其實就是將同一個服務不同的實例可以分到不同的swimlane里面矗晃。
當然RegistryManager可以有不同的實現(xiàn)仑嗅,你也可以選擇將服務信息寫到別的地方,比如db喧兄。默認實現(xiàn)是寫入zk无畔,這也是大型分布式系統(tǒng)比較常用的方法。
然后是注冊心跳上報吠冤,這個后面會具體談到浑彰。接著,向pigeon的管理中心上報發(fā)布情況拯辙,這個其實在整個服務中郭变,不是必須的。
最后就是服務上線涯保,從前面的代碼我們可以知道服務的權重一般會被初始化為0诉濒,而客戶端在調用服務的時候會利用權重來負載均衡,也就是說權重是0的服務基本不會被調用到夕春,那么就可以理解為通過修改權重來進行上線或者下線服務未荒。
public void run() {
logger.info("Service online task start");
try {
Thread.sleep(delay);
if (!isStop) {
ServiceFactory.online();
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
啟動是利用另外一個線程進行的,可以設置一個時延及志。
public static void online() throws RegistryException {
logger.info("online");
ServicePublisher.setServerWeight(Constants.WEIGHT_DEFAULT);
/*ServiceProviderFactory.notifyServiceOnline();*/
}
修改服務權重片排,用來上線服務。
至此速侈,就完成了全部的服務注冊和發(fā)布的過程率寡。