上一篇文章中說到要繼續(xù)討論關(guān)于“Registry”注冊(cè)器的實(shí)現(xiàn)罗珍。然而我反悔了。注冊(cè)器的實(shí)現(xiàn)涉及到了客戶端程序玩讳,而客戶端是屬于consumer的部分榕茧。因此我決定將這個(gè)部分稍微放一放。這篇文章開始介紹“注冊(cè)中心”的實(shí)現(xiàn)诽偷。
public static void main(String[] args) {
RegistryServer registryServer = RegistryServer.Default.createRegistryServer(20001, 1);
MonitorServer monitor = new MonitorServer(19998);
try {
monitor.setRegistryMonitor(registryServer);
monitor.start();
registryServer.startRegistryServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
啟動(dòng)一個(gè)注冊(cè)中心server很簡(jiǎn)單坤学。注冊(cè)中心中有一個(gè)監(jiān)控“MonitorServer”,這個(gè)不僅僅在注冊(cè)中心中有报慕,在provider中也有深浮。
public interface RegistryServer extends RegistryMonitor {
void startRegistryServer();
}
public interface RegistryMonitor {
/**
* Returns the address list of publisher.
*/
List<String> listPublisherHosts();
/**
* Returns the address list of subscriber.
*/
List<String> listSubscriberAddresses();
/**
* Returns to the service of all the specified service provider's address.
*/
List<String> listAddressesByService(String group, String serviceProviderName, String version);
/**
* Finds the address(host, port) of the corresponding node and returns all
* the service names it provides.
*/
List<String> listServicesByAddress(String host, int port);
}
注冊(cè)中心抽象接口中只提供了一個(gè)操作--啟動(dòng)注冊(cè)中心。其超級(jí)繼承自Monitor眠冈,而這個(gè)Monitor提供的操作也很簡(jiǎn)單飞苇。默認(rèn)的實(shí)現(xiàn)是由DefaultRegistryServer
來完成的菌瘫。這個(gè)實(shí)例的創(chuàng)建并不是顯示的,而是使用反射方式去實(shí)例化布卡。在某些情況下不想使用默認(rèn)的注冊(cè)中心而是使用別的如zk雨让,如果jupiter-registry-default
依賴包沒有顯示的添加進(jìn)來,編譯會(huì)直接報(bào)錯(cuò)的忿等。這種思路在很多框架中都有體現(xiàn)栖忠。
注冊(cè)中心的作用除了保存provider的一些信息外,還有一個(gè)重要的職責(zé)--監(jiān)聽注冊(cè)信息的變化贸街。當(dāng)一個(gè)provider斷線了庵寞,那么注冊(cè)中心就得及時(shí)將這個(gè)provide發(fā)布的信息給清理掉(下線)并且告訴consumer:你訂閱的服務(wù)不可用了。如果重連后薛匪,provider會(huì)重新發(fā)布服務(wù)信息捐川,注冊(cè)中心又得將這些變化告訴consumer:你訂閱的服務(wù)又能使用了。對(duì)于consumer來說逸尖,當(dāng)consumer斷線了注冊(cè)中心將consumer訂閱的服務(wù)給清理掉综看,連接重建后又會(huì)自動(dòng)訂閱蚓庭。
和provider一樣蒸痹,注冊(cè)中心也是一個(gè)server头遭。因此需要關(guān)心的核心邏輯其實(shí)就是handler的實(shí)現(xiàn)。
boot.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new IdleStateChecker(timer, JConstants.READER_IDLE_TIME_SECONDS, 0, 0),
idleStateTrigger,
new MessageDecoder(),
encoder,
ackEncoder,
handler);
}
});
這里還多了一個(gè)ackEncoder
編碼器逞频。
protected void encode(ChannelHandlerContext ctx, Acknowledge ack, ByteBuf out) throws Exception {
out.writeShort(JProtocolHeader.MAGIC)
.writeByte(JProtocolHeader.ACK)
.writeByte(0)
.writeLong(ack.sequence())
.writeInt(0);
}
從具體的實(shí)現(xiàn)邏輯中可以看出纯衍,這個(gè)編碼器實(shí)際上僅僅只做了一件事:構(gòu)造一個(gè)ack的消息包。
解碼器的邏輯和之前provider的類似苗胀,只不過接收的消息格式類型不同襟诸。
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case MAGIC:
checkMagic(in.readShort()); // MAGIC
checkpoint(State.SIGN);
case SIGN:
header.sign(in.readByte()); // 消息標(biāo)志位
checkpoint(State.STATUS);
case STATUS:
in.readByte(); // no-op
checkpoint(State.ID);
case ID:
header.id(in.readLong()); // 消息id
checkpoint(State.BODY_SIZE);
case BODY_SIZE:
header.bodySize(in.readInt()); // 消息體長(zhǎng)度
checkpoint(State.BODY);
case BODY:
byte s_code = header.serializerCode();
switch (header.messageCode()) {
case JProtocolHeader.HEARTBEAT:
break;
case JProtocolHeader.PUBLISH_SERVICE:
case JProtocolHeader.PUBLISH_CANCEL_SERVICE:
case JProtocolHeader.SUBSCRIBE_SERVICE:
case JProtocolHeader.OFFLINE_NOTICE: {
byte[] bytes = new byte[header.bodySize()];
in.readBytes(bytes);
Serializer serializer = SerializerFactory.getSerializer(s_code);
Message msg = serializer.readObject(bytes, Message.class);
msg.messageCode(header.messageCode());
out.add(msg);
break;
}
case JProtocolHeader.ACK:
out.add(new Acknowledge(header.id()));
break;
default:
throw IoSignals.ILLEGAL_SIGN;
}
checkpoint(State.MAGIC);
}
}
實(shí)際上有三種消息類型:心跳、發(fā)布/訂閱和ack消息基协。對(duì)應(yīng)的處理邏輯也不相同歌亲,心跳消息不處理,發(fā)布/訂閱消息單獨(dú)去構(gòu)造消息實(shí)體澜驮,ack消息也如此陷揪,只不過內(nèi)容非常簡(jiǎn)單。
將消息解碼成特定對(duì)象之后就得對(duì)這些對(duì)象進(jìn)行處理杂穷。也就是MessageHandler
要干的活兒悍缠。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (msg instanceof Message) {
Message obj = (Message) msg;
switch (obj.messageCode()) {
case JProtocolHeader.PUBLISH_SERVICE:
case JProtocolHeader.PUBLISH_CANCEL_SERVICE:
RegisterMeta meta = (RegisterMeta) obj.data();
if (Strings.isNullOrEmpty(meta.getHost())) {
SocketAddress address = ch.remoteAddress();
if (address instanceof InetSocketAddress) {
meta.setHost(((InetSocketAddress) address).getAddress().getHostAddress());
} else {
logger.warn("Could not get remote host: {}, info: {}", ch, meta);
return;
}
}
if (obj.messageCode() == JProtocolHeader.PUBLISH_SERVICE) {
handlePublish(meta, ch);
} else if (obj.messageCode() == JProtocolHeader.PUBLISH_CANCEL_SERVICE) {
handlePublishCancel(meta, ch);
}
ch.writeAndFlush(new Acknowledge(obj.sequence())) // 回復(fù)ACK
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;
case JProtocolHeader.SUBSCRIBE_SERVICE:
handleSubscribe((RegisterMeta.ServiceMeta) obj.data(), ch);
ch.writeAndFlush(new Acknowledge(obj.sequence())) // 回復(fù)ACK
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;
case JProtocolHeader.OFFLINE_NOTICE:
handleOfflineNotice((RegisterMeta.Address) obj.data());
break;
}
} else if (msg instanceof Acknowledge) {
handleAcknowledge((Acknowledge) msg, ch);
} else {
logger.warn("Unexpected message type received: {}, channel: {}.", msg.getClass(), ch);
ReferenceCountUtil.release(msg);
}
}
這里處理的也只會(huì)有兩種類型--ack消息包和Message消息包,也就是之前解碼器的產(chǎn)物耐量。
對(duì)于發(fā)布/取消發(fā)布服務(wù)操作來說飞蚓,消息的發(fā)送者一定是provider,而消息的內(nèi)容之前也說到廊蜒,一定是RegisterMeta
類型的趴拧。不信可以看看代碼:
public void publish(ServiceWrapper serviceWrapper) {
ServiceMetadata metadata = serviceWrapper.getMetadata();
RegisterMeta meta = new RegisterMeta();
meta.setPort(acceptor.boundPort());
meta.setGroup(metadata.getGroup());
meta.setServiceProviderName(metadata.getServiceProviderName());
meta.setVersion(metadata.getVersion());
meta.setWeight(serviceWrapper.getWeight());
meta.setConnCount(JConstants.SUGGESTED_CONNECTION_COUNT);
registryService.register(meta);
}
因此這里使用強(qiáng)制類型轉(zhuǎn)換是沒有問題的溅漾。可以看到著榴,provider發(fā)布注冊(cè)消息的時(shí)候并沒有將address給set進(jìn)去添履,因此這里是通過channel來獲取provider的host。沒有一點(diǎn)毛病脑又。我猜是因?yàn)閜rovider獲取自己的host比較麻煩暮胧,所以干脆直接讓注冊(cè)中心來處理這件事情。
接下來就是區(qū)分是發(fā)布還是取消發(fā)布挂谍。沒有直接在case中判斷可能是因?yàn)槿绻褂媚欠N方式上面的代碼得重寫一遍叔壤,也很累人瞎饲。干脆直接用if來判斷得了口叙。處理完了之后還得發(fā)送一個(gè)ack的消息到provider/consumer。這種邏輯也是能夠理解的嗅战。作為注冊(cè)中心肯定是有責(zé)任告訴provider/consumer:你們的請(qǐng)求都完成了妄田。不然provider/consumer怎么能保證自己發(fā)布/訂閱的消息是不是成功的被注冊(cè)中心給處理了呢?作為注冊(cè)中心驮捍,還是得讓人信任的疟呐。
看看發(fā)布服務(wù)的具體邏輯。
private void handlePublish(RegisterMeta meta, Channel channel) {
logger.info("Publish {} on channel{}.", meta, channel);
attachPublishEventOnChannel(meta, channel);
final RegisterMeta.ServiceMeta serviceMeta = meta.getServiceMeta();
ConfigWithVersion<ConcurrentMap<RegisterMeta.Address, RegisterMeta>> config =
registerInfoContext.getRegisterMeta(serviceMeta);
synchronized (registerInfoContext.publishLock(config)) {
// putIfAbsent和config.newVersion()需要是原子操作, 所以這里加鎖
if (config.getConfig().putIfAbsent(meta.getAddress(), meta) == null) {
registerInfoContext.getServiceMeta(meta.getAddress()).add(serviceMeta);
final Message msg = new Message(serializerType.value());
msg.messageCode(JProtocolHeader.PUBLISH_SERVICE);
msg.version(config.newVersion()); // 版本號(hào)+1
msg.data(Pair.of(serviceMeta, meta));
subscriberChannels.writeAndFlush(msg, new ChannelMatcher() {
@Override
public boolean matches(Channel channel) {
boolean doSend = isChannelSubscribeOnServiceMeta(serviceMeta, channel);
if (doSend) {
MessageNonAck msgNonAck = new MessageNonAck(serviceMeta, msg, channel);
// 收到ack后會(huì)移除當(dāng)前key(參見handleAcknowledge), 否則超時(shí)超時(shí)重發(fā)
messagesNonAck.put(msgNonAck.id, msgNonAck);
}
return doSend;
}
});
}
}
}
這句代碼attachPublishEventOnChannel(meta, channel);
的方法名直譯過來意思就是在channel上綁定發(fā)布服務(wù)的信息东且。具體實(shí)現(xiàn)如下:
private static final AttributeKey<ConcurrentSet<RegisterMeta>> S_PUBLISH_KEY =
AttributeKey.valueOf("server.published");
private static boolean attachPublishEventOnChannel(RegisterMeta meta, Channel channel) {
Attribute<ConcurrentSet<RegisterMeta>> attr = channel.attr(S_PUBLISH_KEY);
ConcurrentSet<RegisterMeta> registerMetaSet = attr.get();
if (registerMetaSet == null) {
ConcurrentSet<RegisterMeta> newRegisterMetaSet = new ConcurrentSet<>();
registerMetaSet = attr.setIfAbsent(newRegisterMetaSet);
if (registerMetaSet == null) {
registerMetaSet = newRegisterMetaSet;
}
}
return registerMetaSet.add(meta);
}
這樣就很容易理解了启具,意思就是一個(gè)channel上是能夠保存信息的,怎么保存呢珊泳?通過S_PUBLISH_KEY
鲁冯,就將這個(gè)玩意作為一個(gè)記號(hào),刻在channel上色查,這個(gè)記號(hào)是能夠指定類型的薯演,這里指定的是ConcurrentSet
類型,當(dāng)然也能指定別的如String秧了、Integer啥的跨扮。然后就是一段騷操作--將這個(gè)類型給實(shí)例化出來,并且將meta消息給添加進(jìn)去验毡。第一次在這個(gè)channel去獲取S_PUBLISH_KEY
標(biāo)記的內(nèi)容衡创,肯定是不存在的,那么就得將其中的內(nèi)容給實(shí)例化出來晶通,單線程環(huán)境什么都好說璃氢,簡(jiǎn)單粗暴去賦值即可,但是在并發(fā)情況下鬼知道啥時(shí)候某個(gè)線程就悄咪咪的把這個(gè)標(biāo)記好的set給實(shí)例化了录择,而這時(shí)候你卻不知道拔莱,再去實(shí)例化賦值一次碗降,這就產(chǎn)生了并發(fā)問題。這里的操作是使用setIfAbsent塘秦,如果這個(gè)標(biāo)記中已經(jīng)有了讼渊,那就直接返回這個(gè)存在的值,沒有就將其set進(jìn)去尊剔,并返回null爪幻。
“打標(biāo)記”這種概念很抽象,很納悶须误,為什么一個(gè)channel能夠被打上標(biāo)記挨稿。其實(shí)也容易理解,無非就是netty將這個(gè)channel進(jìn)行單獨(dú)的“處理”京痢。從本質(zhì)上來說在服務(wù)端的內(nèi)存中分配了一塊區(qū)域奶甘,與channel相關(guān)聯(lián)。這個(gè)channel也比較抽象祭椰,不容易理解臭家,其實(shí)就是一個(gè)“連接”,再具體點(diǎn)就是一個(gè)客戶端方淤。具體的實(shí)現(xiàn)細(xì)節(jié)得去查閱netty的源碼钉赁。
接下來使用一個(gè)config
變量來保存address和RegisterMeta的映射.這個(gè)config是通過serviceMeta
來獲取的,也就是通過服務(wù)名來獲取映射--指定的服務(wù)有哪些節(jié)點(diǎn)注冊(cè)携茂。這里同時(shí)也得維護(hù)節(jié)點(diǎn)-服務(wù)的映射關(guān)系你踩。最后構(gòu)造消息報(bào)文,這個(gè)報(bào)文發(fā)給所有的訂閱者讳苦。
僅僅發(fā)送出消息還不算完事
private final ConcurrentMap<String, MessageNonAck> messagesNonAck = Maps.newConcurrentMap();
這里還維護(hù)一個(gè)map變量带膜,保存著“未回應(yīng)”的消息。這么做的目的也是為了保證可靠医吊。也就是說钱慢,當(dāng)provider發(fā)布一條注冊(cè)信息給注冊(cè)中心了,注冊(cè)中心得回應(yīng)provider:我收到了你的請(qǐng)求了卿堂。同時(shí)也得處理這個(gè)注冊(cè)消息--告訴訂閱者束莫,但是不能一直等著訂閱者回復(fù)確認(rèn)收到的消息。于是在本地保存一個(gè)“未回應(yīng)”消息草描,當(dāng)收到訂閱者的響應(yīng)報(bào)文時(shí)再將這些“未回應(yīng)”報(bào)文給移除掉览绿。當(dāng)然也得考慮永遠(yuǎn)收不到或者很長(zhǎng)時(shí)間都收不到的情況,內(nèi)部有一個(gè)守護(hù)線程穗慕,定時(shí)去清理超時(shí)的消息饿敲。不然的話這個(gè)map肯定會(huì)爆炸,oom肯定是不允許發(fā)生的逛绵。
至于取消發(fā)布的邏輯基本上和發(fā)布一樣怀各,只不過是反著來的倔韭,一個(gè)是添加,一個(gè)是移除而已瓢对。不啰嗦了寿酌。
服務(wù)訂閱和服務(wù)下線留到下篇接著講。