diff options
| author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-11-24 16:56:20 +0000 |
|---|---|---|
| committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-12-22 04:27:20 +0000 |
| commit | fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c (patch) | |
| tree | 75749d0e4ff2d9db2001252018cc91e78801bc44 | |
| parent | bdac7ced24f9ad8f9afac9c57e7184b1f2bf61b2 (diff) | |
| download | mongo-fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c.tar.gz | |
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
36 files changed, 559 insertions, 40 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index 91cab5a482b..359282efd22 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -7,6 +7,8 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index 1ccbe9f6220..ca48b021541 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -7,6 +7,8 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected to # work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml index 47e194262a7..01f0416f846 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml @@ -9,6 +9,8 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index 97e2b4bc922..4b3e9386942 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -6,6 +6,8 @@ selector: exclude_files: # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml index cf562eb83b8..85fbc704142 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml @@ -7,6 +7,8 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml index 1afe717c90b..cb82ce86c90 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml @@ -9,6 +9,8 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml index 654a7fb085d..9cea611e671 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml @@ -6,6 +6,8 @@ selector: exclude_files: # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml index f0672291ab7..abc46696da7 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml @@ -13,6 +13,8 @@ selector: - jstests/change_streams/include_cluster_time.js # Only relevant for single-collection change streams. - jstests/change_streams/metadata_notifications.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml index a8f0b58f6c3..3127b4d50d3 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml @@ -11,6 +11,8 @@ selector: - jstests/change_streams/report_latest_observed_oplog_timestamp.js # Only relevant for single-collection change streams. - jstests/change_streams/metadata_notifications.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml index 215229fc549..406255523c3 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml @@ -8,6 +8,8 @@ selector: - jstests/change_streams/report_latest_observed_oplog_timestamp.js # Only relevant for single-collection change streams. - jstests/change_streams/metadata_notifications.js + # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken. + - jstests/change_streams/report_post_batch_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js new file mode 100644 index 00000000000..2094f12246c --- /dev/null +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -0,0 +1,183 @@ +/** + * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken. + * @tags: [uses_transactions] + */ +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + // Drop and recreate collections to assure a clean run. + const collName = "report_post_batch_resume_token"; + const testCollection = assertDropAndRecreateCollection(db, collName); + const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); + const adminDB = db.getSiblingDB("admin"); + + // Helper function to return the next batch given an initial aggregate command response. + function runNextGetMore(initialCursorResponse) { + const getMoreCollName = initialCursorResponse.cursor.ns.substr( + initialCursorResponse.cursor.ns.indexOf('.') + 1); + return assert.commandWorked(testCollection.runCommand({ + getMore: initialCursorResponse.cursor.id, + collection: getMoreCollName, + batchSize: batchSize + })); + } + + let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate. + const batchSize = 2; + + // Test that postBatchResumeToken is present on empty initial aggregate batch. + let initialAggResponse = assert.commandWorked(testCollection.runCommand( + {aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}})); + + // Examine the response from the initial agg. It should have a postBatchResumeToken (PBRT), + // despite the fact that the batch is empty. + let initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken; + assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse)); + assert.eq(0, initialAggResponse.cursor.firstBatch.length); + + // Test that postBatchResumeToken is present on empty getMore batch. + let getMoreResponse = runNextGetMore(initialAggResponse); + let getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.neq(undefined, getMorePBRT, tojson(getMoreResponse)); + assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); + assert.eq(0, getMoreResponse.cursor.nextBatch.length); + + // Test that postBatchResumeToken advances with returned events. Insert one document into the + // collection and consume the resulting change stream event. + assert.commandWorked(testCollection.insert({_id: docId++})); + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + + // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal + // to the resume token of the last item in the batch and greater than the initial PBRT. + let resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, resumeTokenFromDoc); + assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); + + // Now seed the collection with enough documents to fit in two batches. + for (let i = 0; i < batchSize * 2; i++) { + assert.commandWorked(testCollection.insert({_id: docId++})); + } + + // Test that postBatchResumeToken is present on non-empty initial aggregate batch. + initialAggResponse = assert.commandWorked(testCollection.runCommand({ + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromDoc}}], + cursor: {batchSize: batchSize} + })); + // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the + // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it. + initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken; + assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse)); + assert.eq(batchSize, initialAggResponse.cursor.firstBatch.length); + assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); + + // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the + // observed postBatchResumeToken advanced. + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(batchSize, getMoreResponse.cursor.nextBatch.length); + + // The postBatchResumeToken is again equal to the final token in the batch, and greater than the + // PBRT from the initial response. + resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[batchSize - 1]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(resumeTokenFromDoc, getMorePBRT, tojson(getMoreResponse)); + assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); + + // Test that postBatchResumeToken advances with writes to an unrelated collection. First make + // sure there is nothing left in our cursor, and obtain the latest PBRT... + getMoreResponse = runNextGetMore(initialAggResponse); + let previousGetMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.neq(undefined, previousGetMorePBRT, tojson(getMoreResponse)); + assert.eq(getMoreResponse.cursor.nextBatch, []); + + // ... then test that it advances on an insert to an unrelated collection. + assert.commandWorked(otherCollection.insert({})); + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(0, getMoreResponse.cursor.nextBatch.length); + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + + // Insert two documents into the collection which are of the maximum BSON object size. + const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize; + assert.gt(bsonUserSizeLimit, 0); + for (let i = 0; i < 2; ++i) { + const docToInsert = {_id: docId++, padding: ""}; + docToInsert.padding = "a".repeat(bsonUserSizeLimit - Object.bsonsize(docToInsert)); + assert.commandWorked(testCollection.insert(docToInsert)); + } + + // Test that we return the correct postBatchResumeToken in the event that the batch hits the + // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result, + // because the second result cannot fit in the batch. + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + + // Verify that the postBatchResumeToken matches the last event actually added to the batch. + resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, resumeTokenFromDoc); + + // Now retrieve the second event and confirm that the PBRT matches its resume token. + previousGetMorePBRT = getMorePBRT; + getMoreResponse = runNextGetMore(initialAggResponse); + resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id; + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0); + assert.docEq(getMorePBRT, resumeTokenFromDoc); + + // Test that the PBRT is correctly updated when reading events from within a transaction. + const session = db.getMongo().startSession(); + const sessionDB = session.getDatabase(db.getName()); + + const sessionColl = sessionDB[testCollection.getName()]; + const sessionOtherColl = sessionDB[otherCollection.getName()]; + session.startTransaction(); + + // Write 3 documents to the test collection and 1 to the unrelated collection. + for (let i = 0; i < 3; ++i) { + assert.commandWorked(sessionColl.insert({_id: docId++})); + } + assert.commandWorked(sessionOtherColl.insert({_id: docId++})); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); + + // Grab the next 2 events, which should be the first 2 events in the transaction. + previousGetMorePBRT = getMorePBRT; + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(2, getMoreResponse.cursor.nextBatch.length); + + // The clusterTime should be the same on each, but the resume token keeps advancing. + const txnEvent1 = getMoreResponse.cursor.nextBatch[0], + txnEvent2 = getMoreResponse.cursor.nextBatch[1]; + const txnClusterTime = txnEvent1.clusterTime; + assert.eq(txnEvent2.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0); + assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0); + + // The PBRT of the first transaction batch is equal to the last document's resumeToken. + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, txnEvent2._id); + + // Now get the next batch. This contains the third and final transaction operation. + previousGetMorePBRT = getMorePBRT; + getMoreResponse = runNextGetMore(initialAggResponse); + assert.eq(1, getMoreResponse.cursor.nextBatch.length); + + // The clusterTime of this event is the same as the two events from the previous batch, but its + // resume token is greater than the previous PBRT. + const txnEvent3 = getMoreResponse.cursor.nextBatch[0]; + assert.eq(txnEvent3.clusterTime, txnClusterTime); + assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); + + // Because we wrote to the unrelated collection, the final event in the transaction does not + // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the + // last event in the batch, because the unrelated event is within the same transaction and + // therefore has the same clusterTime. + getMorePBRT = getMoreResponse.cursor.postBatchResumeToken; + assert.docEq(getMorePBRT, txnEvent3._id); +})(); diff --git a/src/mongo/base/uuid_test.cpp b/src/mongo/base/uuid_test.cpp index daf43f34d5a..242c7102900 100644 --- a/src/mongo/base/uuid_test.cpp +++ b/src/mongo/base/uuid_test.cpp @@ -190,5 +190,11 @@ TEST(UUIDTest, toBSONUsingBSONMacro) { ASSERT_BSONOBJ_EQ(expectedBson, bson); } +TEST(UUIDTest, NilUUID) { + // Test that UUID::nil() returns an all-zero UUID. + auto nilUUID = UUID::parse("00000000-0000-0000-0000-000000000000"); + ASSERT_EQUALS(UUID::nil(), unittest::assertGet(nilUUID)); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 7402004d524..8dd72d0c5ff 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1080,6 +1080,7 @@ env.Library( 'exec/and_hash.cpp', 'exec/and_sorted.cpp', 'exec/cached_plan.cpp', + 'exec/change_stream_proxy.cpp', 'exec/collection_scan.cpp', 'exec/count.cpp', 'exec/count_scan.cpp', diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e137a982ee7..ea669c4a209 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -233,8 +233,10 @@ public: // As soon as we get a result, this operation no longer waits. awaitDataState(opCtx).shouldWaitForInserts = false; - // Add result to output buffer. + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken()); nextBatch->append(obj); (*numResults)++; } @@ -259,7 +261,10 @@ public: case PlanExecutor::IS_EOF: // This causes the reported latest oplog timestamp to advance even when there // are no results for this particular query. + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp()); + nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken()); default: return Status::OK(); } diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 5febd7f14fc..6562dcc6fbc 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,7 +41,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/exec/pipeline_proxy.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" @@ -155,8 +155,12 @@ bool handleCursorCommand(OperationContext* opCtx, } if (state == PlanExecutor::IS_EOF) { + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. responseBuilder.setLatestOplogTimestamp( cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken( + cursor->getExecutor()->getPostBatchResumeToken()); if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; @@ -176,7 +180,10 @@ bool handleCursorCommand(OperationContext* opCtx, break; } + // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the + // postBatchResumeToken until the former is removed in a future release. responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp()); + responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken()); responseBuilder.append(next); } @@ -578,8 +585,9 @@ Status runAggregate(OperationContext* opCtx, for (size_t idx = 0; idx < pipelines.size(); ++idx) { // Transfer ownership of the Pipeline to the PipelineProxyStage. auto ws = make_unique<WorkingSet>(); - auto proxy = - make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); + auto proxy = liteParsedPipeline.hasChangeStream() + ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()) + : make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get()); // This PlanExecutor will simply forward requests to the Pipeline, so does not need to // yield or to be registered with any collection's CursorManager to receive diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp new file mode 100644 index 00000000000..5af30b4b5a3 --- /dev/null +++ b/src/mongo/db/exec/change_stream_proxy.cpp @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/change_stream_proxy.h" + +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/pipeline/resume_token.h" + +namespace mongo { + +const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY"; + +ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws) + : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) { + invariant(std::any_of( + _pipeline->getSources().begin(), _pipeline->getSources().end(), [](const auto& stage) { + return stage->constraints().isChangeStreamStage(); + })); +} + +boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() { + if (auto next = _pipeline->getNext()) { + // While we have more results to return, we track both the timestamp and the resume token of + // the latest event observed in the oplog, the latter via its _id field. + auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson()); + _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (next->getField("_id").getType() == BSONType::Object) { + _postBatchResumeToken = next->getField("_id").getDocument().toBson(); + } + return nextBSON; + } + + // We ran out of results to return. Check whether the oplog cursor has moved forward since the + // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return, + // if the new time is higher than the last then we are guaranteed not to have already returned + // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token + // at the current clusterTime. + auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (highWaterMark > _latestOplogTimestamp) { + auto token = ResumeToken::makeHighWaterMarkResumeToken(highWaterMark); + _postBatchResumeToken = token.toDocument().toBson(); + _latestOplogTimestamp = highWaterMark; + } + return boost::none; +} + +std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() { + std::unique_ptr<PlanStageStats> ret = + std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY); + ret->specific = std::make_unique<CollectionScanStats>(); + return ret; +} + +} // namespace mongo diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h new file mode 100644 index 00000000000..601fe43b582 --- /dev/null +++ b/src/mongo/db/exec/change_stream_proxy.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/pipeline_proxy.h" + +namespace mongo { + +/** + * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the + * serialization of change stream pipeline output from Document to BSON. In particular, it is + * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that + * are necessary for correct merging on mongoS and, in the latter case, must also be provided to + * mongoD clients. + */ +class ChangeStreamProxyStage final : public PipelineProxyStage { +public: + static const char* kStageType; + + /** + * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into + * the constructor will cause an invariant() to fail. + */ + ChangeStreamProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws); + + /** + * Returns an empty PlanStageStats object. + */ + std::unique_ptr<PlanStageStats> getStats() final; + + /** + * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog + * timestamp in the event that we need to merge on mongoS. + */ + Timestamp getLatestOplogTimestamp() const { + return _includeMetaData ? _latestOplogTimestamp : Timestamp(); + } + + /** + * Passes through the most recent resume token from the proxied pipeline. + */ + BSONObj getPostBatchResumeToken() const { + return _postBatchResumeToken; + } + + StageType stageType() const final { + return STAGE_CHANGE_STREAM_PROXY; + } + +protected: + boost::optional<BSONObj> getNextBson() final; + +private: + Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; +}; +} // namespace mongo diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index 4267337ab2b..9b0119d1429 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -51,7 +51,13 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, WorkingSet* ws) - : PlanStage(kStageType, opCtx), + : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {} + +PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws, + const char* stageTypeName) + : PlanStage(stageTypeName, opCtx), _pipeline(std::move(pipeline)), _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger _ws(ws) { @@ -128,10 +134,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() { return boost::none; } -Timestamp PipelineProxyStage::getLatestOplogTimestamp() const { - return PipelineD::getLatestOplogTimestamp(_pipeline.get()); -} - std::string PipelineProxyStage::getPlanSummaryStr() const { return PipelineD::getPlanSummaryStr(_pipeline.get()); } diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index 9520b2139eb..4cd36a88aa7 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -45,12 +45,14 @@ namespace mongo { /** * Stage for pulling results out from an aggregation pipeline. */ -class PipelineProxyStage final : public PlanStage { +class PipelineProxyStage : public PlanStage { public: PipelineProxyStage(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, WorkingSet* ws); + virtual ~PipelineProxyStage() = default; + PlanStage::StageState doWork(WorkingSetID* out) final; bool isEOF() final; @@ -62,22 +64,17 @@ public: void doReattachToOperationContext() final; // Returns empty PlanStageStats object - std::unique_ptr<PlanStageStats> getStats() final; + std::unique_ptr<PlanStageStats> getStats() override; // Not used. SpecificStats* getSpecificStats() const final { MONGO_UNREACHABLE; } - /** - * Pass through the last oplog timestamp from the proxied pipeline. - */ - Timestamp getLatestOplogTimestamp() const; - std::string getPlanSummaryStr() const; void getPlanSummaryStats(PlanSummaryStats* statsOut) const; - StageType stageType() const final { + StageType stageType() const override { return STAGE_PIPELINE_PROXY; } @@ -90,17 +87,20 @@ public: static const char* kStageType; protected: - void doDispose() final; + PipelineProxyStage(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + WorkingSet* ws, + const char* stageTypeName); -private: - boost::optional<BSONObj> getNextBson(); + virtual boost::optional<BSONObj> getNextBson(); + void doDispose() final; - // Things in the _stash should be returned before pulling items from _pipeline. + // Items in the _stash should be returned before pulling items from _pipeline. std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; - std::vector<BSONObj> _stash; const bool _includeMetaData; - // Not owned by us. +private: + std::vector<BSONObj> _stash; WorkingSet* _ws; }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 00b41e38e71..96c917aacf6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1458,9 +1458,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) { TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false); - // Drop database entry doesn't have a UUID. + // Drop database entry has a nil UUID. Document expectedDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::nil())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, @@ -1468,7 +1468,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + kDefaultTs, UUID::nil(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index db600fb9e5b..df345d7eaf2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -347,6 +347,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document if (operationType != DocumentSourceChangeStream::kInvalidateOpType && operationType != DocumentSourceChangeStream::kDropDatabaseOpType) { invariant(!uuid.missing(), "Saw a CRUD op without a UUID"); + } else { + // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after + // high-water-mark tokens. Their sorting relative to other events remains unchanged. + uuid = Value(UUID::nil()); } // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index cd0783eb5ac..b019bbf8e35 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -118,9 +118,9 @@ void DocumentSourceCursor::loadBatch() { // As long as we're waiting for inserts, we shouldn't do any batching at this level // we need the whole pipeline to see each document to see if we should stop waiting. // Furthermore, if we need to return the latest oplog time (in the tailable and - // needs-merge case), batching will result in a wrong time. - if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || - (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) || + // awaitData case), batching will result in a wrong time. + if (pExpCtx->isTailableAwaitData() || + awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 6e706c1e1f2..62ca426172f 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -574,7 +574,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep plannerOpts |= QueryPlannerParams::IS_COUNT; } - if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { + if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) { + invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; } diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index c7a40f3388c..70b3a1cf72d 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -46,6 +46,16 @@ namespace mongo { constexpr StringData ResumeToken::kDataFieldName; constexpr StringData ResumeToken::kTypeBitsFieldName; +namespace { +// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken. +ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) { + invariant(!clusterTime.isNull()); + ResumeTokenData tokenData; + tokenData.clusterTime = clusterTime; + return tokenData; +} +} // namespace + bool ResumeTokenData::operator==(const ResumeTokenData& other) const { return clusterTime == other.clusterTime && version == other.version && fromInvalidate == other.fromInvalidate && @@ -194,4 +204,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } +ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) { + return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime)); +} + +bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) { + return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 16a22311373..a50ac20d8b9 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -109,6 +109,18 @@ public: static ResumeToken parse(const Document& document); /** + * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey. + */ + static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime); + + /** + * Returns true if the given token data represents a valid high-water-mark resume token; that + * is, it does not refer to a specific operation, but instead specifies a clusterTime after + * which the stream should resume. + */ + static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData); + + /** * The default no-argument constructor is required by the IDL for types used as non-optional * fields. */ diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 4828002e401..b246d6d2b5b 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -50,6 +50,7 @@ const char kBatchFieldInitial[] = "firstBatch"; const char kBatchDocSequenceField[] = "cursor.nextBatch"; const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch"; const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp"; +const char kPostBatchResumeTokenField[] = "postBatchResumeToken"; } // namespace @@ -76,6 +77,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) } else { _batch.reset(); } + if (!_postBatchResumeToken.isEmpty()) { + _cursorObject->append(kPostBatchResumeTokenField, _postBatchResumeToken); + } _cursorObject->append(kIdField, cursorId); _cursorObject->append(kNsField, cursorNamespace); _cursorObject.reset(); @@ -124,12 +128,14 @@ CursorResponse::CursorResponse(NamespaceString nss, std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar, boost::optional<Timestamp> latestOplogTimestamp, + boost::optional<BSONObj> postBatchResumeToken, boost::optional<BSONObj> writeConcernError) : _nss(std::move(nss)), _cursorId(cursorId), _batch(std::move(batch)), _numReturnedSoFar(numReturnedSoFar), _latestOplogTimestamp(latestOplogTimestamp), + _postBatchResumeToken(std::move(postBatchResumeToken)), _writeConcernError(std::move(writeConcernError)) {} std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany( @@ -220,6 +226,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo doc.shareOwnershipWith(cmdResponse); } + auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField]; + if (postBatchResumeTokenElem && postBatchResumeTokenElem.type() != BSONType::Object) { + return {ErrorCodes::BadValue, + str::stream() << kPostBatchResumeTokenField + << " format is invalid; expected Object, but found: " + << postBatchResumeTokenElem.type()}; + } + auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField]; if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) { return { @@ -243,6 +257,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo boost::none, latestOplogTimestampElem ? latestOplogTimestampElem.timestamp() : boost::optional<Timestamp>{}, + postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned() + : boost::optional<BSONObj>{}, writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}}; } @@ -261,6 +277,11 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, } batchBuilder.doneFast(); + if (_postBatchResumeToken) { + invariant(!_postBatchResumeToken->isEmpty()); + cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); + } + cursorBuilder.doneFast(); if (_latestOplogTimestamp) { diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 7ed4a5f3f73..af38ce46a6d 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -93,6 +93,10 @@ public: _latestOplogTimestamp = ts; } + void setPostBatchResumeToken(BSONObj token) { + _postBatchResumeToken = token.getOwned(); + } + long long numDocs() const { return _numDocs; } @@ -122,6 +126,7 @@ private: bool _active = true; long long _numDocs = 0; Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; }; /** @@ -198,6 +203,7 @@ public: std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar = boost::none, boost::optional<Timestamp> latestOplogTimestamp = boost::none, + boost::optional<BSONObj> postBatchResumeToken = boost::none, boost::optional<BSONObj> writeConcernError = boost::none); CursorResponse(CursorResponse&& other) = default; @@ -240,6 +246,10 @@ public: return _latestOplogTimestamp; } + boost::optional<BSONObj> getPostBatchResumeToken() const { + return _postBatchResumeToken; + } + boost::optional<BSONObj> getWriteConcernError() const { return _writeConcernError; } @@ -259,6 +269,7 @@ private: std::vector<BSONObj> _batch; boost::optional<long long> _numReturnedSoFar; boost::optional<Timestamp> _latestOplogTimestamp; + boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; }; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index 2ae05e88a7d..e1adef1fbd7 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -34,6 +34,7 @@ #include "mongo/rpc/op_msg_rpc_impls.h" +#include "mongo/db/pipeline/resume_token.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -334,6 +335,35 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) { ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2)); } +TEST(CursorResponseTest, serializePostBatchResumeToken) { + std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; + auto postBatchResumeToken = + ResumeToken::makeHighWaterMarkResumeToken(Timestamp(1, 2)).toDocument().toBson(); + CursorResponse response(NamespaceString("db.coll"), + CursorId(123), + batch, + boost::none, + boost::none, + postBatchResumeToken); + auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); + ASSERT_BSONOBJ_EQ(serialized, + BSON("cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll" + << "nextBatch" + << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)) + << "postBatchResumeToken" + << postBatchResumeToken) + << "ok" + << 1)); + auto reparsed = CursorResponse::parseFromBSON(serialized); + ASSERT_OK(reparsed.getStatus()); + CursorResponse reparsedResponse = std::move(reparsed.getValue()); + ASSERT_EQ(reparsedResponse.getCursorId(), CursorId(123)); + ASSERT_EQ(reparsedResponse.getNSS().ns(), "db.coll"); + ASSERT_EQ(reparsedResponse.getBatch().size(), 2U); + ASSERT_BSONOBJ_EQ(*reparsedResponse.getPostBatchResumeToken(), postBatchResumeToken); +} + TEST(CursorResponseTest, cursorReturnDocumentSequences) { CursorResponseBuilder::Options options; options.isInitialResponse = true; diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp index acff30e9a64..ad299239b82 100644 --- a/src/mongo/db/query/explain.cpp +++ b/src/mongo/db/query/explain.cpp @@ -129,7 +129,8 @@ MultiPlanStage* getMultiPlanStage(PlanStage* root) { * there is no PPS that is root. */ PipelineProxyStage* getPipelineProxyStage(PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { return static_cast<PipelineProxyStage*>(root); } @@ -894,7 +895,8 @@ std::string Explain::getPlanSummary(const PlanExecutor* exec) { // static std::string Explain::getPlanSummary(const PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { auto pipelineProxy = static_cast<const PipelineProxyStage*>(root); return pipelineProxy->getPlanSummaryStr(); } @@ -928,7 +930,8 @@ void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsO PlanStage* root = exec.getRootStage(); - if (root->stageType() == STAGE_PIPELINE_PROXY) { + if (root->stageType() == STAGE_PIPELINE_PROXY || + root->stageType() == STAGE_CHANGE_STREAM_PROXY) { auto pipelineProxy = static_cast<PipelineProxyStage*>(root); pipelineProxy->getPlanSummaryStats(statsOut); return; diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index ea1eaed31f6..6c8d5defe74 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -452,7 +452,13 @@ public: * If the last oplog timestamp is being tracked for this PlanExecutor, return it. * Otherwise return a null timestamp. */ - virtual Timestamp getLatestOplogTimestamp() = 0; + virtual Timestamp getLatestOplogTimestamp() const = 0; + + /** + * If this PlanExecutor is tracking change stream resume tokens, return the most recent token + * for the batch that is currently being built. Otherwise, return an empty object. + */ + virtual BSONObj getPostBatchResumeToken() const = 0; /** * Turns a BSONObj representing an error status produced by getNext() into a Status. diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index b8b31dbac62..53d00097fd1 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -41,9 +41,9 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" +#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/multi_plan.h" -#include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/subplan.h" @@ -709,14 +709,20 @@ bool PlanExecutorImpl::isDetached() const { return _currentState == kDetached; } -Timestamp PlanExecutorImpl::getLatestOplogTimestamp() { - if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) - return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); +Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getLatestOplogTimestamp(); if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp(); return Timestamp(); } +BSONObj PlanExecutorImpl::getPostBatchResumeToken() const { + if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) + return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getPostBatchResumeToken(); + return {}; +} + Status PlanExecutorImpl::getMemberObjectStatus(const BSONObj& memberObj) const { return WorkingSetCommon::getMemberObjectStatus(memberObj); } diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h index 9b9a0f4f7df..79a13f1086d 100644 --- a/src/mongo/db/query/plan_executor_impl.h +++ b/src/mongo/db/query/plan_executor_impl.h @@ -78,7 +78,8 @@ public: Status getKillStatus() final; bool isDisposed() const final; bool isDetached() const final; - Timestamp getLatestOplogTimestamp() final; + Timestamp getLatestOplogTimestamp() const final; + BSONObj getPostBatchResumeToken() const final; Status getMemberObjectStatus(const BSONObj& memberObj) const final; private: diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 511c9c32c26..31a7ff7a8ab 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -363,6 +363,7 @@ PlanStage* buildStages(OperationContext* opCtx, return new EnsureSortedStage(opCtx, esn->pattern, ws, childStage); } case STAGE_CACHED_PLAN: + case STAGE_CHANGE_STREAM_PROXY: case STAGE_COUNT: case STAGE_DELETE: case STAGE_EOF: diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index d99d212124d..6fbdba5ea36 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -78,7 +78,8 @@ enum StageType { STAGE_OR, STAGE_PROJECTION, - // Stage for running aggregation pipelines. + // Stages for running aggregation pipelines. + STAGE_CHANGE_STREAM_PROXY, STAGE_PIPELINE_PROXY, STAGE_QUEUED_DATA, diff --git a/src/mongo/util/uuid.cpp b/src/mongo/util/uuid.cpp index 4f243c99401..290134dc88c 100644 --- a/src/mongo/util/uuid.cpp +++ b/src/mongo/util/uuid.cpp @@ -89,6 +89,10 @@ UUID UUID::parse(const BSONObj& obj) { return res.getValue(); } +UUID UUID::nil() { + return UUID{}; +} + bool UUID::isUUIDString(const std::string& s) { return std::regex_match(s, uuidRegex); } diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h index aeda9b38a31..f71f78cb7c0 100644 --- a/src/mongo/util/uuid.h +++ b/src/mongo/util/uuid.h @@ -102,6 +102,11 @@ public: static StatusWith<UUID> parse(BSONElement from); /** + * Returns the nil UUID; that is, a UUID composed entirely of zeroes. + */ + static UUID nil(); + + /** * Parses a BSON document of the form { uuid: BinData(4, "...") }. * * For IDL. |
