Dubbo 服務(wù)暴露 源碼學(xué)習(xí)(下)(四)

筆記簡述
上文Dubbo 服務(wù)暴露 源碼學(xué)習(xí)(上)(三)已經(jīng)大致介紹了服務(wù)暴露的大致流程菌湃,不過還差了最后的invoke和export操作潜支,本學(xué)習(xí)筆記就來繼續(xù)介紹服務(wù)暴露的實現(xiàn)蒸矛。
更多內(nèi)容可看[目錄]Dubbo 源碼學(xué)習(xí)

目錄

Dubbo 服務(wù)暴露 源碼學(xué)習(xí)(下)(四)
1娄徊、protocol & proxyFactory
2舷嗡、獲取Invoke
2.1、JavassistProxyFactory獲取Invoke
2.2嵌莉、JdkProxyFactory獲取Invoke
2.3进萄、Invoke是什么
3、Invoke暴露為export
3.1锐峭、獲取真實的Protocol類
3.2中鼠、注冊協(xié)議 暴露
3.2.1、獲取遠程控制中心地址 getRegistry
3.2.2沿癞、注冊到注冊中心
3.2.3援雇、暴露服務(wù)之注冊
3.2.4、網(wǎng)絡(luò)端口開啟
3.2.5椎扬、開啟心跳檢測

首先服務(wù)暴露的函數(shù)是如下的一行代碼惫搏,接下來也主要是介紹這行代碼的真正的操作操作細節(jié)。

Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));

1蚕涤、protocol & proxyFactory

根據(jù)我們之前對dubbo spi的學(xué)習(xí)和了解筐赔,到這里已經(jīng)知道了protocol和proxyFactory指的具體是什么了。

protocol 類

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
        if (arg1 == null) 
            throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");

        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        //根據(jù)URL配置信息獲取Protocol協(xié)議揖铜,默認是dubbo
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            //根據(jù)協(xié)議名茴丰,獲取Protocol實體類
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

        return extension.export(arg0);
    }

    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
}

proxyFactory 類

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
        if (arg2 == null) 
            throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");

        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        // 獲取ProxyFactory類的名稱為extName的實現(xiàn)類

        return extension.getInvoker(arg0, arg1, arg2);
        // arg0 是具體被調(diào)用的實現(xiàn)類
        // arg1是實現(xiàn)類的接口
        // arg2 是對外暴露的URL信息
        // 只是調(diào)用鏈路已經(jīng)轉(zhuǎn)交給具體的extension了
    }

    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

       if (arg0.getUrl() == null) 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();

        String extName = url.getParameter("proxy", "javassist");
        // 獲取URL中的proxy參數(shù)信息,沒有則設(shè)置為javassist
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");

        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);

        return extension.getProxy(arg0);
    }
}

可以很明顯的看出來默認的protocol是使用的dubbo協(xié)議天吓,對應(yīng)的實例是包裝DubboProtocol后的實例贿肩,ProxyFactory使用的是javassist,對應(yīng)的實例是JavassistProxyFactory

暴露操作包含了兩個部分龄寞,一個Invoke,另一個是export

2汰规、獲取Invoke

2.1、JavassistProxyFactory獲取Invoke

proxyFactory.getInvoker(ref, (Class) interfaceClass, local)方法獲得具體的Invoke物邑。
此時的ref是暴露的具體實現(xiàn)類溜哮,interfaceClass是對應(yīng)的接口信息,local就是URL信息,具體內(nèi)容是

registry://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo%3A%2F%2F172.16.109.110%3A20880%2Fcom.jwfy.dubbo.product.ProductService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26default.loadbalance%3Drandom%26dubbo%3D2.5.3%26interface%3Dcom.jwfy.dubbo.product.ProductService%26methods%3Dprint%2CgetStr%26owner%3Djwfy%26pid%3D13859%26side%3Dprovider%26timestamp%3D1525772505371%26token%3Dfdfdf&group=dubbo-demo&owner=jwfy&pid=13859&registry=zookeeper&timestamp=1525772500246

