diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 187 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_coll_stats.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_coll_stats.idl | 7 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view.h | 10 |
5 files changed, 180 insertions, 124 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index e08e6b09e30..6cb86f131da 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -656,6 +656,95 @@ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createLegacyEx return execs; } +Status runAggregateOnView(OperationContext* opCtx, + const NamespaceString& origNss, + const AggregateCommandRequest& request, + const MultipleCollectionAccessor& collections, + boost::optional<std::unique_ptr<CollatorInterface>> collatorToUse, + const ViewDefinition* view, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::shared_ptr<const CollectionCatalog> catalog, + const PrivilegeVector& privileges, + CurOp* curOp, + rpc::ReplyBuilderInterface* result, + const std::function<void(void)>& resetContextFn) { + auto nss = request.getNamespace(); + checkCollectionUUIDMismatch( + opCtx, nss, collections.getMainCollection(), request.getCollectionUUID()); + + uassert(ErrorCodes::CommandNotSupportedOnView, + "mapReduce on a view is not supported", + !request.getIsMapReduceCommand()); + + // Check that the default collation of 'view' is compatible with the operation's + // collation. The check is skipped if the request did not specify a collation. + if (!request.getCollation().get_value_or(BSONObj()).isEmpty()) { + invariant(collatorToUse); // Should already be resolved at this point. + if (!CollatorInterface::collatorsMatch(view->defaultCollator(), collatorToUse->get()) && + !view->timeseries()) { + + return {ErrorCodes::OptionNotSupportedOnView, + "Cannot override a view's default collation"}; + } + } + + // Queries on timeseries views may specify non-default collation whereas queries + // on all other types of views must match the default collator (the collation use + // to originally create that collections). Thus in the case of operations on TS + // views, we use the request's collation. + auto timeSeriesCollator = view->timeseries() ? request.getCollation() : boost::none; + + auto resolvedView = + uassertStatusOK(view_catalog_helpers::resolveView(opCtx, catalog, nss, timeSeriesCollator)); + + // With the view & collation resolved, we can relinquish locks. + resetContextFn(); + + // Set this operation's shard version for the underlying collection to unsharded. + // This is prerequisite for future shard versioning checks. + boost::optional<ScopedSetShardRole> scopeSetShardRole; + if (!serverGlobalParams.clusterRole.has(ClusterRole::None)) { + scopeSetShardRole.emplace(opCtx, + resolvedView.getNamespace(), + ShardVersion::UNSHARDED() /* shardVersion */, + boost::none /* databaseVersion */); + }; + uassert(std::move(resolvedView), + "Explain of a resolved view must be executed by mongos", + !ShardingState::get(opCtx)->enabled() || !request.getExplain()); + + // Parse the resolved view into a new aggregation request. + auto newRequest = resolvedView.asExpandedViewAggregation(request); + auto newCmd = aggregation_request_helper::serializeToCommandObj(newRequest); + + auto status{Status::OK()}; + try { + status = runAggregate(opCtx, origNss, newRequest, newCmd, privileges, result); + } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) { + // Since we expect the view to be UNSHARDED, if we reached to this point there are + // two possibilities: + // 1. The shard doesn't know what its shard version/state is and needs to recover + // it (in which case we throw so that the shard can run recovery) + // 2. The collection references by the view is actually SHARDED, in which case the + // router must execute it + if (const auto staleInfo{ex.extraInfo<StaleConfigInfo>()}) { + uassert(std::move(resolvedView), + "Resolved views on sharded collections must be executed by mongos", + !staleInfo->getVersionWanted()); + } + throw; + } + + { + // Set the namespace of the curop back to the view namespace so ctx records + // stats on this view namespace on destruction. + stdx::lock_guard<Client> lk(*opCtx->getClient()); + curOp->setNS_inlock(nss); + } + + return status; +} + } // namespace Status runAggregate(OperationContext* opCtx, @@ -887,86 +976,24 @@ Status runAggregate(OperationContext* opCtx, // recursively calling runAggregate(), which will re-acquire locks on the underlying // collection. (The lock must be released because recursively acquiring locks on the // database will prohibit yielding.) - if (ctx && ctx->getView() && !liteParsedPipeline.startsWithCollStats()) { - invariant(nss != NamespaceString::kRsOplogNamespace); - invariant(!nss.isCollectionlessAggregateNS()); - - checkCollectionUUIDMismatch( - opCtx, nss, collections.getMainCollection(), request.getCollectionUUID()); - - uassert(ErrorCodes::CommandNotSupportedOnView, - "mapReduce on a view is not supported", - !request.getIsMapReduceCommand()); - - // Check that the default collation of 'view' is compatible with the operation's - // collation. The check is skipped if the request did not specify a collation. - if (!request.getCollation().get_value_or(BSONObj()).isEmpty()) { - invariant(collatorToUse); // Should already be resolved at this point. - if (!CollatorInterface::collatorsMatch(ctx->getView()->defaultCollator(), - collatorToUse->get()) && - !ctx->getView()->timeseries()) { - - return {ErrorCodes::OptionNotSupportedOnView, - "Cannot override a view's default collation"}; - } - } - - // Queries on timeseries views may specify non-default collation whereas queries - // on all other types of views must match the default collator (the collation use - // to originally create that collections). Thus in the case of operations on TS - // views, we use the request's collation. - auto timeSeriesCollator = - ctx->getView()->timeseries() ? request.getCollation() : boost::none; - - auto resolvedView = uassertStatusOK( - view_catalog_helpers::resolveView(opCtx, catalog, nss, timeSeriesCollator)); - - // With the view & collation resolved, we can relinquish locks. - resetContext(); - - // Set this operation's shard version for the underlying collection to unsharded. - // This is prerequisite for future shard versioning checks. - boost::optional<ScopedSetShardRole> scopeSetShardRole; - if (!serverGlobalParams.clusterRole.has(ClusterRole::None)) { - scopeSetShardRole.emplace(opCtx, - resolvedView.getNamespace(), - ShardVersion::UNSHARDED() /* shardVersion */, - boost::none /* databaseVersion */); - }; - uassert(std::move(resolvedView), - "Explain of a resolved view must be executed by mongos", - !ShardingState::get(opCtx)->enabled() || !request.getExplain()); - - // Parse the resolved view into a new aggregation request. - auto newRequest = resolvedView.asExpandedViewAggregation(request); - auto newCmd = aggregation_request_helper::serializeToCommandObj(newRequest); - - auto status{Status::OK()}; - try { - status = runAggregate(opCtx, origNss, newRequest, newCmd, privileges, result); - } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) { - // Since we expect the view to be UNSHARDED, if we reached to this point there are - // two possibilities: - // 1. The shard doesn't know what its shard version/state is and needs to recover - // it (in which case we throw so that the shard can run recovery) - // 2. The collection references by the view is actually SHARDED, in which case the - // router must execute it - if (const auto staleInfo{ex.extraInfo<StaleConfigInfo>()}) { - uassert(std::move(resolvedView), - "Resolved views on sharded collections must be executed by mongos", - !staleInfo->getVersionWanted()); - } - throw; - } - - { - // Set the namespace of the curop back to the view namespace so ctx records - // stats on this view namespace on destruction. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp->setNS_inlock(nss); - } - - return status; + // We do not need to expand the view pipeline when there is a $collStats stage, as + // $collStats is supported on a view namespace. For a time-series collection, however, the + // view is abstracted out for the users, so we needed to resolve the namespace to get the + // underlying bucket collection. + if (ctx && ctx->getView() && + (!liteParsedPipeline.startsWithCollStats() || ctx->getView()->timeseries())) { + return runAggregateOnView(opCtx, + origNss, + request, + collections, + std::move(collatorToUse), + ctx->getView(), + expCtx, + catalog, + privileges, + curOp, + result, + resetContext); } // If collectionUUID was provided, verify the collection exists and has the expected UUID. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 6e8efcca9aa..96fd6d1c48a 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -90,8 +90,11 @@ BSONObj DocumentSourceCollStats::makeStatsForNs( builder.appendDate("localTime", jsTime()); if (auto latencyStatsSpec = spec.getLatencyStats()) { + // getRequestOnTimeseriesView is set to true if collstats is called on the view. + auto resolvedNss = + spec.getRequestOnTimeseriesView() ? nss.getTimeseriesViewNamespace() : nss; expCtx->mongoProcessInterface->appendLatencyStats( - expCtx->opCtx, nss, latencyStatsSpec->getHistograms(), &builder); + expCtx->opCtx, resolvedNss, latencyStatsSpec->getHistograms(), &builder); } if (auto storageStats = spec.getStorageStats()) { diff --git a/src/mongo/db/pipeline/document_source_coll_stats.idl b/src/mongo/db/pipeline/document_source_coll_stats.idl index e0babb11ef6..18efd4d2c7f 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.idl +++ b/src/mongo/db/pipeline/document_source_coll_stats.idl @@ -39,7 +39,7 @@ structs: LatencyStatsSpec: description: Represents the 'latencyStats' argument to the $collStats stage. strict: true - fields: + fields: histograms: description: Adds latency histogram information to the embedded documents in latencyStats if true. type: optionalBool @@ -67,3 +67,8 @@ structs: validator: callback: validateObjectIsEmpty optional: true + $_requestOnTimeseriesView: + description: When set to true, $collStats stage requests statistics from the view namespace. + When set to false, $collStats stage requests statistics from the underlying collection. + cpp_name: requestOnTimeseriesView + type: optionalBool diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp index ec3a7ae8a49..2fd2b4a17cc 100644 --- a/src/mongo/db/views/resolved_view.cpp +++ b/src/mongo/db/views/resolved_view.cpp @@ -33,6 +33,7 @@ #include "mongo/base/init.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document_source_coll_stats.h" #include "mongo/db/pipeline/document_source_index_stats.h" #include "mongo/db/pipeline/document_source_internal_convert_bucket_index_stats.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" @@ -133,45 +134,47 @@ std::shared_ptr<const ErrorExtraInfo> ResolvedView::parse(const BSONObj& cmdRepl return std::make_shared<ResolvedView>(fromBSON(cmdReply)); } -AggregateCommandRequest ResolvedView::asExpandedViewAggregation( - const AggregateCommandRequest& request) const { - // Perform the aggregation on the resolved namespace. The new pipeline consists of two parts: - // first, 'pipeline' in this ResolvedView; then, the pipeline in 'request'. - std::vector<BSONObj> resolvedPipeline; - resolvedPipeline.reserve(_pipeline.size() + request.getPipeline().size()); - resolvedPipeline.insert(resolvedPipeline.end(), _pipeline.begin(), _pipeline.end()); - resolvedPipeline.insert( - resolvedPipeline.end(), request.getPipeline().begin(), request.getPipeline().end()); - - // $indexStats needs special handling for time-series-collections. Normally for a regular read, - // $_internalUnpackBucket unpacks the buckets entries into time-series document format and then - // passes the time-series documents on through the pipeline. Instead we need to read the buckets - // collection's index stats unmodified and then pass the results through an additional stage to - // specially convert them to the time-series collection's schema, and then onward. There is no - // need for the $_internalUnpackBucket stage with $indexStats, so we remove it. - if (resolvedPipeline.size() >= 2 && - resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal] && - resolvedPipeline[1][DocumentSourceIndexStats::kStageName]) { - // Clear the $_internalUnpackBucket stage. - auto unpackStage = resolvedPipeline[0]; - resolvedPipeline[0] = resolvedPipeline[1]; - - // Grab the $_internalUnpackBucket stage's time-series collection schema options and pass - // them into the $_internalConvertBucketIndexStats stage to use for schema conversion. - BSONObjBuilder builder; - for (const auto& elem : - unpackStage[DocumentSourceInternalUnpackBucket::kStageNameInternal].Obj()) { - if (elem.fieldNameStringData() == timeseries::kTimeFieldName || - elem.fieldNameStringData() == timeseries::kMetaFieldName) { +void ResolvedView::handleTimeseriesRewrites(std::vector<BSONObj>* resolvedPipeline) const { + // Stages that are constrained to be the first stage of the pipeline ($collStats, $indexStats) + // require special handling since $_internalUnpackBucket is the first stage. + if (resolvedPipeline->size() >= 2 && + (*resolvedPipeline)[0][DocumentSourceInternalUnpackBucket::kStageNameInternal] && + ((*resolvedPipeline)[1][DocumentSourceIndexStats::kStageName] || + (*resolvedPipeline)[1][DocumentSourceCollStats::kStageName])) { + // Normally for a regular read, $_internalUnpackBucket unpacks the buckets entries into + // time-series document format and then passes the time-series documents on through the + // pipeline. Instead, for $indexStats, we need to read the buckets collection's index + // stats unmodified and then pass the results through an additional stage to specially + // convert them to the time-series collection's schema, and then onward. We grab the + // $_internalUnpackBucket stage's time-series collection schema options and pass them + // into the $_internalConvertBucketIndexStats stage to use for schema conversion. + if ((*resolvedPipeline)[1][DocumentSourceIndexStats::kStageName]) { + auto unpackStage = (*resolvedPipeline)[0]; + (*resolvedPipeline)[0] = (*resolvedPipeline)[1]; + BSONObjBuilder builder; + for (const auto& elem : + unpackStage[DocumentSourceInternalUnpackBucket::kStageNameInternal].Obj()) { + if (elem.fieldNameStringData() == timeseries::kTimeFieldName || + elem.fieldNameStringData() == timeseries::kMetaFieldName) { + builder.append(elem); + } + } + (*resolvedPipeline)[1] = + BSON(DocumentSourceInternalConvertBucketIndexStats::kStageName << builder.obj()); + } else { + auto collStatsStage = (*resolvedPipeline)[1]; + BSONObjBuilder builder; + for (const auto& elem : collStatsStage[DocumentSourceCollStats::kStageName].Obj()) { builder.append(elem); } + builder.append("$_requestOnTimeseriesView", true); + (*resolvedPipeline)[1] = BSON(DocumentSourceCollStats::kStageName << builder.obj()); + // For $collStats, we directly read the collection stats from the buckets + // collection, and skip $_internalUnpackBucket. + resolvedPipeline->erase(resolvedPipeline->begin()); } - - resolvedPipeline[1] = - BSON(DocumentSourceInternalConvertBucketIndexStats::kStageName << builder.obj()); - } else if (resolvedPipeline.size() >= 1 && - resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal]) { - auto unpackStage = resolvedPipeline[0]; + } else { + auto unpackStage = (*resolvedPipeline)[0]; BSONObjBuilder builder; for (const auto& elem : @@ -184,11 +187,27 @@ AggregateCommandRequest ResolvedView::asExpandedViewAggregation( builder.append(DocumentSourceInternalUnpackBucket::kUsesExtendedRange, ((_timeseriesUsesExtendedRange && *_timeseriesUsesExtendedRange))); - resolvedPipeline[0] = + (*resolvedPipeline)[0] = BSON(DocumentSourceInternalUnpackBucket::kStageNameInternal << builder.obj()); } +} + +AggregateCommandRequest ResolvedView::asExpandedViewAggregation( + const AggregateCommandRequest& request) const { + // Perform the aggregation on the resolved namespace. The new pipeline consists of two parts: + // first, 'pipeline' in this ResolvedView; then, the pipeline in 'request'. + std::vector<BSONObj> resolvedPipeline; + resolvedPipeline.reserve(_pipeline.size() + request.getPipeline().size()); + resolvedPipeline.insert(resolvedPipeline.end(), _pipeline.begin(), _pipeline.end()); + resolvedPipeline.insert( + resolvedPipeline.end(), request.getPipeline().begin(), request.getPipeline().end()); + + if (resolvedPipeline.size() >= 1 && + resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal]) { + handleTimeseriesRewrites(&resolvedPipeline); + } - AggregateCommandRequest expandedRequest{_namespace, resolvedPipeline}; + AggregateCommandRequest expandedRequest{_namespace, std::move(resolvedPipeline)}; if (request.getExplain()) { expandedRequest.setExplain(request.getExplain()); diff --git a/src/mongo/db/views/resolved_view.h b/src/mongo/db/views/resolved_view.h index 82238556288..bdceb93ac8e 100644 --- a/src/mongo/db/views/resolved_view.h +++ b/src/mongo/db/views/resolved_view.h @@ -59,6 +59,8 @@ public: static ResolvedView fromBSON(const BSONObj& commandResponseObj); + void handleTimeseriesRewrites(std::vector<BSONObj>* resolvedPipeline) const; + /** * Convert an aggregation command on a view to the equivalent command against the view's * underlying collection. @@ -91,12 +93,12 @@ private: NamespaceString _namespace; std::vector<BSONObj> _pipeline; - // The default collation associated with this view. An empty object means that the default is - // the simple collation. + // The default collation associated with this view. An empty object means that the default + // is the simple collation. // // Currently all operations which run over a view must use the default collation. This means - // that operations on the view which do not specify a collation inherit the default. Operations - // on the view which specify any other collation fail with a user error. + // that operations on the view which do not specify a collation inherit the default. + // Operations on the view which specify any other collation fail with a user error. BSONObj _defaultCollation; boost::optional<TimeseriesOptions> _timeseriesOptions; |