Spring Cloud Ribbon設(shè)計(jì)原理

Ribbon 是netflix 公司開(kāi)源的基于客戶(hù)端的負(fù)載均衡組件,是Spring Cloud大家庭中非常重要的一個(gè)模塊找都;Ribbon應(yīng)該也是整個(gè)大家庭中相對(duì)而言比較復(fù)雜的模塊,直接影響到服務(wù)調(diào)度的質(zhì)量和性能廊敌。全面掌握Ribbon可以幫助我們了解在分布式微服務(wù)集群工作模式下飒炎,服務(wù)調(diào)度應(yīng)該考慮到的每個(gè)環(huán)節(jié)谁不。
本文將詳細(xì)地剖析Ribbon的設(shè)計(jì)原理,幫助大家對(duì)Spring Cloud 有一個(gè)更好的認(rèn)知震嫉。

一. Spring集成下的Ribbon工作結(jié)構(gòu)

先貼一張總覽圖森瘪,說(shuō)明一下Spring如何集成Ribbon的,如下所示:

image.png

Spring Cloud集成模式下的Ribbon有以下幾個(gè)特征:

  1. Ribbon 服務(wù)配置方式
    每一個(gè)服務(wù)配置都有一個(gè)Spring ApplicationContext上下文票堵,用于加載各自服務(wù)的實(shí)例扼睬。
    比如,當(dāng)前Spring Cloud 系統(tǒng)內(nèi)悴势,有如下幾個(gè)服務(wù):
服務(wù)名稱(chēng) 角色 依賴(lài)服務(wù)
order 訂單模塊 user
user 用戶(hù)模塊 無(wú)
mobile-bff 移動(dòng)端BFF order,user

mobile-bff服務(wù)在實(shí)際使用中窗宇,會(huì)用到orderuser模塊,那么在mobile-bff服務(wù)的Spring上下文中特纤,會(huì)為orderuser 分別創(chuàng)建一個(gè)子ApplicationContext,用于加載各自服務(wù)模塊的配置军俊。也就是說(shuō),各個(gè)客戶(hù)端的配置相互獨(dú)立捧存,彼此不收影響

  1. 和Feign的集成模式
    在使用Feign作為客戶(hù)端時(shí)粪躬,最終請(qǐng)求會(huì)轉(zhuǎn)發(fā)成 http://<服務(wù)名稱(chēng)>/<relative-path-to-service>的格式,通過(guò)LoadBalancerFeignClient昔穴, 提取出服務(wù)標(biāo)識(shí)<服務(wù)名稱(chēng)>镰官,然后根據(jù)服務(wù)名稱(chēng)在上下文中查找對(duì)應(yīng)服務(wù)的負(fù)載均衡器FeignLoadBalancer負(fù)載均衡器負(fù)責(zé)根據(jù)既有的服務(wù)實(shí)例的統(tǒng)計(jì)信息吗货,挑選出最合適的服務(wù)實(shí)例

二泳唠、Spring Cloud模式下和Feign的集成實(shí)現(xiàn)方式

和Feign結(jié)合的場(chǎng)景下,F(xiàn)eign的調(diào)用會(huì)被包裝成調(diào)用請(qǐng)求LoadBalancerCommand宙搬,然后底層通過(guò)Rxjava基于事件的編碼風(fēng)格笨腥,發(fā)送請(qǐng)求孙援;Spring Cloud框架通過(guò) Feigin 請(qǐng)求的URL,提取出服務(wù)名稱(chēng)扇雕,然后在上下文中找到對(duì)應(yīng)服務(wù)的的負(fù)載均衡器實(shí)現(xiàn)FeignLoadBalancer,然后通過(guò)負(fù)載均衡器中挑選一個(gè)合適的Server實(shí)例,然后將調(diào)用請(qǐng)求轉(zhuǎn)發(fā)到該Server實(shí)例上拓售,完成調(diào)用,在此過(guò)程中镶奉,記錄對(duì)應(yīng)Server實(shí)例的調(diào)用統(tǒng)計(jì)信息础淤。

/**
     * Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
     * If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
     * function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
     * exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
     * result during execution and retries will be emitted.
     */
    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }
        
        // 同一Server最大嘗試次數(shù)
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        //下一Server最大嘗試次數(shù)
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        // 使用負(fù)載均衡器,挑選出合適的Server哨苛,然后執(zhí)行Server請(qǐng)求鸽凶,將請(qǐng)求的數(shù)據(jù)和行為整合到ServerStats中
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        // 獲取Server的統(tǒng)計(jì)值
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        
                        // Called for each attempt and retry 服務(wù)調(diào)用
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        context.incAttemptCount();//重試計(jì)數(shù)
                                        loadBalancerContext.noteOpenConnection(stats);//鏈接統(tǒng)計(jì)
                                        
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }
                                        //執(zhí)行監(jiān)控器,記錄執(zhí)行時(shí)間
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        //找到合適的server后建峭,開(kāi)始執(zhí)行請(qǐng)求
                                        //底層調(diào)用有結(jié)果后玻侥,做消息處理
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                recordStats(tracer, stats, entity, null);
                                                // 記錄統(tǒng)計(jì)信息
                                            }

                                            @Override
                                            public void onError(Throwable e) {
                                                recordStats(tracer, stats, null, e);//記錄異常信息
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }

                                            @Override
                                            public void onNext(T entity) {
                                                this.entity = entity;//返回結(jié)果值
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            
                                            
                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();//結(jié)束計(jì)時(shí)
                                                //標(biāo)記請(qǐng)求結(jié)束,更新統(tǒng)計(jì)信息
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });
                        //如果失敗亿蒸,根據(jù)重試策略觸發(fā)重試邏輯
                        // 使用observable 做重試邏輯凑兰,根據(jù)predicate 做邏輯判斷,這里做
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
         // next請(qǐng)求處理边锁,基于重試器操作   
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }

從一組ServerList 列表中挑選合適的Server

    /**
     * Compute the final URI from a partial URI in the request. The following steps are performed:
     * <ul>
     * <li>  如果host尚未指定姑食,則從負(fù)載均衡器中選定 host/port
     * <li>  如果host 尚未指定并且尚未找到負(fù)載均衡器,則嘗試從 虛擬地址中確定host/port
     * <li> 如果指定了HOST,并且URI的授權(quán)部分通過(guò)虛擬地址設(shè)置茅坛,并且存在負(fù)載均衡器音半,則通過(guò)負(fù)載就均衡器中確定host/port(指定的HOST將會(huì)被忽略)
     * <li> 如果host已指定,但是尚未指定負(fù)載均衡器和虛擬地址配置贡蓖,則使用真實(shí)地址作為host
     * <li> if host is missing but none of the above applies, throws ClientException
     * </ul>
     *
     * @param original Original URI passed from caller
     */
    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // 提供部分URI曹鸠,缺少HOST情況下
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
                Server svc = lb.chooseServer(loadBalancerKey);// 使用負(fù)載均衡器選擇Server
                if (svc == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                //通過(guò)負(fù)載均衡器選擇的結(jié)果中選擇host
                host = svc.getHost();
                if (host == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                return svc;
            } else {
                // No Full URL - and we dont have a LoadBalancer registered to
                // obtain a server
                // if we have a vipAddress that came with the registration, we
                // can use that else we
                // bail out
                // 通過(guò)虛擬地址配置解析出host配置返回
                if (vipAddresses != null && vipAddresses.contains(",")) {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            "Method is invoked for client " + clientName + " with partial URI of ("
                            + original
                            + ") with no load balancer configured."
                            + " Also, there are multiple vipAddresses and hence no vip address can be chosen"
                            + " to complete this partial uri");
                } else if (vipAddresses != null) {
                    try {
                        Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
                        host = hostAndPort.first();
                        port = hostAndPort.second();
                    } catch (URISyntaxException e) {
                        throw new ClientException(
                                ClientException.ErrorType.GENERAL,
                                "Method is invoked for client " + clientName + " with partial URI of ("
                                + original
                                + ") with no load balancer configured. "
                                + " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
                    }
                } else {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            this.clientName
                            + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
                            + " Also has no vipAddress registered");
                }
            }
        } else {
            // Full URL Case URL中指定了全地址,可能是虛擬地址或者是hostAndPort
            // This could either be a vipAddress or a hostAndPort or a real DNS
            // if vipAddress or hostAndPort, we just have to consult the loadbalancer
            // but if it does not return a server, we should just proceed anyways
            // and assume its a DNS
            // For restClients registered using a vipAddress AND executing a request
            // by passing in the full URL (including host and port), we should only
            // consult lb IFF the URL passed is registered as vipAddress in Discovery
            boolean shouldInterpretAsVip = false;

            if (lb != null) {
                shouldInterpretAsVip = isVipRecognized(original.getAuthority());
            }
            if (shouldInterpretAsVip) {
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc != null){
                    host = svc.getHost();
                    if (host == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Invalid Server for :" + svc);
                    }
                    logger.debug("using LB returned Server: {} for request: {}", svc, original);
                    return svc;
                } else {
                    // just fall back as real DNS
                    logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
                }
            } else {
                // consult LB to obtain vipAddress backed instance given full URL
                //Full URL execute request - where url!=vipAddress
                logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
            }
        }
        // end of creating final URL
        if (host == null){
            throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
        }
        // just verify that at this point we have a full URL

        return new Server(host, port);
    }

三. LoadBalancer--負(fù)載均衡器的核心

LoadBalancer 的職能主要有三個(gè):

  1. 維護(hù)Sever列表的數(shù)量(新增斥铺、更新彻桃、刪除等)
  2. 維護(hù)Server列表的狀態(tài)(狀態(tài)更新)
  3. 當(dāng)請(qǐng)求Server實(shí)例時(shí),能否返回最合適的Server實(shí)例

本章節(jié)將通過(guò)詳細(xì)闡述著這三個(gè)方面仅父。

3.1 負(fù)載均衡器的內(nèi)部基本實(shí)現(xiàn)原理

先熟悉一下負(fù)載均衡器LoadBalancer的實(shí)現(xiàn)原理圖:


image.png
組成部分 職能 參考章節(jié)
Server Server 作為服務(wù)實(shí)例的表示叛薯,會(huì)記錄服務(wù)實(shí)例的相關(guān)信息,如:服務(wù)地址笙纤,所屬zone耗溜,服務(wù)名稱(chēng),實(shí)例ID等
ServerList 維護(hù)著一組Server實(shí)例列表,在應(yīng)用運(yùn)行的過(guò)程中省容,Ribbon通過(guò)ServerList中的服務(wù)實(shí)例供負(fù)載均衡器選擇抖拴。ServerList維護(hù)列表可能在運(yùn)行的過(guò)程中動(dòng)態(tài)改變 3.2
ServerStats 作為對(duì)應(yīng)Server 的運(yùn)行情況統(tǒng)計(jì),一般是服務(wù)調(diào)用過(guò)程中的Server平均響應(yīng)時(shí)間,累計(jì)請(qǐng)求失敗計(jì)數(shù)阿宅,熔斷時(shí)間控制等候衍。一個(gè)ServerStats實(shí)例唯一對(duì)應(yīng)一個(gè)Server實(shí)例
LoadBalancerStats 作為 ServerStats實(shí)例列表的容器,統(tǒng)一維護(hù)
ServerListUpdater 負(fù)載均衡器通過(guò)ServerListUpdater來(lái)更新ServerList,比如實(shí)現(xiàn)一個(gè)定時(shí)任務(wù)洒放,每隔一段時(shí)間獲取最新的Server實(shí)例列表 3.2
Pinger 服務(wù)狀態(tài)檢驗(yàn)器蛉鹿,負(fù)責(zé)維護(hù)ServerList列表中的服務(wù)狀態(tài)注意:Pinger僅僅負(fù)責(zé)Server的狀態(tài),沒(méi)有能力決定是否刪除
PingerStrategy 定義以何種方式還檢驗(yàn)服務(wù)是否有效往湿,比如是按照順序的方式還是并行的方式
IPing Ping妖异,檢驗(yàn)服務(wù)是否可用的方法,常見(jiàn)的是通過(guò)HTTP领追,或者TCP/IP的方式看服務(wù)有無(wú)認(rèn)為正常的請(qǐng)求
image.png

3.2 如何維護(hù)Server列表他膳?(新增、更新绒窑、刪除)

單從服務(wù)列表的維護(hù)角度上棕孙,Ribbon的結(jié)構(gòu)如下所示:


image.png

Server列表的維護(hù)從實(shí)現(xiàn)方法上分為兩類(lèi):

  1. 基于配置的服務(wù)列表
    這種方式一般是通過(guò)配置文件,靜態(tài)地配置服務(wù)器列表些膨,這種方式相對(duì)而言比較簡(jiǎn)單蟀俊,但并不是意味著在機(jī)器運(yùn)行的時(shí)候就一直不變。netflix 在做Spring cloud 套件時(shí)傀蓉,使用了分布式配置框架netflix archaius 欧漱,archaius 框架有一個(gè)特點(diǎn)是會(huì)動(dòng)態(tài)的監(jiān)控配置文件的變化职抡,將變化刷新到各個(gè)應(yīng)用上葬燎。也就是說(shuō),當(dāng)我們?cè)诓魂P(guān)閉服務(wù)的情況下缚甩,如果修改了基于配置的服務(wù)列表時(shí), 服務(wù)列表可以直接刷新
  2. 結(jié)合服務(wù)發(fā)現(xiàn)組件(如Eureka)的服務(wù)注冊(cè)信息動(dòng)態(tài)維護(hù)服務(wù)列表
    基于Spring Cloud框架下谱净,服務(wù)注冊(cè)和發(fā)現(xiàn)是一個(gè)分布式服務(wù)集群必不可少的一個(gè)組件,它負(fù)責(zé)維護(hù)不同的服務(wù)實(shí)例(注冊(cè)擅威、續(xù)約壕探、取消注冊(cè)),本文將介紹和Eureka集成模式下郊丛,如果借助Eureka的服務(wù)注冊(cè)信息動(dòng)態(tài)刷新ribbon 的服務(wù)列表