需要對外暴露的服務(wù)就是包含在URL信息中的ProductService信息

在本demo中拂封,getInvoke操作獲取到JavassistProxyFactory對象后執(zhí)行他的getInvoke操作

JavassistProxyFactory 類

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper類不能正確處理帶$的類名
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 這里生產(chǎn)的wrapper也是動態(tài)生成的
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName, 
                                  Class<?>[] parameterTypes, 
                                  Object[] arguments) throws Throwable {
                                  // 這個proxy也就是具體的實現(xiàn)類茬射,對應(yīng)上面的ref
                                  // methodName是方法名
                                  // 剩下的是參數(shù)類型以及名稱
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

生產(chǎn)的invoke對象其實是個AbstractProxyInvoker,只不過在調(diào)用他的doInvoke操作時冒签,最后會執(zhí)行拼接生成的wrapper對象的invokeMethod方法上在抛。

getWrapper

在獲取wrapper操作,也同樣是動態(tài)拼接字符串生成的萧恕,重點看其中的invokeMethod方法

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) 
                     throws java.lang.reflect.InvocationTargetException{ 
com.jwfy.dubbo.product.ProductServiceImpl w; 
// 看這個就已經(jīng)非常明顯的說明實現(xiàn)類的具體對象
try{ 
   w = ((com.jwfy.dubbo.product.ProductServiceImpl)$1); 
   // 格式化刚梭,強轉(zhuǎn)類型
} catch (Throwable e) { 
  throw new IllegalArgumentException(e); 
} 

try{ 
  // 對函數(shù)名稱和參數(shù)進行匹配校驗操作
  if( "print".equals( $2 )  &&  $3.length == 0 ) {  
     w.print(); 
     // 函數(shù)本身是返回void類型肠阱,則直接返回
     return null; 
   } 
   if( "getStr".equals( $2 )  &&  $3.length == 0 ) {  
      // 調(diào)用執(zhí)行結(jié)果后強轉(zhuǎn)格式返回
      return ($w)w.getStr(); 
    } 
 } catch(Throwable e) {      
    throw new java.lang.reflect.InvocationTargetException(e);  
 } 

// 每個接口的方法都會遍歷一遍,如果啥都沒匹配到朴读,就提示沒有方法異常
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class com.jwfy.dubbo.product.ProductServiceImpl."); 
}

現(xiàn)在應(yīng)該就很清楚了屹徘,在執(zhí)行invokeMethod的時候,背后其實就是調(diào)用了實現(xiàn)類的對應(yīng)方法衅金,只是這個wrapper本身是動態(tài)生成的

2.2噪伊、JdkProxyFactory獲取Invoke

上面說了在動態(tài)生成的代理工廠中默認實現(xiàn)的是JavassistProxyFactory,但是也可以使用java本身的協(xié)議氮唯,也就是JdkProxyFactory

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName, 
                                  Class<?>[] parameterTypes, 
                                  Object[] arguments) throws Throwable {
            Method method = proxy.getClass().getMethod(methodName, parameterTypes);
            return method.invoke(proxy, arguments);
        }
    };
}

完全就是通過java的反射去調(diào)用執(zhí)行

2.3鉴吹、Invoke是什么

其實剛剛開始看源碼的時候并不是非常的理解Invoke到底是個什么,現(xiàn)在可以說Invoke是實現(xiàn)類的包裝類豆励,并包含了URL等信息,后續(xù)可以通過invoke方法去調(diào)用具體服務(wù)方瞒渠。

image

3良蒸、Invoke暴露為export

3.1、獲取真實的Protocol類

protocol.export(invoke),其中的invoke就是上面生成的抽象invoke類,可是在單步調(diào)試的時候卻發(fā)現(xiàn)并沒有直接進入到我們設(shè)想的RegistryProtocol類中

這個需要追蹤到Dubbo SPI中的cachedWrapperClasses數(shù)據(jù)處理中

