RPC原理及RPC實(shí)例分析

摘自: https://my.oschina.net/hosee/blog/711632
摘要: 本文主要說(shuō)明RPC的原理硝清,以及通過(guò)Hadoop來(lái)舉例在實(shí)踐中如何實(shí)現(xiàn)RPC预麸,本文主要通過(guò)摘取網(wǎng)上Blog(參見(jiàn)Reference)來(lái)整理RPC原理宿崭。

在學(xué)校期間大家都寫(xiě)過(guò)不少程序,比如寫(xiě)個(gè)hello world服務(wù)類(lèi)晶疼,然后本地調(diào)用下嚼松,如下所示。這些程序的特點(diǎn)是服務(wù)消費(fèi)方和服務(wù)提供方是本地調(diào)用關(guān)系袜炕。

public class Test {
     public static void main(String[] args) {
         HelloWorldService helloWorldService = new HelloWorldServiceImpl();
         helloWorldService.sayHello("test");
     }
}

而一旦踏入公司尤其是大型互聯(lián)網(wǎng)公司就會(huì)發(fā)現(xiàn)本谜,公司的系統(tǒng)都由成千上萬(wàn)大大小小的服務(wù)組成,各服務(wù)部署在不同的機(jī)器上偎窘,由不同的團(tuán)隊(duì)負(fù)責(zé)乌助。

這時(shí)就會(huì)遇到兩個(gè)問(wèn)題:

  1. 要搭建一個(gè)新服務(wù),免不了需要依賴(lài)他人的服務(wù)陌知,而現(xiàn)在他人的服務(wù)都在遠(yuǎn)端眷茁,怎么調(diào)用焚廊?
  2. 其它團(tuán)隊(duì)要使用我們的新服務(wù)文判,我們的服務(wù)該怎么發(fā)布以便他人調(diào)用?下文將對(duì)這兩個(gè)問(wèn)題展開(kāi)探討浪藻。

1 如何調(diào)用他人的遠(yuǎn)程服務(wù)?

由于各服務(wù)部署在不同機(jī)器登刺,服務(wù)間的調(diào)用免不了網(wǎng)絡(luò)通信過(guò)程籽腕,服務(wù)消費(fèi)方每調(diào)用一個(gè)服務(wù)都要寫(xiě)一坨網(wǎng)絡(luò)通信相關(guān)的代碼,不僅復(fù)雜而且極易出錯(cuò)纸俭。

如果有一種方式能讓我們像調(diào)用本地服務(wù)一樣調(diào)用遠(yuǎn)程服務(wù)皇耗,而讓調(diào)用者對(duì)網(wǎng)絡(luò)通信這些細(xì)節(jié)透明,那么將大大提高生產(chǎn)力揍很,比如服務(wù)消費(fèi)方在執(zhí)行helloWorldService.sayHello("test")時(shí)郎楼,實(shí)質(zhì)上調(diào)用的是遠(yuǎn)端的服務(wù)。這種方式其實(shí)就是RPC(Remote Procedure Call Protocol)窒悔,在各大互聯(lián)網(wǎng)公司中被廣泛使用呜袁,如阿里巴巴的hsf、dubbo(開(kāi)源)简珠、Facebook的thrift(開(kāi)源)阶界、Google grpc(開(kāi)源)、Twitter的finagle(開(kāi)源)等聋庵。

要讓網(wǎng)絡(luò)通信細(xì)節(jié)對(duì)使用者透明膘融,我們需要對(duì)通信細(xì)節(jié)進(jìn)行封裝,我們先看下一個(gè)RPC調(diào)用的流程涉及到哪些通信細(xì)節(jié):


  1. 服務(wù)消費(fèi)方(client)調(diào)用以本地調(diào)用方式調(diào)用服務(wù)祭玉;
  2. client stub接收到調(diào)用后負(fù)責(zé)將方法氧映、參數(shù)等組裝成能夠進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw;
  3. client stub找到服務(wù)地址脱货,并將消息發(fā)送到服務(wù)端岛都;
  4. server stub收到消息后進(jìn)行解碼;
  5. server stub根據(jù)解碼結(jié)果調(diào)用本地的服務(wù)蹭劈;
  6. 本地服務(wù)執(zhí)行并將結(jié)果返回給server stub;
  7. server stub將返回結(jié)果打包成消息并發(fā)送至消費(fèi)方线召;
  8. client stub接收到消息铺韧,并進(jìn)行解碼;
  9. 服務(wù)消費(fèi)方得到最終結(jié)果缓淹。

RPC的目標(biāo)就是要2~8這些步驟都封裝起來(lái)哈打,讓用戶(hù)對(duì)這些細(xì)節(jié)透明。

1.1 怎么做到透明化遠(yuǎn)程服務(wù)調(diào)用讯壶?

怎么封裝通信細(xì)節(jié)才能讓用戶(hù)像以本地調(diào)用方式調(diào)用遠(yuǎn)程服務(wù)呢料仗?對(duì)java來(lái)說(shuō)就是使用代理!java代理有兩種方式:

  1. jdk 動(dòng)態(tài)代理
  2. 字節(jié)碼生成

盡管字節(jié)碼生成方式實(shí)現(xiàn)的代理更為強(qiáng)大和高效伏蚊,但代碼維護(hù)不易立轧,大部分公司實(shí)現(xiàn)RPC框架時(shí)還是選擇動(dòng)態(tài)代理方式。

