[原創(chuàng)]Swoft源碼剖析-RPC功能實現(xiàn)

Swoft提供了一個自建RPC(遠程方法調用)實現(xiàn),讓你可以方便的調用其他Swoft上的服務。

RPC服務端的初始化

RPC有兩種啟動方式Http伴隨啟動RPC單獨啟動搪锣。值得一提的是目前swoole的tcp服務即RPC服務鲁僚,暫沒有其他的tcp服務功能,所以基本上tcp相關的配置指代的就是RPC肃叶。

Http伴隨啟動

swoft的RPC 服務在Http服務啟動時候伴隨啟動

//Swoft\Http\Server\Http\HttpServer.php

/**
 * Http Server
 */
class HttpServer extends AbstractServer
    /**
     * Start Server
     *
     * @throws \Swoft\Exception\RuntimeException
     */
    public function start()
    {
        //coco ...

        //根據.env配置文件Server區(qū)段的TCPABLE字段決定是否啟動RPC服務
        if ((int)$this->serverSetting['tcpable'] === 1) {
            $this->registerRpcEvent();
        }
        //code ....
    }
}
Swoole監(jiān)聽

初始化流程即根據相關注解注冊一個swoole監(jiān)聽

//Swoft\Http\Server\Http\HttpServer.php
    /**
     * Register rpc event, swoft/rpc-server required
     *
     * @throws \Swoft\Exception\RuntimeException
     */
    protected function registerRpcEvent()
    {
        //含有@SwooleListener且type為SwooleEvent::TYPE_PORT的Bean,即RpcEventListener
        $swooleListeners = SwooleListenerCollector::getCollector();
        if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) {
            throw new RuntimeException("Please use swoft/rpc-server, run 'composer require swoft/rpc-server'");
        }

        //添加swoole RPC相關的tcp監(jiān)聽端口,使用的是.env文件中的TCP區(qū)段配置
        $this->listen = $this->server->listen($this->tcpSetting['host'], $this->tcpSetting['port'], $this->tcpSetting['type']);
        $tcpSetting = $this->getListenTcpSetting();
        $this->listen->set($tcpSetting);

        //根據RpcEventListener的相關注解添加監(jiān)聽處理句柄
        $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0];
        $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents);
    }

由于是初版十嘿,根據@SwooleListener獲取RPC監(jiān)聽Bean的相關處理暫時還有點生硬因惭。
目前swoft中type為SwooleEvent::TYPE_PORT@SwooleListener只有RpcEventListener一個,如果添加了同類Bean容易出問題绩衷,穩(wěn)定版出的時候應該會有相關優(yōu)化蹦魔。

RPC單獨啟動

入口從Swoft\Http\Server\Command\ServerCommand換成Swoft\Rpc\Server\Command\RpcCommand激率,流程和Http大同小異,區(qū)別僅僅在于使用前者使用Swoole\Http\Server建立Http服務器后額外監(jiān)聽一個Tcp端口支持Rpc勿决,后者直接使用Swoole\Server監(jiān)聽Tcp來支持Rpc乒躺,此處不再贅述。

RPC請求處理

RPC服務器和HTTP服務器的區(qū)別僅僅在于與客戶端交互報文格式和報文所在的網絡層(Swoft的RPC基于TCP層次)低缩,運行原理基本相通嘉冒,都是路由,中間件咆繁,RPC Service(對應Http的Controller)讳推,你完全可以以Http服務的思路去理解他。

swoole的RPC-TCP監(jiān)聽設置好后玩般,RPC服務端就可以開始接受請求了银觅。RpcEventListener的負責的工作僅僅是把收到的數(shù)據轉發(fā)給\Swoft\Rpc\Server\ServiceDispatcher分發(fā)。Dispatcher會將請求傳遞給各個Middleware中間件坏为,最終最終傳遞給HandlerAdapterMiddleware處理究驴。

PackerMiddleware

PackerMiddleware是RPC中比較重要的一個中間件,負責將TCP請求中數(shù)據流解包和數(shù)據流封包匀伏。