Ribbon 通過(guò)配置項(xiàng):<service-name>.ribbon.NIWSServerListClassName 來(lái)決定使用哪種實(shí)現(xiàn)方式李请。對(duì)應(yīng)地:

策略 ServerList實(shí)現(xiàn)
基于配置 com.netflix.loadbalancer.ConfigurationBasedServerList
基于服務(wù)發(fā)現(xiàn) com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList

Server列表可能在運(yùn)行的時(shí)候動(dòng)態(tài)的更新,而具體的更新方式由<service-name>.ribbon.ServerListUpdaterClassName,當(dāng)前有如下兩種實(shí)現(xiàn)方式:

更新策略 ServerListUpdater實(shí)現(xiàn)
基于定時(shí)任務(wù)的拉取服務(wù)列表 com.netflix.loadbalancer.PollingServerListUpdater
基于Eureka服務(wù)事件通知的方式更新 com.netflix.loadbalancer.EurekaNotificationServerListUpdater
  • 基于定時(shí)任務(wù)拉取服務(wù)列表方式
    這種方式的實(shí)現(xiàn)為:com.netflix.loadbalancer.PollingServerListUpdater,其內(nèi)部維護(hù)了一個(gè)周期性的定時(shí)任務(wù)厉熟,拉取最新的服務(wù)列表导盅,然后將最新的服務(wù)列表更新到ServerList之中,其核心的實(shí)現(xiàn)邏輯如下:
public class PollingServerListUpdater implements ServerListUpdater {
    private static final Logger logger = LoggerFactory.getLogger(PollingServerListUpdater.class);

    private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
    private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
    // 更新器線程池定義以及鉤子設(shè)置
    private static class LazyHolder {
        private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
        private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
        private static Thread _shutdownThread;

        static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;

        static {
            int coreSize = poolSizeProp.get();
            ThreadFactory factory = (new ThreadFactoryBuilder())
                    .setNameFormat("PollingServerListUpdater-%d")
                    .setDaemon(true)
                    .build();
            _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
            poolSizeProp.addCallback(new Runnable() {
                @Override
                public void run() {
                    _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
                }

            });
            _shutdownThread = new Thread(new Runnable() {
                public void run() {
                    logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
                    shutdownExecutorPool();
                }
            });
            Runtime.getRuntime().addShutdownHook(_shutdownThread);
        }

        private static void shutdownExecutorPool() {
            if (_serverListRefreshExecutor != null) {
                _serverListRefreshExecutor.shutdown();

                if (_shutdownThread != null) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(_shutdownThread);
                    } catch (IllegalStateException ise) { // NOPMD
                        // this can happen if we're in the middle of a real
                        // shutdown,
                        // and that's 'ok'
                    }
                }

            }
        }
    }
    // 省略部分代碼
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            //創(chuàng)建定時(shí)任務(wù),按照特定的實(shí)行周期執(zhí)行更新操作
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        //執(zhí)行update操作 揍瑟,更新操作定義在LoadBalancer中
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
           //定時(shí)任務(wù)創(chuàng)建
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs, //初始延遲時(shí)間
                    refreshIntervalMs, //內(nèi)部刷新時(shí)間
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
//省略部分代碼
}

有上述代碼可以看到,ServerListUpdator只是定義了更新的方式白翻,而具體怎么更新,則是封裝成UpdateAction來(lái)操作的:

    /**
     * an interface for the updateAction that actually executes a server list update
     */
    public interface UpdateAction {
        void doUpdate();
    }
    
    //在DynamicServerListLoadBalancer 中則實(shí)現(xiàn)了具體的操作:
    public DynamicServerListLoadBalancer() {
        this.isSecure = false;
        this.useTunnel = false;
        this.serverListUpdateInProgress = new AtomicBoolean(false);
        this.updateAction = new UpdateAction() {
            public void doUpdate() {
                //更新服務(wù)列表
                DynamicServerListLoadBalancer.this.updateListOfServers();
            }
        };
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList();
       // 通過(guò)ServerList獲取最新的服務(wù)列表 
       if (this.serverListImpl != null) {
            servers = this.serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            //返回的結(jié)果通過(guò)過(guò)濾器的方式進(jìn)行過(guò)濾
            if (this.filter != null) {
                servers = this.filter.getFilteredListOfServers((List)servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            }
        }
        //更新列表
        this.updateAllServerList((List)servers);
    }

    protected void updateAllServerList(List<T> ls) {
        if (this.serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                Iterator var2 = ls.iterator();

                while(var2.hasNext()) {
                    T s = (Server)var2.next();
                    s.setAlive(true);
                }

                this.setServersList(ls);
                super.forceQuickPing();
            } finally {
                this.serverListUpdateInProgress.set(false);
            }
        }

    }
  • 基于Eureka服務(wù)事件通知的方式更新
    基于Eureka的更新方式則有些不同, 當(dāng)Eureka注冊(cè)中心發(fā)生了Server服務(wù)注冊(cè)信息變更時(shí)绢片,會(huì)將消息通知發(fā)送到EurekaNotificationServerListUpdater 上滤馍,然后此Updator觸發(fā)刷新ServerList:
public class EurekaNotificationServerListUpdater implements ServerListUpdater {

   //省略部分代碼
  
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
          //創(chuàng)建Eureka時(shí)間監(jiān)聽(tīng)器岛琼,當(dāng)Eureka發(fā)生改變后,將觸發(fā)對(duì)應(yīng)邏輯  
          this.updateListener = new EurekaEventListener() {
                @Override
                public void onEvent(EurekaEvent event) {
                    if (event instanceof CacheRefreshedEvent) {
                        //內(nèi)部消息隊(duì)列
                       if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                            logger.info("an update action is already queued, returning as no-op");
                            return;
                        }

                        if (!refreshExecutor.isShutdown()) {
                            try {
                                //提交更新操作請(qǐng)求到消息隊(duì)列中
                                refreshExecutor.submit(new Runnable() {
                                    @Override
                                    public void run() {
                                        try {
                                            updateAction.doUpdate(); // 執(zhí)行真正的更新操作
                                            lastUpdated.set(System.currentTimeMillis());
                                        } catch (Exception e) {
                                            logger.warn("Failed to update serverList", e);
                                        } finally {
                                            updateQueued.set(false);
                                        }
                                    }
                                });  // fire and forget
                            } catch (Exception e) {
                                logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                                updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                            }
                        }
                        else {
                            logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
                            stop();
                        }
                    }
                }
            };
            //EurekaClient 客戶(hù)端實(shí)例
            if (eurekaClient == null) {
                eurekaClient = eurekaClientProvider.get();
            }
            //基于EeurekaClient注冊(cè)事件監(jiān)聽(tīng)器
            if (eurekaClient != null) {
                eurekaClient.registerEventListener(updateListener);
            } else {
                logger.error("Failed to register an updateListener to eureka client, eureka client is null");
                throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
            }
        } else {
            logger.info("Update listener already registered, no-op");
        }
    }

}

