thrift+zk實(shí)現(xiàn)服務(wù)的注冊(cè)施禾,發(fā)現(xiàn)脚线,調(diào)用

本文基于實(shí)際生產(chǎn)環(huán)境中的Thrift+zookeeper實(shí)現(xiàn)的rpc調(diào)用總結(jié),大致有以下幾個(gè)部分:
1: 服務(wù)端將服務(wù)注冊(cè)在zk中
1.1 解析服務(wù)端的網(wǎng)卡IP弥搞;
1.2 獲取zookeeper客戶端對(duì)象邮绿;
1.3 實(shí)現(xiàn)服務(wù)接口的注冊(cè);
2: 基于zookeeper實(shí)現(xiàn)服務(wù)接口的自動(dòng)發(fā)現(xiàn)
3: 實(shí)現(xiàn)客戶端連接池和客戶端通過(guò)代理調(diào)用服務(wù)

一 服務(wù)端將服務(wù)注冊(cè)在zk中

調(diào)用圖:
服務(wù)注冊(cè)流程圖.png

代碼展示

1.1解析thrift-server端IP地址,用于注冊(cè)服務(wù)

接口
public interface ThriftServerIpResolve {
    // 獲取服務(wù)所在機(jī)器的Ip
    String getServerIp() throws Exception;

    String getServerIp(boolean publicIpOnly) throws Exception;
    
    void reset();
    
    //當(dāng)IP變更時(shí),將會(huì)調(diào)用reset方法
    static interface IpRestCalllBack{
        public void rest(String newIp);
    }
}

實(shí)現(xiàn)
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
    
    private Logger logger = LoggerFactory.getLogger(getClass());

    //緩存
    private String serverIp;
    
    public void setServerIp(String serverIp) {
        this.serverIp = serverIp;
    }

    @Override
    public String getServerIp() {
        return getServerIp(false);
    }

    @Override
    public String getServerIp(boolean publicIpOnly) {
        if (serverIp != null) {
            return serverIp;
        }
        // 一個(gè)主機(jī)有多個(gè)網(wǎng)絡(luò)接口
        try {
            Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
            while (netInterfaces.hasMoreElements()) {
                NetworkInterface netInterface = netInterfaces.nextElement();
                // 每個(gè)網(wǎng)絡(luò)接口,都會(huì)有多個(gè)"網(wǎng)絡(luò)地址",比如一定會(huì)有l(wèi)ookback地址,會(huì)有siteLocal地址等.以及IPV4或者IPV6 .
                Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress address = addresses.nextElement();
                    if(address instanceof Inet6Address){
                        continue;
                    }
                    if (!address.isLoopbackAddress()) {
                        if (publicIpOnly && !address.isSiteLocalAddress()) {
                            serverIp = address.getHostAddress();
                            logger.info("resolve server ip :" + serverIp);
                            continue;
                        } else if (!publicIpOnly && address.isSiteLocalAddress()) {
                            serverIp = address.getHostAddress();
                            logger.info("resolve server ip :" + serverIp);
                            continue;
                        }
                    }
                }
            }
        } catch (SocketException e) {
            e.printStackTrace();
        }
        return serverIp;
    }

    @Override
    public void reset() {
        serverIp = null;
    }
}

1.2 獲取zookeeper客戶端鏈接對(duì)象

public class ZookeeperFactory implements FactoryBean<CuratorFramework> ,Closeable{

    private String zkHosts;
    // session超時(shí)
    private int sessionTimeout = 30000;
    private int connectionTimeout = 30000;

    // 共享一個(gè)zk鏈接
    private boolean singleton = true;

    // 全局path前綴,常用來(lái)區(qū)分不同的應(yīng)用
    private String namespace;

    private final static String ROOT = "rpc";

    private CuratorFramework zkClient;

    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public void setSingleton(boolean singleton) {
        this.singleton = singleton;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    public void setZkClient(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }

    @Override
    public CuratorFramework getObject() throws Exception {
        if (singleton) {
            if (zkClient == null) {
                zkClient = create();
                zkClient.start();
            }
            return zkClient;
        }
        return create();
    }

    @Override
    public Class<?> getObjectType() {
        return CuratorFramework.class;
    }

    @Override
    public boolean isSingleton() {
        return singleton;
    }

    public CuratorFramework create() throws Exception {
        if (StringUtils.isEmpty(namespace)) {
            namespace = ROOT;
        } else {
            namespace = ROOT +"/"+ namespace;
        }
        return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
    }

    // 使用CuratorFramework創(chuàng)建zk客戶端對(duì)象
    /**
     *  connectString zk集群的地址攀例,包括ip和端口
     *  sessionTimeout 
     *  connectionTimeout
     *  namespace 不同的應(yīng)用可以使用不同的命名空間區(qū)分
     *  ExponentialBackoffRetry表示重試機(jī)制船逮,重連的時(shí)間間隔隨著重
     *  試的次數(shù)遞增的,如果時(shí)間間隔計(jì)算出來(lái)大于默認(rèn)的最大sleep時(shí) 
     *  間的話粤铭,則取最大sleep時(shí)間挖胃。ExponentialBackoffRetry 除了時(shí)間 
     *  的限制以外,還有最大重連次數(shù)的限制梆惯。而 
     *  ExponentialBackoffRetry策略只是讓用戶設(shè)置最大sleep時(shí)間而 
     *  已酱鸭。默認(rèn)的最大時(shí)間是Integer.MAX_VALUE毫秒。 
     **/
    public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
        return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
                .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
                .defaultData(null).build();
    }