<?php
//Swoft\Rpc\Server\Middleware.PackerMiddleware
namespace Swoft\Rpc\Server\Middleware;
/**
 * service packer
 *
 * @Bean()
 * @uses      PackerMiddleware
 * @version   2017年11月26日
 * @author    stelin <phpcrazy@126.com>
 * @copyright Copyright 2010-2016 swoft software
 * @license   PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
 */
class PackerMiddleware implements MiddlewareInterface
{
    /**
     * packer middleware
     *
     * @param \Psr\Http\Message\ServerRequestInterface     $request
     * @param \Psr\Http\Server\RequestHandlerInterface $handler
     *
     * @return \Psr\Http\Message\ResponseInterface
     */
    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
    {
        //獲取servicePacker Bean(\Swoft\Rpc\Packer\ServicePacker)用于字符串解包封包
        $packer = service_packer();
        $data   = $request->getAttribute(self::ATTRIBUTE_DATA);
        $data   = $packer->unpack($data);

        // 觸發(fā)一個RpcServerEvent::BEFORE_RECEIVE事件洒忧,默認只有一個用于添加請求上下文信息的BeforeReceiveListener
        // 利用中間件觸發(fā)流程關鍵事件的做法耦合有點高,猜測以后會調整
        App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data);
        //替換解包后的解包到Request中够颠,提供給后續(xù)中間件和Handler使用
        $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data);

        /* @var \Swoft\Rpc\Server\Rpc\Response $response */
        $response      = $handler->handle($request);

       //為Response封包返回給RPC客戶端
        $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE);
        $serviceResult = $packer->pack($serviceResult);
        return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult);
    }
}
RouterMiddleware

RouterMiddleware負責根據RPC請求的method,version,interface 獲取處理的RPC服務類熙侍,充當了路由的作用

<?php
//Swoft\Rpc\Server\Middleware\RouterMiddleware.php

/**
 * service router
 *
 * @Bean()
 * @uses      RouterMiddleware
 * @version   2017年11月26日
 * @author    stelin <phpcrazy@126.com>
 * @copyright Copyright 2010-2016 swoft software
 * @license   PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
 */
class RouterMiddleware implements MiddlewareInterface
{
    /**
     * get handler from router
     *
     * @param \Psr\Http\Message\ServerRequestInterface     $request
     * @param \Psr\Http\Server\RequestHandlerInterface $handler
     *
     * @return \Psr\Http\Message\ResponseInterface
     */
    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
    {
        // service data
        $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);

        $method    = $data['method']??"";
        $version   = $data['version']??"";
        $interface = $data['interface']??"";

        /* @var \Swoft\Rpc\Server\Router\HandlerMapping $serviceRouter */
        $serviceRouter  = App::getBean('serviceRouter');
        //路由匹配,即向Swoft\Rpc\Server\Router\HandlerMapping->$routes獲取RPC服務信息
        $serviceHandler = $serviceRouter->getHandler($interface, $version, $method);

        // deliver service data
        $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler);

        return $handler->handle($request);
    }
}

swoft啟動階段會掃描并初始化注解信息(參考注解章節(jié))摧找,注解初始化完畢后會觸發(fā)一個AppEvent::APPLICATION_LOADER事件,此時會將來自@Service的所有RPC的路由信息注冊到Swoft\Rpc\Server\Router\HandlerMapping->$routes中牢硅,用于serviceRouter Bean的路由匹配蹬耘。

HandlerAdapterMiddleware

HandlerAdapterMiddleware最終轉發(fā)請求給HandlerAdapter處理,HandlerAdapter會使用剛剛RouterMiddleware匹配到的服務類信息轉發(fā)請求并封裝Response最終返回給ServiceDispatcher,ServiceDispatcher會返回TCP流給客戶端然后結束本次請求减余。

<?php
//Swoft\Rpc\Server\Router\HandlerAdapter.php
/**
 * Service handler adapter
 * @Bean("serviceHandlerAdapter")
 */
class HandlerAdapter implements HandlerAdapterInterface
{