下面簡(jiǎn)單介紹下動(dòng)態(tài)代理怎么實(shí)現(xiàn)我們的需求。我們需要實(shí)現(xiàn)RPCProxyClient代理類(lèi)氛改,代理類(lèi)的invoke方法中封裝了與遠(yuǎn)端服務(wù)通信的細(xì)節(jié)帐萎,消費(fèi)方首先從RPCProxyClient獲得服務(wù)提供方的接口,當(dāng)執(zhí)行helloWorldService.sayHello("test")方法時(shí)就會(huì)調(diào)用invoke方法胜卤。

public class RPCProxyClient implements java.lang.reflect.InvocationHandler{
    private Object obj;

    public RPCProxyClient(Object obj){
        this.obj=obj;
    }

    /**
     * 得到被代理對(duì)象;
     */
    public static Object getProxy(Object obj){
        return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(),
                obj.getClass().getInterfaces(), new RPCProxyClient(obj));
    }

    /**
     * 調(diào)用此方法執(zhí)行
     */
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        //結(jié)果參數(shù);
        Object result = new Object();
        // ...執(zhí)行通信相關(guān)邏輯
        // ...
        return result;
    }
}
public class Test {
     public static void main(String[] args) {
         HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.class);
         helloWorldService.sayHello("test");
     }
 }
1.2 怎么對(duì)消息進(jìn)行編碼和解碼疆导?
1.2.1 確定消息數(shù)據(jù)結(jié)構(gòu)

上節(jié)講了invoke里需要封裝通信細(xì)節(jié)(通信細(xì)節(jié)再后面幾章詳細(xì)探討),而通信的第一步就是要確定客戶(hù)端和服務(wù)端相互通信的消息結(jié)構(gòu)葛躏〕憾危客戶(hù)端的請(qǐng)求消息結(jié)構(gòu)一般需要包括以下內(nèi)容:

1)接口名稱(chēng)

在我們的例子里接口名是“HelloWorldService”,如果不傳舰攒,服務(wù)端就不知道調(diào)用哪個(gè)接口了败富;

2)方法名

一個(gè)接口內(nèi)可能有很多方法,如果不傳方法名服務(wù)端也就不知道調(diào)用哪個(gè)方法芒率;

3)參數(shù)類(lèi)型&參數(shù)值

參數(shù)類(lèi)型有很多囤耳,比如有bool、int偶芍、long充择、double、string匪蟀、map椎麦、list,甚至如struct(class)材彪;以及相應(yīng)的參數(shù)值观挎;

4)超時(shí)時(shí)間

5)requestID,標(biāo)識(shí)唯一請(qǐng)求id段化,在下面一節(jié)會(huì)詳細(xì)描述requestID的用處嘁捷。

同理服務(wù)端返回的消息結(jié)構(gòu)一般包括以下內(nèi)容。

1)返回值

2)狀態(tài)code

3)requestID

1.2.2 序列化

一旦確定了消息的數(shù)據(jù)結(jié)構(gòu)后显熏,下一步就是要考慮序列化與反序列化了雄嚣。

什么是序列化?序列化就是將數(shù)據(jù)結(jié)構(gòu)或?qū)ο筠D(zhuǎn)換成二進(jìn)制串的過(guò)程喘蟆,也就是編碼的過(guò)程缓升。

什么是反序列化?將在序列化過(guò)程中所生成的二進(jìn)制串轉(zhuǎn)換成數(shù)據(jù)結(jié)構(gòu)或者對(duì)象的過(guò)程蕴轨。

為什么需要序列化港谊?轉(zhuǎn)換為二進(jìn)制串后才好進(jìn)行網(wǎng)絡(luò)傳輸嘛!

為什么需要反序列化橙弱?將二進(jìn)制轉(zhuǎn)換為對(duì)象才好進(jìn)行后續(xù)處理歧寺!

現(xiàn)如今序列化的方案越來(lái)越多燥狰,每種序列化方案都有優(yōu)點(diǎn)和缺點(diǎn),它們?cè)谠O(shè)計(jì)之初有自己獨(dú)特的應(yīng)用場(chǎng)景成福,那到底選擇哪種呢碾局?從RPC的角度上看,主要看三點(diǎn):

  1. 通用性奴艾,比如是否能支持Map等復(fù)雜的數(shù)據(jù)結(jié)構(gòu)净当;
  2. 性能,包括時(shí)間復(fù)雜度和空間復(fù)雜度蕴潦,由于RPC框架將會(huì)被公司幾乎所有服務(wù)使用像啼,如果序列化上能節(jié)約一點(diǎn)時(shí)間,對(duì)整個(gè)公司的收益都將非程栋可觀忽冻,同理如果序列化上能節(jié)約一點(diǎn)內(nèi)存,網(wǎng)絡(luò)帶寬也能省下不少此疹;
  3. 可擴(kuò)展性僧诚,對(duì)互聯(lián)網(wǎng)公司而言,業(yè)務(wù)變化飛快蝗碎,如果序列化協(xié)議具有良好的可擴(kuò)展性湖笨,支持自動(dòng)增加新的業(yè)務(wù)字段,而不影響老的服務(wù)蹦骑,這將大大提供系統(tǒng)的靈活度慈省。