    public void close() {
        if (zkClient != null) {
            zkClient.close();
        }
    }
}

1.3 將服務(wù)接口注冊(cè)到zookeeper中
1.3.1 服務(wù)端注冊(cè)工廠調(diào)用注冊(cè)服務(wù)的方法加袋,異步啟動(dòng)服務(wù)

/**
 * 服務(wù)端注冊(cè)服務(wù)工廠
 */
public class ThriftServiceServerFactory implements InitializingBean ,Closeable{
    // 服務(wù)注冊(cè)本機(jī)端口
    private Integer port = 8299;
    // 優(yōu)先級(jí)
    private Integer weight = 1;// default
    // 服務(wù)實(shí)現(xiàn)類
    private Object service;// serice實(shí)現(xiàn)類
    //服務(wù)版本號(hào)
    private String version;
    // 是否只取公網(wǎng)ip凛辣,如果是true抱既,zk中只注冊(cè)公網(wǎng)ip职烧;如果是false,zk中只注冊(cè)私網(wǎng)ip
    private boolean publicIpOnly = false;
    // 解析本機(jī)IP
    private ThriftServerIpResolve thriftServerIpResolve;
    //服務(wù)注冊(cè)
    private ThriftServerAddressRegister thriftServerAddressRegister;

    private ServerThread serverThread;

    private boolean zkUse = true;
    
    public void setPort(Integer port) {
        this.port = port;
    }

    public void setWeight(Integer weight) {
        this.weight = weight;
    }

    public void setService(Object service) {
        this.service = service;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public void setZkUse(boolean zkUse) {
        this.zkUse = zkUse;
    }

    public void setPublicIpOnly(boolean publicIpOnly) {
        this.publicIpOnly = publicIpOnly;
    }

    public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
        this.thriftServerIpResolve = thriftServerIpResolve;
    }

    public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
        this.thriftServerAddressRegister = thriftServerAddressRegister;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (thriftServerIpResolve == null) {
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
        }
        String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
        if (StringUtils.isEmpty(serverIP)) {
            throw new ThriftException("cant find server ip...");
        }

        String hostname = serverIP + ":" + port + ":" + weight;
        Class<?> serviceClass = service.getClass();
        // 獲取實(shí)現(xiàn)類接口
        Class<?>[] interfaces = serviceClass.getInterfaces();
        if (interfaces.length == 0) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        // reflect,load "Processor";
        TProcessor processor = null;
        String serviceName = null;

        for (Class<?> clazz : interfaces) {
            String cname = clazz.getSimpleName();
            if (!cname.equals("Iface")) {
                continue;
            }
            serviceName = clazz.getEnclosingClass().getName();
            String pname = serviceName + "$Processor";
            try {
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                Class<?> pclass = classLoader.loadClass(pname);
                if (!TProcessor.class.isAssignableFrom(pclass)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(clazz);
                processor = (TProcessor) constructor.newInstance(service);
                break;
            } catch (Exception e) {
                //
            }
        }
        if (processor == null) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        //需要單獨(dú)的線程,因?yàn)閟erve方法是阻塞的.
        serverThread = new ServerThread(processor, port);
        serverThread.start();
        // 注冊(cè)服務(wù)
        if (zkUse && thriftServerAddressRegister != null) {
            thriftServerAddressRegister.register(serviceName, version, hostname);
        }

    }
    class ServerThread extends Thread {
        private TServer server;
        ServerThread(TProcessor processor, int port) throws Exception {
--------------------- 
        /** TThreadedSelectorServer模式是thrift-server最高級(jí)的工作模 
       式:主要有以下幾個(gè)不分組成
      TThreadedSelectorServer模式是目前Thrift提供的最高級(jí)的模式, 
      它內(nèi)部有如果幾個(gè)部分構(gòu)成:

     (1)  一個(gè)AcceptThread線程對(duì)象蚀之,專門用于處理監(jiān)聽socket上的新連接蝗敢;

     (2)  若干個(gè)SelectorThread對(duì)象專門用于處理業(yè)務(wù)socket的網(wǎng)絡(luò)I/O操作,所有網(wǎng)絡(luò)數(shù)據(jù)的讀寫均是有這些線程來(lái)完成足删;

     (3)  一個(gè)負(fù)載均衡器SelectorThreadLoadBalancer對(duì)象寿谴,主要用于AcceptThread線程接收到一個(gè)新socket連接請(qǐng)求時(shí),決定將這個(gè)新連接請(qǐng)求分配給哪個(gè)SelectorThread線程失受。

     (4)  一個(gè)ExecutorService類型的工作線程池讶泰,在SelectorThread線程中,監(jiān)聽到有業(yè)務(wù)socket中有調(diào)用請(qǐng)求過(guò)來(lái)拂到,則將請(qǐng)求讀取之后痪署,交個(gè)ExecutorService線程池中的線程完成此次調(diào)用的具體執(zhí)行;
   --------------------- 
  **/
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);  
            TProcessorFactory processorFactory = new TProcessorFactory(processor);
            tArgs.processorFactory(processorFactory);
            tArgs.transportFactory(new TFramedTransport.Factory());  
            tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
            tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
            tArgs.selectorThreads(4); // 設(shè)置selector線程數(shù)兄旬,默認(rèn)是2
            tArgs.workerThreads(32); // 設(shè)置工作線程數(shù)狼犯,默認(rèn)是5,在數(shù)據(jù)庫(kù)負(fù)載高時(shí)有可能會(huì)堵塞
            server = new TThreadedSelectorServer(tArgs);
        }

