summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-24 16:56:20 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2018-12-22 04:27:20 +0000
commitfc5bc0947ceedee3b61b2d922cabd3e5df7ec07c (patch)
tree75749d0e4ff2d9db2001252018cc91e78801bc44
parentbdac7ced24f9ad8f9afac9c57e7184b1f2bf61b2 (diff)
downloadmongo-fc5bc0947ceedee3b61b2d922cabd3e5df7ec07c.tar.gz
SERVER-38408 Return postBatchResumeToken with each mongoD change stream batch
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml2
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js183
-rw-r--r--src/mongo/base/uuid_test.cpp6
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp7
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp84
-rw-r--r--src/mongo/db/exec/change_stream_proxy.h86
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp12
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h28
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp18
-rw-r--r--src/mongo/db/pipeline/resume_token.h12
-rw-r--r--src/mongo/db/query/cursor_response.cpp21
-rw-r--r--src/mongo/db/query/cursor_response.h11
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp30
-rw-r--r--src/mongo/db/query/explain.cpp9
-rw-r--r--src/mongo/db/query/plan_executor.h8
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp14
-rw-r--r--src/mongo/db/query/plan_executor_impl.h3
-rw-r--r--src/mongo/db/query/stage_builder.cpp1
-rw-r--r--src/mongo/db/query/stage_types.h3
-rw-r--r--src/mongo/util/uuid.cpp4
-rw-r--r--src/mongo/util/uuid.h5
36 files changed, 559 insertions, 40 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index 91cab5a482b..359282efd22 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -7,6 +7,8 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index 1ccbe9f6220..ca48b021541 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -7,6 +7,8 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected to
# work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
index 47e194262a7..01f0416f846 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
@@ -9,6 +9,8 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index 97e2b4bc922..4b3e9386942 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -6,6 +6,8 @@ selector:
exclude_files:
# Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
index cf562eb83b8..85fbc704142 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -7,6 +7,8 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
index 1afe717c90b..cb82ce86c90 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -9,6 +9,8 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
index 654a7fb085d..9cea611e671 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
@@ -6,6 +6,8 @@ selector:
exclude_files:
# Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
index f0672291ab7..abc46696da7 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
@@ -13,6 +13,8 @@ selector:
- jstests/change_streams/include_cluster_time.js
# Only relevant for single-collection change streams.
- jstests/change_streams/metadata_notifications.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
index a8f0b58f6c3..3127b4d50d3 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
@@ -11,6 +11,8 @@ selector:
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
# Only relevant for single-collection change streams.
- jstests/change_streams/metadata_notifications.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
index 215229fc549..406255523c3 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
@@ -8,6 +8,8 @@ selector:
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
# Only relevant for single-collection change streams.
- jstests/change_streams/metadata_notifications.js
+ # TODO SERVER-38411: un-blacklist when mongoS can return postBatchResumeToken.
+ - jstests/change_streams/report_post_batch_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
new file mode 100644
index 00000000000..2094f12246c
--- /dev/null
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -0,0 +1,183 @@
+/**
+ * Tests that an aggregate with a $changeStream stage reports the latest postBatchResumeToken.
+ * @tags: [uses_transactions]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ // Drop and recreate collections to assure a clean run.
+ const collName = "report_post_batch_resume_token";
+ const testCollection = assertDropAndRecreateCollection(db, collName);
+ const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
+ const adminDB = db.getSiblingDB("admin");
+
+ // Helper function to return the next batch given an initial aggregate command response.
+ function runNextGetMore(initialCursorResponse) {
+ const getMoreCollName = initialCursorResponse.cursor.ns.substr(
+ initialCursorResponse.cursor.ns.indexOf('.') + 1);
+ return assert.commandWorked(testCollection.runCommand({
+ getMore: initialCursorResponse.cursor.id,
+ collection: getMoreCollName,
+ batchSize: batchSize
+ }));
+ }
+
+ let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate.
+ const batchSize = 2;
+
+ // Test that postBatchResumeToken is present on empty initial aggregate batch.
+ let initialAggResponse = assert.commandWorked(testCollection.runCommand(
+ {aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}}));
+
+ // Examine the response from the initial agg. It should have a postBatchResumeToken (PBRT),
+ // despite the fact that the batch is empty.
+ let initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse));
+ assert.eq(0, initialAggResponse.cursor.firstBatch.length);
+
+ // Test that postBatchResumeToken is present on empty getMore batch.
+ let getMoreResponse = runNextGetMore(initialAggResponse);
+ let getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, getMorePBRT, tojson(getMoreResponse));
+ assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
+ assert.eq(0, getMoreResponse.cursor.nextBatch.length);
+
+ // Test that postBatchResumeToken advances with returned events. Insert one document into the
+ // collection and consume the resulting change stream event.
+ assert.commandWorked(testCollection.insert({_id: docId++}));
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+
+ // Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal
+ // to the resume token of the last item in the batch and greater than the initial PBRT.
+ let resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, resumeTokenFromDoc);
+ assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
+
+ // Now seed the collection with enough documents to fit in two batches.
+ for (let i = 0; i < batchSize * 2; i++) {
+ assert.commandWorked(testCollection.insert({_id: docId++}));
+ }
+
+ // Test that postBatchResumeToken is present on non-empty initial aggregate batch.
+ initialAggResponse = assert.commandWorked(testCollection.runCommand({
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromDoc}}],
+ cursor: {batchSize: batchSize}
+ }));
+ // We see a postBatchResumeToken on the initial aggregate command. Because we resumed after the
+ // previous getMorePBRT, the postBatchResumeToken from this stream compares greater than it.
+ initialAggPBRT = initialAggResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, initialAggPBRT, tojson(initialAggResponse));
+ assert.eq(batchSize, initialAggResponse.cursor.firstBatch.length);
+ assert.gt(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
+
+ // Test that postBatchResumeToken advances with getMore. Iterate the cursor and assert that the
+ // observed postBatchResumeToken advanced.
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(batchSize, getMoreResponse.cursor.nextBatch.length);
+
+ // The postBatchResumeToken is again equal to the final token in the batch, and greater than the
+ // PBRT from the initial response.
+ resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[batchSize - 1]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(resumeTokenFromDoc, getMorePBRT, tojson(getMoreResponse));
+ assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
+
+ // Test that postBatchResumeToken advances with writes to an unrelated collection. First make
+ // sure there is nothing left in our cursor, and obtain the latest PBRT...
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ let previousGetMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.neq(undefined, previousGetMorePBRT, tojson(getMoreResponse));
+ assert.eq(getMoreResponse.cursor.nextBatch, []);
+
+ // ... then test that it advances on an insert to an unrelated collection.
+ assert.commandWorked(otherCollection.insert({}));
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(0, getMoreResponse.cursor.nextBatch.length);
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+
+ // Insert two documents into the collection which are of the maximum BSON object size.
+ const bsonUserSizeLimit = assert.commandWorked(adminDB.isMaster()).maxBsonObjectSize;
+ assert.gt(bsonUserSizeLimit, 0);
+ for (let i = 0; i < 2; ++i) {
+ const docToInsert = {_id: docId++, padding: ""};
+ docToInsert.padding = "a".repeat(bsonUserSizeLimit - Object.bsonsize(docToInsert));
+ assert.commandWorked(testCollection.insert(docToInsert));
+ }
+
+ // Test that we return the correct postBatchResumeToken in the event that the batch hits the
+ // byte size limit. Despite the fact that the batchSize is 2, we should only see 1 result,
+ // because the second result cannot fit in the batch.
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+
+ // Verify that the postBatchResumeToken matches the last event actually added to the batch.
+ resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, resumeTokenFromDoc);
+
+ // Now retrieve the second event and confirm that the PBRT matches its resume token.
+ previousGetMorePBRT = getMorePBRT;
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ resumeTokenFromDoc = getMoreResponse.cursor.nextBatch[0]._id;
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+ assert.gt(bsonWoCompare(getMorePBRT, previousGetMorePBRT), 0);
+ assert.docEq(getMorePBRT, resumeTokenFromDoc);
+
+ // Test that the PBRT is correctly updated when reading events from within a transaction.
+ const session = db.getMongo().startSession();
+ const sessionDB = session.getDatabase(db.getName());
+
+ const sessionColl = sessionDB[testCollection.getName()];
+ const sessionOtherColl = sessionDB[otherCollection.getName()];
+ session.startTransaction();
+
+ // Write 3 documents to the test collection and 1 to the unrelated collection.
+ for (let i = 0; i < 3; ++i) {
+ assert.commandWorked(sessionColl.insert({_id: docId++}));
+ }
+ assert.commandWorked(sessionOtherColl.insert({_id: docId++}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+
+ // Grab the next 2 events, which should be the first 2 events in the transaction.
+ previousGetMorePBRT = getMorePBRT;
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(2, getMoreResponse.cursor.nextBatch.length);
+
+ // The clusterTime should be the same on each, but the resume token keeps advancing.
+ const txnEvent1 = getMoreResponse.cursor.nextBatch[0],
+ txnEvent2 = getMoreResponse.cursor.nextBatch[1];
+ const txnClusterTime = txnEvent1.clusterTime;
+ assert.eq(txnEvent2.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0);
+ assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);
+
+ // The PBRT of the first transaction batch is equal to the last document's resumeToken.
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, txnEvent2._id);
+
+ // Now get the next batch. This contains the third and final transaction operation.
+ previousGetMorePBRT = getMorePBRT;
+ getMoreResponse = runNextGetMore(initialAggResponse);
+ assert.eq(1, getMoreResponse.cursor.nextBatch.length);
+
+ // The clusterTime of this event is the same as the two events from the previous batch, but its
+ // resume token is greater than the previous PBRT.
+ const txnEvent3 = getMoreResponse.cursor.nextBatch[0];
+ assert.eq(txnEvent3.clusterTime, txnClusterTime);
+ assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
+
+ // Because we wrote to the unrelated collection, the final event in the transaction does not
+ // appear in the batch. But in this case it also does not allow our PBRT to advance beyond the
+ // last event in the batch, because the unrelated event is within the same transaction and
+ // therefore has the same clusterTime.
+ getMorePBRT = getMoreResponse.cursor.postBatchResumeToken;
+ assert.docEq(getMorePBRT, txnEvent3._id);
+})();
diff --git a/src/mongo/base/uuid_test.cpp b/src/mongo/base/uuid_test.cpp
index daf43f34d5a..242c7102900 100644
--- a/src/mongo/base/uuid_test.cpp
+++ b/src/mongo/base/uuid_test.cpp
@@ -190,5 +190,11 @@ TEST(UUIDTest, toBSONUsingBSONMacro) {
ASSERT_BSONOBJ_EQ(expectedBson, bson);
}
+TEST(UUIDTest, NilUUID) {
+ // Test that UUID::nil() returns an all-zero UUID.
+ auto nilUUID = UUID::parse("00000000-0000-0000-0000-000000000000");
+ ASSERT_EQUALS(UUID::nil(), unittest::assertGet(nilUUID));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 7402004d524..8dd72d0c5ff 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1080,6 +1080,7 @@ env.Library(
'exec/and_hash.cpp',
'exec/and_sorted.cpp',
'exec/cached_plan.cpp',
+ 'exec/change_stream_proxy.cpp',
'exec/collection_scan.cpp',
'exec/count.cpp',
'exec/count_scan.cpp',
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index e137a982ee7..ea669c4a209 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -233,8 +233,10 @@ public:
// As soon as we get a result, this operation no longer waits.
awaitDataState(opCtx).shouldWaitForInserts = false;
- // Add result to output buffer.
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken());
nextBatch->append(obj);
(*numResults)++;
}
@@ -259,7 +261,10 @@ public:
case PlanExecutor::IS_EOF:
// This causes the reported latest oplog timestamp to advance even when there
// are no results for this particular query.
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
+ nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken());
default:
return Status::OK();
}
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 5febd7f14fc..6562dcc6fbc 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,7 +41,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/exec/pipeline_proxy.h"
+#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
@@ -155,8 +155,12 @@ bool handleCursorCommand(OperationContext* opCtx,
}
if (state == PlanExecutor::IS_EOF) {
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
responseBuilder.setLatestOplogTimestamp(
cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(
+ cursor->getExecutor()->getPostBatchResumeToken());
if (!cursor->isTailable()) {
// make it an obvious error to use cursor or executor after this point
cursor = nullptr;
@@ -176,7 +180,10 @@ bool handleCursorCommand(OperationContext* opCtx,
break;
}
+ // TODO SERVER-38539: We need to set both the latestOplogTimestamp and the
+ // postBatchResumeToken until the former is removed in a future release.
responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp());
+ responseBuilder.setPostBatchResumeToken(cursor->getExecutor()->getPostBatchResumeToken());
responseBuilder.append(next);
}
@@ -578,8 +585,9 @@ Status runAggregate(OperationContext* opCtx,
for (size_t idx = 0; idx < pipelines.size(); ++idx) {
// Transfer ownership of the Pipeline to the PipelineProxyStage.
auto ws = make_unique<WorkingSet>();
- auto proxy =
- make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
+ auto proxy = liteParsedPipeline.hasChangeStream()
+ ? make_unique<ChangeStreamProxyStage>(opCtx, std::move(pipelines[idx]), ws.get())
+ : make_unique<PipelineProxyStage>(opCtx, std::move(pipelines[idx]), ws.get());
// This PlanExecutor will simply forward requests to the Pipeline, so does not need to
// yield or to be registered with any collection's CursorManager to receive
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
new file mode 100644
index 00000000000..5af30b4b5a3
--- /dev/null
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -0,0 +1,84 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/change_stream_proxy.h"
+
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/pipeline/resume_token.h"
+
+namespace mongo {
+
+const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY";
+
+ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws)
+ : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {
+ invariant(std::any_of(
+ _pipeline->getSources().begin(), _pipeline->getSources().end(), [](const auto& stage) {
+ return stage->constraints().isChangeStreamStage();
+ }));
+}
+
+boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
+ if (auto next = _pipeline->getNext()) {
+ // While we have more results to return, we track both the timestamp and the resume token of
+ // the latest event observed in the oplog, the latter via its _id field.
+ auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson());
+ _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (next->getField("_id").getType() == BSONType::Object) {
+ _postBatchResumeToken = next->getField("_id").getDocument().toBson();
+ }
+ return nextBSON;
+ }
+
+ // We ran out of results to return. Check whether the oplog cursor has moved forward since the
+ // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return,
+ // if the new time is higher than the last then we are guaranteed not to have already returned
+ // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token
+ // at the current clusterTime.
+ auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (highWaterMark > _latestOplogTimestamp) {
+ auto token = ResumeToken::makeHighWaterMarkResumeToken(highWaterMark);
+ _postBatchResumeToken = token.toDocument().toBson();
+ _latestOplogTimestamp = highWaterMark;
+ }
+ return boost::none;
+}
+
+std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() {
+ std::unique_ptr<PlanStageStats> ret =
+ std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY);
+ ret->specific = std::make_unique<CollectionScanStats>();
+ return ret;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h
new file mode 100644
index 00000000000..601fe43b582
--- /dev/null
+++ b/src/mongo/db/exec/change_stream_proxy.h
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/pipeline_proxy.h"
+
+namespace mongo {
+
+/**
+ * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the
+ * serialization of change stream pipeline output from Document to BSON. In particular, it is
+ * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that
+ * are necessary for correct merging on mongoS and, in the latter case, must also be provided to
+ * mongoD clients.
+ */
+class ChangeStreamProxyStage final : public PipelineProxyStage {
+public:
+ static const char* kStageType;
+
+ /**
+ * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into
+ * the constructor will cause an invariant() to fail.
+ */
+ ChangeStreamProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws);
+
+ /**
+ * Returns an empty PlanStageStats object.
+ */
+ std::unique_ptr<PlanStageStats> getStats() final;
+
+ /**
+ * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog
+ * timestamp in the event that we need to merge on mongoS.
+ */
+ Timestamp getLatestOplogTimestamp() const {
+ return _includeMetaData ? _latestOplogTimestamp : Timestamp();
+ }
+
+ /**
+ * Passes through the most recent resume token from the proxied pipeline.
+ */
+ BSONObj getPostBatchResumeToken() const {
+ return _postBatchResumeToken;
+ }
+
+ StageType stageType() const final {
+ return STAGE_CHANGE_STREAM_PROXY;
+ }
+
+protected:
+ boost::optional<BSONObj> getNextBson() final;
+
+private:
+ Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
+};
+} // namespace mongo
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index 4267337ab2b..9b0119d1429 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -51,7 +51,13 @@ const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
WorkingSet* ws)
- : PlanStage(kStageType, opCtx),
+ : PipelineProxyStage(opCtx, std::move(pipeline), ws, kStageType) {}
+
+PipelineProxyStage::PipelineProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws,
+ const char* stageTypeName)
+ : PlanStage(stageTypeName, opCtx),
_pipeline(std::move(pipeline)),
_includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger
_ws(ws) {
@@ -128,10 +134,6 @@ boost::optional<BSONObj> PipelineProxyStage::getNextBson() {
return boost::none;
}
-Timestamp PipelineProxyStage::getLatestOplogTimestamp() const {
- return PipelineD::getLatestOplogTimestamp(_pipeline.get());
-}
-
std::string PipelineProxyStage::getPlanSummaryStr() const {
return PipelineD::getPlanSummaryStr(_pipeline.get());
}
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index 9520b2139eb..4cd36a88aa7 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -45,12 +45,14 @@ namespace mongo {
/**
* Stage for pulling results out from an aggregation pipeline.
*/
-class PipelineProxyStage final : public PlanStage {
+class PipelineProxyStage : public PlanStage {
public:
PipelineProxyStage(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
WorkingSet* ws);
+ virtual ~PipelineProxyStage() = default;
+
PlanStage::StageState doWork(WorkingSetID* out) final;
bool isEOF() final;
@@ -62,22 +64,17 @@ public:
void doReattachToOperationContext() final;
// Returns empty PlanStageStats object
- std::unique_ptr<PlanStageStats> getStats() final;
+ std::unique_ptr<PlanStageStats> getStats() override;
// Not used.
SpecificStats* getSpecificStats() const final {
MONGO_UNREACHABLE;
}
- /**
- * Pass through the last oplog timestamp from the proxied pipeline.
- */
- Timestamp getLatestOplogTimestamp() const;
-
std::string getPlanSummaryStr() const;
void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
- StageType stageType() const final {
+ StageType stageType() const override {
return STAGE_PIPELINE_PROXY;
}
@@ -90,17 +87,20 @@ public:
static const char* kStageType;
protected:
- void doDispose() final;
+ PipelineProxyStage(OperationContext* opCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ WorkingSet* ws,
+ const char* stageTypeName);
-private:
- boost::optional<BSONObj> getNextBson();
+ virtual boost::optional<BSONObj> getNextBson();
+ void doDispose() final;
- // Things in the _stash should be returned before pulling items from _pipeline.
+ // Items in the _stash should be returned before pulling items from _pipeline.
std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
- std::vector<BSONObj> _stash;
const bool _includeMetaData;
- // Not owned by us.
+private:
+ std::vector<BSONObj> _stash;
WorkingSet* _ws;
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 00b41e38e71..96c917aacf6 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1458,9 +1458,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) {
TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false);
- // Drop database entry doesn't have a UUID.
+ // Drop database entry has a nil UUID.
Document expectedDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, UUID::nil())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
@@ -1468,7 +1468,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
Document expectedInvalidate{
{DSChangeStream::kIdField,
makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ kDefaultTs, UUID::nil(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index db600fb9e5b..df345d7eaf2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -347,6 +347,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
if (operationType != DocumentSourceChangeStream::kInvalidateOpType &&
operationType != DocumentSourceChangeStream::kDropDatabaseOpType) {
invariant(!uuid.missing(), "Saw a CRUD op without a UUID");
+ } else {
+ // Fill in a dummy UUID for invalidate and dropDatabase, to ensure that they sort after
+ // high-water-mark tokens. Their sorting relative to other events remains unchanged.
+ uuid = Value(UUID::nil());
}
// Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index cd0783eb5ac..b019bbf8e35 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -118,9 +118,9 @@ void DocumentSourceCursor::loadBatch() {
// As long as we're waiting for inserts, we shouldn't do any batching at this level
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
- // needs-merge case), batching will result in a wrong time.
- if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
- (pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
+ // awaitData case), batching will result in a wrong time.
+ if (pExpCtx->isTailableAwaitData() ||
+ awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 6e706c1e1f2..62ca426172f 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -574,7 +574,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::IS_COUNT;
}
- if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) {
+ invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
}
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index c7a40f3388c..70b3a1cf72d 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -46,6 +46,16 @@ namespace mongo {
constexpr StringData ResumeToken::kDataFieldName;
constexpr StringData ResumeToken::kTypeBitsFieldName;
+namespace {
+// Helper function for makeHighWaterMarkResumeToken and isHighWaterMarkResumeToken.
+ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime) {
+ invariant(!clusterTime.isNull());
+ ResumeTokenData tokenData;
+ tokenData.clusterTime = clusterTime;
+ return tokenData;
+}
+} // namespace
+
bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
return clusterTime == other.clusterTime && version == other.version &&
fromInvalidate == other.fromInvalidate &&
@@ -194,4 +204,12 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
+ResumeToken ResumeToken::makeHighWaterMarkResumeToken(Timestamp clusterTime) {
+ return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime));
+}
+
+bool ResumeToken::isHighWaterMarkResumeToken(const ResumeTokenData& tokenData) {
+ return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 16a22311373..a50ac20d8b9 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -109,6 +109,18 @@ public:
static ResumeToken parse(const Document& document);
/**
+ * Generate a high-water-mark pseudo-token for 'clusterTime', with no UUID or documentKey.
+ */
+ static ResumeToken makeHighWaterMarkResumeToken(Timestamp clusterTime);
+
+ /**
+ * Returns true if the given token data represents a valid high-water-mark resume token; that
+ * is, it does not refer to a specific operation, but instead specifies a clusterTime after
+ * which the stream should resume.
+ */
+ static bool isHighWaterMarkResumeToken(const ResumeTokenData& tokenData);
+
+ /**
* The default no-argument constructor is required by the IDL for types used as non-optional
* fields.
*/
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 4828002e401..b246d6d2b5b 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -50,6 +50,7 @@ const char kBatchFieldInitial[] = "firstBatch";
const char kBatchDocSequenceField[] = "cursor.nextBatch";
const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch";
const char kInternalLatestOplogTimestampField[] = "$_internalLatestOplogTimestamp";
+const char kPostBatchResumeTokenField[] = "postBatchResumeToken";
} // namespace
@@ -76,6 +77,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace)
} else {
_batch.reset();
}
+ if (!_postBatchResumeToken.isEmpty()) {
+ _cursorObject->append(kPostBatchResumeTokenField, _postBatchResumeToken);
+ }
_cursorObject->append(kIdField, cursorId);
_cursorObject->append(kNsField, cursorNamespace);
_cursorObject.reset();
@@ -124,12 +128,14 @@ CursorResponse::CursorResponse(NamespaceString nss,
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar,
boost::optional<Timestamp> latestOplogTimestamp,
+ boost::optional<BSONObj> postBatchResumeToken,
boost::optional<BSONObj> writeConcernError)
: _nss(std::move(nss)),
_cursorId(cursorId),
_batch(std::move(batch)),
_numReturnedSoFar(numReturnedSoFar),
_latestOplogTimestamp(latestOplogTimestamp),
+ _postBatchResumeToken(std::move(postBatchResumeToken)),
_writeConcernError(std::move(writeConcernError)) {}
std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany(
@@ -220,6 +226,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
doc.shareOwnershipWith(cmdResponse);
}
+ auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField];
+ if (postBatchResumeTokenElem && postBatchResumeTokenElem.type() != BSONType::Object) {
+ return {ErrorCodes::BadValue,
+ str::stream() << kPostBatchResumeTokenField
+ << " format is invalid; expected Object, but found: "
+ << postBatchResumeTokenElem.type()};
+ }
+
auto latestOplogTimestampElem = cmdResponse[kInternalLatestOplogTimestampField];
if (latestOplogTimestampElem && latestOplogTimestampElem.type() != BSONType::bsonTimestamp) {
return {
@@ -243,6 +257,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
boost::none,
latestOplogTimestampElem ? latestOplogTimestampElem.timestamp()
: boost::optional<Timestamp>{},
+ postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned()
+ : boost::optional<BSONObj>{},
writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}};
}
@@ -261,6 +277,11 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
}
batchBuilder.doneFast();
+ if (_postBatchResumeToken) {
+ invariant(!_postBatchResumeToken->isEmpty());
+ cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
+ }
+
cursorBuilder.doneFast();
if (_latestOplogTimestamp) {
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 7ed4a5f3f73..af38ce46a6d 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -93,6 +93,10 @@ public:
_latestOplogTimestamp = ts;
}
+ void setPostBatchResumeToken(BSONObj token) {
+ _postBatchResumeToken = token.getOwned();
+ }
+
long long numDocs() const {
return _numDocs;
}
@@ -122,6 +126,7 @@ private:
bool _active = true;
long long _numDocs = 0;
Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
};
/**
@@ -198,6 +203,7 @@ public:
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar = boost::none,
boost::optional<Timestamp> latestOplogTimestamp = boost::none,
+ boost::optional<BSONObj> postBatchResumeToken = boost::none,
boost::optional<BSONObj> writeConcernError = boost::none);
CursorResponse(CursorResponse&& other) = default;
@@ -240,6 +246,10 @@ public:
return _latestOplogTimestamp;
}
+ boost::optional<BSONObj> getPostBatchResumeToken() const {
+ return _postBatchResumeToken;
+ }
+
boost::optional<BSONObj> getWriteConcernError() const {
return _writeConcernError;
}
@@ -259,6 +269,7 @@ private:
std::vector<BSONObj> _batch;
boost::optional<long long> _numReturnedSoFar;
boost::optional<Timestamp> _latestOplogTimestamp;
+ boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
};
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index 2ae05e88a7d..e1adef1fbd7 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/rpc/op_msg_rpc_impls.h"
+#include "mongo/db/pipeline/resume_token.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -334,6 +335,35 @@ TEST(CursorResponseTest, serializeLatestOplogEntry) {
ASSERT_EQ(*reparsedResponse.getLastOplogTimestamp(), Timestamp(1, 2));
}
+TEST(CursorResponseTest, serializePostBatchResumeToken) {
+ std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
+ auto postBatchResumeToken =
+ ResumeToken::makeHighWaterMarkResumeToken(Timestamp(1, 2)).toDocument().toBson();
+ CursorResponse response(NamespaceString("db.coll"),
+ CursorId(123),
+ batch,
+ boost::none,
+ boost::none,
+ postBatchResumeToken);
+ auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ ASSERT_BSONOBJ_EQ(serialized,
+ BSON("cursor" << BSON("id" << CursorId(123) << "ns"
+ << "db.coll"
+ << "nextBatch"
+ << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))
+ << "postBatchResumeToken"
+ << postBatchResumeToken)
+ << "ok"
+ << 1));
+ auto reparsed = CursorResponse::parseFromBSON(serialized);
+ ASSERT_OK(reparsed.getStatus());
+ CursorResponse reparsedResponse = std::move(reparsed.getValue());
+ ASSERT_EQ(reparsedResponse.getCursorId(), CursorId(123));
+ ASSERT_EQ(reparsedResponse.getNSS().ns(), "db.coll");
+ ASSERT_EQ(reparsedResponse.getBatch().size(), 2U);
+ ASSERT_BSONOBJ_EQ(*reparsedResponse.getPostBatchResumeToken(), postBatchResumeToken);
+}
+
TEST(CursorResponseTest, cursorReturnDocumentSequences) {
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index acff30e9a64..ad299239b82 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -129,7 +129,8 @@ MultiPlanStage* getMultiPlanStage(PlanStage* root) {
* there is no PPS that is root.
*/
PipelineProxyStage* getPipelineProxyStage(PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
return static_cast<PipelineProxyStage*>(root);
}
@@ -894,7 +895,8 @@ std::string Explain::getPlanSummary(const PlanExecutor* exec) {
// static
std::string Explain::getPlanSummary(const PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
auto pipelineProxy = static_cast<const PipelineProxyStage*>(root);
return pipelineProxy->getPlanSummaryStr();
}
@@ -928,7 +930,8 @@ void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsO
PlanStage* root = exec.getRootStage();
- if (root->stageType() == STAGE_PIPELINE_PROXY) {
+ if (root->stageType() == STAGE_PIPELINE_PROXY ||
+ root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
auto pipelineProxy = static_cast<PipelineProxyStage*>(root);
pipelineProxy->getPlanSummaryStats(statsOut);
return;
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index ea1eaed31f6..6c8d5defe74 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -452,7 +452,13 @@ public:
* If the last oplog timestamp is being tracked for this PlanExecutor, return it.
* Otherwise return a null timestamp.
*/
- virtual Timestamp getLatestOplogTimestamp() = 0;
+ virtual Timestamp getLatestOplogTimestamp() const = 0;
+
+ /**
+ * If this PlanExecutor is tracking change stream resume tokens, return the most recent token
+ * for the batch that is currently being built. Otherwise, return an empty object.
+ */
+ virtual BSONObj getPostBatchResumeToken() const = 0;
/**
* Turns a BSONObj representing an error status produced by getNext() into a Status.
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index b8b31dbac62..53d00097fd1 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -41,9 +41,9 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
+#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/multi_plan.h"
-#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/exec/subplan.h"
@@ -709,14 +709,20 @@ bool PlanExecutorImpl::isDetached() const {
return _currentState == kDetached;
}
-Timestamp PlanExecutorImpl::getLatestOplogTimestamp() {
- if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
- return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();
+Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const {
+ if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY))
+ return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getLatestOplogTimestamp();
if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN))
return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp();
return Timestamp();
}
+BSONObj PlanExecutorImpl::getPostBatchResumeToken() const {
+ if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY))
+ return static_cast<ChangeStreamProxyStage*>(changeStreamProxy)->getPostBatchResumeToken();
+ return {};
+}
+
Status PlanExecutorImpl::getMemberObjectStatus(const BSONObj& memberObj) const {
return WorkingSetCommon::getMemberObjectStatus(memberObj);
}
diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h
index 9b9a0f4f7df..79a13f1086d 100644
--- a/src/mongo/db/query/plan_executor_impl.h
+++ b/src/mongo/db/query/plan_executor_impl.h
@@ -78,7 +78,8 @@ public:
Status getKillStatus() final;
bool isDisposed() const final;
bool isDetached() const final;
- Timestamp getLatestOplogTimestamp() final;
+ Timestamp getLatestOplogTimestamp() const final;
+ BSONObj getPostBatchResumeToken() const final;
Status getMemberObjectStatus(const BSONObj& memberObj) const final;
private:
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index 511c9c32c26..31a7ff7a8ab 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -363,6 +363,7 @@ PlanStage* buildStages(OperationContext* opCtx,
return new EnsureSortedStage(opCtx, esn->pattern, ws, childStage);
}
case STAGE_CACHED_PLAN:
+ case STAGE_CHANGE_STREAM_PROXY:
case STAGE_COUNT:
case STAGE_DELETE:
case STAGE_EOF:
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index d99d212124d..6fbdba5ea36 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -78,7 +78,8 @@ enum StageType {
STAGE_OR,
STAGE_PROJECTION,
- // Stage for running aggregation pipelines.
+ // Stages for running aggregation pipelines.
+ STAGE_CHANGE_STREAM_PROXY,
STAGE_PIPELINE_PROXY,
STAGE_QUEUED_DATA,
diff --git a/src/mongo/util/uuid.cpp b/src/mongo/util/uuid.cpp
index 4f243c99401..290134dc88c 100644
--- a/src/mongo/util/uuid.cpp
+++ b/src/mongo/util/uuid.cpp
@@ -89,6 +89,10 @@ UUID UUID::parse(const BSONObj& obj) {
return res.getValue();
}
+UUID UUID::nil() {
+ return UUID{};
+}
+
bool UUID::isUUIDString(const std::string& s) {
return std::regex_match(s, uuidRegex);
}
diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h
index aeda9b38a31..f71f78cb7c0 100644
--- a/src/mongo/util/uuid.h
+++ b/src/mongo/util/uuid.h
@@ -102,6 +102,11 @@ public:
static StatusWith<UUID> parse(BSONElement from);
/**
+ * Returns the nil UUID; that is, a UUID composed entirely of zeroes.
+ */
+ static UUID nil();
+
+ /**
* Parses a BSON document of the form { uuid: BinData(4, "...") }.
*
* For IDL.