    /**
     * Execute service handler
     *
     * @param \Psr\Http\Message\ServerRequestInterface $request
     * @param array                                    $handler
     * @return Response
     */
    public function doHandler(ServerRequestInterface $request, array $handler): Response
    {
        // RPC方法的各個參數(shù)
        $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
        $params = $data['params'] ?? [];
        
        //路由解析出來的综苔,處理該請求的服務Bean和方法
        list($serviceClass, $method) = $handler;
        $service = App::getBean($serviceClass);

        // execute handler with params
        $response = PhpHelper::call([$service, $method], $params);
        $response = ResponseHelper::formatData($response);

        // 構造Response返回客戶端
        if (! $response instanceof Response) {
            $response = (new Response())->withAttribute(self::ATTRIBUTE, $response);
        }

        return $response;
    }
}

RPC客戶端的實現(xiàn)

在Bean的屬性中聲明@Reference,swoft即會根據@var聲明的類型注入相應的RPC客戶端實例位岔。

    /**
     * @Reference(name="user")
     *
     * @var DemoInterface
     */
    private $demoService;

依賴注入的實現(xiàn)會專門另外用一篇文章單獨解釋如筛,這里先看看RPC客戶端的相關代碼。

遠程代理

namespace Swoft\Rpc\Client\Service;

/**
 * The proxy of service
 */
class ServiceProxy
{
    /**
     * @param string $className
     * @param string $interfaceClass
     */
    public static function   (string $className, string $interfaceClass)
    {
        $reflectionClass   = new \ReflectionClass($interfaceClass);
        $reflectionMethods = $reflectionClass->getMethods(\ReflectionMethod::IS_PUBLIC);

        $template = "class $className extends \\Swoft\\Rpc\\Client\\Service implements {$interfaceClass} {";

        //\Swoft\Rpc\Client\Service::class
        // the template of methods
        $template .= self::getMethodsTemplate($reflectionMethods);
        $template .= "}";
            
        eval($template);
    }
    //code ...
}

和AOP一樣抒抬,原理一樣是使用了動態(tài)代理杨刨,更具體的說法是動態(tài)遠程代理
RPC動態(tài)客戶端類實現(xiàn)了客戶端聲明的Interface類型(如DemoInterface)并繼承了\Swoft\Rpc\Client\Service類擦剑。
動態(tài)類的實現(xiàn)很簡單妖胀,對于接口顯式聲明的方法芥颈,實際上都是調用\Swoft\Rpc\Client\Service->call()方法。

interface DemoInterface
{
    /**
     * @param array $ids
     * @return array
     */
    public function getUsers(array $ids);
}
class 動態(tài)生成RPC客戶端類 extends \Swoft\Rpc\Client\Service implements \App\Lib\DemoInterface { 
    public function getUsers ( array  $ids  ) {
        $params = func_get_args();
        return $this->call('getUsers', $params);
    }
    //code ...
}

對于自動生成的defer方法赚抡,則是通過魔術方法__call(),調用\Swoft\Rpc\Client\Service->deferCall()

    /**
     * @param string $name
     * @param array  $arguments
     *
     * @return ResultInterface
     * @throws RpcClientException
     */
    function __call(string $name, array $arguments)
    {
        $method = $name;
        $prefix = self::DEFER_PREFIX;//'defer'
        if (strpos($name, $prefix) !== 0) {
            throw new RpcClientException(sprintf('the method of %s is not exist! ', $name));
        }

        if ($name == $prefix) {
            $method = array_shift($arguments);
        } elseif (strpos($name, $prefix) === 0) {
            $method = lcfirst(ltrim($name, $prefix));
        }

        return $this->deferCall($method, $arguments);
    }

我們這里只看具有代表性的call()方法,deferCall()大致相同爬坑。
RPC客戶端動態(tài)類的本質是將客戶端的參數(shù)和接口信息根據swoft自己的格式傳遞給RPC服務端,然后將服務器返回的數(shù)據解包取出返回值返回給RPC的調用者涂臣,對外偽裝成一個普通的對象盾计,屏蔽遠程調用操作。

