summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_new_shard_new_database.js
blob: 4212e233b486d75651dcec2f746eb0c41db312c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
 * Tests that existing whole-cluster, whole-db and single-collection $changeStreams correctly pick
 * up events on a newly-added shard when a new unsharded collection is created on it. Exercises the
 * fix for SERVER-42723.
 * @tags: [
 *   requires_sharding,
 *   uses_change_streams,
 * ]
 */
(function() {

"use strict";

const rsNodeOptions = {
    setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
};
const st =
    new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});

// We require one 'test' database and a second 'other' database.
const oldShardDB = st.s.getDB(jsTestName() + "_other");
const newShardDB = st.s.getDB(jsTestName());

const configDB = st.s.getDB("config");
const adminDB = st.s.getDB("admin");

const oldShardColl = oldShardDB.coll;
const newShardColl = newShardDB.test;

// Helper function to add a new ReplSetTest shard into the cluster.
function addShardToCluster(shardName) {
    const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
    replTest.startSet({shardsvr: ""});
    replTest.initiate();
    assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));
    return replTest;
}

// Helper function to confirm that a stream sees an expected sequence of documents.
function assertAllEventsObserved(changeStream, expectedDocs) {
    for (let expectedDoc of expectedDocs) {
        assert.soon(() => changeStream.hasNext());
        const nextEvent = changeStream.next();
        assert.docEq(expectedDoc, nextEvent.fullDocument, tojson(nextEvent));
    }
}

// Helper function to confirm that a change stream sees a collection drop event.
function assertCollectionDropEventObserved(changeStream, dbName, collectionName) {
    assert.soon(() => changeStream.hasNext());
    const nextEvent = changeStream.next();
    assert.eq(nextEvent.operationType, "drop", tojson(nextEvent));
    assert.docEq({db: dbName, coll: collectionName}, nextEvent.ns, tojson(nextEvent));
}

// Open a whole-db change stream on the as yet non-existent database.
const wholeDBCS = newShardDB.watch();

// Open a single-collection change stream on a namespace within the non-existent database.
const singleCollCS = newShardColl.watch();

// Open a whole-cluster stream on the deployment.
const wholeClusterCS = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);

// Insert some data into the 'other' database on the only existing shard. This should ensure that
// the primary shard of the test database will be created on the second shard, after it is added.
const insertedDocs = Array.from({length: 20}, (_, i) => ({_id: i}));
assert.commandWorked(oldShardColl.insert(insertedDocs));

// Verify that the whole-cluster stream sees all these events.
assertAllEventsObserved(wholeClusterCS, insertedDocs);

// Verify that the other two streams did not see any of the insertions on the 'other' collection.
for (let csCursor of [wholeDBCS, singleCollCS]) {
    assert(!csCursor.hasNext());
}

// Now add a new shard into the cluster...
const newShard1 = addShardToCluster("newShard1");

// .. make sure the primary shard of 'newShardDB' database is the new shard ..
assert.commandWorked(
    newShardDB.runCommand({create: "unusedCollection"}));  // To trigger creation of a database.
const isNewShardDBOnNewShard =
    configDB.databases.findOne({_id: newShardDB.getName(), primary: "newShard1"}) != null;
if (!isNewShardDBOnNewShard) {
    st.ensurePrimaryShard(newShardDB.getName(), "newShard1");

    // Consume collection drop events from the existing change streams caused by movePrimary command
    // issued by a call to 'ensurePrimaryShard()' above.
    for (let csCursor of [wholeDBCS, wholeClusterCS]) {
        assertCollectionDropEventObserved(csCursor, newShardDB.getName(), "unusedCollection");
    }
}

//... create a new collection, and verify that it was placed on the new shard....
assert.commandWorked(newShardDB.runCommand({create: newShardColl.getName()}));
assert(configDB.databases.findOne({_id: newShardDB.getName(), primary: "newShard1"}));

// ... insert some documents into the new, unsharded collection on the new shard...
assert.commandWorked(newShardColl.insert(insertedDocs));

// ... and confirm that all the pre-existing streams see all of these events.
for (let csCursor of [singleCollCS, wholeDBCS, wholeClusterCS]) {
    assertAllEventsObserved(csCursor, insertedDocs);
}

// Stop the new shard manually since the ShardingTest doesn't know anything about it.
st.stop();
newShard1.stopSet();
})();