1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
/**
* Tests that existing whole-cluster, whole-db and single-collection $changeStreams correctly pick
* up events on a newly-added shard when a new unsharded collection is created on it. Exercises the
* fix for SERVER-42723.
* @tags: [
* requires_sharding,
* uses_change_streams,
* ]
*/
(function() {
"use strict";
const rsNodeOptions = {
setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
};
const st =
new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});
// We require one 'test' database and a second 'other' database.
const oldShardDB = st.s.getDB(jsTestName() + "_other");
const newShardDB = st.s.getDB(jsTestName());
const configDB = st.s.getDB("config");
const adminDB = st.s.getDB("admin");
const oldShardColl = oldShardDB.coll;
const newShardColl = newShardDB.test;
// Helper function to add a new ReplSetTest shard into the cluster.
function addShardToCluster(shardName) {
const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
replTest.startSet({shardsvr: ""});
replTest.initiate();
assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));
return replTest;
}
// Helper function to confirm that a stream sees an expected sequence of documents.
function assertAllEventsObserved(changeStream, expectedDocs) {
for (let expectedDoc of expectedDocs) {
assert.soon(() => changeStream.hasNext());
const nextEvent = changeStream.next();
assert.docEq(expectedDoc, nextEvent.fullDocument, tojson(nextEvent));
}
}
// Helper function to confirm that a change stream sees a collection drop event.
function assertCollectionDropEventObserved(changeStream, dbName, collectionName) {
assert.soon(() => changeStream.hasNext());
const nextEvent = changeStream.next();
assert.eq(nextEvent.operationType, "drop", tojson(nextEvent));
assert.docEq({db: dbName, coll: collectionName}, nextEvent.ns, tojson(nextEvent));
}
// Open a whole-db change stream on the as yet non-existent database.
const wholeDBCS = newShardDB.watch();
// Open a single-collection change stream on a namespace within the non-existent database.
const singleCollCS = newShardColl.watch();
// Open a whole-cluster stream on the deployment.
const wholeClusterCS = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);
// Insert some data into the 'other' database on the only existing shard. This should ensure that
// the primary shard of the test database will be created on the second shard, after it is added.
const insertedDocs = Array.from({length: 20}, (_, i) => ({_id: i}));
assert.commandWorked(oldShardColl.insert(insertedDocs));
// Verify that the whole-cluster stream sees all these events.
assertAllEventsObserved(wholeClusterCS, insertedDocs);
// Verify that the other two streams did not see any of the insertions on the 'other' collection.
for (let csCursor of [wholeDBCS, singleCollCS]) {
assert(!csCursor.hasNext());
}
// Now add a new shard into the cluster...
const newShard1 = addShardToCluster("newShard1");
// .. make sure the primary shard of 'newShardDB' database is the new shard ..
assert.commandWorked(
newShardDB.runCommand({create: "unusedCollection"})); // To trigger creation of a database.
const isNewShardDBOnNewShard =
configDB.databases.findOne({_id: newShardDB.getName(), primary: "newShard1"}) != null;
if (!isNewShardDBOnNewShard) {
st.ensurePrimaryShard(newShardDB.getName(), "newShard1");
// Consume collection drop events from the existing change streams caused by movePrimary command
// issued by a call to 'ensurePrimaryShard()' above.
for (let csCursor of [wholeDBCS, wholeClusterCS]) {
assertCollectionDropEventObserved(csCursor, newShardDB.getName(), "unusedCollection");
}
}
//... create a new collection, and verify that it was placed on the new shard....
assert.commandWorked(newShardDB.runCommand({create: newShardColl.getName()}));
assert(configDB.databases.findOne({_id: newShardDB.getName(), primary: "newShard1"}));
// ... insert some documents into the new, unsharded collection on the new shard...
assert.commandWorked(newShardColl.insert(insertedDocs));
// ... and confirm that all the pre-existing streams see all of these events.
for (let csCursor of [singleCollCS, wholeDBCS, wholeClusterCS]) {
assertAllEventsObserved(csCursor, insertedDocs);
}
// Stop the new shard manually since the ShardingTest doesn't know anything about it.
st.stop();
newShard1.stopSet();
})();
|