        @Override
        public void run(){
            try{
                //啟動(dòng)服務(wù)
                server.serve();
            }catch(Exception e){
                //
            }
        }
        
        public void stopServer(){
            server.stop();
        }
    }
    
    public void close() {
        serverThread.stopServer();
    }
}

1.3.1 真正的注冊(cè)實(shí)現(xiàn)
第一步:監(jiān)聽zk連接變化领铐;
第二步:將服務(wù)暴露的ip和port以虛擬節(jié)點(diǎn)的形式創(chuàng)建在zk的接口服務(wù)路徑下悯森,切換到指定的組;
第三步:注冊(cè)成功后使用NodeCache監(jiān)聽節(jié)點(diǎn)內(nèi)容變化绪撵,如果原本節(jié)點(diǎn)不存在瓢姻,那么Cache就會(huì)在節(jié)點(diǎn)被創(chuàng)建時(shí)觸發(fā)監(jiān)聽事件,如果該節(jié)點(diǎn)被刪除莲兢,就無(wú)法再觸發(fā)監(jiān)聽事件汹来。任意節(jié)點(diǎn)的內(nèi)容變化都會(huì)重新注冊(cè)。

// 啟動(dòng)服務(wù)
@Override
    public void afterPropertiesSet() throws Exception {
        if (thriftServerIpResolve == null) {
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
        }
        String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
        if (StringUtils.isEmpty(serverIP)) {
            throw new ThriftException("cant find server ip...");
        }

        String hostname = serverIP + ":" + port + ":" + weight;
        Class<?> serviceClass = service.getClass();
        // 獲取實(shí)現(xiàn)類接口
        Class<?>[] interfaces = serviceClass.getInterfaces();
        if (interfaces.length == 0) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        // reflect,load "Processor";
        TProcessor processor = null;
        String serviceName = null;

        for (Class<?> clazz : interfaces) {
            String cname = clazz.getSimpleName();
            if (!cname.equals("Iface")) {
                continue;
            }
            serviceName = clazz.getEnclosingClass().getName();
            String pname = serviceName + "$Processor";
            try {
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                Class<?> pclass = classLoader.loadClass(pname);
                if (!TProcessor.class.isAssignableFrom(pclass)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(clazz);
                processor = (TProcessor) constructor.newInstance(service);
                break;
            } catch (Exception e) {
                //
            }
        }
        if (processor == null) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        //需要單獨(dú)的線程,因?yàn)閟erve方法是阻塞的.
        serverThread = new ServerThread(processor, port);
        serverThread.start();
        // 注冊(cè)服務(wù)
        if (zkUse && thriftServerAddressRegister != null) {
            thriftServerAddressRegister.register(serviceName, version, hostname);
        }

    }
    class ServerThread extends Thread {
        private TServer server;
        ServerThread(TProcessor processor, int port) throws Exception {
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);  
            TProcessorFactory processorFactory = new TProcessorFactory(processor);
            tArgs.processorFactory(processorFactory);
            tArgs.transportFactory(new TFramedTransport.Factory());  
            tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
            tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
            tArgs.selectorThreads(4); // 設(shè)置selector線程數(shù)改艇,默認(rèn)是2
            tArgs.workerThreads(32); // 設(shè)置工作線程數(shù)收班,默認(rèn)是5,在數(shù)據(jù)庫(kù)負(fù)載高時(shí)有可能會(huì)堵塞
            server = new TThreadedSelectorServer(tArgs);
        }

        @Override
        public void run(){
            try{
                //啟動(dòng)服務(wù)
                server.serve();
            }catch(Exception e){
                //
            }
        }
        
        public void stopServer(){
            server.stop();
        }
    }