3.2.1 相關(guān)的配置項(xiàng)
配置項(xiàng) 說(shuō)明 生效場(chǎng)景 默認(rèn)值
<service-name>.ribbon.NIWSServerListClassName ServerList的實(shí)現(xiàn)巢株,實(shí)現(xiàn)參考上述描述 ConfigurationBasedServerList
<service-name>.ribbon.listOfServers 服務(wù)列表 hostname:port 形式槐瑞,以逗號(hào)隔開(kāi) 當(dāng)ServerList實(shí)現(xiàn)基于配置時(shí)
<service-name>.ribbon.ServerListUpdaterClassName 服務(wù)列表更新策略實(shí)現(xiàn),參考上述描述 PollingServerListUpdater
<service-name>.ribbon.ServerListRefreshInterval 服務(wù)列表刷新頻率 基于定時(shí)任務(wù)拉取時(shí) 30s
3.2.2 ribbon的默認(rèn)實(shí)現(xiàn)

ribbon在默認(rèn)情況下阁苞,會(huì)采用如下的配置項(xiàng)随珠,即,采用基于配置的服務(wù)列表維護(hù)猬错,基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式窗看,頻率為30s.

<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
<service-name>.ribbon.listOfServers=<ip:port>,<ip:port>
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
<service-name>.ribbon.ServerListRefreshInterval=30
### 更新線程池大小
DynamicServerListLoadBalancer.ThreadPoolSize=2
3.2.3 Spring Cloud集成下的配置

ribbon在默認(rèn)情況下,會(huì)采用如下的配置項(xiàng)倦炒,即显沈,采用基于配置的服務(wù)列表維護(hù),基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式逢唤,頻率為30s.

<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
### 更新線程池大小
EurekaNotificationServerListUpdater.ThreadPoolSize=2
###通知隊(duì)列接收大小
EurekaNotificationServerListUpdater.queueSize=1000

3.3 負(fù)載均衡器如何維護(hù)服務(wù)實(shí)例的狀態(tài)拉讯?

Ribbon負(fù)載均衡器將服務(wù)實(shí)例的狀態(tài)維護(hù)托交給PingerPingerStrategy鳖藕、IPing 來(lái)維護(hù)魔慷,具體交互模式如下所示:

image.png

/**
* 定義Ping服務(wù)狀態(tài)是否有效的策略,是序列化順序Ping著恩,還是并行的方式Ping院尔,在此過(guò)程中,應(yīng)當(dāng)保證相互不受影響
 *
 */
public interface IPingStrategy {

    boolean[] pingServers(IPing ping, Server[] servers);
}


/**
 * 定義如何Ping一個(gè)服務(wù),確保是否有效
 * @author stonse
 *
 */
public interface IPing {
    
    /**
     * Checks whether the given <code>Server</code> is "alive" i.e. should be
     * considered a candidate while loadbalancing
     * 校驗(yàn)是否存活
     */
    public boolean isAlive(Server server);
}

3.3.1 創(chuàng)建Ping定時(shí)任務(wù)

默認(rèn)情況下,負(fù)載均衡器內(nèi)部會(huì)創(chuàng)建一個(gè)周期性定時(shí)任務(wù)

控制參數(shù) 說(shuō)明 默認(rèn)值
<service-name>.ribbon.NFLoadBalancerPingInterval Ping定時(shí)任務(wù)周期 30 s
<service-name>.ribbon.NFLoadBalancerMaxTotalPingTime Ping超時(shí)時(shí)間 2s
<service-name>.ribbon.NFLoadBalancerPingClassName IPing實(shí)現(xiàn)類(lèi) DummyPing,直接返回true

默認(rèn)的PingStrategy仇穗,采用序列化的實(shí)現(xiàn)方式绳瘟,依次檢查服務(wù)實(shí)例是否可用:

/**
     * Default implementation for <c>IPingStrategy</c>, performs ping
     * serially, which may not be desirable, if your <c>IPing</c>
     * implementation is slow, or you have large number of servers.
     */
    private static class SerialPingStrategy implements IPingStrategy {

        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];

            logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

            for (int i = 0; i < numCandidates; i++) {
                results[i] = false; /* Default answer is DEAD. */
                try {
                    // 按序列依次檢查服務(wù)是否正常,并返回對(duì)應(yīng)的數(shù)組表示 
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception e) {
                    logger.error("Exception while pinging Server: '{}'", servers[i], e);
                }
            }
            return results;
        }
    }
3.3.2 Ribbon默認(rèn)的IPing實(shí)現(xiàn):DummyPing

默認(rèn)的IPing實(shí)現(xiàn),直接返回為true:

public class DummyPing extends AbstractLoadBalancerPing {

    public DummyPing() {
    }

    public boolean isAlive(Server server) {
        return true;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

除此之外,IPing還有如下幾種實(shí)現(xiàn):

image.png
3.3.3 Spring Cloud集成下的IPing實(shí)現(xiàn):NIWSDiscoveryPing

而和Spring Cloud 集成后,IPing的默認(rèn)實(shí)現(xiàn),是NIWSDiscoveryPing ,其使用Eureka作為服務(wù)注冊(cè)和發(fā)現(xiàn)例获,則校驗(yàn)服務(wù)是否可用,則通過(guò)監(jiān)聽(tīng)Eureka 服務(wù)更新來(lái)更新Ribbon的Server狀態(tài)曹仗,而具體的實(shí)現(xiàn)就是 NIWSDiscoveryPing:

/**
 * "Ping" Discovery Client
 * i.e. we dont do a real "ping". We just assume that the server is up if Discovery Client says so
 * @author stonse
 *
 */
public class NIWSDiscoveryPing extends AbstractLoadBalancerPing {
            
        BaseLoadBalancer lb = null; 
        

        public NIWSDiscoveryPing() {
        }
        
        public BaseLoadBalancer getLb() {
            return lb;
        }

        /**
         * Non IPing interface method - only set this if you care about the "newServers Feature"
         * @param lb
         */
        public void setLb(BaseLoadBalancer lb) {
            this.lb = lb;
        }

        public boolean isAlive(Server server) {
            boolean isAlive = true;
         //取 Eureka Server 的Instance實(shí)例狀態(tài)作為Ribbon服務(wù)的狀態(tài)   
                 if (server!=null && server instanceof DiscoveryEnabledServer){
                DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
                InstanceInfo instanceInfo = dServer.getInstanceInfo();
                if (instanceInfo!=null){                    
                    InstanceStatus status = instanceInfo.getStatus();
                    if (status!=null){
                        isAlive = status.equals(InstanceStatus.UP);
                    }
                }
            }
            return isAlive;
        }

        @Override
        public void initWithNiwsConfig(
                IClientConfig clientConfig) {
        }
        
}

Spring Cloud下的默認(rèn)實(shí)現(xiàn)入口:

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }

3.4 如何從服務(wù)列表中挑選一個(gè)合適的服務(wù)實(shí)例榨汤?

3.4.1 服務(wù)實(shí)例容器:ServerList的維護(hù)

負(fù)載均衡器通過(guò) ServerList來(lái)統(tǒng)一維護(hù)服務(wù)實(shí)例,具體模式如下:


image.png

基礎(chǔ)的接口定義非常簡(jiǎn)單:

/**
 * Interface that defines the methods sed to obtain the List of Servers
 * @author stonse
 *
 * @param <T>
 */
public interface ServerList<T extends Server> {
    //獲取初始化的服務(wù)列表
    public List<T> getInitialListOfServers();
    
