1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
// Tests that $merge respects the writeConcern set on the original aggregation command.
(function() {
"use strict";
load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode.
const st = new ShardingTest({shards: 2, rs: {nodes: 3}});
const mongosDB = st.s0.getDB("merge_write_concern");
const source = mongosDB["source"];
const target = mongosDB["target"];
const shard0 = st.rs0;
const shard1 = st.rs1;
// Enable sharding on the test DB and ensure its primary is shard0.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
function testWriteConcernError(rs) {
// Split the target collection at {_id: 10} so that there'll be doc $merge-ed to both shards.
if (FixtureHelpers.isSharded(target)) {
mongosDB.adminCommand({split: target.getFullName(), middle: {_id: 10}});
}
// Make sure that there are only 2 nodes up so w:3 writes will always time out.
const stoppedSecondary = rs.getSecondary();
rs.stop(stoppedSecondary);
// Test that $merge correctly returns a WC error.
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
// When either mode is "fail", a different error rather than WC error is thrown.
if (whenMatchedMode === "fail" || whenNotMatchedMode === "fail") {
return;
}
const res = mongosDB.runCommand({
aggregate: "source",
pipeline: [{
$merge: {
into: "target",
whenMatched: whenMatchedMode,
whenNotMatched: whenNotMatchedMode
}
}],
writeConcern: {w: 3, wtimeout: 100},
cursor: {},
});
jsTestLog("Testing Mode: " + tojson(whenMatchedMode) + tojson(whenNotMatchedMode));
jsTestLog("Target collection after $merge: " + tojson(target.find().toArray()));
let oplogEntries = shard0.getPrimary()
.getDB("local")
.oplog.rs.find({"ns": {$regex: "merge_write_concern.*"}})
.toArray();
jsTestLog("Shard0 oplog entries: " + tojson(oplogEntries));
oplogEntries = shard1.getPrimary()
.getDB("local")
.oplog.rs.find({"ns": {$regex: "merge_write_concern.*"}})
.toArray();
jsTestLog("Shard1 oplog entries: " + tojson(oplogEntries));
// $merge writeConcern errors are handled differently from normal writeConcern
// errors. Rather than returing ok:1 and a WriteConcernError, the entire operation
// fails.
assert.commandFailedWithCode(res, ErrorCodes.WriteConcernFailed);
assert.commandWorked(target.remove({}));
});
// Restart the stopped node and verify that the $merge's now pass.
rs.restart(rs.getSecondary());
rs.awaitReplication();
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
// Skip the combination of merge modes which will fail depending on the contents of the
// source and target collection, as this will cause the assertion below to trip.
if (whenNotMatchedMode == "fail")
return;
const res = mongosDB.runCommand({
aggregate: "source",
pipeline: [{
$merge: {
into: "target",
whenMatched: whenMatchedMode,
whenNotMatched: whenNotMatchedMode
}
}],
writeConcern: {w: 3},
cursor: {},
});
// Ensure that the write concern is satisfied within a reasonable amount of time. This
// prevents the test from hanging if for some reason the write concern can't be
// satisfied.
assert.soon(() => assert.commandWorked(res), "writeConcern was not satisfied");
assert.commandWorked(target.remove({}));
});
}
// Test that when both collections are unsharded, all writes are directed to the primary shard.
assert.commandWorked(source.insert([{_id: -1}, {_id: 0}, {_id: 1}, {_id: 2}]));
testWriteConcernError(shard0);
// Shard the source collection and continue to expect writes to the primary shard.
st.shardColl(source, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
testWriteConcernError(shard0);
// Shard the target collection, however make sure that all writes go to the primary shard by
// splitting the collection at {_id: 10} and keeping all values in the same chunk.
st.shardColl(target, {_id: 1}, {_id: 10}, {_id: 10}, mongosDB.getName());
assert.eq(FixtureHelpers.isSharded(target), true);
testWriteConcernError(shard0);
// Write a few documents to the source collection which will be $merge-ed to the second shard.
assert.commandWorked(source.insert([{_id: 11}, {_id: 12}, {_id: 13}]));
// Verify that either shard can produce a WriteConcernError since writes are going to both.
testWriteConcernError(shard0);
testWriteConcernError(shard1);
// Verify that either shard can produce a WriteConcernError when the CatalogCache is empty.
[shard0, shard1].forEach(function(shard) {
shard.nodes.forEach(function(node) {
assert.commandWorked(node.adminCommand({flushRouterConfig: target.getFullName()}));
});
});
testWriteConcernError(shard0);
testWriteConcernError(shard1);
st.stop();
}());
|