summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdityavardhan Agrawal <adi.agrawal@mongodb.com>2023-04-17 13:55:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-17 14:46:20 +0000
commit01beeb4c2895153914158b1b5691c8c8cd60356a (patch)
tree9cbf16f84ba3176aafc4d930a9b116ea4b7016cc /src
parentf8968230334cca0e504035188445697fcc8088cf (diff)
downloadmongo-01beeb4c2895153914158b1b5691c8c8cd60356a.tar.gz
SERVER-72686: Support $collStats in agg pipeline for timeseries collections
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp187
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.idl7
-rw-r--r--src/mongo/db/views/resolved_view.cpp95
-rw-r--r--src/mongo/db/views/resolved_view.h10
5 files changed, 180 insertions, 124 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index e08e6b09e30..6cb86f131da 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -656,6 +656,95 @@ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createLegacyEx
return execs;
}
+Status runAggregateOnView(OperationContext* opCtx,
+ const NamespaceString& origNss,
+ const AggregateCommandRequest& request,
+ const MultipleCollectionAccessor& collections,
+ boost::optional<std::unique_ptr<CollatorInterface>> collatorToUse,
+ const ViewDefinition* view,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::shared_ptr<const CollectionCatalog> catalog,
+ const PrivilegeVector& privileges,
+ CurOp* curOp,
+ rpc::ReplyBuilderInterface* result,
+ const std::function<void(void)>& resetContextFn) {
+ auto nss = request.getNamespace();
+ checkCollectionUUIDMismatch(
+ opCtx, nss, collections.getMainCollection(), request.getCollectionUUID());
+
+ uassert(ErrorCodes::CommandNotSupportedOnView,
+ "mapReduce on a view is not supported",
+ !request.getIsMapReduceCommand());
+
+ // Check that the default collation of 'view' is compatible with the operation's
+ // collation. The check is skipped if the request did not specify a collation.
+ if (!request.getCollation().get_value_or(BSONObj()).isEmpty()) {
+ invariant(collatorToUse); // Should already be resolved at this point.
+ if (!CollatorInterface::collatorsMatch(view->defaultCollator(), collatorToUse->get()) &&
+ !view->timeseries()) {
+
+ return {ErrorCodes::OptionNotSupportedOnView,
+ "Cannot override a view's default collation"};
+ }
+ }
+
+ // Queries on timeseries views may specify non-default collation whereas queries
+ // on all other types of views must match the default collator (the collation use
+ // to originally create that collections). Thus in the case of operations on TS
+ // views, we use the request's collation.
+ auto timeSeriesCollator = view->timeseries() ? request.getCollation() : boost::none;
+
+ auto resolvedView =
+ uassertStatusOK(view_catalog_helpers::resolveView(opCtx, catalog, nss, timeSeriesCollator));
+
+ // With the view & collation resolved, we can relinquish locks.
+ resetContextFn();
+
+ // Set this operation's shard version for the underlying collection to unsharded.
+ // This is prerequisite for future shard versioning checks.
+ boost::optional<ScopedSetShardRole> scopeSetShardRole;
+ if (!serverGlobalParams.clusterRole.has(ClusterRole::None)) {
+ scopeSetShardRole.emplace(opCtx,
+ resolvedView.getNamespace(),
+ ShardVersion::UNSHARDED() /* shardVersion */,
+ boost::none /* databaseVersion */);
+ };
+ uassert(std::move(resolvedView),
+ "Explain of a resolved view must be executed by mongos",
+ !ShardingState::get(opCtx)->enabled() || !request.getExplain());
+
+ // Parse the resolved view into a new aggregation request.
+ auto newRequest = resolvedView.asExpandedViewAggregation(request);
+ auto newCmd = aggregation_request_helper::serializeToCommandObj(newRequest);
+
+ auto status{Status::OK()};
+ try {
+ status = runAggregate(opCtx, origNss, newRequest, newCmd, privileges, result);
+ } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
+ // Since we expect the view to be UNSHARDED, if we reached to this point there are
+ // two possibilities:
+ // 1. The shard doesn't know what its shard version/state is and needs to recover
+ // it (in which case we throw so that the shard can run recovery)
+ // 2. The collection references by the view is actually SHARDED, in which case the
+ // router must execute it
+ if (const auto staleInfo{ex.extraInfo<StaleConfigInfo>()}) {
+ uassert(std::move(resolvedView),
+ "Resolved views on sharded collections must be executed by mongos",
+ !staleInfo->getVersionWanted());
+ }
+ throw;
+ }
+
+ {
+ // Set the namespace of the curop back to the view namespace so ctx records
+ // stats on this view namespace on destruction.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ curOp->setNS_inlock(nss);
+ }
+
+ return status;
+}
+
} // namespace
Status runAggregate(OperationContext* opCtx,
@@ -887,86 +976,24 @@ Status runAggregate(OperationContext* opCtx,
// recursively calling runAggregate(), which will re-acquire locks on the underlying
// collection. (The lock must be released because recursively acquiring locks on the
// database will prohibit yielding.)
- if (ctx && ctx->getView() && !liteParsedPipeline.startsWithCollStats()) {
- invariant(nss != NamespaceString::kRsOplogNamespace);
- invariant(!nss.isCollectionlessAggregateNS());
-
- checkCollectionUUIDMismatch(
- opCtx, nss, collections.getMainCollection(), request.getCollectionUUID());
-
- uassert(ErrorCodes::CommandNotSupportedOnView,
- "mapReduce on a view is not supported",
- !request.getIsMapReduceCommand());
-
- // Check that the default collation of 'view' is compatible with the operation's
- // collation. The check is skipped if the request did not specify a collation.
- if (!request.getCollation().get_value_or(BSONObj()).isEmpty()) {
- invariant(collatorToUse); // Should already be resolved at this point.
- if (!CollatorInterface::collatorsMatch(ctx->getView()->defaultCollator(),
- collatorToUse->get()) &&
- !ctx->getView()->timeseries()) {
-
- return {ErrorCodes::OptionNotSupportedOnView,
- "Cannot override a view's default collation"};
- }
- }
-
- // Queries on timeseries views may specify non-default collation whereas queries
- // on all other types of views must match the default collator (the collation use
- // to originally create that collections). Thus in the case of operations on TS
- // views, we use the request's collation.
- auto timeSeriesCollator =
- ctx->getView()->timeseries() ? request.getCollation() : boost::none;
-
- auto resolvedView = uassertStatusOK(
- view_catalog_helpers::resolveView(opCtx, catalog, nss, timeSeriesCollator));
-
- // With the view & collation resolved, we can relinquish locks.
- resetContext();
-
- // Set this operation's shard version for the underlying collection to unsharded.
- // This is prerequisite for future shard versioning checks.
- boost::optional<ScopedSetShardRole> scopeSetShardRole;
- if (!serverGlobalParams.clusterRole.has(ClusterRole::None)) {
- scopeSetShardRole.emplace(opCtx,
- resolvedView.getNamespace(),
- ShardVersion::UNSHARDED() /* shardVersion */,
- boost::none /* databaseVersion */);
- };
- uassert(std::move(resolvedView),
- "Explain of a resolved view must be executed by mongos",
- !ShardingState::get(opCtx)->enabled() || !request.getExplain());
-
- // Parse the resolved view into a new aggregation request.
- auto newRequest = resolvedView.asExpandedViewAggregation(request);
- auto newCmd = aggregation_request_helper::serializeToCommandObj(newRequest);
-
- auto status{Status::OK()};
- try {
- status = runAggregate(opCtx, origNss, newRequest, newCmd, privileges, result);
- } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
- // Since we expect the view to be UNSHARDED, if we reached to this point there are
- // two possibilities:
- // 1. The shard doesn't know what its shard version/state is and needs to recover
- // it (in which case we throw so that the shard can run recovery)
- // 2. The collection references by the view is actually SHARDED, in which case the
- // router must execute it
- if (const auto staleInfo{ex.extraInfo<StaleConfigInfo>()}) {
- uassert(std::move(resolvedView),
- "Resolved views on sharded collections must be executed by mongos",
- !staleInfo->getVersionWanted());
- }
- throw;
- }
-
- {
- // Set the namespace of the curop back to the view namespace so ctx records
- // stats on this view namespace on destruction.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp->setNS_inlock(nss);
- }
-
- return status;
+ // We do not need to expand the view pipeline when there is a $collStats stage, as
+ // $collStats is supported on a view namespace. For a time-series collection, however, the
+ // view is abstracted out for the users, so we needed to resolve the namespace to get the
+ // underlying bucket collection.
+ if (ctx && ctx->getView() &&
+ (!liteParsedPipeline.startsWithCollStats() || ctx->getView()->timeseries())) {
+ return runAggregateOnView(opCtx,
+ origNss,
+ request,
+ collections,
+ std::move(collatorToUse),
+ ctx->getView(),
+ expCtx,
+ catalog,
+ privileges,
+ curOp,
+ result,
+ resetContext);
}
// If collectionUUID was provided, verify the collection exists and has the expected UUID.
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 6e8efcca9aa..96fd6d1c48a 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -90,8 +90,11 @@ BSONObj DocumentSourceCollStats::makeStatsForNs(
builder.appendDate("localTime", jsTime());
if (auto latencyStatsSpec = spec.getLatencyStats()) {
+ // getRequestOnTimeseriesView is set to true if collstats is called on the view.
+ auto resolvedNss =
+ spec.getRequestOnTimeseriesView() ? nss.getTimeseriesViewNamespace() : nss;
expCtx->mongoProcessInterface->appendLatencyStats(
- expCtx->opCtx, nss, latencyStatsSpec->getHistograms(), &builder);
+ expCtx->opCtx, resolvedNss, latencyStatsSpec->getHistograms(), &builder);
}
if (auto storageStats = spec.getStorageStats()) {
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.idl b/src/mongo/db/pipeline/document_source_coll_stats.idl
index e0babb11ef6..18efd4d2c7f 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.idl
+++ b/src/mongo/db/pipeline/document_source_coll_stats.idl
@@ -39,7 +39,7 @@ structs:
LatencyStatsSpec:
description: Represents the 'latencyStats' argument to the $collStats stage.
strict: true
- fields:
+ fields:
histograms:
description: Adds latency histogram information to the embedded documents in latencyStats if true.
type: optionalBool
@@ -67,3 +67,8 @@ structs:
validator:
callback: validateObjectIsEmpty
optional: true
+ $_requestOnTimeseriesView:
+ description: When set to true, $collStats stage requests statistics from the view namespace.
+ When set to false, $collStats stage requests statistics from the underlying collection.
+ cpp_name: requestOnTimeseriesView
+ type: optionalBool
diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp
index ec3a7ae8a49..2fd2b4a17cc 100644
--- a/src/mongo/db/views/resolved_view.cpp
+++ b/src/mongo/db/views/resolved_view.cpp
@@ -33,6 +33,7 @@
#include "mongo/base/init.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/document_source_coll_stats.h"
#include "mongo/db/pipeline/document_source_index_stats.h"
#include "mongo/db/pipeline/document_source_internal_convert_bucket_index_stats.h"
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
@@ -133,45 +134,47 @@ std::shared_ptr<const ErrorExtraInfo> ResolvedView::parse(const BSONObj& cmdRepl
return std::make_shared<ResolvedView>(fromBSON(cmdReply));
}
-AggregateCommandRequest ResolvedView::asExpandedViewAggregation(
- const AggregateCommandRequest& request) const {
- // Perform the aggregation on the resolved namespace. The new pipeline consists of two parts:
- // first, 'pipeline' in this ResolvedView; then, the pipeline in 'request'.
- std::vector<BSONObj> resolvedPipeline;
- resolvedPipeline.reserve(_pipeline.size() + request.getPipeline().size());
- resolvedPipeline.insert(resolvedPipeline.end(), _pipeline.begin(), _pipeline.end());
- resolvedPipeline.insert(
- resolvedPipeline.end(), request.getPipeline().begin(), request.getPipeline().end());
-
- // $indexStats needs special handling for time-series-collections. Normally for a regular read,
- // $_internalUnpackBucket unpacks the buckets entries into time-series document format and then
- // passes the time-series documents on through the pipeline. Instead we need to read the buckets
- // collection's index stats unmodified and then pass the results through an additional stage to
- // specially convert them to the time-series collection's schema, and then onward. There is no
- // need for the $_internalUnpackBucket stage with $indexStats, so we remove it.
- if (resolvedPipeline.size() >= 2 &&
- resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal] &&
- resolvedPipeline[1][DocumentSourceIndexStats::kStageName]) {
- // Clear the $_internalUnpackBucket stage.
- auto unpackStage = resolvedPipeline[0];
- resolvedPipeline[0] = resolvedPipeline[1];
-
- // Grab the $_internalUnpackBucket stage's time-series collection schema options and pass
- // them into the $_internalConvertBucketIndexStats stage to use for schema conversion.
- BSONObjBuilder builder;
- for (const auto& elem :
- unpackStage[DocumentSourceInternalUnpackBucket::kStageNameInternal].Obj()) {
- if (elem.fieldNameStringData() == timeseries::kTimeFieldName ||
- elem.fieldNameStringData() == timeseries::kMetaFieldName) {
+void ResolvedView::handleTimeseriesRewrites(std::vector<BSONObj>* resolvedPipeline) const {
+ // Stages that are constrained to be the first stage of the pipeline ($collStats, $indexStats)
+ // require special handling since $_internalUnpackBucket is the first stage.
+ if (resolvedPipeline->size() >= 2 &&
+ (*resolvedPipeline)[0][DocumentSourceInternalUnpackBucket::kStageNameInternal] &&
+ ((*resolvedPipeline)[1][DocumentSourceIndexStats::kStageName] ||
+ (*resolvedPipeline)[1][DocumentSourceCollStats::kStageName])) {
+ // Normally for a regular read, $_internalUnpackBucket unpacks the buckets entries into
+ // time-series document format and then passes the time-series documents on through the
+ // pipeline. Instead, for $indexStats, we need to read the buckets collection's index
+ // stats unmodified and then pass the results through an additional stage to specially
+ // convert them to the time-series collection's schema, and then onward. We grab the
+ // $_internalUnpackBucket stage's time-series collection schema options and pass them
+ // into the $_internalConvertBucketIndexStats stage to use for schema conversion.
+ if ((*resolvedPipeline)[1][DocumentSourceIndexStats::kStageName]) {
+ auto unpackStage = (*resolvedPipeline)[0];
+ (*resolvedPipeline)[0] = (*resolvedPipeline)[1];
+ BSONObjBuilder builder;
+ for (const auto& elem :
+ unpackStage[DocumentSourceInternalUnpackBucket::kStageNameInternal].Obj()) {
+ if (elem.fieldNameStringData() == timeseries::kTimeFieldName ||
+ elem.fieldNameStringData() == timeseries::kMetaFieldName) {
+ builder.append(elem);
+ }
+ }
+ (*resolvedPipeline)[1] =
+ BSON(DocumentSourceInternalConvertBucketIndexStats::kStageName << builder.obj());
+ } else {
+ auto collStatsStage = (*resolvedPipeline)[1];
+ BSONObjBuilder builder;
+ for (const auto& elem : collStatsStage[DocumentSourceCollStats::kStageName].Obj()) {
builder.append(elem);
}
+ builder.append("$_requestOnTimeseriesView", true);
+ (*resolvedPipeline)[1] = BSON(DocumentSourceCollStats::kStageName << builder.obj());
+ // For $collStats, we directly read the collection stats from the buckets
+ // collection, and skip $_internalUnpackBucket.
+ resolvedPipeline->erase(resolvedPipeline->begin());
}
-
- resolvedPipeline[1] =
- BSON(DocumentSourceInternalConvertBucketIndexStats::kStageName << builder.obj());
- } else if (resolvedPipeline.size() >= 1 &&
- resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal]) {
- auto unpackStage = resolvedPipeline[0];
+ } else {
+ auto unpackStage = (*resolvedPipeline)[0];
BSONObjBuilder builder;
for (const auto& elem :
@@ -184,11 +187,27 @@ AggregateCommandRequest ResolvedView::asExpandedViewAggregation(
builder.append(DocumentSourceInternalUnpackBucket::kUsesExtendedRange,
((_timeseriesUsesExtendedRange && *_timeseriesUsesExtendedRange)));
- resolvedPipeline[0] =
+ (*resolvedPipeline)[0] =
BSON(DocumentSourceInternalUnpackBucket::kStageNameInternal << builder.obj());
}
+}
+
+AggregateCommandRequest ResolvedView::asExpandedViewAggregation(
+ const AggregateCommandRequest& request) const {
+ // Perform the aggregation on the resolved namespace. The new pipeline consists of two parts:
+ // first, 'pipeline' in this ResolvedView; then, the pipeline in 'request'.
+ std::vector<BSONObj> resolvedPipeline;
+ resolvedPipeline.reserve(_pipeline.size() + request.getPipeline().size());
+ resolvedPipeline.insert(resolvedPipeline.end(), _pipeline.begin(), _pipeline.end());
+ resolvedPipeline.insert(
+ resolvedPipeline.end(), request.getPipeline().begin(), request.getPipeline().end());
+
+ if (resolvedPipeline.size() >= 1 &&
+ resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal]) {
+ handleTimeseriesRewrites(&resolvedPipeline);
+ }
- AggregateCommandRequest expandedRequest{_namespace, resolvedPipeline};
+ AggregateCommandRequest expandedRequest{_namespace, std::move(resolvedPipeline)};
if (request.getExplain()) {
expandedRequest.setExplain(request.getExplain());
diff --git a/src/mongo/db/views/resolved_view.h b/src/mongo/db/views/resolved_view.h
index 82238556288..bdceb93ac8e 100644
--- a/src/mongo/db/views/resolved_view.h
+++ b/src/mongo/db/views/resolved_view.h
@@ -59,6 +59,8 @@ public:
static ResolvedView fromBSON(const BSONObj& commandResponseObj);
+ void handleTimeseriesRewrites(std::vector<BSONObj>* resolvedPipeline) const;
+
/**
* Convert an aggregation command on a view to the equivalent command against the view's
* underlying collection.
@@ -91,12 +93,12 @@ private:
NamespaceString _namespace;
std::vector<BSONObj> _pipeline;
- // The default collation associated with this view. An empty object means that the default is
- // the simple collation.
+ // The default collation associated with this view. An empty object means that the default
+ // is the simple collation.
//
// Currently all operations which run over a view must use the default collation. This means
- // that operations on the view which do not specify a collation inherit the default. Operations
- // on the view which specify any other collation fail with a user error.
+ // that operations on the view which do not specify a collation inherit the default.
+ // Operations on the view which specify any other collation fail with a user error.
BSONObj _defaultCollation;
boost::optional<TimeseriesOptions> _timeseriesOptions;