    /**
     * Return updated list of servers. This is called say every 30 secs
     * (configurable) by the Loadbalancer's Ping cycle
     * 獲取更新后的的服務(wù)列表
     */
    public List<T> getUpdatedListOfServers();   

}

在Ribbon的實(shí)現(xiàn)中整葡,在ServerList中件余,維護(hù)著Server的實(shí)例,并返回最新的List<Server>集合,供LoadBalancer使用

image.png

ServerList<Server>的職能
負(fù)責(zé)維護(hù)服務(wù)實(shí)例啼器,并使用ServerListFilter過(guò)濾器過(guò)濾出符合要求的服務(wù)實(shí)例列表List<Server>

3.4.2 服務(wù)實(shí)例列表過(guò)濾器ServerListFilter

服務(wù)實(shí)例列表過(guò)濾器ServerListFilter的職能很簡(jiǎn)單:
傳入一個(gè)服務(wù)實(shí)例列表旬渠,過(guò)濾出滿(mǎn)足過(guò)濾條件的服務(wù)列表

public interface ServerListFilter<T extends Server> {
    public List<T> getFilteredListOfServers(List<T> servers);
}
3.4.2.1 Ribbon 的默認(rèn)ServerListFilter實(shí)現(xiàn):ZoneAffinityServerListFilter

Ribbon默認(rèn)采取了區(qū)域優(yōu)先的過(guò)濾策略,即當(dāng)Server列表中端壳,過(guò)濾出和當(dāng)前實(shí)例所在的區(qū)域(zone)一致的server列表
與此相關(guān)聯(lián)的告丢,Ribbon有兩個(gè)相關(guān)得配置參數(shù):

控制參數(shù) 說(shuō)明 默認(rèn)值
<service-name>.ribbon.EnableZoneAffinity 是否開(kāi)啟區(qū)域優(yōu)先 false
<service-name>.ribbon.EnableZoneExclusivity 是否采取區(qū)域排他性,即只返回和當(dāng)前Zone一致的服務(wù)實(shí)例 false
<service-name>.ribbon.zoneAffinity.maxLoadPerServer 每個(gè)Server上的最大活躍請(qǐng)求負(fù)載數(shù)閾值 0.6
<service-name>.ribbon.zoneAffinity.maxBlackOutServesrPercentage 最大斷路過(guò)濾的百分比 0.8
<service-name>.ribbon.zoneAffinity.minAvailableServers 最少可用的服務(wù)實(shí)例閾值 2

其核心實(shí)現(xiàn)如下所示:

public class ZoneAffinityServerListFilter<T extends Server> extends
        AbstractServerListFilter<T> implements IClientConfigAware {

     @Override
    public List<T> getFilteredListOfServers(List<T> servers) {
          //zone非空损谦,并且開(kāi)啟了區(qū)域優(yōu)先岖免,并且服務(wù)實(shí)例數(shù)量不為空
          if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
           //基于斷言過(guò)濾服務(wù)列表 
           List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            //如果允許區(qū)域優(yōu)先,則返回過(guò)濾列表
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }
    // 判斷是否應(yīng)該使用區(qū)域優(yōu)先過(guò)濾條件
    private boolean shouldEnableZoneAffinity(List<T> filtered) {    
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
       // 獲取統(tǒng)計(jì)信息
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            //獲取區(qū)域Server快照照捡,包含統(tǒng)計(jì)數(shù)據(jù)
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            //平均負(fù)載颅湘,此負(fù)載的意思是,當(dāng)前所有的Server中栗精,平均每臺(tái)機(jī)器上的活躍請(qǐng)求數(shù)
           double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();            
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            // 1. 如果Server斷路的比例超過(guò)了設(shè)置的上限(默認(rèn)`0.8`)
            // 2. 或者當(dāng)前負(fù)載超過(guò)了設(shè)置的負(fù)載上限
            // 3. 如果可用的服務(wù)小于設(shè)置的服務(wù)上限`默認(rèn)為2`
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                    || loadPerServer >= activeReqeustsPerServerThreshold.get()
                    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                        new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
            
        }
    }
        
   

}

具體判斷流程如下所示

image.png

3.4.2.2 Ribbon 的ServerListFilter實(shí)現(xiàn)2:ZonePreferenceServerListFilter

ZonePreferenceServerListFilter 集成自 ZoneAffinityServerListFilter闯参,在此基礎(chǔ)上做了拓展,在 ZoneAffinityServerListFilter返回結(jié)果的基礎(chǔ)上悲立,再過(guò)濾出和本地服務(wù)相同區(qū)域(zone)的服務(wù)列表鹿寨。

核心邏輯-什么時(shí)候起作用?
當(dāng)指定了當(dāng)前服務(wù)的所在Zone薪夕,并且 ZoneAffinityServerListFilter 沒(méi)有起到過(guò)濾效果時(shí)脚草,ZonePreferenceServerListFilter會(huì)返回當(dāng)前Zone的Server列表。

public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> {

    private String zone;

    @Override
    public void initWithNiwsConfig(IClientConfig niwsClientConfig) {
        super.initWithNiwsConfig(niwsClientConfig);
        if (ConfigurationManager.getDeploymentContext() != null) {
            this.zone = ConfigurationManager.getDeploymentContext().getValue(
                    ContextKey.zone);
        }
    }

    @Override
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        //父類(lèi)的基礎(chǔ)上原献,獲取過(guò)濾結(jié)果
                List<Server> output = super.getFilteredListOfServers(servers);
        //沒(méi)有起到過(guò)濾效果馏慨,則進(jìn)行區(qū)域優(yōu)先過(guò)濾
                if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
    }

    public String getZone() {
        return zone;
    }

    public void setZone(String zone) {
        this.zone = zone;
    }
3.4.2.3 Ribbon 的ServerListFilter實(shí)現(xiàn)3:ServerListSubsetFilter