image

上述代碼已經(jīng)很清楚了伍玖,獲取wrapper嫩痰,首先不應(yīng)該被Adaptive注解(未貼出),其次一定得存在包含了參數(shù)為type的構(gòu)造函數(shù)私沮,而如下文件則是protocol的spi文件始赎,可以知道只有filter和listener符合操作

image
image

這樣就非常清楚了,在獲取getExtension中仔燕,我們應(yīng)該是獲取到了RegistryProtocol對象,但是后續(xù)的cachedWrapperClasses操作又加上了包裝操作魔招,先后加入了ProtocolFilterWrapper晰搀、ProtocolListenerWrapper對象,使得在后續(xù)protocol.export操作不是進入到RegistryProtocol中办斑,而是首先進入到ProtocolFilterWrapper

ProtocolFilterWrapper 類

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        // 如果invoke的協(xié)議是registry類型
       return protocol.export(invoker);
    }
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

然后來到了ProtocolListenerWrapper類

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        // 如果invoke的協(xié)議是registry類型
        return protocol.export(invoker);
    }
    return new ListenerExporterWrapper<T>(protocol.export(invoker), 
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

3.2外恕、注冊協(xié)議 暴露

不經(jīng)過任何處理,通過兩個wrapper的轉(zhuǎn)發(fā)乡翅,直接來到RegistryProtocol的export操作

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    // 本地暴露服務(wù)
    final Registry registry = getRegistry(originInvoker);
    // 獲取遠程注冊中心
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    // 注冊到注冊中心的URL信息鳞疲,其中包含了一個接口的信息以及協(xié)議等信息
    registry.register(registedProviderUrl);
    // 注冊操作,如果查看zk記錄蠕蚜,可以發(fā)現(xiàn)注冊成功的操作日志
    
    // 訂閱override數(shù)據(jù)
    // FIXME 提供者訂閱時尚洽,會影響同一JVM即暴露服務(wù),又引用同一服務(wù)的的場景靶累,因為subscribed以服務(wù)名為緩存的key腺毫,導(dǎo)致訂閱信息覆蓋癣疟。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保證每次export都返回一個新的exporter實例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
            try {
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                overrideListeners.remove(overrideSubscribeUrl);
                registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    };
}

3.2.1、獲取遠程控制中心地址 getRegistry

private Registry getRegistry(final Invoker<?> originInvoker){
    URL registryUrl = originInvoker.getUrl();
    // 獲取的invoke的URL信息
    // registry://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?
    //application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo
    //%3A%2F%2F192.168.10.123%3A20880%2Fcom.jwfy.dubbo.product.ProductService
    //%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26default.loadbalance%3Drandom
    //%26dubbo%3D2.5.3%26interface%3Dcom.jwfy.dubbo.product.ProductService%26methods
    // %3Dprint%2CgetStr%26owner%3Djwfy%26pid%3D12663%26side%3Dprovider
    // %26timestamp%3D1525733684167%26token%3Dfdfdf&
    // group=dubbo-demo&owner=jwfy&pid=12663&registry=zookeeper&timestamp=1525733671009
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
       // 如果協(xié)議是register
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        // 默認協(xié)議是dubbo
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        // 替換協(xié)議為dubbo潮酒,并移除參數(shù)中的注冊數(shù)據(jù)
        // 現(xiàn)在的URL信息是
        // zookeeper://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?
        // application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.10.123%3A20880
        //%2Fcom.jwfy.dubbo.product.ProductService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26default.loadbalance%3Drandom%26dubbo%3D2.5.3
        //%26interface%3Dcom.jwfy.dubbo.product.ProductService%26
        //methods%3Dprint%2CgetStr%26owner%3Djwfy%26pid%3D12663%26
        // side%3Dprovider%26timestamp%3D1525733684167%26
        //token%3Dfdfdf&group=dubbo-demo&owner=jwfy&pid=12663&timestamp=1525733671009
    }
    return registryFactory.getRegistry(registryUrl);
}