// 實(shí)現(xiàn)注冊(cè)和節(jié)點(diǎn)監(jiān)聽
/**
     * 初始化注冊(cè)方法
     * @param service 服務(wù)接口名稱谒兄,一個(gè)產(chǎn)品中不能重復(fù)
     * @param version 服務(wù)接口的版本號(hào)摔桦,默認(rèn)1.0.0
     * @param address 服務(wù)發(fā)布的地址和端口
     */
    @Override
    public void register(String service, String version, String address) throws InterruptedException {
        // 輸入校驗(yàn)
        String[] parts = address.split(":");
        if (parts.length < 3) {
            logger.error("ThriftZookeeper Register Error: address invalid '" + address + "'");
            throw new ThriftException("ThriftZookeeper Register Error: address invalid '" + address + "'");
        }

        // 拆解內(nèi)容
        String ip = parts[0];
        String port = parts[1];
        String weight = parts[2];

        // 增加連接狀態(tài)監(jiān)聽
        setConnectionListener(service, version, ip, port, weight);

        // 注冊(cè)
        generalRegister(service, version, ip, port, weight);

        //監(jiān)聽組別變化后重新注冊(cè)
        setGroupListener(service, version, ip, port, weight);
    }

    public void setConnectionListener(final String service, final String version, final String ip, final String port, final String weight)  {
        // 如果zk尚未啟動(dòng),則啟動(dòng)
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        logger.info("設(shè)置ZK連接狀態(tài)監(jiān)聽的Listener");
        zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                logger.info("ZkClient狀態(tài)變化:" + newState.name());
                if (newState == ConnectionState.LOST) { //處理session過(guò)期
                    logger.info("ZkClient連接斷開,session過(guò)期");

                    int i = 0;
                    while (true) {
                        logger.info("嘗試重新連接到zk..." + (i++));
                        try {
                            if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
                                logger.info("嘗試重新注冊(cè)zk的臨時(shí)節(jié)點(diǎn)...");

                                // 重新注冊(cè)
                                generalRegister(service, version, ip, port, weight);

                                break;
                            }
                        } catch (InterruptedException e) {
                            logger.info("嘗試重新注冊(cè)zk的臨時(shí)節(jié)點(diǎn)異常(InterruptedException)", e);
                            break;
                        } catch (Exception e) {
                            logger.info("嘗試重新注冊(cè)zk的臨時(shí)節(jié)點(diǎn)異常(Exception)", e);
                            break;
                        }
                    }
                }
            }
        });
    }

    /**
     * 監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的變化重新注冊(cè)
     * @param version 版本號(hào)
     * @param weight 服務(wù)權(quán)重信息
     * @throws InterruptedException
     */
    public void setGroupListener(final String service, final String version, final String ip, final String port, final String weight) throws InterruptedException {
        final String ipServicePath = getInstanceGroupPath(localInstance, ip);
        try {
            createGroupNodeIfNotExists(localInstance, ip);
            final NodeCache nodeCache = new NodeCache(zkClient, ipServicePath, false);
            nodeCache.start(true);
            nodeCache.getListenable().addListener(
                    new NodeCacheListener() {
                        public void nodeChanged() throws Exception {
                            synchronized (lock) {
                                logger.info("服務(wù)端監(jiān)聽到節(jié)點(diǎn): " + ipServicePath + " 變化承疲, for service:" + service);
                                generalRegister(service, version, ip, port, weight);
                            }
                        }
                    }
            );
        } catch (Exception e) {
            logger.error("nodeCache start exception:",e);
            throw new ThriftException("nodeCache start exception:", e);
        }
    }


    /**
     * 通用注冊(cè)方法
     * @param service 服務(wù)名
     * @param version 版本號(hào)
     * @param ip 服務(wù)地址邻耕,IP
     * @param port 服務(wù)端口
     * @param weight 權(quán)重信息,格式為1-10的整數(shù)字符串形式燕鸽,例如"5"
     *
     * 獲取當(dāng)前zk中組別配置兄世,如果和本地不同,則刪除zk中舊組別下的注冊(cè)地址啊研,在新組別下注冊(cè)
     */
    public void generalRegister(String service, String version, String ip, String port, String weight) {
        logger.info("開始注冊(cè)ServiceLocate, 服務(wù): " + service);
        // 如果zk尚未啟動(dòng),則啟動(dòng)
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }
        String ipPort = ip + ":" + port;

        // 獲取組別
        String ipInstancePath = getInstanceGroupPath(localInstance, ip);
        String groupJson = null;
        String group = "";
        try {
            createGroupNodeIfNotExists(localInstance, ip);
            groupJson = new String(zkClient.getData().forPath(ipInstancePath));
            GroupConfig groupConfig = JSON.parseObject(groupJson, GroupConfig.class);
            if (groupConfig == null || groupConfig.getGroup() == null) {
                throw new ThriftException("獲取到錯(cuò)誤的分組配置御滩,分組注冊(cè)不會(huì)進(jìn)行鸥拧,請(qǐng)檢查配置內(nèi)容是否正確");
            }
            group = groupConfig.getGroup();
        } catch (Exception e) {
            logger.error("獲取組別失敗, 按默認(rèn)組別執(zhí)行", e);
            group = defaultGroup;
        }
        logger.info("注冊(cè)到group: " + group);

        // 注冊(cè)
        RegisterConfig registerConfig = new RegisterConfig();
        registerConfig.setWeight(weight);
        registerConfig.setGroup(group);
        String groupWeightString = JSON.toJSONString(registerConfig);
        try {
            String serviceLocatePath = getServiceLocatePath(service);

            // 創(chuàng)建當(dāng)前服務(wù)定位存儲(chǔ)節(jié)點(diǎn),結(jié)構(gòu):
            // serviceLocatePath
            //    |-- address1(服務(wù)定位存儲(chǔ)節(jié)點(diǎn)削解,臨時(shí)節(jié)點(diǎn))
            //    |-- address2(服務(wù)定位存儲(chǔ)節(jié)點(diǎn)富弦,臨時(shí)節(jié)點(diǎn))
            if (zkClient.checkExists().forPath(serviceLocatePath + "/" + ipPort) != null) {
                // 已經(jīng)存在則修改Data
                zkClient.setData().forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
            } else {
                // 不存在則創(chuàng)建
                // 使用creatingParentContainersIfNeeded創(chuàng)建服務(wù)定位根目錄(固定)
                zkClient.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
            }

            // 切換當(dāng)前注冊(cè)組別
            currentRegisterGroup = group;
            logger.info("服務(wù): " + service + " 注冊(cè)到group: " + group + " 成功");
        } catch (Exception e) {
            logger.error("zk分組注冊(cè)異常:", e);
            throw new ThriftException("zk分組注冊(cè)異常:", e);
        }
    }

    /**
     * 如果本地服務(wù)在ZK中不存在分組注冊(cè)信息,則創(chuàng)建一個(gè)分組信息節(jié)點(diǎn)
     */
    private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        String serviceGroupPath = getInstanceGroupPath(localInstance, ip);

        GroupConfig groupConfig = new GroupConfig();
        groupConfig.setGroup(defaultGroup);
        groupConfig.setWeight(defaultWeight);

        if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
            logger.info("創(chuàng)建group_config氛驮,localInstance:" + localInstance + "腕柜,ip:" + ip);
            zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
        }
    }