這個(gè)過(guò)濾器作用于當(dāng)Server數(shù)量列表特別龐大時(shí)(比如有上百個(gè)Server實(shí)例),這時(shí)嚼贡,長(zhǎng)時(shí)間保持Http鏈接也不太合適熏纯,可以適當(dāng)?shù)乇A舨糠址?wù),舍棄其中一些服務(wù)粤策,這樣可使釋放沒(méi)必要的鏈接。
此過(guò)濾器也是繼承自 ZoneAffinityServerListFilter,在此基礎(chǔ)上做了拓展误窖,在實(shí)際使用中不太常見(jiàn)叮盘,這個(gè)后續(xù)再展開(kāi)介紹,暫且不表霹俺。

3.4.3 LoadBalancer選擇服務(wù)實(shí)例 的流程

LoadBalancer的核心功能是根據(jù)負(fù)載情況柔吼,從服務(wù)列表中挑選最合適的服務(wù)實(shí)例。LoadBalancer內(nèi)部采用了如下圖所示的組件完成:

image.png

LoadBalancer 選擇服務(wù)實(shí)例的流程

  1. 通過(guò)ServerList獲取當(dāng)前可用的服務(wù)實(shí)例列表丙唧;
  2. 通過(guò)ServerListFilter將步驟1 得到的服務(wù)列表進(jìn)行一次過(guò)濾愈魏,返回滿(mǎn)足過(guò)濾器條件的服務(wù)實(shí)例列表;
  3. 應(yīng)用Rule規(guī)則,結(jié)合服務(wù)實(shí)例的統(tǒng)計(jì)信息,返回滿(mǎn)足規(guī)則的某一個(gè)服務(wù)實(shí)例培漏;

通過(guò)上述的流程可以看到溪厘,實(shí)際上,在服務(wù)實(shí)例列表選擇的過(guò)程中牌柄,有兩次過(guò)濾的機(jī)會(huì):第一次是首先通過(guò)ServerListFilter過(guò)濾器畸悬,另外一次是用過(guò)IRule 的選擇規(guī)則進(jìn)行過(guò)濾

通過(guò)ServerListFilter進(jìn)行服務(wù)實(shí)例過(guò)濾的策略上面已經(jīng)介紹得比較詳細(xì)了,接下來(lái)將介紹Rule是如何從一堆服務(wù)列表中選擇服務(wù)的。
在介紹Rule之前珊佣,需要介紹一個(gè)概念:Server統(tǒng)計(jì)信息

當(dāng)LoadBalancer在選擇合適的Server提供給應(yīng)用后蹋宦,應(yīng)用會(huì)向該Server發(fā)送服務(wù)請(qǐng)求,則在請(qǐng)求的過(guò)程中咒锻,應(yīng)用會(huì)根據(jù)請(qǐng)求的相應(yīng)時(shí)間或者網(wǎng)絡(luò)連接情況等進(jìn)行統(tǒng)計(jì)冷冗;當(dāng)應(yīng)用后續(xù)從LoadBalancer選擇合適的Server時(shí),LoadBalancer 會(huì)根據(jù)每個(gè)服務(wù)的統(tǒng)計(jì)信息惑艇,結(jié)合Rule來(lái)判定哪個(gè)服務(wù)是最合適的贾惦。

3.4.3.1 負(fù)載均衡器LoaderBalancer 都統(tǒng)計(jì)了哪些關(guān)于服務(wù)實(shí)例Server相關(guān)的信息?
ServerStats 說(shuō)明 類(lèi)型 默認(rèn)值
zone 當(dāng)前服務(wù)所屬的可用區(qū) 配置 可通過(guò) eureka.instance.meta.zone 指定
totalRequests 總請(qǐng)求數(shù)量敦捧,client每次調(diào)用须板,數(shù)量會(huì)遞增 實(shí)時(shí) 0
activeRequestsCountTimeout 活動(dòng)請(qǐng)求計(jì)數(shù)時(shí)間窗niws.loadbalancer.serverStats.activeRequestsCount.effectiveWindowSeconds,如果時(shí)間窗范圍之內(nèi)沒(méi)有activeRequestsCount值的改變,則activeRequestsCounts初始化為0 配置 60*10(seconds)
successiveConnectionFailureCount 連續(xù)連接失敗計(jì)數(shù) 實(shí)時(shí)
connectionFailureThreshold 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置 配置 3
circuitTrippedTimeoutFactor 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds 配置 10(seconds)
maxCircuitTrippedTimeout 最大斷路器超時(shí)秒數(shù)兢卵,niws.loadbalancer.default.circuitTripMaxTimeoutSeconds 配置 30(seconds)
totalCircuitBreakerBlackOutPeriod 累計(jì)斷路器終端時(shí)間區(qū)間 實(shí)時(shí) milliseconds
lastAccessedTimestamp 最后連接時(shí)間 實(shí)時(shí)
lastConnectionFailedTimestamp 最后連接失敗時(shí)間 實(shí)時(shí)
firstConnectionTimestamp 首次連接時(shí)間 實(shí)時(shí)
activeRequestsCount 當(dāng)前活躍的連接數(shù) 實(shí)時(shí)
failureCountSlidingWindowInterval 失敗次數(shù)統(tǒng)計(jì)時(shí)間窗 配置 1000(ms)
serverFailureCounts 當(dāng)前時(shí)間窗內(nèi)連接失敗的數(shù)量 實(shí)時(shí)
responseTimeDist.mean 請(qǐng)求平均響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.max 請(qǐng)求最大響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.minimum 請(qǐng)求最小響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.minimum 請(qǐng)求最小響應(yīng)時(shí)間 實(shí)時(shí) (ms)
responseTimeDist.stddev 請(qǐng)求響應(yīng)時(shí)間標(biāo)準(zhǔn)差 實(shí)時(shí) (ms)
dataDist.sampleSize QoS服務(wù)質(zhì)量采集點(diǎn)大小 實(shí)時(shí)
dataDist.timestamp QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn) 實(shí)時(shí)
dataDist.timestampMillis QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn)毫秒數(shù)习瑰,自1970.1.1開(kāi)始 實(shí)時(shí)
dataDist.mean QoS 最近的時(shí)間窗內(nèi)的請(qǐng)求平均響應(yīng)時(shí)間 實(shí)時(shí)
dataDist.10thPercentile QoS 10% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.25thPercentile QoS 25% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.50thPercentile QoS 50% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.75thPercentile QoS 75% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.95thPercentile QoS 95% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.99thPercentile QoS 99% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
dataDist.99.5thPercentile QoS 前99.5% 處理請(qǐng)求的時(shí)間 實(shí)時(shí) ms
3.4.3.2 服務(wù)斷路器的工作原理