registryFactory 同樣是在RegistryProtocol實例完后注入的動態(tài)對象

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory {
    public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
    
        if (arg0 == null) throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg0;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");

        com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);

        return extension.getRegistry(arg0);
    }
}

對應(yīng)實現(xiàn)的對象是ZookeeperRegistryFactory睛挚,調(diào)用其getRegistry方法,來到了AbstractRegistryFactory類

public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceString();
    // 獲取的key是zookeeper://127.0.0.1:2182/dubbo-demo/com.alibaba.dubbo.registry.RegistryService
    // 鎖定注冊中心獲取過程急黎,保證注冊中心單一實例
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        // 從緩存獲取注冊中心扎狱,REGISTRIES是一個線程安全的map
        if (registry != null) {
            return registry;
        }
        registry = createRegistry(url);
        // 創(chuàng)建一個注冊中心ZookeeperRegistry
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 釋放鎖
        LOCK.unlock();
    }
}

AbstractRegistry 類

public AbstractRegistry(URL url) {
    setUrl(url);
    // 啟動文件保存定時器
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
    // 文件名為/Users/XXX/.dubbo/dubbo-registry-127.0.0.1.cache
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        file = new File(filename);
        if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
            if(! file.getParentFile().mkdirs()){
                throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
            }
        }
    }
    this.file = file;
    loadProperties();
    // 從.cache 文件中獲取已經(jīng)存儲的zk信息
    notify(url.getBackupUrls());
    // 通知訂閱
}
image
image

來到訂閱notify方法

protected void notify(List<URL> urls) {
    if(urls == null || urls.isEmpty()) return;
    
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        // 獲取訂閱者
        URL url = entry.getKey();
        
        if(! UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }
        
        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    notify(url, listener, filterEmpty(url, urls));
                    // 真正的通知觸發(fā)
                } catch (Throwable t) {
                    logger.error("Failed to notify registry event, urls: " +  urls + ", cause: " + t.getMessage(), t);
                }
            }
        }
    }
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((urls == null || urls.size() == 0) 
            && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            // 處理的URL信息u和url是匹配的
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    // 一個類目下存儲的多個URL信息
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        // 存儲到文件中,對于上面的讀取文件
        listener.notify(categoryList);
        // 由監(jiān)聽器掌握處理
    }
}

FailbackRegistry 類

