grpc python 源碼分析(2):server 處理請求

grpc python 源碼分為三部分:python——cython——c++ , 本系列文章分析的是 python 部分代碼污它,其它部分不涉及(其實是我看不懂??)

版本:1.24.3

  • 接受請求
    首先來看 上期文章 中提到的接受請求線程
def _serve(state):
    while True:
        timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
        event = state.completion_queue.poll(timeout)
        if state.server_deallocated:
            _begin_shutdown_once(state)
        if event.completion_type != cygrpc.CompletionType.queue_timeout:
            if not _process_event_and_continue(state, event):
                return
        # We want to force the deletion of the previous event
        # ~before~ we poll again; if the event has a reference
        # to a shutdown Call object, this can induce spinlock.
        event = None

負責處理請求的是 _process_event_and_continue 函數(shù),在之前做了一些超時的判斷,決定是否處理這個請求把篓。
下面是 _process_event_and_continue 函數(shù)的代碼

def _process_event_and_continue(state, event):
    should_continue = True
    if event.tag is _SHUTDOWN_TAG:
       ...
    elif event.tag is _REQUEST_CALL_TAG:
        with state.lock:
            state.due.remove(_REQUEST_CALL_TAG)
            concurrency_exceeded = (
                state.maximum_concurrent_rpcs is not None and
                state.active_rpc_count >= state.maximum_concurrent_rpcs)

            rpc_state, rpc_future = _handle_call(
                event, state.generic_handlers, state.interceptor_pipeline,
                state.thread_pool, concurrency_exceeded)  # 我們只關(guān)注這一段代碼

            if rpc_state is not None:
                state.rpc_states.add(rpc_state)
            if rpc_future is not None:
                state.active_rpc_count += 1
                rpc_future.add_done_callback(
                    lambda unused_future: _on_call_completed(state))
            if state.stage is _ServerStage.STARTED:
                _request_call(state)
            elif _stop_serving(state):
                should_continue = False
    else:
        ...
    return should_continue

繼續(xù)往下走

def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
                 concurrency_exceeded):
    if not rpc_event.success:
        return None, None
    if rpc_event.call_details.method is not None:
        try:
            method_handler = _find_method_handler(rpc_event, generic_handlers,
                                                  interceptor_pipeline)
        except Exception as exception:  # pylint: disable=broad-except
            details = 'Exception servicing handler: {}'.format(exception)
            _LOGGER.exception(details)
            return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
                               b'Error in service handler!'), None
        if method_handler is None:
            return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
                               b'Method not found!'), None
        elif concurrency_exceeded:
            return _reject_rpc(rpc_event, cygrpc.StatusCode.resource_exhausted,
                               b'Concurrent RPC limit exceeded!'), None
        else:
            return _handle_with_method_handler(rpc_event, method_handler,
                                               thread_pool)
    else:
        return None, None

這里我們主要關(guān)注 _find_method_handler_handle_with_method_handler

_find_method_handler 負責獲取請求對應的接口方法帜平,例如下面這個 SayHello 方法

class Greeter(helloworld_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

其實瀑焦,這就是一個字典映射蔫浆,鍵為 接口名,值為 接口方法

def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):

    def query_handlers(handler_call_details):
        for generic_handler in generic_handlers:
            method_handler = generic_handler.service(handler_call_details)
            if method_handler is not None:
                return method_handler
        return None

    handler_call_details = _HandlerCallDetails(
        _common.decode(rpc_event.call_details.method),
        rpc_event.invocation_metadata)

    if interceptor_pipeline is not None:
        ...
    else:
        return query_handlers(handler_call_details)

# 上面代碼中的 generic_handler 類型
class DictionaryGenericHandler(grpc.ServiceRpcHandler):

    def __init__(self, service, method_handlers):
        self._name = service
        self._method_handlers = {
            _common.fully_qualified_method(service, method): method_handler
            for method, method_handler in six.iteritems(method_handlers)
        }  # 這是個字典推導

    def service(self, handler_call_details):
        return self._method_handlers.get(handler_call_details.method)

繼續(xù)另一個函數(shù) _handle_with_method_handler

def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
    state = _RPCState()
    with state.condition:
        rpc_event.call.start_server_batch(
            (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
            _receive_close_on_server(state))
        state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
        if method_handler.request_streaming:
            if method_handler.response_streaming:
                return state, _handle_stream_stream(rpc_event, state,
                                                    method_handler, thread_pool)
            else:
                return state, _handle_stream_unary(rpc_event, state,
                                                   method_handler, thread_pool)
        else:
            if method_handler.response_streaming:
                return state, _handle_unary_stream(rpc_event, state,
                                                   method_handler, thread_pool)
            else:
                return state, _handle_unary_unary(rpc_event, state,
                                                  method_handler, thread_pool)

這里的四個分支對應了四種rpc調(diào)用 灸眼,詳情可以看 gRPC 官方文檔
我們看最簡單的——請求和響應都一次性發(fā)送的 _handle_unary_unary

def _handle_unary_unary(rpc_event, state, method_handler, default_thread_pool):
    unary_request = _unary_request(rpc_event, state,
                                   method_handler.request_deserializer)  # 請求反序列化卧檐,得到我們定義的請求對象
    thread_pool = _select_thread_pool_for_behavior(method_handler.unary_unary,
                                                   default_thread_pool)
    return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
                              method_handler.unary_unary, unary_request,
                              method_handler.request_deserializer,
                              method_handler.response_serializer)