二 基于zk的服務(wù)自動(dòng)發(fā)現(xiàn)


服務(wù)自動(dòng)發(fā)現(xiàn).png

1.使用NodeCache監(jiān)聽zk節(jié)點(diǎn),也就是服務(wù)的變化矫废;
2.使用NodePathChild監(jiān)聽子節(jié)點(diǎn)盏缤,也就是ip:port的變化;

public void afterPropertiesSet() throws Exception {
        logger.info("Provider初始化開始");
        // 本機(jī)IP獲取
        if (thriftServerIpResolve == null) {
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
        }
        String ipAddress = thriftServerIpResolve.getServerIp(publicIpOnly);
        if (StringUtils.isEmpty(ipAddress)) {
            throw new ThriftException("can not find server ip...");
        }
        logger.info("Provider創(chuàng)建ServiceLocate監(jiān)聽器. targetService:" + targetService);
        buildServiceLocateListener();
        cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
        countDownLatch.await();
        logger.info("Provider創(chuàng)建GroupConfig監(jiān)聽器蓖扑,localInstance:" + localInstance);
        buildInstanceGroupListener(ipAddress);
        logger.info("Provider初始化完成");
    }

    private void buildInstanceGroupListener(final String ip) {
        // 如果zk尚未啟動(dòng),則啟動(dòng)
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        final String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
        try {
            // 如果不存在則先創(chuàng)建節(jié)點(diǎn)
            createGroupNodeIfNotExists(localInstance, ip);
            groupNodeCache = new NodeCache(zkClient, serviceGroupPath, false);
            groupNodeCache.getListenable().addListener(
                    new NodeCacheListener() {
                        public void nodeChanged() throws Exception {
                            synchronized (lock) {
                                groupNodeCache.rebuild();
                                createGroupNodeIfNotExists(localInstance, ip);
                                rebuildGroup();
                            }
                        }
                    }
            );
            groupNodeCache.start(true);
            rebuildGroup();
        } catch (Exception e) {
            logger.error("nodeCache start exception:",e);
            throw new ThriftException("nodeCache start exception:", e);
        }
    }

    /**
     * 如果本地服務(wù)在ZK中不存在分組注冊(cè)信息蛾找,則創(chuàng)建一個(gè)分組信息節(jié)點(diǎn)
     */
    private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        String serviceGroupPath = getInstanceGroupPath(localInstance, ip);

        GroupConfig groupConfig = new GroupConfig();
        groupConfig.setGroup(defaultGroup);
        groupConfig.setWeight(defaultWeight);

        if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
            zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
        }
    }

    private void rebuildGroup() throws UnsupportedEncodingException {
        String data = new String(groupNodeCache.getCurrentData().getData(), "utf-8");
        logger.info("客戶端監(jiān)聽到分組變化,當(dāng)前內(nèi)容 " + data + " 進(jìn)行同步");
        GroupConfig groupConfig = JSON.parseObject(data, GroupConfig.class);
        if (groupConfig == null || groupConfig.getGroup() == null) {
            logger.error("分組數(shù)據(jù)錯(cuò)誤赵誓,緩存區(qū)域不會(huì)變更打毛,請(qǐng)查看分組配置區(qū)數(shù)據(jù),localInstance:" + localInstance
                    + "targetService:" + targetService + "俩功,data: " + data);
            return;
        }
        currentGroup = groupConfig.getGroup();
    }

    private void buildServiceLocateListener() throws Exception {
        // 如果zk尚未啟動(dòng),則啟動(dòng)
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        // 服務(wù)地址監(jiān)聽
        // 尋找目標(biāo)分組
        final String serviceLocatePath = getServiceLocatePath(targetService);
        cachedPath = new PathChildrenCache(zkClient, serviceLocatePath, true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                PathChildrenCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        logger.info("Connection is reconection.");
                        break;
                    case CONNECTION_SUSPENDED:
                        logger.info("Connection is suspended.");
                        break;
                    case CONNECTION_LOST:
                        logger.warn("Connection error,waiting...");
                        // 這個(gè)return很講究幻枉,ZK掛掉后,目的是當(dāng)ZK掛掉后诡蜓,不影響本地緩存
                        return;
                    case INITIALIZED:
                        //  countDownLatch.countDown();
                        logger.warn("Connection init ...");
                        break;
                    default:
                }
                // 任何節(jié)點(diǎn)的數(shù)據(jù)變動(dòng),都會(huì)rebuild,此處為一個(gè)"簡(jiǎn)單的"做法.
                cachedPath.rebuild();
                synchronized (lock) {
                    rebuild();
                }
                countDownLatch.countDown();
            }
        };

        cachedPath.getListenable().addListener(childrenCacheListener);
    }

    protected void rebuild() {
        logger.info("即將更新本地地址緩存, localService: " + localInstance + ", targetService: " + targetService);
        List<ChildData> children = cachedPath.getCurrentData();
        if (children == null || children.isEmpty()) {
            // 有可能所有的thrift server都與zookeeper斷開了鏈接
            // 但是有可能,thrift client與thrift server之間的網(wǎng)絡(luò)是良好的
            // 因此此處是否需要清空container,是需要多方面考慮的.
            container.clear();
            trace.clear();
            ipPortQueue.clear();
            logger.error("在注冊(cè)服務(wù)區(qū)無(wú)法找到子節(jié)點(diǎn)");
            return;
        }

        String path = null;
        Map<String, List<InetSocketAddress>> currentMap = new HashMap<String, List<InetSocketAddress>>();
        try {
            for (ChildData data : children) {
                path = data.getPath();
                String address = new String(path.getBytes(), "utf-8");
                String[] parts = address.split("/");
                String ipPort = parts[parts.length-1];
                String jsonString = new String(data.getData(), "utf-8");
                RegisterConfig registerConfig = JSON.parseObject(jsonString, RegisterConfig.class);
                if (registerConfig.getWeight() == null) {
                    throw new ThriftException("獲取權(quán)重失敗");
                }
                String weight = registerConfig.getWeight();
                String group = registerConfig.getGroup();

                // 當(dāng)前InetAddress列表
                List<InetSocketAddress> addressList = transfer(weight, ipPort);

                // 添加到容器currentMap
                if (!currentMap.containsKey(group)) {
                    currentMap.put(group, new ArrayList<InetSocketAddress>());
                }
                List<InetSocketAddress> groupList = currentMap.get(group);
                groupList.addAll(addressList);
                currentMap.put(group, groupList);
            }

            trace.clear();
            container.clear();
            ipPortQueue.clear();
            for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
                String group = entry.getKey();
                List<InetSocketAddress> current = entry.getValue();
                Collections.shuffle(current);

                // 先組裝到備份容器
                if (!trace.containsKey(group)) {
                    trace.put(group, new HashSet<InetSocketAddress>());
                }
                Set<InetSocketAddress> traceGroup = trace.get(group);
                traceGroup.addAll(current);

                // 組裝容器
                if (!container.containsKey(group)) {
                    container.put(group, new ArrayList<InetSocketAddress>());
                }
                List<InetSocketAddress> groupContainer = container.get(group);
                groupContainer.addAll(current);

                // 組裝隊(duì)列
                if (!ipPortQueue.containsKey(group)) {
                    ipPortQueue.put(group, new LinkedList<InetSocketAddress>());
                }
                Queue<InetSocketAddress> groupQueue = ipPortQueue.get(group);
                groupQueue.addAll(current);
            }


            logger.info("分組緩存重建完畢");
            for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
                logger.info("group:" + entry.getKey() + ", target:" + entry.getValue());
            }
        } catch (Exception e) {
            logger.error("重建緩存失敗" + e.getMessage());
            throw new ThriftException("重建緩存失敗", e);
        }
    }



    /**
     * 根據(jù)權(quán)重分配初始化"IP:PORT"集合
     * @param weight 權(quán)重字符串熬甫,例如"5"
     * @param ipPort 例如10.0.0.1:9050
     * @return 根據(jù)權(quán)重信息返回類似10.0.0.1:9050集合
     */
    private List<InetSocketAddress> transfer(String weight, String ipPort) {
        List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
        InetAddress ipAddress = null;
        try {
            ipAddress = InetAddress.getByName(ipPort.split(":")[0]);
        } catch (UnknownHostException e) {
            logger.error("獲取IP地址失敗:" + e.getMessage());
        }
        int port = Integer.parseInt(ipPort.split(":")[1]);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(ipAddress, port);
        for (int i = 0; i < Integer.parseInt(weight); i++) {
            result.add(inetSocketAddress);
        }
        return result;
    }


    /**
     * 根據(jù)權(quán)重情況隨機(jī)獲取IP:PORT
     * 取當(dāng)前分組的隊(duì)列
     */
    @Override
    public synchronized InetSocketAddress selector() {
        Queue<InetSocketAddress> currentQueue = ipPortQueue.get(currentGroup);
        List<InetSocketAddress> currentContainer = container.get(currentGroup);
        Set<InetSocketAddress> currentTrace = trace.get(currentGroup);
        if (currentQueue == null || currentQueue.isEmpty()) {
            if (currentContainer != null && !currentContainer.isEmpty()) {
                currentQueue.addAll(currentContainer);
            } else if(currentTrace != null && !currentTrace.isEmpty()) {
                synchronized (lock) {
                    currentContainer.addAll(currentTrace);
                    Collections.shuffle(currentContainer);
                    currentQueue.addAll(currentContainer);
                }
            }
        }

        if (currentQueue == null || currentQueue.size() == 0) {
            logger.error("找不到可用服務(wù),localInstance:" + localInstance + "目標(biāo)服務(wù):" + targetService);
            throw new ThriftException("找不到可用服務(wù)蔓罚,localInstance:" + localInstance + "目標(biāo)服務(wù):" + targetService);
        }
        return currentQueue.poll();
    }

    @Override
    public boolean validateGroup(String group, InetSocketAddress address) {
        // 當(dāng)前trace中是否存在該address若無(wú)則排除
        Set<InetSocketAddress> cTrace = trace.get(currentGroup);
        if (cTrace == null) {
            logger.error("找不到可用服務(wù)椿肩,Trace為空");
            return false;
        }
        return cTrace.contains(address);
    }

    @Override
    public String getGroup() {
        return currentGroup;
    }

    @Override
    public List<InetSocketAddress> findServerAddressList() {
        List<InetSocketAddress> currentContainer = container.get(currentGroup);
        return Collections.unmodifiableList(currentContainer);
    }

    @Override
    public String getService() {
        return targetService;
    }

    @Override
    public String getServiceUrl() {
        return "";
    }

    public void close(){
        try {
            cachedPath.close();
            groupNodeCache.close();
        } catch (IOException e) {
        }
        zkClient.close();
    }