目前互聯(lián)網(wǎng)公司廣泛使用Protobuf、Thrift眠菇、Avro等成熟的序列化解決方案來(lái)搭建RPC框架边败,這些都是久經(jīng)考驗(yàn)的解決方案。

1.3 通信

消息數(shù)據(jù)結(jié)構(gòu)被序列化為二進(jìn)制串后捎废,下一步就要進(jìn)行網(wǎng)絡(luò)通信了笑窜。目前有兩種常用IO通信模型:1)BIO;2)NIO登疗。一般RPC框架需要支持這兩種IO模型排截。

如何實(shí)現(xiàn)RPC的IO通信框架呢?

  1. 使用java nio方式自研谜叹,這種方式較為復(fù)雜匾寝,而且很有可能出現(xiàn)隱藏bug搬葬,但也見(jiàn)過(guò)一些互聯(lián)網(wǎng)公司使用這種方式荷腊;
  2. 基于mina,mina在早幾年比較火熱急凰,不過(guò)這些年版本更新緩慢女仰;
  3. 基于netty猜年,現(xiàn)在很多RPC框架都直接基于netty這一IO通信框架,省力又省心疾忍,比如阿里巴巴的HSF乔外、dubbo,Twitter的finagle等一罩。
1.4 消息里為什么要有requestID杨幼?

如果使用netty的話,一般會(huì)用channel.writeAndFlush()方法來(lái)發(fā)送消息二進(jìn)制串聂渊,這個(gè)方法調(diào)用后對(duì)于整個(gè)遠(yuǎn)程調(diào)用(從發(fā)出請(qǐng)求到接收到結(jié)果)來(lái)說(shuō)是一個(gè)異步的差购,即對(duì)于當(dāng)前線程來(lái)說(shuō),將請(qǐng)求發(fā)送出來(lái)后汉嗽,線程就可以往后執(zhí)行了欲逃,至于服務(wù)端的結(jié)果,是服務(wù)端處理完成后饼暑,再以消息的形式發(fā)送給客戶(hù)端的稳析。于是這里出現(xiàn)以下兩個(gè)問(wèn)題:

  1. 怎么讓當(dāng)前線程“暫停”弓叛,等結(jié)果回來(lái)后彰居,再向后執(zhí)行?
  2. 如果有多個(gè)線程同時(shí)進(jìn)行遠(yuǎn)程方法調(diào)用邪码,這時(shí)建立在client server之間的socket連接上會(huì)有很多雙方發(fā)送的消息傳遞裕菠,前后順序也可能是隨機(jī)的,server處理完結(jié)果后闭专,將結(jié)果消息發(fā)送給client奴潘,client收到很多消息,怎么知道哪個(gè)消息結(jié)果是原先哪個(gè)線程調(diào)用的影钉?

如下圖所示画髓,線程A和線程B同時(shí)向client socket發(fā)送請(qǐng)求requestA和requestB,socket先后將requestB和requestA發(fā)送至server平委,而server可能將responseA先返回奈虾,盡管requestA請(qǐng)求到達(dá)時(shí)間更晚。我們需要一種機(jī)制保證responseA丟給ThreadA廉赔,responseB丟給ThreadB肉微。



怎么解決呢?

  1. client線程每次通過(guò)socket調(diào)用一次遠(yuǎn)程接口前蜡塌,生成一個(gè)唯一的ID碉纳,即requestID(requestID必需保證在一個(gè)Socket連接里面是唯一的),一般常常使用AtomicLong從0開(kāi)始累計(jì)數(shù)字生成唯一ID馏艾;
  2. 將處理結(jié)果的回調(diào)對(duì)象callback劳曹,存放到全局ConcurrentHashMap里面put(requestID, callback)奴愉;
  3. 當(dāng)線程調(diào)用channel.writeAndFlush()發(fā)送消息后,緊接著執(zhí)行callback的get()方法試圖獲取遠(yuǎn)程返回的結(jié)果铁孵。在get()內(nèi)部锭硼,則使用synchronized獲取回調(diào)對(duì)象callback的鎖,再先檢測(cè)是否已經(jīng)獲取到結(jié)果蜕劝,如果沒(méi)有檀头,然后調(diào)用callback的wait()方法,釋放callback上的鎖岖沛,讓當(dāng)前線程處于等待狀態(tài)鳖擒。
  4. 服務(wù)端接收到請(qǐng)求并處理后,將response結(jié)果(此結(jié)果中包含了前面的requestID)發(fā)送給客戶(hù)端烫止,客戶(hù)端socket連接上專(zhuān)門(mén)監(jiān)聽(tīng)消息的線程收到消息蒋荚,分析結(jié)果,取到requestID馆蠕,再?gòu)那懊娴腃oncurrentHashMap里面get(requestID)期升,從而找到callback對(duì)象,再用synchronized獲取callback上的鎖互躬,將方法調(diào)用結(jié)果設(shè)置到callback對(duì)象里播赁,再調(diào)用callback.notifyAll()喚醒前面處于等待狀態(tài)的線程。
public Object get() {
        synchronized (this) { // 旋鎖
            while (!isDone) { // 是否有結(jié)果了
                wait(); //沒(méi)結(jié)果是釋放鎖吼渡,讓當(dāng)前線程處于等待狀態(tài)
            }
        }
}
private void setDone(Response res) {
        this.res = res;
        isDone = true;
        synchronized (this) { //獲取鎖容为,因?yàn)榍懊鎤ait()已經(jīng)釋放了callback的鎖了
            notifyAll(); // 喚醒處于等待的線程
        }
    }

