diff options
Diffstat (limited to 'src/mongo/db/query/plan_executor_impl.cpp')
| -rw-r--r-- | src/mongo/db/query/plan_executor_impl.cpp | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index b8b31dbac62..53d00097fd1 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -41,9 +41,9 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/multi_plan.h" -#include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/subplan.h" @@ -709,14 +709,20 @@ bool PlanExecutorImpl::isDetached() const { return _currentState == kDetached; } -Timestamp PlanExecutorImpl::getLatestOplogTimestamp() { - if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) - return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); +Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getLatestOplogTimestamp(); if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp(); return Timestamp(); } +BSONObj PlanExecutorImpl::getPostBatchResumeToken() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getPostBatchResumeToken(); + return {}; +} + Status PlanExecutorImpl::getMemberObjectStatus(const BSONObj& memberObj) const { return WorkingSetCommon::getMemberObjectStatus(memberObj); } |
