summaryrefslogtreecommitdiff
path: root/jstests/concurrency/fsm_workloads/multi_statement_transaction_atomicity_isolation.js
blob: 4c5e8e30b8fef885e3fee1d2da24183144046261 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
'use strict';

/**
 * multi_statement_transaction_atomicity_isolation.js
 *
 * Inserts a handful of documents into a collection. Each thread then updates multiple documents
 * inside of a multi-statement transaction. The resulting documents are of the form
 *
 *   {_id: 0, order: [{tid: 0, iteration: 0, numUpdated: 3},
 *                    {tid: 1, iteration: 0, numUpdated: 2},
 *                    ...]}
 *
 *   {_id: 1, order: [{tid: 0, iteration: 0, numUpdated: 3},
 *                    {tid: 1, iteration: 0, numUpdated: 2},
 *                    ...]}
 *
 *   {_id: 2, order: [{tid: 0, iteration: 0, numUpdated: 3}, ...]}
 *
 * where the {tid: 0, iteration: 0, numUpdated: 3} element should occur 3 times and should always
 * come before the {tid: 1, iteration: 0, numUpdated: 2} element's 2 occurrences. In other words, we
 * track
 *
 *   (1) the relative order in which each of the transactions commit based on their position within
 *       the array, and
 *
 *   (2) the expected number of occurrences for each element in the array.
 *
 * An anomaly is detected if either
 *
 *   (a) transaction A's (tid, txnNumber, numToUpdate) element precedes transaction B's
 *       (tid, txnNumber, numToUpdate) element in one document and follows it in another. This would
 *       suggest that the database failed to detect a write-write conflict despite both transactions
 *       modifying the same document and is therefore not providing snapshot isolation.
 *
 *   (b) transaction C's (tid, txnNumber, numToUpdate) element doesn't appear numToUpdate times
 *       across a consistent snapshot of all of the documents. This would suggest that the database
 *       failed to atomically update all documents modified in a concurrent transaction.
 *
 *   (c) transaction D's (tid, txnNumber) for a given (docId, dbName, collName) doesn't match the
 *       (tid, txnNumber) for the thread with threadId == tid. This indicates that there are writes
 *       that exist in the database that were not committed.
 *
 * @tags: [uses_transactions, assumes_snapshot_transactions]
 */

// for Graph
load('jstests/libs/cycle_detection.js');

// For withTxnAndAutoRetry, isKilledSessionCode.
load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js');

// For arrayEq.
load("jstests/aggregation/extras/utils.js");