2 如何發(fā)布自己的服務(wù)?

如何讓別人使用我們的服務(wù)呢寺酪?有同學(xué)說(shuō)很簡(jiǎn)單嘛坎背,告訴使用者服務(wù)的IP以及端口就可以了啊。確實(shí)是這樣寄雀,這里問(wèn)題的關(guān)鍵在于是自動(dòng)告知還是人肉告知得滤。

人肉告知的方式:如果你發(fā)現(xiàn)你的服務(wù)一臺(tái)機(jī)器不夠,要再添加一臺(tái)盒犹,這個(gè)時(shí)候就要告訴調(diào)用者我現(xiàn)在有兩個(gè)ip了懂更,你們要輪詢(xún)調(diào)用來(lái)實(shí)現(xiàn)負(fù)載均衡;調(diào)用者咬咬牙改了急膀,結(jié)果某天一臺(tái)機(jī)器掛了沮协,調(diào)用者發(fā)現(xiàn)服務(wù)有一半不可用,他又只能手動(dòng)修改代碼來(lái)刪除掛掉那臺(tái)機(jī)器的ip∽可現(xiàn)實(shí)生產(chǎn)環(huán)境當(dāng)然不會(huì)使用人肉方式慷暂。

有沒(méi)有一種方法能實(shí)現(xiàn)自動(dòng)告知,即機(jī)器的增添命黔、剔除對(duì)調(diào)用方透明呜呐,調(diào)用者不再需要寫(xiě)死服務(wù)提供方地址?當(dāng)然可以悍募,現(xiàn)如今zookeeper被廣泛用于實(shí)現(xiàn)服務(wù)自動(dòng)注冊(cè)與發(fā)現(xiàn)功能蘑辑!
簡(jiǎn)單來(lái)講,zookeeper可以充當(dāng)一個(gè)服務(wù)注冊(cè)表
(Service Registry)坠宴,讓多個(gè)服務(wù)提供者
形成一個(gè)集群洋魂,讓服務(wù)消費(fèi)者
通過(guò)服務(wù)注冊(cè)表獲取具體的服務(wù)訪問(wèn)地址(ip+端口)去訪問(wèn)具體的服務(wù)提供者。如下圖所示:



具體來(lái)說(shuō)喜鼓,zookeeper就是個(gè)分布式文件系統(tǒng)副砍,每當(dāng)一個(gè)服務(wù)提供者部署后都要將自己的服務(wù)注冊(cè)到zookeeper的某一路徑上: /{service}/{version}/{ip:port}, 比如我們的HelloWorldService部署到兩臺(tái)機(jī)器,那么zookeeper上就會(huì)創(chuàng)建兩條目錄:分別為/HelloWorldService/1.0.0/100.19.20.01:16888 /HelloWorldService/1.0.0/100.19.20.02:16888庄岖。

zookeeper提供了“心跳檢測(cè)”功能豁翎,它會(huì)定時(shí)向各個(gè)服務(wù)提供者發(fā)送一個(gè)請(qǐng)求(實(shí)際上建立的是一個(gè) Socket 長(zhǎng)連接),如果長(zhǎng)期沒(méi)有響應(yīng)隅忿,服務(wù)中心就認(rèn)為該服務(wù)提供者已經(jīng)“掛了”心剥,并將其剔除,比如100.19.20.02這臺(tái)機(jī)器如果宕機(jī)了背桐,那么zookeeper上的路徑就會(huì)只剩/HelloWorldService/1.0.0/100.19.20.01:16888优烧。

服務(wù)消費(fèi)者會(huì)去監(jiān)聽(tīng)相應(yīng)路徑(/HelloWorldService/1.0.0),一旦路徑上的數(shù)據(jù)有任務(wù)變化(增加或減少)链峭,zookeeper都會(huì)通知服務(wù)消費(fèi)方服務(wù)提供者地址列表已經(jīng)發(fā)生改變畦娄,從而進(jìn)行更新。

更為重要的是zookeeper與生俱來(lái)的容錯(cuò)容災(zāi)能力(比如leader選舉)弊仪,可以確保服務(wù)注冊(cè)表的高可用性熙卡。

3.Hadoop中RPC實(shí)例分析

ipc.RPC類(lèi)中有一些內(nèi)部類(lèi),為了大家對(duì)RPC類(lèi)有個(gè)初步的印象励饵,就先羅列幾個(gè)我們感興趣的分析一下吧:

Invocation :用于封裝方法名和參數(shù)再膳,作為數(shù)據(jù)傳輸層。
ClientCache :用于存儲(chǔ)client對(duì)象曲横,用socket factory作為hash key,存儲(chǔ)結(jié)構(gòu)為hashMap <SocketFactory, Client>喂柒。
Invoker :是動(dòng)態(tài)代理中的調(diào)用實(shí)現(xiàn)類(lèi),繼承了InvocationHandler.
Server :是ipc.Server的實(shí)現(xiàn)類(lèi)禾嫉。

public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      ???
      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      ???
      return value.get();
    }

