基于GraphQl-JAVA-TOOLS 5.5.2
把GraphQL的完整流程分為三個部分:schema解析层亿、GraphlQLSchema裝載和執(zhí)行query(這里用query代指query、mutation诗箍、subscription)。其中的schema解析和GraphlQLSchema裝載屬于啟動階段挽唉,執(zhí)行query屬于運行階段滤祖。
一、schema解析
我們在構(gòu)建 GraphQLSchema 時會通過 SchemaParserBuilder 去build一個 SchemaParser :
/**
* Build the parser with the supplied schema and dictionary.
*/
fun build() = SchemaParser(scan(), options, runtimeWiringBuilder.build())
這里的 scan() 方法最終會去執(zhí)行 parseDocuments() 瓶籽。GraphQL的schema解析使用了antlr匠童,這是一個語法分析工具,它先根據(jù)預先定義的規(guī)則進行詞法分析塑顺,把傳入的文本解析成一個個token汤求,然后將這些token構(gòu)建成一個樹俏险。
public Document parseDocument(String input, String sourceName) {
CharStream charStream;
if(sourceName == null) {
charStream = CharStreams.fromString(input);
} else{
charStream = CharStreams.fromString(input, sourceName);
}
// 構(gòu)建詞法分析器
GraphqlLexer lexer = new GraphqlLexer(charStream);
CommonTokenStream tokens = new CommonTokenStream(lexer);
GraphqlParser parser = new GraphqlParser(tokens);
parser.removeErrorListeners();
parser.getInterpreter().setPredictionMode(PredictionMode.SLL);
parser.setErrorHandler(new BailErrorStrategy());
// 用定義的詞法分析規(guī)則進行解析,得到tokens
GraphqlParser.DocumentContext documentContext = parser.document();
GraphqlAntlrToLanguage antlrToLanguage = new GraphqlAntlrToLanguage(tokens);
// 將tokens構(gòu)建成一棵分析樹
Document doc = antlrToLanguage.createDocument(documentContext);
// 校驗tokens
Token stop = documentContext.getStop();
List<Token> allTokens = tokens.getTokens();
if (stop != null && allTokens != null && !allTokens.isEmpty()) {
Token last = allTokens.get(allTokens.size() - 1);
//
// do we have more tokens in the stream than we consumed in the parse?
// if yes then its invalid. We make sure its the same channel
boolean notEOF = last.getType() != Token.EOF;
boolean lastGreaterThanDocument = last.getTokenIndex() > stop.getTokenIndex();
boolean sameChannel = last.getChannel() == stop.getChannel();
if (notEOF && lastGreaterThanDocument && sameChannel) {
throw new ParseCancellationException("There are more tokens in the query that have not been consumed");
}
}
return doc;
}
其中詞法分析的結(jié)果tokens是這樣的:
最終得到的分析樹是這樣的:
其中 FieldDefinition 就是schema解析后的內(nèi)存模型扬绪。
二竖独、GraphlQLSchema裝載
上一步完成了schema的解析,第二步是把分析樹按照GraphQL的規(guī)則進行組裝挤牛。組裝的操作在上一篇GraphQL(五):GraphQL身份認證中已有介紹 GraphQLObjectType 的組裝莹痢,在 SchemaParser 中還有其他類型(GraphQLInputObjectType、GraphQLEnumType墓赴、GraphQLInterfaceType竞膳、GraphQLUnionType等 )的組裝,組裝邏輯的入口在 parseSchemaObjects():
fun parseSchemaObjects(): SchemaObjects {
// Create GraphQL objects
val interfaces = interfaceDefinitions.map { createInterfaceObject(it) }
val objects = objectDefinitions.map { createObject(it, interfaces) }
val unions = unionDefinitions.map { createUnionObject(it, objects) }
val inputObjects = inputObjectDefinitions.map { createInputObject(it) }
val enums = enumDefinitions.map { createEnumObject(it) }
// Assign type resolver to interfaces now that we know all of the object types
interfaces.forEach { (it.typeResolver as TypeResolverProxy).typeResolver = InterfaceTypeResolver(dictionary.inverse(), it, objects) }
unions.forEach { (it.typeResolver as TypeResolverProxy).typeResolver = UnionTypeResolver(dictionary.inverse(), it, objects) }
// Find query type and mutation/subscription type (if mutation/subscription type exists)
val queryName = rootInfo.getQueryName()
val mutationName = rootInfo.getMutationName()
val subscriptionName = rootInfo.getSubscriptionName()
val query = objects.find { it.name == queryName }
?: throw SchemaError("Expected a Query object with name '$queryName' but found none!")
val mutation = objects.find { it.name == mutationName }
?: if (rootInfo.isMutationRequired()) throw SchemaError("Expected a Mutation object with name '$mutationName' but found none!") else null
val subscription = objects.find { it.name == subscriptionName }
?: if (rootInfo.isSubscriptionRequired()) throw SchemaError("Expected a Subscription object with name '$subscriptionName' but found none!") else null
return SchemaObjects(query, mutation, subscription, (objects + inputObjects + enums + interfaces + unions).toSet())
}
其中的 SchemaObjects 就是 GraphQLSchema 構(gòu)造器的參數(shù)诫硕,不同類型的組裝邏輯大同小異顶猜,這里就不啰嗦了。
啟動過程簡單圖例:
三痘括、執(zhí)行query
query的執(zhí)行是在運行時长窄,我們通過統(tǒng)一的入口 GraphQL 執(zhí)行客戶端傳過來的schema,GraphQL提供了兩個類 ExecutionInput 和 ExecutionResult 用于包裝輸入和輸出纲菌,GraphQL在執(zhí)行查詢時通過 CompletableFuture 來實現(xiàn)異步挠日,其執(zhí)行的核心代碼如下:
public CompletableFuture<ExecutionResult> executeAsync(ExecutionInput executionInput) {
try {
// 創(chuàng)建InstrumentationState對象,這是一個跟蹤Instrumentation全生命周期的對象
InstrumentationState instrumentationState = instrumentation.createState(new InstrumentationCreateStateParameters(this.graphQLSchema, executionInput));
// 對ExecutionInput進行攔截
InstrumentationExecutionParameters inputInstrumentationParameters = new InstrumentationExecutionParameters(executionInput, this.graphQLSchema, instrumentationState);
executionInput = instrumentation.instrumentExecutionInput(executionInput, inputInstrumentationParameters);
// 執(zhí)行前攔截
InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(executionInput, this.graphQLSchema, instrumentationState);
InstrumentationContext<ExecutionResult> executionInstrumentation = instrumentation.beginExecution(instrumentationParameters);
// 執(zhí)行前攔截
GraphQLSchema graphQLSchema = instrumentation.instrumentSchema(this.graphQLSchema, instrumentationParameters);
// 對客戶端傳遞的query進行驗證并執(zhí)行
CompletableFuture<ExecutionResult> executionResult = parseValidateAndExecute(executionInput, graphQLSchema, instrumentationState);
executionResult = executionResult.whenComplete(executionInstrumentation::onCompleted);
// 對執(zhí)行結(jié)果進行攔截
executionResult = executionResult.thenCompose(result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters));
return executionResult;
} catch (AbortExecutionException abortException) {
return CompletableFuture.completedFuture(abortException.toExecutionResult());
}
}
我們跳過省略掉驗證部分直接看最終的執(zhí)行翰舌,執(zhí)行最終在 Execution 類中完成:
private CompletableFuture<ExecutionResult> executeOperation(ExecutionContext executionContext, InstrumentationExecutionParameters instrumentationExecutionParameters, Object root, OperationDefinition operationDefinition) {
// ...
CompletableFuture<ExecutionResult> result;
try {
ExecutionStrategy executionStrategy;
if (operation == OperationDefinition.Operation.MUTATION) {
executionStrategy = mutationStrategy;
} else if (operation == SUBSCRIPTION) {
executionStrategy = subscriptionStrategy;
} else {
executionStrategy = queryStrategy;
}
log.debug("Executing '{}' query operation: '{}' using '{}' execution strategy", executionContext.getExecutionId(), operation, executionStrategy.getClass().getName());
result = executionStrategy.execute(executionContext, parameters);
} catch (NonNullableFieldWasNullException e) {
// ...
}
// ...
return deferSupport(executionContext, result);
}
省略掉若干代碼后我們關(guān)注 executionStrategy.execute(executionContext, parameters); 這句嚣潜,這里有一個執(zhí)行策略,這是我們在開發(fā)時可能會用到的椅贱。除了已廢棄的幾種執(zhí)行策略懂算,當前版本提供了兩種執(zhí)行策略: AsyncExecutionStrategy 和 AsyncSerialExecutionStrategy。前者是并行執(zhí)行屬性值獲取庇麦,后者是串行執(zhí)行屬性值獲取计技。
AsyncExecutionStrategy 關(guān)鍵代碼:
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
// ...
Map<String, List<Field>> fields = parameters.getFields();
List<String> fieldNames = new ArrayList<>(fields.keySet());
List<CompletableFuture<FieldValueInfo>> futures = new ArrayList<>();
List<String> resolvedFields = new ArrayList<>();
for (String fieldName : fieldNames) {
List<Field> currentField = fields.get(fieldName);
ExecutionPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
ExecutionStrategyParameters newParameters = parameters
.transform(builder -> builder.field(currentField).path(fieldPath).parent(parameters));
if (isDeferred(executionContext, newParameters, currentField)) {
executionStrategyCtx.onDeferredField(currentField);
continue;
}
resolvedFields.add(fieldName);
CompletableFuture<FieldValueInfo> future = resolveFieldWithInfo(executionContext, newParameters);
futures.add(future);
}
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
executionStrategyCtx.onDispatched(overallResult);
// 并行執(zhí)行所有future
Async.each(futures).whenComplete((completeValueInfos, throwable) -> {
BiConsumer<List<ExecutionResult>, Throwable> handleResultsConsumer = handleResults(executionContext, resolvedFields, overallResult);
if (throwable != null) {
handleResultsConsumer.accept(null, throwable.getCause());
return;
}
List<CompletableFuture<ExecutionResult>> executionResultFuture = completeValueInfos.stream().map(FieldValueInfo::getFieldValue).collect(Collectors.toList());
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
Async.each(executionResultFuture).whenComplete(handleResultsConsumer);
}).exceptionally((ex) -> {
// if there are any issues with combining/handling the field results,
// complete the future at all costs and bubble up any thrown exception so
// the execution does not hang.
overallResult.completeExceptionally(ex);
return null;
});
overallResult.whenComplete(executionStrategyCtx::onCompleted);
return overallResult;
}
AsyncSerialExecutionStrategy 關(guān)鍵代碼:
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
CompletableFuture<List<ExecutionResult>> resultsFuture = Async.eachSequentially(fieldNames, (fieldName, index, prevResults) -> {
List<Field> currentField = fields.get(fieldName);
ExecutionPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
ExecutionStrategyParameters newParameters = parameters
.transform(builder -> builder.field(currentField).path(fieldPath));
return resolveField(executionContext, newParameters);
});
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
executionStrategyCtx.onDispatched(overallResult);
resultsFuture.whenComplete(handleResults(executionContext, fieldNames, overallResult));
overallResult.whenComplete(executionStrategyCtx::onCompleted);
return overallResult;
}
// eachSequentially最終會調(diào)用下面的方法排隊執(zhí)行屬性值獲取
private static <T, U> void eachSequentiallyImpl(Iterator<T> iterator, CFFactory<T, U> cfFactory, int index, List<U> tmpResult, CompletableFuture<List<U>> overallResult) {
if (!iterator.hasNext()) {
overallResult.complete(tmpResult);
return;
}
CompletableFuture<U> cf;
try {
cf = cfFactory.apply(iterator.next(), index, tmpResult);
Assert.assertNotNull(cf, "cfFactory must return a non null value");
} catch (Exception e) {
cf = new CompletableFuture<>();
cf.completeExceptionally(new CompletionException(e));
}
cf.whenComplete((cfResult, exception) -> {
if (exception != null) {
overallResult.completeExceptionally(exception);
return;
}
// 上一個屬性值獲取完后再執(zhí)行下一個屬性值的獲取
tmpResult.add(cfResult);
eachSequentiallyImpl(iterator, cfFactory, index + 1, tmpResult, overallResult);
});
}