// Swoft\Rpc\Client\Service.php
    /**
     * Do call service
     *
     * @param string $func
     * @param array  $params
     *
     * @throws \Throwable
     * @return mixed
     */
    public function call(string $func, array $params)
    {
        $profileKey = $this->interface . '->' . $func;
        //根據@reference的fallback屬性獲取降級處理句柄赁遗,在RPC服務調用失敗的時候可以會使用fallback句柄代替
        $fallback   = $this->getFallbackHandler($func);
        try {
            $connectPool    = $this->getPool();
            $circuitBreaker = $this->getBreaker();

            /* @var $client AbstractServiceConnection */
            $client = $connectPool->getConnection();
            //數(shù)據封包署辉,和RPC服務端一致
            $packer   = service_packer();
            $type     = $this->getPackerName();
            $data     = $packer->formatData($this->interface, $this->version, $func, $params);
            $packData = $packer->pack($data, $type);

            //通過熔斷器調用接口
            $result = $circuitBreaker->call([$client, 'send'], [$packData], $fallback);
            if ($result === null || $result === false) {
                return null;
            }

            //和defercall不一致這里直接收包,解包
            App::profileStart($profileKey);
            $result = $client->recv();
            App::profileEnd($profileKey);
            $connectPool->release($client);

            App::debug(sprintf('%s call %s success, data=%', $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE)));
            $result = $packer->unpack($result);
            $data   = $packer->checkData($result);
        } catch (\Throwable $throwable) {
            if (empty($fallback)) {
                throw $throwable;
            }
            //RPC調用失敗則調用降級句柄吼和,代替實際RPC服務直接返回
            $data = PhpHelper::call($fallback, $params);
        }

        return $data;
    }

熔斷器

熔斷器的swoft-RPC的另一重要概念涨薪,RPC的所有請求都通過熔斷器發(fā)送。
熔斷器使用狀態(tài)模式實現(xiàn),熔斷器有開啟,半開,關閉 3種狀態(tài)炫乓,不同狀態(tài)下熔斷器會持有不同的狀態(tài)實例刚夺,狀態(tài)根據RPC調用情況切換,熔斷器根據持有狀態(tài)實例的不同末捣,行為也有所不同侠姑。

熔斷器關閉狀態(tài)策略
<?php
//Swoft\Sg\Circuit\CloseState.php 
/**
 * close狀態(tài)的熔斷器,對所有RPC調用都通過協(xié)程客戶端發(fā)送到RPC服務器
 *  關閉狀態(tài)及切換
 * 1. 重置failCounter=0 successCount=0
 * 2. 操作失敗箩做,failCounter計數(shù)
 * 3. 操作失敗一定計數(shù)莽红,切換為open開啟狀態(tài)
 */
class CloseState extends CircuitBreakerState
{
    /**
     * 熔斷器調用
     *
     * @param mixed $callback 回調函數(shù)
     * @param array $params 參數(shù)
     * @param mixed $fallback 失敗回調
     *
     * @return mixed 返回結果
     */
    public function doCall($callback, $params = [], $fallback = null)
    {
        list($class, $method) = $callback;

        try {
            if ($class == null) {
                throw new \Exception($this->circuitBreaker->serviceName . "服務,連接建立失敗(null)");
            }

            if ($class instanceof Client && $class->isConnected() == false) {
                throw new \Exception($this->circuitBreaker->serviceName . "服務,當前連接已斷開");
            }
            //調用swoole協(xié)程客戶端的send()方法發(fā)送數(shù)據
            $data = $class->$method(...$params);
        } catch (\Exception $e) {
            //遞增失敗計數(shù)
            if ($this->circuitBreaker->isClose()) {
                $this->circuitBreaker->incFailCount();
            }

            App::error($this->circuitBreaker->serviceName . "服務,當前[關閉狀態(tài)]邦邦,服務端調用失敗安吁,開始服務降級容錯處理,error=" . $e->getMessage());
            //RPC調用失敗則使用降級接口 
            $data = $this->circuitBreaker->fallback($fallback);
        }
        
        //失敗次數(shù)過線則切換狀態(tài)
        $failCount = $this->circuitBreaker->getFailCounter();
        $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount();
        if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) {
            App::trace($this->circuitBreaker->serviceName . "服務燃辖,當前[關閉狀態(tài)]鬼店,服務失敗次數(shù)達到上限,開始切換為開啟狀態(tài)黔龟,failCount=" . $failCount);
            $this->circuitBreaker->switchToOpenState();
        }