如果你發(fā)現(xiàn)這個(gè)invoke()方法實(shí)現(xiàn)的有些奇怪的話灾杰,那你就對(duì)了。一般我們看到的動(dòng)態(tài)代理的invoke()方法中總會(huì)有 method.invoke(ac, arg); 這句代碼熙参。而上面代碼中卻沒(méi)有艳吠,這是為什么呢?其實(shí)使用 method.invoke(ac, arg); 是在本地JVM中調(diào)用孽椰;而在hadoop中昭娩,是將數(shù)據(jù)發(fā)送給服務(wù)端凛篙,服務(wù)端將處理的結(jié)果再返回給客戶(hù)端,所以這里的invoke()方法必然需要進(jìn)行網(wǎng)絡(luò)通信栏渺。而網(wǎng)絡(luò)通信就是下面的這段代碼實(shí)現(xiàn)的:

ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);

Invocation類(lèi)在這里封裝了方法名和參數(shù)呛梆。其實(shí)這里網(wǎng)絡(luò)通信只是調(diào)用了Client類(lèi)的call()方法。那我們接下來(lái)分析一下ipc.Client源碼吧磕诊。和第一章一樣填物,同樣是3個(gè)問(wèn)題

  1. 客戶(hù)端和服務(wù)端的連接是怎樣建立的?
  2. 客戶(hù)端是怎樣給服務(wù)端發(fā)送數(shù)據(jù)的霎终?
  3. 客戶(hù)端是怎樣獲取服務(wù)端的返回?cái)?shù)據(jù)的滞磺?
3.1 客戶(hù)端和服務(wù)端的連接是怎樣建立的?
public Writable call(Writable param, ConnectionId remoteId)  
                       throws InterruptedException, IOException {
    Call call = new Call(param);       //將傳入的數(shù)據(jù)封裝成call對(duì)象
    Connection connection = getConnection(remoteId, call);   //獲得一個(gè)連接
    connection.sendParam(call);     // 向服務(wù)端發(fā)送call對(duì)象
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait(); // 等待結(jié)果的返回莱褒,在Call類(lèi)的callComplete()方法里有notify()方法用于喚醒線程
        } catch (InterruptedException ie) {
          // 因中斷異常而終止击困,設(shè)置標(biāo)志interrupted為true
          interrupted = true;
        }
      }
      if (interrupted) {
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // 本地異常
          throw wrapException(remoteId.getAddress(), call.error);
        }
      } else {
        return call.value; //返回結(jié)果數(shù)據(jù)
      }
    }
  }

具體代碼的作用我已做了注釋?zhuān)赃@里不再贅述。但到目前為止广凸,你依然不知道RPC機(jī)制底層的網(wǎng)絡(luò)連接是怎么建立的沛励。分析代碼后,我們會(huì)發(fā)現(xiàn)和網(wǎng)絡(luò)通信有關(guān)的代碼只會(huì)是下面的兩句了:

  Connection connection = getConnection(remoteId, call);   //獲得一個(gè)連接
  connection.sendParam(call);      // 向服務(wù)端發(fā)送call對(duì)象

先看看是怎么獲得一個(gè)到服務(wù)端的連接吧炮障,下面貼出ipc.Client類(lèi)中的getConnection()方法目派。

private Connection getConnection(ConnectionId remoteId,
                                   Call call)
                                   throws IOException, InterruptedException {
    if (!running.get()) {
      // 如果client關(guān)閉了
      throw new IOException("The client is stopped");
    }
    Connection connection;
//如果connections連接池中有對(duì)應(yīng)的連接對(duì)象,就不需重新創(chuàng)建了胁赢;如果沒(méi)有就需重新創(chuàng)建一個(gè)連接對(duì)象企蹭。
//但請(qǐng)注意,該//連接對(duì)象只是存儲(chǔ)了remoteId的信息智末,其實(shí)還并沒(méi)有和服務(wù)端建立連接谅摄。
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call)); //將call對(duì)象放入對(duì)應(yīng)連接中的calls池,就不貼出源碼了
   //這句代碼才是真正的完成了和服務(wù)端建立連接哦~
    connection.setupIOstreams();
    return connection;
  }

下面貼出Client.Connection類(lèi)中的setupIOstreams()方法:

  private synchronized void setupIOstreams() throws InterruptedException {
   ???
      try {
       ???
        while (true) {
          setupConnection();  //建立連接
          InputStream inStream = NetUtils.getInputStream(socket);     //獲得輸入流
          OutputStream outStream = NetUtils.getOutputStream(socket);  //獲得輸出流
          writeRpcHeader(outStream);
          ???
          this.in = new DataInputStream(new BufferedInputStream
              (new PingInputStream(inStream)));   //將輸入流裝飾成DataInputStream
          this.out = new DataOutputStream
          (new BufferedOutputStream(outStream));   //將輸出流裝飾成DataOutputStream
          writeHeader();
          // 跟新活動(dòng)時(shí)間
          touch();
          //當(dāng)連接建立時(shí)系馆,啟動(dòng)接受線程等待服務(wù)端傳回?cái)?shù)據(jù)送漠,注意:Connection繼承了Tread
          start();
          return;
        }
      } catch (IOException e) {
        markClosed(e);
        close();
      }
    }