當(dāng)有某個(gè)服務(wù)存在多個(gè)實(shí)例時(shí),在請(qǐng)求的過(guò)程中秽荤,負(fù)載均衡器會(huì)統(tǒng)計(jì)每次請(qǐng)求的情況(請(qǐng)求相應(yīng)時(shí)間甜奄,是否發(fā)生網(wǎng)絡(luò)異常等),當(dāng)出現(xiàn)了請(qǐng)求出現(xiàn)累計(jì)重試時(shí)窃款,負(fù)載均衡器會(huì)標(biāo)識(shí)當(dāng)前服務(wù)實(shí)例课兄,設(shè)置當(dāng)前服務(wù)實(shí)例的斷路的時(shí)間區(qū)間,在此區(qū)間內(nèi)晨继,當(dāng)請(qǐng)求過(guò)來(lái)時(shí)烟阐,負(fù)載均衡器會(huì)將此服務(wù)實(shí)例可用服務(wù)實(shí)例列表中暫時(shí)剔除,優(yōu)先選擇其他服務(wù)實(shí)例紊扬。

相關(guān)統(tǒng)計(jì)信息如下

ServerStats 說(shuō)明 類(lèi)型 默認(rèn)值
successiveConnectionFailureCount 連續(xù)連接失敗計(jì)數(shù) 實(shí)時(shí)
connectionFailureThreshold 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置蜒茄,當(dāng)successiveConnectionFailureCount 超過(guò)了此限制時(shí),將計(jì)算熔斷時(shí)間 配置 3
circuitTrippedTimeoutFactor 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds 配置 10(seconds)
maxCircuitTrippedTimeout 最大斷路器超時(shí)秒數(shù)餐屎,niws.loadbalancer.default.circuitTripMaxTimeoutSeconds 配置 30(seconds)
totalCircuitBreakerBlackOutPeriod 累計(jì)斷路器終端時(shí)間區(qū)間 實(shí)時(shí) milliseconds
lastAccessedTimestamp 最后連接時(shí)間 實(shí)時(shí)
lastConnectionFailedTimestamp 最后連接失敗時(shí)間 實(shí)時(shí)
firstConnectionTimestamp 首次連接時(shí)間 實(shí)時(shí)
@Monitor(name="CircuitBreakerTripped", type = DataSourceType.INFORMATIONAL)    
    public boolean isCircuitBreakerTripped() {
        return isCircuitBreakerTripped(System.currentTimeMillis());
    }
    
    public boolean isCircuitBreakerTripped(long currentTime) {
        //斷路器熔斷的時(shí)間戳
        long circuitBreakerTimeout = getCircuitBreakerTimeout();
        if (circuitBreakerTimeout <= 0) {
            return false;
        }
        return circuitBreakerTimeout > currentTime;//還在熔斷區(qū)間內(nèi)檀葛,則返回熔斷結(jié)果
    }
   //獲取熔斷超時(shí)時(shí)間
   private long getCircuitBreakerTimeout() {
        long blackOutPeriod = getCircuitBreakerBlackoutPeriod();
        if (blackOutPeriod <= 0) {
            return 0;
        }
        return lastConnectionFailedTimestamp + blackOutPeriod;
    }
    //返回中斷毫秒數(shù)
    private long getCircuitBreakerBlackoutPeriod() {
        int failureCount = successiveConnectionFailureCount.get();
        int threshold = connectionFailureThreshold.get();
       // 連續(xù)失敗,但是尚未超過(guò)上限腹缩,則服務(wù)中斷周期為 0 屿聋,表示可用 
       if (failureCount < threshold) {
            return 0;
        }
        //當(dāng)鏈接失敗超過(guò)閾值時(shí)空扎,將進(jìn)行熔斷,熔斷的時(shí)間間隔為:
        int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);
        int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
        if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
            blackOutSeconds = maxCircuitTrippedTimeout.get();
        }
        return blackOutSeconds * 1000L;
    }