嗯,把任務提交給了線程池處理
繼續(xù)向下挖

def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
                            request_deserializer, response_serializer):
    cygrpc.install_context_from_request_call_event(rpc_event)
    try:
        argument = argument_thunk()
        if argument is not None:
            response, proceed = _call_behavior(rpc_event, state, behavior,
                                               argument, request_deserializer)
            if proceed:
                serialized_response = _serialize_response(
                    rpc_event, state, response, response_serializer)
                if serialized_response is not None:
                    _status(rpc_event, state, serialized_response)  # 發(fā)送響應
    finally:
        cygrpc.uninstall_context()

我們看到了 _call_behavior 函數(shù)焰宣,就是這個函數(shù)調(diào)用了我們寫得接口方法 ;-)

def _call_behavior(rpc_event,
                   state,
                   behavior,
                   argument,
                   request_deserializer,
                   send_response_callback=None):
    from grpc import _create_servicer_context
    with _create_servicer_context(rpc_event, state,
                                  request_deserializer) as context:
        try:
            response_or_iterator = None
            if send_response_callback is not None:
                response_or_iterator = behavior(argument, context,
                                                send_response_callback)
            else:
                response_or_iterator = behavior(argument, context)
            return response_or_iterator, True
        except Exception as exception:  # pylint: disable=broad-except
            ...
            return None, False

里面的 behavior 就是我們的接口方法

對于之前提及的 helloworld 例子來說霉囚, behavior 就是下面這樣(pycharm debug 得到的值)
<bound method Greeter.SayHello of <main.Greeter object at 0x0000024672710108>

還有一個工作要做,那就是發(fā)送響應匕积,這個則是在 _unary_response_in_pool 中調(diào)用 _status 函數(shù)

def _status(rpc_event, state, serialized_response):
    with state.condition:
        if state.client is not _CANCELLED:
            code = _completion_code(state)
            details = _details(state)
            operations = [
                cygrpc.SendStatusFromServerOperation(
                    state.trailing_metadata, code, details, _EMPTY_FLAGS),
            ]
            if state.initial_metadata_allowed:
                operations.append(_get_initial_metadata_operation(state, None))
            if serialized_response is not None:
                operations.append(
                    cygrpc.SendMessageOperation(
                        serialized_response,
                        _get_send_message_op_flags_from_state(state)))
            rpc_event.call.start_server_batch(
                operations,
                _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
            state.statused = True
            _reset_per_message_state(state)
            state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN)

在這里盈罐,python 只是做了一點微不足道的工作,主要工作是 c++ core 負責的

  • 總結(jié)


    temp.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末闪唆,一起剝皮案震驚了整個濱河市盅粪,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌悄蕾,老刑警劉巖票顾,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異帆调,居然都是意外死亡奠骄,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門番刊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來含鳞,“玉大人,你說我怎么就攤上這事芹务〔醣粒” “怎么了?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵枣抱,是天一觀的道長熔吗。 經(jīng)常有香客問我,道長佳晶,這世上最難降的妖魔是什么磁滚? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上垂攘,老公的妹妹穿的比我還像新娘。我一直安慰自己淤刃,他們只是感情好晒他,可當我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著逸贾,像睡著了一般陨仅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上铝侵,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天灼伤,我揣著相機與錄音,去河邊找鬼咪鲜。 笑死狐赡,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的疟丙。 我是一名探鬼主播颖侄,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼享郊!你這毒婦竟也來了览祖?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤炊琉,失蹤者是張志新(化名)和其女友劉穎展蒂,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體苔咪,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡锰悼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了悼泌。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片松捉。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖馆里,靈堂內(nèi)的尸體忽然破棺而出隘世,到底是詐尸還是另有隱情,我是刑警寧澤鸠踪,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布丙者,位于F島的核電站,受9級特大地震影響营密,放射性物質(zhì)發(fā)生泄漏械媒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纷捞。 院中可真熱鬧痢虹,春花似錦、人聲如沸主儡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽岔冀。三九已至忠烛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間寂汇,已是汗流浹背病往。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留骄瓣,地道東北人停巷。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像累贤,于是被迫代替她去往敵國和親叠穆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Lua 5.1 參考手冊 by Roberto Ierusalimschy, Luiz Henrique de F...
    蘇黎九歌閱讀 13,810評論 0 38
  • (目前有點亂臼膏,先貼上來硼被,等以后有時間在整理吧。這個問題一直想拿出來分享渗磅,還有兩個博客嚷硫,都是相關(guān)的,一點點發(fā)出來) ...
    kamiSDY閱讀 4,379評論 0 2
  • 求佛佛不應始鱼,拈花花不開仔掸,抽刀斷水流。 問佛信什么医清?佛口微微笑起暮,吾求自身佛。 為人身正念会烙,何須求他佛负懦,心正吾即佛。
    俗人安安閱讀 560評論 0 7
  • 本來想用個一個標題黨"春季必不可少的一道菜"柏腻,勾引讀者進來的纸厉,可這樣做又覺得有點不要臉。還是算了五嫂。我的生活感悟我記...
    微光綺夢閱讀 787評論 8 5
  • 我們希望自己再瘦一點则吟、腿再長一點、腰再細一點锄蹂,而這些其實一條對的連衣裙可以幫你做到逾滥,不信?那就別廢話败匹,往下看! 全...
    拍范閱讀 172評論 0 1