再有一步我們就知道客戶(hù)端的連接是怎么建立的啦,下面貼出Client.Connection類(lèi)中的setupConnection()方法:

  private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
          this.socket = socketFactory.createSocket(); //終于看到創(chuàng)建socket的方法了
          this.socket.setTcpNoDelay(tcpNoDelay);
         ???
          // 設(shè)置連接超時(shí)為20s
          NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* 設(shè)置最多連接重試為45次由蘑。
           * 總共有20s*45 = 15 分鐘的重試時(shí)間闽寡。
           */
          handleConnectionFailure(timeoutFailures++, 45, toe);
        } catch (IOException ie) {
          handleConnectionFailure(ioFailures++, maxRetries, ie);
        }
      }
    }

終于,我們知道了客戶(hù)端的連接是怎樣建立的了尼酿,其實(shí)就是創(chuàng)建一個(gè)普通的socket進(jìn)行通信爷狈。

3.2 客戶(hù)端是怎樣給服務(wù)端發(fā)送數(shù)據(jù)的?

下面貼出Client.Connection類(lèi)的sendParam()方法吧:

public void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }
      DataOutputBuffer d=null;
      try {
        synchronized (this.out) {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + " sending #" + call.id);
          //創(chuàng)建一個(gè)緩沖區(qū)
          d = new DataOutputBuffer();
          d.writeInt(call.id);
          call.param.write(d);
          byte[] data = d.getData();
          int dataLength = d.getLength();
          out.writeInt(dataLength);        //首先寫(xiě)出數(shù)據(jù)的長(zhǎng)度
          out.write(data, 0, dataLength); //向服務(wù)端寫(xiě)數(shù)據(jù)
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally {
        IOUtils.closeStream(d);
      }
    }  
3.3 客戶(hù)端是怎樣獲取服務(wù)端的返回?cái)?shù)據(jù)的裳擎?

下面貼出Client.Connection類(lèi)和Client.Call類(lèi)中的相關(guān)方法:

方法一:  
  public void run() {
      ???
      while (waitForWork()) {
        receiveResponse();  //具體的處理方法
      }
      close();
     ???
}

方法二:
private void receiveResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();
      try {
        int id = in.readInt();                    // 阻塞讀取id
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + id);
          Call call = calls.get(id);    //在calls池中找到發(fā)送時(shí)的那個(gè)對(duì)象
        int state = in.readInt();     // 阻塞讀取call對(duì)象的狀態(tài)
        if (state == Status.SUCCESS.state) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);           // 讀取數(shù)據(jù)
        //將讀取到的值賦給call對(duì)象涎永,同時(shí)喚醒Client等待線程,貼出setValue()代碼方法三
          call.setValue(value);              
          calls.remove(id);               //刪除已處理的call    
        } else if (state == Status.ERROR.state) {
        ???
        } else if (state == Status.FATAL.state) {
        ???
        }
      } catch (IOException e) {
        markClosed(e);
      }
}

方法三:
public synchronized void setValue(Writable value) {
      this.value = value;
      callComplete();   //具體實(shí)現(xiàn)
}
protected synchronized void callComplete() {
      this.done = true;
      notify();         // 喚醒client等待線程
    }

完成的功能主要是:?jiǎn)?dòng)一個(gè)處理線程,讀取從服務(wù)端傳來(lái)的call對(duì)象羡微,將call對(duì)象讀取完畢后谷饿,喚醒client處理線程。就這么簡(jiǎn)單妈倔,客戶(hù)端就獲取了服務(wù)端返回的數(shù)據(jù)了哦~博投。客戶(hù)端的源碼分析就到這里了哦启涯,下面我們來(lái)分析Server端的源碼吧。

3.4 ipc.Server源碼分析

為了讓大家對(duì)ipc.Server有個(gè)初步的了解恃轩,我們先分析一下它的幾個(gè)內(nèi)部類(lèi)吧:

Call :用于存儲(chǔ)客戶(hù)端發(fā)來(lái)的請(qǐng)求
Listener : 監(jiān)聽(tīng)類(lèi)结洼,用于監(jiān)聽(tīng)客戶(hù)端發(fā)來(lái)的請(qǐng)求,同時(shí)Listener內(nèi)部還有一個(gè)靜態(tài)類(lèi)叉跛,Listener.Reader松忍,當(dāng)監(jiān)聽(tīng)器監(jiān)聽(tīng)到用戶(hù)請(qǐng)求,便讓Reader讀取用戶(hù)請(qǐng)求筷厘。
Responder :響應(yīng)RPC請(qǐng)求類(lèi)鸣峭,請(qǐng)求處理完畢,由Responder發(fā)送給請(qǐng)求客戶(hù)端酥艳。
Connection :連接類(lèi)摊溶,真正的客戶(hù)端請(qǐng)求讀取邏輯在這個(gè)類(lèi)中。
Handler :請(qǐng)求處理類(lèi)充石,會(huì)循環(huán)阻塞讀取callQueue中的call對(duì)象莫换,并對(duì)其進(jìn)行操作。

private void initialize(Configuration conf) throws IOException {
   ???
    // 創(chuàng)建 rpc server
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      //獲得serviceRpcServer
      this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), 
          dnSocketAddr.getPort(), serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
}
//獲得server
    this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());

   ???
    this.server.start();  //啟動(dòng) RPC server   Clients只允許連接該server
    if (serviceRpcServer != null) {
      serviceRpcServer.start();  //啟動(dòng) RPC serviceRpcServer 為HDFS服務(wù)的server
    }
    startTrashEmptier(conf);
  }