public FailbackRegistry(URL url) {
    super(url);
    int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    // 設(shè)置重試的時間勃教,默認為5s委乌,當(dāng)鏈接失敗就會觸發(fā)這個重連操作
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        public void run() {
            // 檢測并連接注冊中心
            try {
                retry();
            } catch (Throwable t) { // 防御性容錯
                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
            }
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}

鏈接失敗時,進行重試的操作就是在這里進行的荣回,retry就是獲取當(dāng)前registry中的failedRegistered等信息遭贸,如果failedRegistered中有URL信息存在,意味著之前存在鏈接失敗的情況心软,現(xiàn)在執(zhí)行retry進行重連操作

ZookeeperRegistry 類

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    // dubbo 分組概念 group
    if (! group.startsWith(Constants.PATH_SEPARATOR)) {
        // 如果組名稱不是/開頭壕吹,則添加該數(shù)據(jù)
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    zkClient = zookeeperTransporter.connect(url);
    // 調(diào)用zk的操作方式連接注冊中心,并持有該client
    zkClient.addStateListener(new StateListener() {
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

通過上述操作得到的注冊中心對象實例删铃,并且其URL為zookeeper://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?application=dubbo-demo&client=zkclient&dubbo=2.5.3&group=dubbo-demo&interface=com.alibaba.dubbo.registry.RegistryService&owner=jwfy&pid=12663&timestamp=1525733671009

3.2.2耳贬、注冊到注冊中心

final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// 先獲取invoke對象的注冊URL
registry.register(registedProviderUrl);
// 注冊到注冊中心
// 此時觀察zk的日志會發(fā)現(xiàn)注冊操作

從invoke對錯獲取的注冊URL是dubbo://192.168.10.123:20880/com.jwfy.dubbo.product.ProductService?anyhost=true&application=dubbo-demo&default.loadbalance=random&dubbo=2.5.3&interface=com.jwfy.dubbo.product.ProductService&methods=print,getStr&owner=jwfy&pid=12663&side=provider&timestamp=1525733684167&token=fdfdf

其包含了當(dāng)前bean的基本信息,把這些信息提交給注冊中心猎唁,服務(wù)使用方就可以獲取到這些數(shù)據(jù)咒劲,然后反轉(zhuǎn)生成invoke去調(diào)用執(zhí)行

FailbackRegistry 類

public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服務(wù)器端發(fā)送注冊請求
        // 真正調(diào)用ZK包的接口注冊到zk注冊中心
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 如果開啟了啟動時檢測,則直接拋出異常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if(skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 將失敗的注冊請求記錄到失敗列表诫隅,定時重試
        // 上面的rety方法就有使用這個failedRegistered容器內(nèi)的數(shù)據(jù)
        failedRegistered.add(url);
    }
}

以下就是zk的輸出日志腐魂,可以很清晰的看到確實創(chuàng)建了節(jié)點信息

2018-05-08 22:52:52,808 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn$Factory@251] - Accepted socket connection from /127.0.0.1:55708
2018-05-08 22:52:52,926 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@770] - Client attempting to renew session 0x163267acb7f000b at /127.0.0.1:55708
2018-05-08 22:52:52,941 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@1573] - Invalid session 0x163267acb7f000b for client /127.0.0.1:55708, probably expired
2018-05-08 22:52:52,949 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@1435] - Closed socket connection for client /127.0.0.1:55708 which had sessionid 0x163267acb7f000b
2018-05-08 22:52:53,437 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn$Factory@251] - Accepted socket connection from /127.0.0.1:55709
2018-05-08 22:52:53,444 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@777] - Client attempting to establish new session at /127.0.0.1:55709
2018-05-08 22:52:53,454 - INFO  [SyncThread:0:NIOServerCnxn@1580] - Established session 0x163267acb7f000c with negotiated timeout 30000 for client /127.0.0.1:55709
2018-05-08 22:52:56,957 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x163267acb7f000c type:create cxid:0x1 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/dubbo-demo Error:KeeperErrorCode = NodeExists for /dubbo-demo
2018-05-08 22:52:57,018 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x163267acb7f000c type:create cxid:0x2 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/dubbo-demo/com.jwfy.dubbo.product.ProductService Error:KeeperErrorCode = NodeExists for /dubbo-demo/com.jwfy.dubbo.product.ProductService
2018-05-08 22:52:57,029 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x163267acb7f000c type:create cxid:0x3 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/dubbo-demo/com.jwfy.dubbo.product.ProductService/providers Error:KeeperErrorCode = NodeExists for /dubbo-demo/com.jwfy.dubbo.product.ProductService/providers

到這里服務(wù)注冊到注冊中心就已經(jīng)完成了,同時還伴隨著從文件加載注冊信息和保存注冊信息逐纬,可自行通過zKcli命令去

3.2.3蛔屹、暴露服務(wù)之注冊

