在MongoTemplate中使用帶merge的聚合操作時(shí)柑潦,發(fā)現(xiàn)返回了into表的全表查詢結(jié)果帕棉。實(shí)際使用merge通常是要進(jìn)行聚合插入,不需要查詢結(jié)果秦叛。
去除返回結(jié)果需要給Aggregation添加skipOutput選項(xiàng)蚕愤,具體如下:
@Test
void mergeTest() throws InterruptedException {
// 添加skipOutput去除merge返回結(jié)果
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("a").is("b")),
Aggregation.merge().intoCollection("newCollection").on("_id").build()
).withOptions(AggregationOptions.builder().skipOutput().build());
// 下面兩種均適用
reactiveMongoTemplate.aggregate(aggregation, "collection", Document.class)
.subscribe(document -> System.out.println(document));
List<Document> mappedResults = mongoTemplate.aggregate(aggregation, "collection", Document.class)
.getMappedResults();
}
正常使用數(shù)據(jù)庫(kù)時(shí)答恶,聚合操作中使用merge并不會(huì)返回查詢結(jié)果,造成返回into表查詢結(jié)果的原因是template在merge和out聚合操作后添加了find操作萍诱,即進(jìn)行了兩次請(qǐng)求
添加的位置分別在AggregateIterableImpl和AsyncAggregateIterableImpl中
class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResult> implements AggregateIterable<TResult> {
public ReadOperation<BatchCursor<TResult>> asReadOperation() {
MongoNamespace outNamespace = getOutNamespace();
// 此處判斷是否含有merge或out
if (outNamespace != null) {
getExecutor().execute(operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation,
hint, comment, aggregationLevel), getReadConcern(), getClientSession());
FindOptions findOptions = new FindOptions().collation(collation);
Integer batchSize = getBatchSize();
if (batchSize != null) {
findOptions.batchSize(batchSize);
}
// 此處添加findOptions
return operations.find(outNamespace, new BsonDocument(), resultClass, findOptions);
} else {
return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, getBatchSize(), collation,
hint, comment, allowDiskUse, aggregationLevel);
}
}
}
class AsyncAggregateIterableImpl<TDocument, TResult> extends AsyncMongoIterableImpl<TResult> implements AsyncAggregateIterable<TResult> {
AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
MongoNamespace outNamespace = getOutNamespace();
// 此處判斷是否含有merge或out
if (outNamespace != null) {
AsyncWriteOperation<Void> aggregateToCollectionOperation =
operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment,
aggregationLevel);
FindOptions findOptions = new FindOptions().collation(collation);
Integer batchSize = getBatchSize();
if (batchSize != null) {
findOptions.batchSize(batchSize);
}
AsyncReadOperation<AsyncBatchCursor<TResult>> findOperation =
operations.find(outNamespace, new BsonDocument(), resultClass, findOptions);
// 此處添加findOptions
return new WriteOperationThenCursorReadOperation<TResult>(aggregateToCollectionOperation, findOperation);
} else {
return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, getBatchSize(), collation,
hint, comment, allowDiskUse, aggregationLevel);
}
}
}