查看Namenode初始化源碼得知:RPC的server對(duì)象是通過(guò)ipc.RPC類(lèi)的getServer()方法獲得的骤铃。下面咱們?nèi)タ纯磇pc.RPC類(lèi)中的getServer()源碼吧:

public static Server getServer(final Object instance, final String bindAddress, final int port,
                                 final int numHandlers,
                                 final boolean verbose, Configuration conf,
                                 SecretManager<? extends TokenIdentifier> secretManager) 
    throws IOException {
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  }

這時(shí)我們發(fā)現(xiàn)getServer()是一個(gè)創(chuàng)建Server對(duì)象的工廠方法拉岁,但創(chuàng)建的卻是RPC.Server類(lèi)的對(duì)象。哈哈惰爬,現(xiàn)在你明白了我前面說(shuō)的“RPC.Server是ipc.Server的實(shí)現(xiàn)類(lèi)”了吧喊暖。不過(guò)RPC.Server的構(gòu)造函數(shù)還是調(diào)用了ipc.Server類(lèi)的構(gòu)造函數(shù)的,因篇幅所限撕瞧,就不貼出相關(guān)源碼了陵叽。

初始化Server后,Server端就運(yùn)行起來(lái)了丛版,看看ipc.Server的start()源碼吧:

 /** 啟動(dòng)服務(wù) */
 public synchronized void start() {
   responder.start();  //啟動(dòng)responder
   listener.start();   //啟動(dòng)listener
   handlers = new Handler[handlerCount];
   
   for (int i = 0; i < handlerCount; i++) {
     handlers[i] = new Handler(i);
     handlers[i].start();   //逐個(gè)啟動(dòng)Handler
   }
 }

分析過(guò)ipc.Client源碼后咨跌,我們知道Client端的底層通信直接采用了阻塞式IO編程,當(dāng)時(shí)我們?cè)龀霾聹y(cè):Server端是不是也采用了阻塞式IO∨鹦觯現(xiàn)在我們仔細(xì)地分析一下吧锌半,如果Server端也采用阻塞式IO,當(dāng)連接進(jìn)來(lái)的Client端很多時(shí),勢(shì)必會(huì)影響Server端的性能刊殉。hadoop的實(shí)現(xiàn)者們考慮到了這點(diǎn)殉摔,所以他們采用了java NIO來(lái)實(shí)現(xiàn)Server端,那Server端采用java NIO是怎么建立連接的呢记焊?分析源碼得知逸月,Server端采用Listener監(jiān)聽(tīng)客戶(hù)端的連接,下面先分析一下Listener的構(gòu)造函數(shù)吧:

public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // 創(chuàng)建ServerSocketChannel,并設(shè)置成非阻塞式
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // 將server socket綁定到本地端口
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); 
      // 獲得一個(gè)selector
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      //啟動(dòng)多個(gè)reader線程遍膜,為了防止請(qǐng)求多時(shí)服務(wù)端響應(yīng)延時(shí)的問(wèn)題
      for (int i = 0; i < readThreads; i++) {       
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }
      // 注冊(cè)連接事件
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

在啟動(dòng)Listener線程時(shí)碗硬,服務(wù)端會(huì)一直等待客戶(hù)端的連接,下面貼出Server.Listener類(lèi)的run()方法:

 public void run() {
     ???
      while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);     //具體的連接方法
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
       ???         
    }

下面貼出Server.Listener類(lèi)中doAccept()方法中的關(guān)鍵源碼吧:

 void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) { //建立連接
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();  //從readers池中獲得一個(gè)reader
        try {
          reader.startAdd(); // 激活readSelector瓢颅,設(shè)置adding為true
          SelectionKey readKey = reader.registerChannel(channel);//將讀事件設(shè)置成興趣事件
          c = new Connection(readKey, channel, System.currentTimeMillis());//創(chuàng)建一個(gè)連接對(duì)象
          readKey.attach(c);   //將connection對(duì)象注入readKey
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
        ??? 
        } finally {
//設(shè)置adding為false恩尾,采用notify()喚醒一個(gè)reader,其實(shí)代碼十三中啟動(dòng)的每個(gè)reader都使
//用了wait()方法等待。因篇幅有限挽懦,就不貼出源碼了翰意。
          reader.finishAdd();
        }
      }
    }

當(dāng)reader被喚醒,reader接著執(zhí)行doRead()方法信柿。

下面貼出Server.Listener.Reader類(lèi)中的doRead()方法和Server.Connection類(lèi)中的readAndProcess()方法源碼:

方法一:   
 void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();  //獲得connection對(duì)象
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      try {
        count = c.readAndProcess();    // 接受并處理請(qǐng)求  
      } catch (InterruptedException ieo) {
       ???
      }
     ???    
}

方法二:
public int readAndProcess() throws IOException, InterruptedException {
      while (true) {
        ???
        if (!rpcHeaderRead) {
          if (rpcHeaderBuffer == null) {
            rpcHeaderBuffer = ByteBuffer.allocate(2);
          }
         //讀取請(qǐng)求頭
          count = channelRead(channel, rpcHeaderBuffer);
          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
            return count;
          }
        // 讀取請(qǐng)求版本號(hào)  
          int version = rpcHeaderBuffer.get(0);
          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
        ???  
       
          data = ByteBuffer.allocate(dataLength);
        }
        // 讀取請(qǐng)求  
        count = channelRead(channel, data);
        
        if (data.remaining() == 0) {
         ???
          if (useSasl) {
         ???
          } else {
            processOneRpc(data.array());//處理請(qǐng)求
          }
        ???
          }
        } 
        return count;
      }
    }