private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
    // invoke的URL信息是registry://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService? 
    // application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo%3A%2F%2F172.16.109.110%3A20880%2F
    // com.jwfy.dubbo.product.ProductService%3Fanyhost%3Dtrue%26application%3D
    // dubbo-demo%26default.loadbalance%3Drandom%26dubbo%3D2.5.3%26
    // interface%3Dcom.jwfy.dubbo.product.ProductService%26methods%3Dprint%2CgetStr%26
    // owner%3Djwfy%26pid%3D13375%26side%3Dprovider%26timestamp%3D1525749656129%26
    // token%3Dfdfdf&group=dubbo-demo&owner=jwfy&pid=13375&registry=zookeeper&
    // timestamp=1525749652060
    String key = getCacheKey(originInvoker);
    // key就是dubbo://172.16.109.110:20880/com.jwfy.dubbo.product.ProductService?  
    // anyhost=true&application=dubbo-demo&default.loadbalance=random&dubbo=2.5.3&interface=com.jwfy.dubbo.product.ProductService
    // &methods=print,getStr&owner=jwfy&pid=13375&side=provider&timestamp=1525749656129&token=fdfdf
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                // 又出現(xiàn)了double-check操作
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                // 包裝類
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                
                bounds.put(key, exporter);
            }
        }
    }
    return (ExporterChangeableWrapper<T>) exporter;
}

在上面的protocol.export操作中,protocol也不是DubboProtocol本身豁生,而是包裝了ProtocolFilterWrapper兔毒、ProtocolListenerWrapper,協(xié)議不是register甸箱,各種處理之后進入到DubboProtocol的export進行暴露操作育叁。

3.2.4、網(wǎng)絡(luò)端口開啟

DubboProtocol 類

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    // URL是dubbo://172.16.109.110:20880/com.jwfy.dubbo.product.ProductService?anyhost=true&
    //application=dubbo-demo&default.loadbalance=random&dubbo=2.5.3&
    //interface=com.jwfy.dubbo.product.ProductService&methods=print,getStr&owner=jwfy
   // &pid=13375&side=provider&timestamp=1525749656129&token=fdfdf
    
    // export service.
    String key = serviceKey(url);
    // key是com.jwfy.dubbo.product.ProductService:20880
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 組成成為一個新的包裝類DubboExporter
    exporterMap.put(key, exporter);
    
    //export an stub service for dispaching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
            if (logger.isWarnEnabled()){
                logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);
    // 來了芍殖,最關(guān)鍵的時候
    
    return exporter;
}
private ExchangeServer createServer(URL url) {
    //默認開啟server關(guān)閉時發(fā)送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默認開啟heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    // RPC服務(wù)的名稱豪嗽,默認是netty

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
        // 綁定操作
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

Exchangers 類

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).bind(url, handler);
    // getExchanger 是通過SPI返回一個HeaderExchanger對象
}

HeaderExchanger 類

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    // 包裝一個HeaderExchangeHandler對象
    // 再包裝一個DecodeHandler對象
    // Transporters.bind綁定操做
    // 最后包裝成HeaderExchangeServer返回
    return new HeaderExchangeServer(
        Transporters.bind(url, 
            new DecodeHandler(
                new HeaderExchangeHandler(handler)
            )
        )
    );
}

其中Transporters.bind會先獲取當(dāng)前可用的其中Transporters,默認也是NettyTransporter對象,調(diào)用其bind方法
new NettyServer(url, listener)昵骤,來到了AbstractServer類

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();
    // 獲取本地套接字地址树碱,也就是IP端口
    String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                    || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                    ? NetUtils.ANYHOST : getUrl().getHost();
    bindAddress = new InetSocketAddress(host, getUrl().getPort());
    // 綁定地址,如果是本地項目变秦,host為"0.0.0.0"
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {
        doOpen();
        // 連接打開成榜,當(dāng)前默認是使用的NettyServer里的方法
        // 關(guān)于Netty的操作細節(jié)目前也不是很理解,后續(xù)補充
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    if (handler instanceof WrappedChannelHandler ){
        executor = ((WrappedChannelHandler)handler).getExecutor();
    }
}

其最終返回一個NettyService,服務(wù)端已經(jīng)開啟了蹦玫,客戶端可以連接了赎婚,繼續(xù)跳入到HeaderExchangeServer中

3.2.5、開啟心跳檢測

HeaderExchangeServer 類

