之前的文章介紹了一個(gè)RPC請(qǐng)求有哪些經(jīng)歷富纸。這篇文章將介紹一直遺漏的provider發(fā)布服務(wù)的相關(guān)細(xì)節(jié)葵袭。
以前都是以JNettyTcpAcceptor
作為程序的入口來(lái)一步一步抽絲剝繭探索細(xì)節(jié)◆岢澹現(xiàn)在就得尋根問(wèn)底從DefaultServer
來(lái)繼續(xù)探索簸州。
程序包中有個(gè)example缠借,給了最基礎(chǔ)的用法拆火,一看就知道的那種跳夭。
public static void main(String[] args) {
JServer server = new DefaultServer().withAcceptor(new JNettyTcpAcceptor(18090));
MonitorServer monitor = new MonitorServer();
try {
monitor.start();
ServiceWrapper provider = server.serviceRegistry()
.provider(new GenericServiceTestImpl())
.flowController(new FlowController<JRequest>() {
private AtomicLong count = new AtomicLong();
@Override
public ControlResult flowControl(JRequest request) {
if (count.getAndIncrement() > 100) {
return new ControlResult(false, "fuck out!!!");
}
return ControlResult.ALLOWED;
}
})
.register();
// server.withGlobalFlowController(); // 全局限流器
server.connectToRegistryServer("127.0.0.1:20001");
server.publish(provider);
server.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
很明顯JNettyTcpAcceptor
僅僅是一個(gè)小弟,正真的老大其實(shí)是JServer
.他才是有至高無(wú)上的權(quán)利们镜。這也是一個(gè)接口類(lèi)型币叹,這么做目的也是很明顯,就是為了方便拓展--其他的實(shí)現(xiàn)只需要實(shí)現(xiàn)這個(gè)接口就完事了模狭。從demo中可以看到幾個(gè)非常明顯的操作如connectToRegistryServer
颈抚、publish
、start
以及serviceRegistry
嚼鹉。接下來(lái)就對(duì)這些動(dòng)作逐一解讀贩汉。
首先通過(guò)serviceRegistry()
獲取“本地”注冊(cè)工具。什么意思呢锚赤?也就是說(shuō)通過(guò)這個(gè)方法獲取的玩意兒實(shí)際上是用來(lái)在本機(jī)上將所要發(fā)布的服務(wù)給“注冊(cè)”上匹舞。所謂的注冊(cè)其實(shí)簡(jiǎn)單理解為用一個(gè)map給保存起來(lái)。而這個(gè)注冊(cè)工具也提供幾個(gè)動(dòng)作线脚。
這幾個(gè)動(dòng)作的命名也是非常語(yǔ)義化--provider()
方法就是將一個(gè)服務(wù)的實(shí)現(xiàn)(就是普通的Javabean)給hold状突:
public ServiceRegistry provider(Object serviceProvider, ProviderInterceptor... interceptors) {
this.serviceProvider = serviceProvider;
this.interceptors = interceptors;
return this;
}
這里將其保存為成員變量,放到后面會(huì)有用到的浑侥,現(xiàn)在先別慌姊舵,繼續(xù)往下面看≡⒙洌看完了啥都會(huì)知道的蠢莺。
接下來(lái)就是flowController()
流控器,也是非常直白的命名零如。這個(gè)玩意具體是做什么的呢,其實(shí)也很簡(jiǎn)單锄弱。在一些場(chǎng)景下考蕾,會(huì)出現(xiàn)下游業(yè)務(wù)大量請(qǐng)求上游的接口,翻譯成人話就是consumer會(huì)發(fā)起很多請(qǐng)求到provider会宪,這時(shí)候provider會(huì)覺(jué)得難受肖卧,壓力很大,可能被搞崩潰掸鹅。因?yàn)橐坏┍罎⒘巳慷纪甑叭剩虼藀rovider就想個(gè)法子:你們consumer可以來(lái)請(qǐng)求拦赠,隨便你們?cè)趺锤悖坏┌盐腋銦┝宋揖筒淮罾砟懔丝选_@樣就不會(huì)出現(xiàn)崩潰的情形了荷鼠。當(dāng)然這個(gè)解釋是很簡(jiǎn)陋的,實(shí)際上要比這個(gè)復(fù)雜的多榔幸。流量控制是一個(gè)比較大的話題允乐,絕非一句兩句能說(shuō)的清道的明的。現(xiàn)有的成熟的限流中間件也有很多削咆,如近期阿里開(kāi)源的Sentinel等牍疏。這個(gè)demo中給的限流策略也很簡(jiǎn)單:如果你調(diào)用這服務(wù)超過(guò)100次我就把你踢掉,具體做法是拋出異常拨齐。
接下來(lái)就是register()
方法了鳞陨。顧名思義就是注冊(cè)。這個(gè)注冊(cè)就是本地的注冊(cè)瞻惋,也就是之前說(shuō)的將這個(gè)服務(wù)在本地用一個(gè)map保存下來(lái)厦滤。這個(gè)邏輯稍微有點(diǎn)復(fù)雜。先不急熟史,繼續(xù)往后看馁害。
通過(guò)register()
會(huì)返回一個(gè)ServiceWrapper
實(shí)例,Jupiter根據(jù)這個(gè)對(duì)象將注冊(cè)信息上傳給注冊(cè)中心蹂匹。這里的注冊(cè)才是正真意義上的注冊(cè)碘菜,其實(shí)邏輯也非常簡(jiǎn)單,無(wú)非就是將這個(gè)實(shí)例相關(guān)的信息如類(lèi)名限寞,版本信息和組名等一些元數(shù)據(jù)放到注冊(cè)中心上去忍啸,這里的注冊(cè)中心簡(jiǎn)單理解為zk就好了。
connectToRegistryServer()
方法用來(lái)連接到注冊(cè)中心履植,注冊(cè)中心本質(zhì)上就是一個(gè)server计雌,這里的provider實(shí)際上是一個(gè)客戶端。然后通過(guò)publish()
方法將ServiceWrapper
中的相關(guān)信息通過(guò)tcp網(wǎng)絡(luò)傳送給注冊(cè)中心玫霎,注冊(cè)中心將其保存下來(lái)凿滤。最后start()
用來(lái)啟動(dòng)provider的server,用來(lái)接受consumer的請(qǐng)求庶近。
下面來(lái)看看這個(gè)register()
是怎么實(shí)現(xiàn)的翁脆。
public ServiceWrapper register() {
checkNotNull(serviceProvider, "serviceProvider");
Class<?> providerClass = serviceProvider.getClass();
ServiceProviderImpl implAnnotation = null;
ServiceProvider ifAnnotation = null;
for (Class<?> cls = providerClass; cls != Object.class; cls = cls.getSuperclass()) {
if (implAnnotation == null) {
implAnnotation = cls.getAnnotation(ServiceProviderImpl.class);
}
Class<?>[] interfaces = cls.getInterfaces();
if (interfaces != null) {
for (Class<?> i : interfaces) {
ifAnnotation = i.getAnnotation(ServiceProvider.class);
if (ifAnnotation == null) {
continue;
}
checkArgument(
interfaceClass == null,
i.getName() + " has a @ServiceProvider annotation, can't set [interfaceClass] again"
);
interfaceClass = i;
break;
}
}
if (implAnnotation != null && ifAnnotation != null) {
break;
}
}
if (ifAnnotation != null) {
checkArgument(
group == null,
interfaceClass.getName() + " has a @ServiceProvider annotation, can't set [group] again"
);
checkArgument(
providerName == null,
interfaceClass.getName() + " has a @ServiceProvider annotation, can't set [providerName] again"
);
group = ifAnnotation.group();
String name = ifAnnotation.name();
providerName = Strings.isNotBlank(name) ? name : interfaceClass.getName();
}
if (implAnnotation != null) {
checkArgument(
version == null,
providerClass.getName() + " has a @ServiceProviderImpl annotation, can't set [version] again"
);
version = implAnnotation.version();
}
checkNotNull(interfaceClass, "interfaceClass");
checkArgument(Strings.isNotBlank(group), "group");
checkArgument(Strings.isNotBlank(providerName), "providerName");
checkArgument(Strings.isNotBlank(version), "version");
// method's extensions
//
// key: method name
// value: pair.first: 方法參數(shù)類(lèi)型(用于根據(jù)JLS規(guī)則實(shí)現(xiàn)方法調(diào)用的靜態(tài)分派)
// pair.second: 方法顯式聲明拋出的異常類(lèi)型
Map<String, List<Pair<Class<?>[], Class<?>[]>>> extensions = Maps.newHashMap();
for (Method method : interfaceClass.getMethods()) {
String methodName = method.getName();
List<Pair<Class<?>[], Class<?>[]>> list = extensions.get(methodName);
if (list == null) {
list = Lists.newArrayList();
extensions.put(methodName, list);
}
list.add(Pair.of(method.getParameterTypes(), method.getExceptionTypes()));
}
return registerService(
group,
providerName,
version,
serviceProvider,
interceptors,
extensions,
weight,
executor,
flowController
);
}
首先第一個(gè)for循環(huán)目的是通過(guò)給定的Javabean找到其接口,如果沒(méi)有就去找父類(lèi)鼻种,直到最終為Object為止反番。在暴露一個(gè)服務(wù)的時(shí)候可以在其實(shí)現(xiàn)類(lèi)及接口上加注解,因此這里需要做一下篩選,只會(huì)去找有注解的接口以及實(shí)現(xiàn)類(lèi)罢缸。通過(guò)注解去找接口是一種方式篙贸,其實(shí)也能手動(dòng)指定接口,原則是不能既手動(dòng)指定接口又給接口指定注解枫疆。注解中有提供相關(guān)信息如group爵川、name等。最后將這些參數(shù)全部丟給registerService
處理养铸。
ServiceWrapper registerService(
String group,
String providerName,
String version,
Object serviceProvider,
ProviderInterceptor[] interceptors,
Map<String, List<Pair<Class<?>[], Class<?>[]>>> extensions,
int weight,
Executor executor,
FlowController<JRequest> flowController) {
ProviderInterceptor[] allInterceptors = null;
List<ProviderInterceptor> tempList = Lists.newArrayList();
if (globalInterceptors != null) {
Collections.addAll(tempList, globalInterceptors);
}
if (interceptors != null) {
Collections.addAll(tempList, interceptors);
}
if (!tempList.isEmpty()) {
allInterceptors = tempList.toArray(new ProviderInterceptor[tempList.size()]);
}
ServiceWrapper wrapper =
new ServiceWrapper(group, providerName, version, serviceProvider, allInterceptors, extensions);
wrapper.setWeight(weight);
wrapper.setExecutor(executor);
wrapper.setFlowController(flowController);
providerContainer.registerService(wrapper.getMetadata().directoryString(), wrapper);
return wrapper;
}
這里有幾個(gè)參數(shù)沒(méi)有提及到雁芙,如interceptors,weight钞螟,executor等兔甘。這些實(shí)際上和核心邏輯關(guān)系不大,只是可選項(xiàng)而已鳞滨。不必太多關(guān)注洞焙。這里的邏輯也十分簡(jiǎn)單,主旨就是將這些參數(shù)封裝到一個(gè)ServiceWrapper
對(duì)象中拯啦,最后將這個(gè)對(duì)象保存在本地(放一個(gè)map中)澡匪。這里的key的生成邏輯是這樣的:組-服務(wù)名-版本號(hào)。這種結(jié)構(gòu)就像目錄一樣褒链,一級(jí)又一級(jí)唁情。
public String directoryString() {
if (directoryCache != null) {
return directoryCache;
}
StringBuilder buf = StringBuilderHelper.get();
buf.append(getGroup())
.append('-')
.append(getServiceProviderName())
.append('-')
.append(getVersion());
directoryCache = buf.toString();
return directoryCache;
}
本地注冊(cè)邏輯也非常簡(jiǎn)單:
private final ConcurrentMap<String, ServiceWrapper> serviceProviders = Maps.newConcurrentMap();
public void registerService(String uniqueKey, ServiceWrapper serviceWrapper) {
serviceProviders.put(uniqueKey, serviceWrapper);
logger.info("ServiceProvider [{}, {}] is registered.", uniqueKey, serviceWrapper);
}
整個(gè)“本地注冊(cè)”的邏輯實(shí)際上也講完了(除了那些不影響邏輯的可選參數(shù))。接下來(lái)就是連接到注冊(cè)中心甫匹。
public void connectToRegistryServer(String connectString) {
registryService.connectToRegistryServer(connectString);
}
實(shí)際上也是委派給registryService
完成的甸鸟,這個(gè)老大很懶,很多事情不自己做兵迅。而這個(gè)registryService
是這樣創(chuàng)建的:
public DefaultServer() {
this(RegistryService.RegistryType.DEFAULT);
}
public DefaultServer(RegistryService.RegistryType registryType) {
registryType = registryType == null ? RegistryService.RegistryType.DEFAULT : registryType;
registryService = JServiceLoader.load(RegistryService.class).find(registryType.getValue());
}
這個(gè)操作比較高級(jí)抢韭,通過(guò)SPI去加載具體實(shí)現(xiàn)。具體怎么整的先不去看了恍箭,反正一時(shí)半會(huì)也看不懂刻恭。Jupiter中實(shí)現(xiàn)有兩種:DefaultRegistryService
和ZookeeperRegistryService
,默認(rèn)的是DefaultRegistryService
扯夭。實(shí)際上默認(rèn)的是作者自己寫(xiě)的一個(gè)實(shí)現(xiàn)鳍贾,屬于玩票級(jí)別的。雖然是業(yè)余的交洗,但是也是值得學(xué)習(xí)的贾漏。這個(gè)DefaultRegistryService
的核心實(shí)際上就是內(nèi)部維護(hù)一個(gè)tcp客戶端,然后去連接注冊(cè)中心藕筋。注冊(cè)中心也有兩個(gè)實(shí)現(xiàn):自己實(shí)現(xiàn)的和基于zk的。這里的細(xì)節(jié)以后會(huì)說(shuō)到。
連接完事了之后隐圾,接下來(lái)的動(dòng)作就是發(fā)布publish
:
public void publish(ServiceWrapper serviceWrapper) {
ServiceMetadata metadata = serviceWrapper.getMetadata();
RegisterMeta meta = new RegisterMeta();
meta.setPort(acceptor.boundPort());
meta.setGroup(metadata.getGroup());
meta.setServiceProviderName(metadata.getServiceProviderName());
meta.setVersion(metadata.getVersion());
meta.setWeight(serviceWrapper.getWeight());
meta.setConnCount(JConstants.SUGGESTED_CONNECTION_COUNT);
registryService.register(meta);
}
這個(gè)發(fā)布的實(shí)際操作是將這些信息傳給注冊(cè)中心伍掀,準(zhǔn)確來(lái)講應(yīng)該叫做“遠(yuǎn)程注冊(cè)”。其中的邏輯無(wú)非就是將服務(wù)中的信息封裝到RegisterMeta
對(duì)象中暇藏,然后將這個(gè)對(duì)象丟給注冊(cè)中心蜜笤。后續(xù)的處理就留個(gè)懸念,后續(xù)的文章繼續(xù)講盐碱。
至此把兔,一個(gè)provider的服務(wù)發(fā)布流程就差不多整理完了。這么回頭以看瓮顽,其實(shí)代碼組織邏輯也不是那么復(fù)雜县好。接下來(lái)的內(nèi)容該是RegistryService
的核心實(shí)現(xiàn)了。