下面貼出Server.Connection類(lèi)中的processOneRpc()方法和processData()方法的源碼冀偶。

方法一:   
 private void processOneRpc(byte[] buf) throws IOException,
        InterruptedException {
      if (headerRead) {
        processData(buf);
      } else {
        processHeader(buf);
        headerRead = true;
        if (!authorizeConnection()) {
          throw new AccessControlException("Connection from " + this
              + " for protocol " + header.getProtocol()
              + " is unauthorized for user " + user);
        }
      }
}
方法二:
    private void processData(byte[] buf) throws  IOException, InterruptedException {
      DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(buf));
      int id = dis.readInt();      // 嘗試讀取id
      Writable param = ReflectionUtils.newInstance(paramClass, conf);//讀取參數(shù)
      param.readFields(dis);        
        
      Call call = new Call(id, param, this);  //封裝成call
      callQueue.put(call);   // 將call存入callQueue
      incRpcCount();  // 增加rpc請(qǐng)求的計(jì)數(shù)
    }

4. RPC與web service

RPC:



Web service

web service接口就是RPC中的stub組件,規(guī)定了server能夠提供的服務(wù)(web service)渔嚷,這在server和client上是一致的进鸠,但是也是跨語(yǔ)言跨平臺(tái)的。同時(shí)形病,由于web service規(guī)范中的WSDL文件的存在堤如,現(xiàn)在各平臺(tái)的web service框架,都可以基于WSDL文件窒朋,自動(dòng)生成web service接口 搀罢。
其實(shí)兩者差不多,只是傳輸?shù)膮f(xié)議不同侥猩。

Reference:

  1. http://www.cnblogs.com/LBSer/p/4853234.html
  2. http://weixiaolu.iteye.com/blog/1504898
  3. http://kyfxbl.iteye.com/blog/1745550
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末榔至,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子欺劳,更是在濱河造成了極大的恐慌唧取,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件划提,死亡現(xiàn)場(chǎng)離奇詭異枫弟,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)鹏往,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門(mén)淡诗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人,你說(shuō)我怎么就攤上這事韩容】钗ィ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵群凶,是天一觀的道長(zhǎng)插爹。 經(jīng)常有香客問(wèn)我,道長(zhǎng)请梢,這世上最難降的妖魔是什么赠尾? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮毅弧,結(jié)果婚禮上气嫁,老公的妹妹穿的比我還像新娘。我一直安慰自己形真,他們只是感情好杉编,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布超全。 她就那樣靜靜地躺著咆霜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嘶朱。 梳的紋絲不亂的頭發(fā)上蛾坯,一...
    開(kāi)封第一講書(shū)人閱讀 51,292評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音疏遏,去河邊找鬼脉课。 笑死,一個(gè)胖子當(dāng)著我的面吹牛财异,可吹牛的內(nèi)容都是我干的倘零。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼戳寸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼呈驶!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起疫鹊,我...
    開(kāi)封第一講書(shū)人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤袖瞻,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后拆吆,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體聋迎,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年枣耀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了霉晕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖娄昆,靈堂內(nèi)的尸體忽然破棺而出佩微,到底是詐尸還是另有隱情,我是刑警寧澤萌焰,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布哺眯,位于F島的核電站,受9級(jí)特大地震影響扒俯,放射性物質(zhì)發(fā)生泄漏奶卓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一撼玄、第九天 我趴在偏房一處隱蔽的房頂上張望夺姑。 院中可真熱鬧,春花似錦掌猛、人聲如沸盏浙。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)废膘。三九已至,卻和暖如春慕蔚,著一層夾襖步出監(jiān)牢的瞬間丐黄,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工孔飒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留灌闺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓坏瞄,卻偏偏與公主長(zhǎng)得像桂对,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子鸠匀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理蕉斜,服務(wù)發(fā)現(xiàn),斷路器狮崩,智...
    卡卡羅2017閱讀 134,654評(píng)論 18 139
  • 轉(zhuǎn)自:http://blog.csdn.net/kesonyk/article/details/50924489 ...
    晴天哥_王志閱讀 24,808評(píng)論 2 38
  • Android跨進(jìn)程通信IPC整體內(nèi)容如下 1蛛勉、Android跨進(jìn)程通信IPC之1——Linux基礎(chǔ)2、Andro...
    隔壁老李頭閱讀 11,885評(píng)論 11 56
  • 幼時(shí)總盼著長(zhǎng)大睦柴,現(xiàn)在長(zhǎng)大了卻有諸多煩惱诽凌。有人問(wèn)我在大學(xué)里為什么不談場(chǎng)戀愛(ài),我想說(shuō)坦敌,我已經(jīng)不是那個(gè)16侣诵、17歲的...
    白玉錟閱讀 107評(píng)論 0 1
  • 我知道人和自己處理不好關(guān)系時(shí)痢法,和誰(shuí)一起都不合適。現(xiàn)在我就處于和自己處不好關(guān)系杜顺。我不知道何時(shí)才會(huì)是淡定從容财搁,不再容易...
    姜暖人生閱讀 107評(píng)論 0 1