三 客戶端jdk代理實(shí)現(xiàn)客戶端代理調(diào)用遠(yuǎn)程服務(wù),客戶端代理交給GenericObjectPool實(shí)現(xiàn)創(chuàng)建豺谈,銷毀郑象。

@Override
    public void afterPropertiesSet() throws Exception {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        // 加載Iface接口
        objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
        // 加載Client.Factory類
        Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
        TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
             // 實(shí)現(xiàn)客戶端連接池
        ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
        GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
        poolConfig.maxActive = maxActive;
        poolConfig.maxIdle = 1;
        poolConfig.minIdle = 0;
        poolConfig.minEvictableIdleTimeMillis = idleTime;
        poolConfig.timeBetweenEvictionRunsMillis = idleTime * 2L;
        poolConfig.testOnBorrow=true;
        poolConfig.testOnReturn=false;
        poolConfig.testWhileIdle=false;
        pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
                // 創(chuàng)建客戶端代理,實(shí)現(xiàn)遠(yuǎn)程調(diào)用茬末,異常厂榛,超時(shí)重試機(jī)制
        proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 記錄調(diào)用開始時(shí)間t1
                long t1 = System.currentTimeMillis();

                // 調(diào)用遠(yuǎn)程的目標(biāo)方法,重試3次
                Object reObject = null;
                Exception reException = null;
                TServiceClient client = null;
                boolean success = false;
                int MAX_RETRY = 3, retry = 0;
                while (++retry <= MAX_RETRY) {
                    client = pool.borrowObject();
                    boolean flag = true;
                    try {
                        reObject = method.invoke(client, args);
                    } catch (Exception e) {
                        flag = false;
                        reException = e;
                        logger.info("retry:"+e.getMessage(),e);
                    } finally {
                        logger.info("retry time:"+retry);
                        if (flag) {
                            pool.returnObject(client);
                        } else {
                            pool.invalidateObject(client);
                        }
                    }

                    // 執(zhí)行成功丽惭,退出循環(huán)
                    if (flag) {
                        success = true;
                        break;
                    }

                    // 只有超時(shí)類異常進(jìn)行重試
                    boolean needRetry = false;
                    if (reException != null && reException instanceof InvocationTargetException) {
                        Throwable cause1 = reException.getCause();
                        if (cause1 != null && cause1 instanceof TTransportException) {
                            Throwable cause2 = cause1.getCause();
                            if (cause2 != null && (cause2 instanceof SocketTimeoutException || cause2 instanceof ConnectTimeoutException)) {
                                if (method.getName() != null && method.getName().startsWith("get")) {
                                    logger.info("timeout needRetry set true");
                                    needRetry = true;
                                }
                            }
                        }
                    }

                    if (!needRetry)
                        break;
                }

                // 記錄調(diào)用結(jié)束時(shí)間t2
                long t2 = System.currentTimeMillis();
                printLog(client, method, (t2 - t1), success, retry > MAX_RETRY ? MAX_RETRY : retry);

                if (!success) {
                    if (reException instanceof InvocationTargetException)
                        throw ((InvocationTargetException) reException).getTargetException();
                    else
                        throw reException;
                } else {
                    return reObject;
                }
            }
        });
    }

