summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
1 files changed, 11 insertions, 3 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 5febd7f14fc..6562dcc6fbc 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,7 +41,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/exec/pipeline_proxy.h"
+#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
@@ -155,8 +155,12 @@ bool handleCursorCommand(OperationContext* opCtx,
}
if (state == PlanExecutor::IS_EOF) {
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
responseBuilder.setLatestOplogTimestamp(
cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(
+ cursor->getExecutor()->getPostBatchResumeToken());
if (!cursor->isTailable()) {
// make it an obvious error to use cursor or executor after this point
cursor = nullptr;
@@ -176,7 +180,10 @@ bool handleCursorCommand(OperationContext* opCtx,
break;
}
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken());
responseBuilder.append(next);
}
@@ -578,8 +585,9 @@ Status runAggregate(OperationContext* opCtx,
for (size_t idx = 0; idx < pipelines.size(); ++idx) {
// Transfer ownership of the Pipeline to the PipelineProxyStage.
auto ws = make_unique<WorkingSet>();
- auto proxy =
- make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
+ auto proxy = liteParsedPipeline.hasChangeStream()
+ ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipelines[idx]), ws.get())
+ : make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
// This PlanExecutor will simply forward requests to the Pipeline, so does not need to
// yield or to be registered with any collection's CursorManager to receive