diff options
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
| -rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 5febd7f14fc..6562dcc6fbc 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,7 +41,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/exec/pipeline_proxy.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" @@ -155,8 +155,12 @@ bool handleCursorCommand(OperationContext* opCtx, } if (state == PlanExecutor::IS_EOF) { + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. responseBuilder.setLatestOplogTimestamp( cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken( + cursor->getExecutor()->getPostBatchResumeToken()); if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; @@ -176,7 +180,10 @@ bool handleCursorCommand(OperationContext* opCtx, break; } + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken()); responseBuilder.append(next); } @@ -578,8 +585,9 @@ Status runAggregate(OperationContext* opCtx, for (size_t idx = 0; idx < pipelines.size(); ++idx) { // Transfer ownership of the Pipeline to the PipelineProxyStage. auto ws = make_unique<WorkingSet>(); - auto proxy = - make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); + auto proxy = liteParsedPipeline.hasChangeStream() + ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()) + : make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); // This PlanExecutor will simply forward requests to the Pipeline, so does not need to // yield or to be registered with any collection's CursorManager to receive |