熔斷時(shí)間的計(jì)算

  1. 計(jì)算累計(jì)連接失敗計(jì)數(shù)successiveConnectionFailureCount 是否超過(guò) 鏈接失敗閾值connectionFailureThreshold润讥。如果 successiveConnectionFailureCount < connectionFailureThreshold,即尚未超過(guò)限額转锈,則熔斷時(shí)間為 0 ;反之象对,如果超過(guò)限額黑忱,則進(jìn)行步驟2的計(jì)算;
  2. 計(jì)算失敗基數(shù)勒魔,最大不得超過(guò) 16:diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold)
  3. 根據(jù)超時(shí)因子circuitTrippedTimeoutFactor計(jì)算超時(shí)時(shí)間: int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
  4. 超時(shí)時(shí)間不得超過(guò)最大超時(shí)時(shí)間`maxCircuitTrippedTimeout 上線甫煞,

當(dāng)有鏈接失敗情況出現(xiàn)斷路邏輯時(shí),將會(huì)最多:1<<16 * 10 =320s冠绢、最少1<<1 * 10 =100s 的請(qǐng)求熔斷時(shí)間抚吠,再此期間內(nèi),此Server將會(huì)被忽略弟胀。
即:
熔斷時(shí)間最大值:1<<16 * 10 =320s
熔斷時(shí)間最小值:1<<1 * 10 =100s

熔斷統(tǒng)計(jì)何時(shí)清空楷力?

熔斷的統(tǒng)計(jì)有自己的清除策略,當(dāng)如下幾種場(chǎng)景存在時(shí)孵户,熔斷統(tǒng)計(jì)會(huì)清空歸零:

  1. 當(dāng)請(qǐng)求時(shí)萧朝,發(fā)生的異常不是斷路攔截類(lèi)的異常(Exception)時(shí)(至于如何節(jié)點(diǎn)是否是斷路攔截類(lèi)異常,可以自定義)
  2. 當(dāng)請(qǐng)求未發(fā)生異常夏哭,切且有結(jié)果返回時(shí)
3.4.3.3 定義IRule检柬,從服務(wù)實(shí)例列表中,選擇最合適的Server實(shí)例
image.png

由上圖可見(jiàn)竖配,IRule會(huì)從服務(wù)列表中何址,根據(jù)自身定義的規(guī)則,返回最合適的Server實(shí)例进胯,其接口定義如下:

public interface IRule{
    /*
     * choose one alive server from lb.allServers or
     * lb.upServers according to key
     * 
     * @return choosen Server object. NULL is returned if none
     *  server is available 
     */

    public Server choose(Object key);
    
    public void setLoadBalancer(ILoadBalancer lb);
    
    public ILoadBalancer getLoadBalancer();    
}

Ribbon定義了一些常見(jiàn)的規(guī)則

實(shí)現(xiàn) 描述 備注
RoundRobinRule 通過(guò)輪詢(xún)的方式用爪,選擇過(guò)程會(huì)有最多10次的重試機(jī)制
RandomRule 隨機(jī)方式,從列表中隨機(jī)挑選一個(gè)服務(wù)
ZoneAvoidanceRule 基于ZoneAvoidancePredicate斷言和AvailabilityPredicate斷言的規(guī)則胁镐。ZoneAvoidancePredicate計(jì)算出哪個(gè)Zone的服務(wù)最差偎血,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉;而AvaliabitiyPredicate是過(guò)濾掉正處于熔斷狀態(tài)的服務(wù)希停;上述兩個(gè)斷言過(guò)濾出來(lái)的結(jié)果后烁巫,再通過(guò)RoundRobin輪詢(xún)的方式從列表中挑選一個(gè)服務(wù)
BestAvailableRule 最優(yōu)匹配規(guī)則:從服務(wù)列表中給挑選出并發(fā)數(shù)最少的Server
RetryRule 采用了裝飾模式,為Rule提供了重試機(jī)制
WeightedResponseTimeRule 基于請(qǐng)求響應(yīng)時(shí)間加權(quán)計(jì)算的規(guī)則宠能,如果此規(guī)則沒(méi)有生效,將采用 RoundRobinRule的的策略選擇服務(wù)實(shí)例
image.png
3.4.3.3.1 RoundRobinRule 的實(shí)現(xiàn)
public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;

    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

    public RoundRobinRule() {
        nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        setLoadBalancer(lb);
    }

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        //10次重試機(jī)制
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
            // 生成輪詢(xún)數(shù)據(jù)
            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

    /**
     * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
     *
     * @param modulo The modulo to bound the value of the counter.
     * @return The next value.
     */
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }
}
3.4.3.3.2 ZoneAvoidanceRule的實(shí)現(xiàn)

ZoneAvoidanceRule的處理思路:

  1. ZoneAvoidancePredicate 計(jì)算出哪個(gè)Zone的服務(wù)最差磁餐,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉违崇;
  2. AvailabilityPredicate 將處于熔斷狀態(tài)的服務(wù)剔除掉阿弃;
  3. 將上述兩步驟過(guò)濾后的服務(wù)通過(guò)RoundRobinRule挑選一個(gè)服務(wù)實(shí)例返回

ZoneAvoidancePredicate 剔除最差的Zone的過(guò)程:

public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                   //如果負(fù)載超過(guò)限額,則將用可用區(qū)中剔除出去
                  if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    //計(jì)算最差的Zone區(qū)域
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        // they are the same considering double calculation
                        // round error
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }
        // 如果最大負(fù)載沒(méi)有超過(guò)上限羞延,則返回所有可用分區(qū)
        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        // 從最差的可用分區(qū)中隨機(jī)挑選一個(gè)剔除渣淳,這么做是保證服務(wù)的高可用
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

四. Ribbon的配置參數(shù)

控制參數(shù) 說(shuō)明 默認(rèn)值
<service-name>.ribbon.NFLoadBalancerPingInterval Ping定時(shí)任務(wù)周期 30 s
<service-name>.ribbon.NFLoadBalancerMaxTotalPingTime Ping超時(shí)時(shí)間 2s
<service-name>.ribbon.NFLoadBalancerRuleClassName IRule實(shí)現(xiàn)類(lèi) RoundRobinRule,基于輪詢(xún)調(diào)度算法規(guī)則選擇服務(wù)實(shí)例
<service-name>.ribbon.NFLoadBalancerPingClassName IPing實(shí)現(xiàn)類(lèi) DummyPing,直接返回true
<service-name>.ribbon.NFLoadBalancerClassName 負(fù)載均衡器實(shí)現(xiàn)類(lèi) 2s
<service-name>.ribbon.NIWSServerListClassName ServerList實(shí)現(xiàn)類(lèi) ConfigurationBasedServerList,基于配置的服務(wù)列表
<service-name>.ribbon.ServerListUpdaterClassName 服務(wù)列表更新類(lèi) PollingServerListUpdater伴箩,
<service-name>.ribbon.NIWSServerListFilterClassName 服務(wù)實(shí)例過(guò)濾器 2s
<service-name>.ribbon.ServerListRefreshInterval 服務(wù)列表刷新頻率 2s
<service-name>.ribbon.NFLoadBalancerClassName 自定義負(fù)載均衡器實(shí)現(xiàn)類(lèi) 2s
<service-name>.ribbon.NFLoadBalancerClassName 自定義負(fù)載均衡器實(shí)現(xiàn)類(lèi) 2s
<service-name>.ribbon.NFLoadBalancerClassName 自定義負(fù)載均衡器實(shí)現(xiàn)類(lèi) 2s

五. 結(jié)語(yǔ)

Ribbon是Spring Cloud框架中相當(dāng)核心的模塊入愧,負(fù)責(zé)著服務(wù)負(fù)載調(diào)用,Ribbon也可以脫離SpringCloud單獨(dú)使用嗤谚。
另外Ribbon是客戶(hù)端的負(fù)載均衡框架棺蛛,即每個(gè)客戶(hù)端上,獨(dú)立維護(hù)著自身的調(diào)用信息統(tǒng)計(jì)巩步,相互隔離旁赊;也就是說(shuō):Ribbon的負(fù)載均衡表現(xiàn)在各個(gè)機(jī)器上變現(xiàn)并不完全一致
Ribbon 也是整個(gè)組件框架中最復(fù)雜的一環(huán),控制流程上為了保證服務(wù)的高可用性椅野,有很多比較細(xì)節(jié)的參數(shù)控制终畅,在使用的過(guò)程中,需要深入理清每個(gè)環(huán)節(jié)的處理機(jī)制竟闪,這樣在問(wèn)題定位上會(huì)高效很多离福。


亦山札記,聚焦微服務(wù)炼蛤、中間件
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末妖爷,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鲸湃,更是在濱河造成了極大的恐慌赠涮,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件暗挑,死亡現(xiàn)場(chǎng)離奇詭異笋除,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)炸裆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)垃它,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人烹看,你說(shuō)我怎么就攤上這事国拇。” “怎么了惯殊?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵酱吝,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我土思,道長(zhǎng)务热,這世上最難降的妖魔是什么忆嗜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮崎岂,結(jié)果婚禮上捆毫,老公的妹妹穿的比我還像新娘。我一直安慰自己冲甘,他們只是感情好绩卤,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著江醇,像睡著了一般濒憋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嫁审,一...
    開(kāi)封第一講書(shū)人閱讀 49,046評(píng)論 1 285
  • 那天跋炕,我揣著相機(jī)與錄音,去河邊找鬼律适。 笑死辐烂,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的捂贿。 我是一名探鬼主播纠修,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼厂僧!你這毒婦竟也來(lái)了扣草?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤颜屠,失蹤者是張志新(化名)和其女友劉穎辰妙,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體甫窟,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡密浑,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了粗井。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片尔破。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖浇衬,靈堂內(nèi)的尸體忽然破棺而出懒构,到底是詐尸還是另有隱情,我是刑警寧澤耘擂,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布胆剧,位于F島的核電站,受9級(jí)特大地震影響醉冤,放射性物質(zhì)發(fā)生泄漏赞赖。R本人自食惡果不足惜滚朵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一冤灾、第九天 我趴在偏房一處隱蔽的房頂上張望前域。 院中可真熱鬧,春花似錦韵吨、人聲如沸匿垄。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)椿疗。三九已至,卻和暖如春糠悼,著一層夾襖步出監(jiān)牢的瞬間届榄,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工倔喂, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留铝条,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓席噩,卻偏偏與公主長(zhǎng)得像班缰,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子悼枢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容