3.1 客戶端連接池的實(shí)現(xiàn)

// 客戶端銷毀
@Override
    public void destroyObject(TServiceClient client) throws Exception {
        if (callback != null) {
            try {
                callback.destroy(client);
            } catch (Exception e) {
                logger.warn("destroyObject:{}", e);
            }
        }
        clientGroupMap.remove(client);
        clientAddressMap.remove(client);
        logger.info("destroyObject:{}", client);
        TTransport pin = client.getInputProtocol().getTransport();
        pin.close();
        TTransport pout = client.getOutputProtocol().getTransport();
        pout.close();
    }
// 客戶端創(chuàng)建
@Override
    public TServiceClient makeObject() throws Exception {
        InetSocketAddress address = serverAddressProvider.selector();
        String group = serverAddressProvider.getGroup();
        if (address == null) {
            new ThriftException("No provider available for remote service");
        }

        TTransport transport;
        TProtocol protocol;
        if (StringUtils.isEmpty(serverAddressProvider.getServiceUrl())) {
            // 如果serviceUrl是空击奶,則采用TFramedTransport,適用于Java的thrift服務(wù)
            // socket超時(shí)30s责掏,connect超時(shí)5s
            TSocket tsocket = new TSocket(address.getHostName(), address.getPort(), 10000, 5000);
            transport = new TFramedTransport(tsocket);
            protocol = new TBinaryProtocol(transport);
        } else {
            // 如果serviceUrl不空柜砾,則采用THttpClient的transport,適用于php的thrift服務(wù)
            String url = "";
            try {
                url = "http://" + address.getHostName() + ":" + address.getPort()
                        + serverAddressProvider.getServiceUrl();
            } catch (NullPointerException e) {
                if (address == null) {
                    logger.error("address is null");
                }
                if (serverAddressProvider == null) {
                    logger.error("serverAddressProvider is null");
                }
                throw e;
            }
            transport = new THttpClient(url);
            protocol = new TBinaryProtocol(transport);
        }

        TServiceClient client = this.clientFactory.getClient(protocol);
        clientGroupMap.put(client, group);
        clientAddressMap.put(client, address);
        transport.open();
        if (callback != null) {
            try {
                callback.make(client);
            } catch (Exception e) {
                logger.warn("makeObject:{}", e);
            }
        }
        return client;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末换衬,一起剝皮案震驚了整個(gè)濱河市痰驱,隨后出現(xiàn)的幾起案子喜爷,更是在濱河造成了極大的恐慌,老刑警劉巖萄唇,帶你破解...
    沈念sama閱讀 218,525評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異术幔,居然都是意外死亡另萤,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門诅挑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)四敞,“玉大人,你說(shuō)我怎么就攤上這事拔妥》尬#” “怎么了?”我有些...
    開封第一講書人閱讀 164,862評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵没龙,是天一觀的道長(zhǎng)铺厨。 經(jīng)常有香客問(wèn)我,道長(zhǎng)硬纤,這世上最難降的妖魔是什么解滓? 我笑而不...
    開封第一講書人閱讀 58,728評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮筝家,結(jié)果婚禮上洼裤,老公的妹妹穿的比我還像新娘。我一直安慰自己溪王,他們只是感情好腮鞍,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,743評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著莹菱,像睡著了一般移国。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上道伟,一...
    開封第一講書人閱讀 51,590評(píng)論 1 305
  • 那天桥狡,我揣著相機(jī)與錄音,去河邊找鬼皱卓。 笑死裹芝,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的娜汁。 我是一名探鬼主播嫂易,決...
    沈念sama閱讀 40,330評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼掐禁!你這毒婦竟也來(lái)了怜械?” 一聲冷哼從身側(cè)響起颅和,我...
    開封第一講書人閱讀 39,244評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎缕允,沒想到半個(gè)月后峡扩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,693評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡障本,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,885評(píng)論 3 336
  • 正文 我和宋清朗相戀三年教届,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驾霜。...
    茶點(diǎn)故事閱讀 40,001評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡案训,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出粪糙,到底是詐尸還是另有隱情强霎,我是刑警寧澤,帶...
    沈念sama閱讀 35,723評(píng)論 5 346
  • 正文 年R本政府宣布蓉冈,位于F島的核電站城舞,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏寞酿。R本人自食惡果不足惜椿争,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,343評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望熟嫩。 院中可真熱鬧秦踪,春花似錦、人聲如沸掸茅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)昧狮。三九已至景馁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間逗鸣,已是汗流浹背合住。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留撒璧,地道東北人透葛。 一個(gè)月前我還...
    沈念sama閱讀 48,191評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像卿樱,于是被迫代替她去往敵國(guó)和親僚害。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,955評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理繁调,服務(wù)發(fā)現(xiàn)萨蚕,斷路器靶草,智...
    卡卡羅2017閱讀 134,657評(píng)論 18 139
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 31,938評(píng)論 2 89
  • 一、Nginx/keepalived/lvs的介紹 1.nginx 1.1.nginx簡(jiǎn)介 Nginx是一個(gè)自由岳遥、...
    Java幫幫閱讀 1,318評(píng)論 0 5
  • ... 一奕翔、相關(guān)概念 中間件:為分布式系統(tǒng)提供協(xié)調(diào)服務(wù)的組件,如專門用于計(jì)算服務(wù)的機(jī)器就是一個(gè)計(jì)算型中間件浩蓉,還有專...
    帥可兒妞閱讀 477評(píng)論 0 0
  • 這是我高中不知天高地厚而寫下的詩(shī)派继,算是給自己的祭奠吧。 掠過(guò)千里長(zhǎng)空妻往,孤雁蕭蕭,風(fēng)輕云淡试和,縱有似錦才華讯泣,太...
    白色麥田閱讀 314評(píng)論 0 3