grpc python 源碼分為三部分:python——cython——c++ , 本系列文章分析的是 python 部分代碼污它,其它部分不涉及(其實是我看不懂??)
版本:1.24.3
- 接受請求
首先來看 上期文章 中提到的接受請求線程
def _serve(state):
while True:
timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
event = state.completion_queue.poll(timeout)
if state.server_deallocated:
_begin_shutdown_once(state)
if event.completion_type != cygrpc.CompletionType.queue_timeout:
if not _process_event_and_continue(state, event):
return
# We want to force the deletion of the previous event
# ~before~ we poll again; if the event has a reference
# to a shutdown Call object, this can induce spinlock.
event = None
負責處理請求的是 _process_event_and_continue
函數(shù),在之前做了一些超時的判斷,決定是否處理這個請求把篓。
下面是 _process_event_and_continue
函數(shù)的代碼
def _process_event_and_continue(state, event):
should_continue = True
if event.tag is _SHUTDOWN_TAG:
...
elif event.tag is _REQUEST_CALL_TAG:
with state.lock:
state.due.remove(_REQUEST_CALL_TAG)
concurrency_exceeded = (
state.maximum_concurrent_rpcs is not None and
state.active_rpc_count >= state.maximum_concurrent_rpcs)
rpc_state, rpc_future = _handle_call(
event, state.generic_handlers, state.interceptor_pipeline,
state.thread_pool, concurrency_exceeded) # 我們只關(guān)注這一段代碼
if rpc_state is not None:
state.rpc_states.add(rpc_state)
if rpc_future is not None:
state.active_rpc_count += 1
rpc_future.add_done_callback(
lambda unused_future: _on_call_completed(state))
if state.stage is _ServerStage.STARTED:
_request_call(state)
elif _stop_serving(state):
should_continue = False
else:
...
return should_continue
繼續(xù)往下走
def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
concurrency_exceeded):
if not rpc_event.success:
return None, None
if rpc_event.call_details.method is not None:
try:
method_handler = _find_method_handler(rpc_event, generic_handlers,
interceptor_pipeline)
except Exception as exception: # pylint: disable=broad-except
details = 'Exception servicing handler: {}'.format(exception)
_LOGGER.exception(details)
return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
b'Error in service handler!'), None
if method_handler is None:
return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
b'Method not found!'), None
elif concurrency_exceeded:
return _reject_rpc(rpc_event, cygrpc.StatusCode.resource_exhausted,
b'Concurrent RPC limit exceeded!'), None
else:
return _handle_with_method_handler(rpc_event, method_handler,
thread_pool)
else:
return None, None
這里我們主要關(guān)注 _find_method_handler
和 _handle_with_method_handler
_find_method_handler
負責獲取請求對應的接口方法帜平,例如下面這個 SayHello
方法
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
其實瀑焦,這就是一個字典映射蔫浆,鍵為 接口名,值為 接口方法
def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
def query_handlers(handler_call_details):
for generic_handler in generic_handlers:
method_handler = generic_handler.service(handler_call_details)
if method_handler is not None:
return method_handler
return None
handler_call_details = _HandlerCallDetails(
_common.decode(rpc_event.call_details.method),
rpc_event.invocation_metadata)
if interceptor_pipeline is not None:
...
else:
return query_handlers(handler_call_details)
# 上面代碼中的 generic_handler 類型
class DictionaryGenericHandler(grpc.ServiceRpcHandler):
def __init__(self, service, method_handlers):
self._name = service
self._method_handlers = {
_common.fully_qualified_method(service, method): method_handler
for method, method_handler in six.iteritems(method_handlers)
} # 這是個字典推導
def service(self, handler_call_details):
return self._method_handlers.get(handler_call_details.method)
繼續(xù)另一個函數(shù) _handle_with_method_handler
def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
state = _RPCState()
with state.condition:
rpc_event.call.start_server_batch(
(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_receive_close_on_server(state))
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
if method_handler.request_streaming:
if method_handler.response_streaming:
return state, _handle_stream_stream(rpc_event, state,
method_handler, thread_pool)
else:
return state, _handle_stream_unary(rpc_event, state,
method_handler, thread_pool)
else:
if method_handler.response_streaming:
return state, _handle_unary_stream(rpc_event, state,
method_handler, thread_pool)
else:
return state, _handle_unary_unary(rpc_event, state,
method_handler, thread_pool)
這里的四個分支對應了四種rpc調(diào)用 灸眼,詳情可以看 gRPC 官方文檔
我們看最簡單的——請求和響應都一次性發(fā)送的 _handle_unary_unary
def _handle_unary_unary(rpc_event, state, method_handler, default_thread_pool):
unary_request = _unary_request(rpc_event, state,
method_handler.request_deserializer) # 請求反序列化卧檐,得到我們定義的請求對象
thread_pool = _select_thread_pool_for_behavior(method_handler.unary_unary,
default_thread_pool)
return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
method_handler.unary_unary, unary_request,
method_handler.request_deserializer,
method_handler.response_serializer)
嗯,把任務提交給了線程池處理
繼續(xù)向下挖
def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
cygrpc.install_context_from_request_call_event(rpc_event)
try:
argument = argument_thunk()
if argument is not None:
response, proceed = _call_behavior(rpc_event, state, behavior,
argument, request_deserializer)
if proceed:
serialized_response = _serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_status(rpc_event, state, serialized_response) # 發(fā)送響應
finally:
cygrpc.uninstall_context()
我們看到了 _call_behavior
函數(shù)焰宣,就是這個函數(shù)調(diào)用了我們寫得接口方法 ;-)
def _call_behavior(rpc_event,
state,
behavior,
argument,
request_deserializer,
send_response_callback=None):
from grpc import _create_servicer_context
with _create_servicer_context(rpc_event, state,
request_deserializer) as context:
try:
response_or_iterator = None
if send_response_callback is not None:
response_or_iterator = behavior(argument, context,
send_response_callback)
else:
response_or_iterator = behavior(argument, context)
return response_or_iterator, True
except Exception as exception: # pylint: disable=broad-except
...
return None, False
里面的 behavior
就是我們的接口方法
對于之前提及的 helloworld 例子來說霉囚,
behavior
就是下面這樣(pycharm debug 得到的值)
<bound method Greeter.SayHello of <main.Greeter object at 0x0000024672710108>
還有一個工作要做,那就是發(fā)送響應匕积,這個則是在 _unary_response_in_pool
中調(diào)用 _status
函數(shù)
def _status(rpc_event, state, serialized_response):
with state.condition:
if state.client is not _CANCELLED:
code = _completion_code(state)
details = _details(state)
operations = [
cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, code, details, _EMPTY_FLAGS),
]
if state.initial_metadata_allowed:
operations.append(_get_initial_metadata_operation(state, None))
if serialized_response is not None:
operations.append(
cygrpc.SendMessageOperation(
serialized_response,
_get_send_message_op_flags_from_state(state)))
rpc_event.call.start_server_batch(
operations,
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
state.statused = True
_reset_per_message_state(state)
state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN)
在這里盈罐,python 只是做了一點微不足道的工作,主要工作是 c++ core
負責的
-
總結(jié)
temp.png