public HeaderExchangeServer(Server server) {
    // 此處傳遞的service就是上面生成的NettyService
    if (server == null) {
        throw new IllegalArgumentException("server == null");
    }
    this.server = server;
    // 開始心跳檢測
    this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
    // 默認是0(可是沒找到在哪設(shè)置為了6000樱溉,也就是6s)
    this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    // 定時任務(wù)間隔期 6*3 = 18s
    // 如果沒有設(shè)置心跳檢測的間隔期挣输,則設(shè)置為心跳延遲時間的3倍
    if (heartbeatTimeout < heartbeat * 2) {
        // 設(shè)置的時間不符合要求
        throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    }
    startHeatbeatTimer();
    // 開啟心跳檢測
}

心跳檢測最后真正執(zhí)行的任務(wù)是如下代碼

HeartBeatTask 類

public void run() {
    try {
        long now = System.currentTimeMillis();
        // 具體所有IO的channel是通過HeaderExchangeServer.this.getChannels()獲取到的
        for ( Channel channel : channelProvider.getChannels() ) {
            if (channel.isClosed()) {
                // 確保可用的channel
                continue;
            }
            try {
                Long lastRead = ( Long ) channel.getAttribute(
                        HeaderExchangeHandler.KEY_READ_TIMESTAMP );
                Long lastWrite = ( Long ) channel.getAttribute(
                        HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
                if ( ( lastRead != null && now - lastRead > heartbeat )
                        || ( lastWrite != null && now - lastWrite > heartbeat ) ) {
                    Request req = new Request();
                    req.setVersion( "2.0.0" );
                    req.setTwoWay( true );
                    req.setEvent( Request.HEARTBEAT_EVENT );
                    channel.send( req );
                    // 發(fā)生心跳檢測包 req是內(nèi)容
                    if ( logger.isDebugEnabled() ) {
                        logger.debug( "Send heartbeat to remote channel " + channel.getRemoteAddress()
                                              + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms" );
                    }
                }
                if ( lastRead != null && now - lastRead > heartbeatTimeout ) {
                    // channel因為超時可能存在關(guān)閉的情況
                    logger.warn( "Close channel " + channel
                                         + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms" );
                    if (channel instanceof Client) {
                        try {
                            ((Client)channel).reconnect();
                        }catch (Exception e) {
                            //do nothing
                        }
                    } else {
                        channel.close();
                    }
                }
            } catch ( Throwable t ) {
                logger.warn( "Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t );
            }
        }
    } catch ( Throwable t ) {
        logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );
    }
}

到此整個的服務(wù)暴露就全部結(jié)束了

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末福贞,一起剝皮案震驚了整個濱河市撩嚼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌挖帘,老刑警劉巖完丽,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異拇舀,居然都是意外死亡逻族,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門骄崩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來聘鳞,“玉大人,你說我怎么就攤上這事要拂】倭В” “怎么了?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵宇弛,是天一觀的道長鸡典。 經(jīng)常有香客問我,道長枪芒,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任谁尸,我火速辦了婚禮舅踪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘良蛮。我一直安慰自己抽碌,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著货徙,像睡著了一般左权。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上痴颊,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天赏迟,我揣著相機與錄音,去河邊找鬼蠢棱。 笑死锌杀,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的泻仙。 我是一名探鬼主播糕再,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼玉转!你這毒婦竟也來了突想?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤究抓,失蹤者是張志新(化名)和其女友劉穎猾担,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體漩蟆,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡垒探,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了怠李。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片圾叼。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖捺癞,靈堂內(nèi)的尸體忽然破棺而出夷蚊,到底是詐尸還是另有隱情,我是刑警寧澤髓介,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布惕鼓,位于F島的核電站,受9級特大地震影響唐础,放射性物質(zhì)發(fā)生泄漏箱歧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一一膨、第九天 我趴在偏房一處隱蔽的房頂上張望呀邢。 院中可真熱鬧,春花似錦豹绪、人聲如沸价淌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蝉衣。三九已至括尸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間病毡,已是汗流浹背濒翻。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留剪验,地道東北人肴焊。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像功戚,于是被迫代替她去往敵國和親娶眷。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容