var $config = (function() {
    function checkTransactionCommitOrder(documents) {
        const graph = new Graph();

        for (let doc of documents) {
            const commitOrder = doc.order;
            for (let i = 1; i < commitOrder.length; ++i) {
                // We add an edge from commitOrder[i - 1] to commitOrder[i] because it being earlier
                // in the array that was $push'd to indicates that the commitOrder[i - 1]
                // transaction happened before the commit[i] transaction order.
                graph.addEdge(commitOrder[i - 1], commitOrder[i]);
            }
        }

        const result = graph.findCycle();

        if (result.length > 0) {
            const isOpPartOfCycle = (op) =>
                result.some(cyclicOp => bsonBinaryEqual({_: op}, {_: cyclicOp}));

            const filteredDocuments = documents.map(doc => {
                const filteredCommitOrder = doc.order.filter(isOpPartOfCycle);
                return Object.assign({}, doc, {order: filteredCommitOrder});
            });

            assertWhenOwnColl.eq([], result, tojson(filteredDocuments));
        }
    }

    function checkNumUpdatedByEachTransaction(documents) {
        const updateCounts = new Map();

        for (let doc of documents) {
            for (let op of doc.order) {
                // We store 'op' both as the key and as part of the value because the mongo shell's
                // Map type mangles the key and doesn't provide an API to retrieve the original
                // key-value pairs.
                const value = updateCounts.get(op) || {op, actual: 0};
                updateCounts.put(op, {op, actual: value.actual + 1});
            }
        }

        for (let {op, actual} of updateCounts.values()) {
            assert.eq(op.numUpdated, actual, () => {
                return 'transaction ' + tojson(op) + ' should have updated ' + op.numUpdated +
                    ' documents, but ' + actual + ' were updated: ' + tojson(updateCounts.values());
            });
        }
    }

    // Check that only writes that were committed are reflected in the database. Writes that were
    // committed are reflected in $config.data.updatedDocsClientHistory.
    function checkWritesOfCommittedTxns(data, documents) {
        // updatedDocsServerHistory is a dictionary of {txnNum: [list of {docId, dbName, collName}]}
        // that were updated by this thread (this.tid) and exist in the database.
        let updatedDocsServerHistory = [];
        for (let doc of documents) {
            for (let i = 0; i < doc.order.length; i++) {
                // Pull out only those docIds and txnNums that were updated by this thread.
                if (doc.order[i].tid === data.tid) {
                    const txnNum = doc.metadata[i].txnNum.valueOf();
                    updatedDocsServerHistory[txnNum] = updatedDocsServerHistory[txnNum] || [];
                    updatedDocsServerHistory[txnNum].push({
                        _id: doc._id,
                        dbName: doc.metadata[i].dbName,
                        collName: doc.metadata[i].collName
                    });
                }
            }
        }

        // Assert that any transaction written down in $config.data also exists in the database
        // and that the docIds associated with this txnNum in $config.data match those docIds
        // associated with this txnNum in the database.
        const highestTxnNum =
            Math.max(updatedDocsServerHistory.length, data.updatedDocsClientHistory.length);
        for (let txnNum = 0; txnNum < highestTxnNum; ++txnNum) {
            assertAlways((arrayEq(updatedDocsServerHistory[txnNum] || [],
                                  data.updatedDocsClientHistory[txnNum] || [])),
                         () => 'expected ' + tojson(data.updatedDocsClientHistory[txnNum]) +
                             ' but instead have ' + tojson(updatedDocsServerHistory[txnNum]) +
                             ' for txnNumber ' + txnNum);
        }
    }

    // Overridable functions for subclasses to do more complex transactions.
    const getAllCollections = (db, collName) => [db.getCollection(collName)];

    function getAllDocuments(numDocs, {useTxn}) {
        let allDocuments = [];

        if (useTxn) {
            withTxnAndAutoRetry(this.session, () => {
                allDocuments = [];
                for (let collection of this.collections) {
                    const txnDbName = collection.getDB().getName();
                    const txnCollName = collection.getName();
                    const txnCollection =
                        this.session.getDatabase(txnDbName).getCollection(txnCollName);

                    // We intentionally use a smaller batch size when fetching all of the
                    // documents in the collection in order to stress the behavior of reading
                    // from the same snapshot over the course of multiple network roundtrips.
                    const batchSize = Math.max(2, Math.floor(numDocs / 5));
                    allDocuments.push(...txnCollection.find().batchSize(batchSize).toArray());
                }
            }, {
                retryOnKilledSession: this.retryOnKilledSession,
                prepareProbability: this.prepareProbability
            });
        } else {
            for (let collection of this.collections) {
                allDocuments.push(...collection.find().toArray());
            }
        }

        assertWhenOwnColl.eq(allDocuments.length, numDocs * this.collections.length, () => {
            if (this.session) {
                return "txnNumber: " + tojson(this.session.getTxnNumber_forTesting()) +
                    ", session id: " + tojson(this.session.getSessionId()) +
                    ", all documents: " + tojson(allDocuments);
            }
            return "all documents: " + tojson(allDocuments);
        });

        return allDocuments;
    }

    function getDocIdsToUpdate(numDocs) {
        // Generate between [2, numDocs / 2] operations.
        const numOps = 2 + Random.randInt(Math.ceil(numDocs / 2) - 1);
        // Select 'numOps' document (without replacement) to update.
        let docIds = Array.from({length: numDocs}, (value, index) => index);
        return Array.shuffle(docIds).slice(0, numOps);
    }

    const states = (function() {
        return {
            init: function init(db, collName) {
                this.iteration = 0;
                // Set causalConsistency = true to ensure that in the checkConsistency state
                // function, we will be able to read our own writes that were committed as a
                // part of the update state function.
                this.session = db.getMongo().startSession({causalConsistency: true});
                this.collections = this.getAllCollections(db, collName);
                this.totalIteration = 1;
            },

            update: function update(db, collName) {
                const docIds = this.getDocIdsToUpdate(this.numDocs);
                let committedTxnInfo;
                let txnNumber;

                withTxnAndAutoRetry(this.session, () => {
                    committedTxnInfo = [];
                    for (let [i, docId] of docIds.entries()) {
                        const collection =
                            this.collections[Random.randInt(this.collections.length)];
                        const txnDbName = collection.getDB().getName();
                        const txnCollName = collection.getName();
                        txnNumber = this.session.getTxnNumber_forTesting();

                        // We apply the following update to each of the 'docIds' documents
                        // to record the number of times we expect to see the transaction
                        // being run in this execution of the update() state function by this
                        // worker thread present across all documents. Using the $push
                        // operator causes a transaction which commits after another
                        // transaction to appear later in the array.
                        const updateMods = {
                            $push: {
                                order: {
                                    tid: this.tid,
                                    iteration: this.iteration,
                                    numUpdated: docIds.length,
                                },
                                metadata: {
                                    dbName: txnDbName,
                                    collName: txnCollName,
                                    txnNum: txnNumber,
                                }
                            }
                        };

                        const txnCollection =
                            this.session.getDatabase(txnDbName).getCollection(txnCollName);
                        const coll = txnCollection.find().toArray();
                        const res = txnCollection.runCommand('update', {
                            updates: [{q: {_id: docId}, u: updateMods}],
                        });
                        assertAlways.commandWorked(
                            res,
                            () => "Failed to update. result: " + tojson(res) +
                                ", collection: " + tojson(coll));
                        assertWhenOwnColl.eq(res.n, 1, () => tojson(res));
                        assertWhenOwnColl.eq(res.nModified, 1, () => tojson(res));
                        committedTxnInfo.push(
                            {_id: docId, dbName: txnDbName, collName: txnCollName});
                    }
                }, {
                    retryOnKilledSession: this.retryOnKilledSession,
                    prepareProbability: this.prepareProbability
                });

                this.updatedDocsClientHistory[txnNumber.valueOf()] = committedTxnInfo;
                ++this.iteration;
            },

            checkConsistency: function checkConsistency(db, collName) {
                const documents = this.getAllDocuments(this.numDocs, {useTxn: true});
                checkTransactionCommitOrder(documents);
                checkNumUpdatedByEachTransaction(documents);
                checkWritesOfCommittedTxns(this, documents);
            },

            causalRead: function causalRead(db, collName) {
                const collection = this.collections[Random.randInt(this.collections.length)];
                const randomDbName = collection.getDB().getName();
                const randomCollName = collection.getName();

                // We do a non-transactional read on a causally consistent session so that it uses
                // 'afterClusterTime' internally and is subject to prepare conflicts (in a sharded
                // cluster). This is meant to expose deadlocks involving prepare conflicts outside
                // of transactions.
                const sessionCollection =
                    this.session.getDatabase(randomDbName).getCollection(randomCollName);

                try {
                    const cursor = sessionCollection.find();
                    assertAlways.eq(cursor.itcount(), this.numDocs);
                } catch (e) {
                    if (this.retryOnKilledSession && isKilledSessionCode(e.code)) {
                        // If the session is expected to be killed, ignore it.
                        return;
                    }
                    throw e;
                }
            }
        };
    })();

    /**
     * This function wraps the state functions and causes each thread to run checkConsistency()
     * before teardown.
     */
    function checkConsistencyOnLastIteration(data, func, checkConsistencyFunc) {
        let lastIteration = ++data.totalIteration >= data.iterations;
        func();
        if (lastIteration) {
            checkConsistencyFunc();
        }
    }

    // Wrap each state in a checkConsistencyOnLastIteration() invocation.
    for (let stateName of Object.keys(states)) {
        const stateFn = states[stateName];
        const checkConsistencyFn = states['checkConsistency'];
        states[stateName] = function(db, collName) {
            checkConsistencyOnLastIteration(this,
                                            () => stateFn.apply(this, arguments),
                                            () => checkConsistencyFn.apply(this, arguments));
        };
    }

    const transitions = {
        init: {update: 0.9, checkConsistency: 0.1},
        update: {update: 0.8, checkConsistency: 0.1, causalRead: 0.1},
        checkConsistency: {update: 1},
        causalRead: {update: 1},
    };

    function setup(db, collName, cluster) {
        // The default WC is majority and this workload may not be able to satisfy majority writes.
        if (cluster.isSharded()) {
            cluster.executeOnMongosNodes(function(db) {
                assert.commandWorked(db.adminCommand({
                    setDefaultRWConcern: 1,
                    defaultWriteConcern: {w: 1},
                    writeConcern: {w: "majority"}
                }));
            });
        } else if (cluster.isReplication()) {
            assert.commandWorked(db.adminCommand({
                setDefaultRWConcern: 1,
                defaultWriteConcern: {w: 1},
                writeConcern: {w: "majority"}
            }));
        }

        this.collections = this.getAllCollections(db, collName);

        for (let collection of this.collections) {
            const bulk = collection.initializeUnorderedBulkOp();

            for (let i = 0; i < this.numDocs; ++i) {
                bulk.insert({_id: i, order: []});
            }

            const res = bulk.execute({w: 'majority'});
            assertWhenOwnColl.commandWorked(res);
            assertWhenOwnColl.eq(this.numDocs, res.nInserted);
        }

        if (cluster.isSharded()) {
            // Advance each router's cluster time to be >= the time of the writes, so the first
            // global snapshots chosen by each is guaranteed to include the inserted documents.
            cluster.synchronizeMongosClusterTimes();
        }
    }

    function teardown(db, collName, cluster) {
        const documents = this.getAllDocuments(this.numDocs, {useTxn: false});
        checkTransactionCommitOrder(documents);
        checkNumUpdatedByEachTransaction(documents);

        // Unsetting CWWC is not allowed, so explicitly restore the default write concern to be
        // majority by setting CWWC to {w: majority}.
        if (cluster.isSharded()) {
            cluster.executeOnMongosNodes(function(db) {
                assert.commandWorked(db.adminCommand({
                    setDefaultRWConcern: 1,
                    defaultWriteConcern: {w: "majority"},
                    writeConcern: {w: "majority"}
                }));
            });
        } else if (cluster.isReplication()) {
            assert.commandWorked(db.adminCommand({
                setDefaultRWConcern: 1,
                defaultWriteConcern: {w: "majority"},
                writeConcern: {w: "majority"}
            }));
        }
    }

    return {
        threadCount: 10,
        iterations: 50,
        states: states,
        transitions: transitions,
        data: {
            getAllCollections,
            getAllDocuments,
            getDocIdsToUpdate,
            numDocs: 10,
            // Because of the usage of 'getMore' command in this test, we may receive
            // 'CursorNotFound' exception from the server, if a node was stepped down between the
            // 'find' and subsequent 'getMore' command. We retry the entire transaction in this
            // case.
            retryOnKilledSession: TestData.runningWithShardStepdowns,
            updatedDocsClientHistory: [],
        },
        setup: setup,
        teardown: teardown,
    };
})();