        App::trace($this->circuitBreaker->serviceName . "服務妇智,當前[關閉狀態(tài)],failCount=" . $this->circuitBreaker->getFailCounter());
        return $data;
    }
}
熔斷器開啟狀態(tài)策略
<?php
\\Swoft\Sg\Circuit\OpenState .php;
/**
 * open狀態(tài)的熔斷器氏身,對所有RPC調用都使用降級句柄代替
 * 開啟狀態(tài)及切換(open)
 * 1. 重置failCounter=0 successCounter=0
 * 2. 請求立即返回錯誤響應
 * 3. 定時器一定時間后切換為半開狀態(tài)(open)
 */
class OpenState extends CircuitBreakerState
{
    /**
     * 熔斷器調用
     *
     * @param mixed $callback 回調函數(shù)
     * @param array $params 參數(shù)
     * @param mixed $fallback 失敗回調
     *
     * @return mixed 返回結果
     */
    public function doCall($callback, $params = [], $fallback = null)
    {
        $data = $this->circuitBreaker->fallback();

        App::trace($this->getServiceName() . "服務巍棱,當前[開啟狀態(tài)],執(zhí)行服務fallback服務降級容錯處理");
        $nowTime = time();

        if ($this->circuitBreaker->isOpen()
            && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime()
        ) {
            $delayTime = $this->circuitBreaker->getDelaySwitchTimer();

            // swoole定時器不是嚴格的蛋欣,3s容錯時間 航徙,定時切換狀態(tài)的半開
            $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3;
            App::getTimer()->addAfterTimer('openState', $delayTime, [$this, 'delayCallback']);
            $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime);

            App::trace($this->getServiceName() . "服務,當前[開啟狀態(tài)]陷虎,創(chuàng)建延遲觸發(fā)器捉偏,一段時間后狀態(tài)切換為半開狀態(tài)");
        }

        return $data;
    }

}

熔斷器半開狀態(tài)策略

半開熔斷器是熔斷器關閉狀態(tài)和熔斷器開啟狀態(tài)的過度狀態(tài)倒得,半開熔斷器的所有RPC調用都是加鎖的,連續(xù)成功或者連續(xù)失敗到閾值后會切換到關閉狀態(tài)或者開啟狀態(tài)夭禽,代碼類似霞掺,此處不再累述,有興趣的讀者可以自行研究讹躯。

Swoft源碼剖析系列目錄:http://www.reibang.com/p/2f679e0b4d58

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末菩彬,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子潮梯,更是在濱河造成了極大的恐慌骗灶,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秉馏,死亡現(xiàn)場離奇詭異耙旦,居然都是意外死亡,警方通過查閱死者的電腦和手機萝究,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門免都,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人帆竹,你說我怎么就攤上這事绕娘。” “怎么了栽连?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵险领,是天一觀的道長。 經常有香客問我秒紧,道長绢陌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任熔恢,我火速辦了婚禮脐湾,結果婚禮上,老公的妹妹穿的比我還像新娘绩聘。我一直安慰自己沥割,他們只是感情好耗啦,可當我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布凿菩。 她就那樣靜靜地躺著,像睡著了一般帜讲。 火紅的嫁衣襯著肌膚如雪衅谷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天似将,我揣著相機與錄音获黔,去河邊找鬼蚀苛。 笑死,一個胖子當著我的面吹牛玷氏,可吹牛的內容都是我干的堵未。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼盏触,長吁一口氣:“原來是場噩夢啊……” “哼渗蟹!你這毒婦竟也來了?” 一聲冷哼從身側響起赞辩,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤雌芽,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后辨嗽,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體世落,經...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡梭依,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年父泳,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斜姥。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡篮灼,死狀恐怖忘古,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情诅诱,我是刑警寧澤髓堪,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站娘荡,受9級特大地震影響干旁,放射性物質發(fā)生泄漏。R本人自食惡果不足惜炮沐,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一争群、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧大年,春花似錦换薄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至垦缅,卻和暖如春冲泥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工凡恍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留志秃,地道東北人。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓嚼酝,卻偏偏與公主長得像浮还,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子闽巩,可洞房花燭夜當晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內容