diff options
author | Kevin Cherkauer <kevin.cherkauer@mongodb.com> | 2022-11-17 00:43:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-17 01:15:34 +0000 |
commit | 3fdd9c6c3e71e9be3478f840afab8eea93c2061c (patch) | |
tree | 8c02b1d2ff9d1ebf3ca958160f099790a5d40007 /src | |
parent | 6b46d34b665f34e7c0d4794d0f8964aa9e36b289 (diff) | |
download | mongo-3fdd9c6c3e71e9be3478f840afab8eea93c2061c.tar.gz |
SERVER-70392 Named Pipes _writeTestPipe shell function
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/storage/external_record_store_test.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/storage/input_stream.h | 1 | ||||
-rw-r--r-- | src/mongo/db/storage/named_pipe_posix.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/storage/named_pipe_windows.cpp | 22 | ||||
-rw-r--r-- | src/mongo/shell/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/shell/named_pipe_test_helper.cpp | 194 | ||||
-rw-r--r-- | src/mongo/shell/named_pipe_test_helper.h | 64 | ||||
-rw-r--r-- | src/mongo/shell/shell_utils_launcher.cpp | 147 |
8 files changed, 440 insertions, 23 deletions
diff --git a/src/mongo/db/storage/external_record_store_test.cpp b/src/mongo/db/storage/external_record_store_test.cpp index 3ff696f02ab..135fbe7d43d 100644 --- a/src/mongo/db/storage/external_record_store_test.cpp +++ b/src/mongo/db/storage/external_record_store_test.cpp @@ -256,7 +256,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) { (ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]), StorageTypeEnum::pipe, FileTypeEnum::bson); - vopts.dataSources.push_back(meta); + vopts.dataSources.emplace_back(meta); } MultiBsonStreamCursor msbc = MultiBsonStreamCursor(vopts); @@ -273,7 +273,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) { ASSERT_EQ(recIdExpected, recId) << "Expected record->id {} but got {}"_format(recIdExpected, recId); ASSERT_EQ(record->data.size(), bsonObjs[pipeIdx][0].objsize()) - << "record.data.size() {} != original size {}"_format( + << "record->data.size() {} != original size {}"_format( record->data.size(), bsonObjs[pipeIdx][0].objsize()); ASSERT_EQ(std::memcmp(record->data.data(), bsonObjs[pipeIdx][0].objdata(), @@ -357,7 +357,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) { (ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]), StorageTypeEnum::pipe, FileTypeEnum::bson); - vopts.dataSources.push_back(meta); + vopts.dataSources.emplace_back(meta); } MultiBsonStreamCursor msbc(vopts); @@ -374,8 +374,8 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) { ASSERT_EQ(recIdExpected, recId) << "Expected record->id {} but got {}"_format(recIdExpected, recId); ASSERT_EQ(record->data.size(), bsonObjs[objIdx].objsize()) - << "record.data.size() {} != original size {}"_format(record->data.size(), - bsonObjs[objIdx].objsize()); + << "record->data.size() {} != original size {}"_format(record->data.size(), + bsonObjs[objIdx].objsize()); ASSERT_EQ(std::memcmp(record->data.data(), bsonObjs[objIdx].objdata(), bsonObjs[objIdx].objsize()), @@ -443,7 +443,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) { (ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]), StorageTypeEnum::pipe, FileTypeEnum::bson); - vopts.dataSources.push_back(meta); + vopts.dataSources.emplace_back(meta); } MultiBsonStreamCursor msbc(vopts); @@ -459,8 +459,8 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) { ASSERT_EQ(recIdExpected, recId) << "Expected record->id {} but got {}"_format(recIdExpected, recId); ASSERT_EQ(record->data.size(), bsonObjs[0].objsize()) - << "record.data.size() {} != original size {}"_format(record->data.size(), - bsonObjs[0].objsize()); + << "record->data.size() {} != original size {}"_format(record->data.size(), + bsonObjs[0].objsize()); ASSERT_EQ( std::memcmp(record->data.data(), bsonObjs[0].objdata(), bsonObjs[0].objsize()), 0) << "Read data is not same as the source data"; @@ -496,7 +496,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) { objsWritten += numObjs; std::string fieldName = "field_{}"_format(pipeIdx); for (int objIdx = 0; objIdx < numObjs; ++objIdx) { - pipeBsonObjs[pipeIdx].push_back(BSON(fieldName << getRandomString(rand() % 2048))); + pipeBsonObjs[pipeIdx].emplace_back(BSON(fieldName << getRandomString(rand() % 2048))); } } @@ -527,7 +527,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) { (ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]), StorageTypeEnum::pipe, FileTypeEnum::bson); - vopts.dataSources.push_back(meta); + vopts.dataSources.emplace_back(meta); } MultiBsonStreamCursor msbc(vopts); @@ -551,7 +551,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) { ASSERT_EQ(recIdExpected, recId) << "Expected record->id {} but got {}"_format(recIdExpected, recId); ASSERT_EQ(record->data.size(), pipeBsonObjs[pipeIdx][pipeObjsRead - 1].objsize()) - << "record.data.size() {} != original size {}"_format( + << "record->data.size() {} != original size {}"_format( record->data.size(), pipeBsonObjs[pipeIdx][pipeObjsRead - 1].objsize()); ASSERT_EQ(std::memcmp(record->data.data(), pipeBsonObjs[pipeIdx][pipeObjsRead - 1].objdata(), diff --git a/src/mongo/db/storage/input_stream.h b/src/mongo/db/storage/input_stream.h index 8e5dcdf65cc..a1f9dbe502c 100644 --- a/src/mongo/db/storage/input_stream.h +++ b/src/mongo/db/storage/input_stream.h @@ -98,7 +98,6 @@ public: // If we reach this point, we accumulated fewer than 'count' bytes. if (MONGO_likely(InputT::isEof())) { - LOGV2_INFO(7005001, "Named pipe is closed", "path"_attr = InputT::getAbsolutePath()); return nReadTotal; } diff --git a/src/mongo/db/storage/named_pipe_posix.cpp b/src/mongo/db/storage/named_pipe_posix.cpp index 624c72b1db8..cf940dfe5e7 100644 --- a/src/mongo/db/storage/named_pipe_posix.cpp +++ b/src/mongo/db/storage/named_pipe_posix.cpp @@ -28,7 +28,8 @@ */ #ifndef _WIN32 -#include "named_pipe.h" + +#include "mongo/db/storage/named_pipe.h" #include <fmt/format.h> #include <string> @@ -91,6 +92,14 @@ NamedPipeInput::~NamedPipeInput() { } void NamedPipeInput::doOpen() { + // MultiBsonStreamCursor's (MBSC) assembly buffer is designed to perform well without a lower- + // layer IO buffer. Removing std::ifstream's default 8k "associated buffer" improves throughput + // by 1.9% by eliminating the hidden copies from that buffer to MBSC's buffer. MBSC itself will + // never copy data except when it (rarely) needs to expand its buffer, so by removing + // std::ifstream's buffer we get an essentially zero-copy cursor that still avoids lots of tiny + // IOs due to MBSC's assembly buffer algorithm. + _ifs.rdbuf()->pubsetbuf(0, 0); + // Retry open every 1 ms for up to 1 sec in case writer has not created the pipe yet. int retries = 0; do { diff --git a/src/mongo/db/storage/named_pipe_windows.cpp b/src/mongo/db/storage/named_pipe_windows.cpp index aa4d530cab9..4dd20cbc6bf 100644 --- a/src/mongo/db/storage/named_pipe_windows.cpp +++ b/src/mongo/db/storage/named_pipe_windows.cpp @@ -36,6 +36,7 @@ #include "mongo/db/storage/io_error_message.h" #include "mongo/logv2/log.h" +#include "mongo/stdx/thread.h" #include "mongo/util/errno_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage @@ -123,14 +124,21 @@ NamedPipeInput::~NamedPipeInput() { } void NamedPipeInput::doOpen() { - _pipe = - CreateFileA(_pipeAbsolutePath.c_str(), GENERIC_READ, 0, nullptr, OPEN_EXISTING, 0, nullptr); - if (_pipe == INVALID_HANDLE_VALUE) { - return; - } + // Retry open every 1 ms for up to 1 sec in case writer has not created the pipe yet. + int retries = 0; + do { + _pipe = CreateFileA( + _pipeAbsolutePath.c_str(), GENERIC_READ, 0, nullptr, OPEN_EXISTING, 0, nullptr); + if (_pipe == INVALID_HANDLE_VALUE) { + stdx::this_thread::sleep_for(stdx::chrono::milliseconds(1)); + ++retries; + } + } while (_pipe == INVALID_HANDLE_VALUE && retries <= 1000); - _isOpen = true; - _isGood = true; + if (_pipe != INVALID_HANDLE_VALUE) { + _isOpen = true; + _isGood = true; + } } int NamedPipeInput::doRead(char* data, int size) { diff --git a/src/mongo/shell/SConscript b/src/mongo/shell/SConscript index d99de6a0ed6..1179246d35c 100644 --- a/src/mongo/shell/SConscript +++ b/src/mongo/shell/SConscript @@ -123,6 +123,7 @@ env.Library( target='shell_utils', source=[ 'mongo-server.cpp', + 'named_pipe_test_helper.cpp', 'shell_options.cpp', 'shell_utils.cpp', 'shell_utils_extended.cpp', @@ -143,6 +144,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/bson/util/bson_column', '$BUILD_DIR/mongo/db/auth/security_token', + '$BUILD_DIR/mongo/db/storage/record_store_base', ], ) diff --git a/src/mongo/shell/named_pipe_test_helper.cpp b/src/mongo/shell/named_pipe_test_helper.cpp new file mode 100644 index 00000000000..0603af11f8b --- /dev/null +++ b/src/mongo/shell/named_pipe_test_helper.cpp @@ -0,0 +1,194 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/shell/named_pipe_test_helper.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/catalog/virtual_collection_options.h" +#include "mongo/db/storage/multi_bson_stream_cursor.h" +#include "mongo/db/storage/named_pipe.h" +#include "mongo/stdx/chrono.h" + +namespace mongo { +/** + * Gets a string of 'length' 'a' chars as efficiently as possible. + */ +std::string NamedPipeHelper::getString(int length) { + return std::string(length, 'a'); +} + +/** + * Reads all BSON objects from all named pipes in 'pipeRelativePaths' and returns the following + * stats in a BSON object: + * { + * "objects": number of objects read, + * "time": { total time consumed in... + * "sec": seconds, + * "msec": milliseconds, + * "usec": microseconds, + * "nsec": nanoseconds, + * }, + * "rate": { data processing rate in... + * "mbps": megabytes / second, + * "gbps": gigabytes / second, + * }, + * "totalSize": { total size of all objects in + * "bytes": bytes, + * "kb": kilobytes, + * "mb": megabytes, + * "gb": gigabytes, + * } + * } + */ +BSONObj NamedPipeHelper::readFromPipes(const std::vector<std::string>& pipeRelativePaths) { + stdx::chrono::system_clock::time_point startTime = stdx::chrono::system_clock::now(); + double objects = 0.0; // return stat + double totalSizeBytes = 0.0; // return stat + + // Create metadata describing the pipes and a MultiBsonStreamCursor to read them. + VirtualCollectionOptions vopts; + for (const std::string& pipeRelativePath : pipeRelativePaths) { + ExternalDataSourceMetadata meta( + (ExternalDataSourceMetadata::kUrlProtocolFile + pipeRelativePath), + StorageTypeEnum::pipe, + FileTypeEnum::bson); + vopts.dataSources.emplace_back(meta); + } + MultiBsonStreamCursor msbc(vopts); + + // Use MultiBsonStreamCursor to read the pipes. + boost::optional<Record> record = boost::none; + do { + record = msbc.next(); + if (record) { + ++objects; + totalSizeBytes += record->data.size(); + } + } while (record); + stdx::chrono::system_clock::time_point finishTime = stdx::chrono::system_clock::now(); + auto duration = finishTime - startTime; + + double sec = stdx::chrono::duration_cast<stdx::chrono::seconds>(duration).count(); + double msec = stdx::chrono::duration_cast<stdx::chrono::milliseconds>(duration).count(); + double usec = stdx::chrono::duration_cast<stdx::chrono::microseconds>(duration).count(); + double nsec = stdx::chrono::duration_cast<stdx::chrono::nanoseconds>(duration).count(); + double mbps = (totalSizeBytes / (1024.0 * 1024.0)) / (nsec / (1000.0 * 1000.0 * 1000.0)); + double gbps = mbps / 1024.0; + return BSON("" << BSON("objects" + << objects << "time" + << BSON("sec" << sec << "msec" << msec << "usec" << usec << "nsec" + << nsec) + << "rate" << BSON("mbps" << mbps << "gbps" << gbps) << "totalSize" + << BSON("bytes" << totalSizeBytes << "kb" << (totalSizeBytes / 1024.0) + << "mb" << (totalSizeBytes / (1024.0 * 1024.0)) << "gb" + << (totalSizeBytes / (1024.0 * 1024.0 * 1024.0))))); +} + +/** + * Synchronously writes 'objects' random BSON objects to named pipe 'pipeRelativePath'. The "string" + * field of these objects will have stringMinSize <= string.length() <= stringMaxSize. Note that + * the open() call itself will block until a pipe reader attaches to the same pipe. Absorbs + * exceptions because this is called by an async detached thread, so escaping exceptions will cause + * fuzzer tests to fail as its try blocks are only around the main thread. + */ +void NamedPipeHelper::writeToPipe(const std::string& pipeRelativePath, + long objects, + long stringMinSize, + long stringMaxSize) noexcept { + try { + NamedPipeOutput pipeWriter(pipeRelativePath); // producer + + pipeWriter.open(); + for (long obj = 0; obj < objects; ++obj) { + int length = std::rand() % (1 + stringMaxSize - stringMinSize) + stringMinSize; + BSONObj bsonObj{BSON("length" << length << "string" << getString(length))}; + pipeWriter.write(bsonObj.objdata(), bsonObj.objsize()); + } + pipeWriter.close(); + } catch (const DBException& exc) { + std::cout << "NamedPipeHelper::writeToPipe caught exception: " << exc.toString() + << std::endl; + } catch (...) { + // absorb + } +} + +/** + * Asynchronously writes 'objects' random BSON objects to named pipe 'pipeRelativePath'. The + * "string" field of these objects will have stringMinSize <= string.length() <= stringMaxSize. + */ +void NamedPipeHelper::writeToPipeAsync(const std::string& pipeRelativePath, + long objects, + long stringMinSize, + long stringMaxSize) { + stdx::thread thread(writeToPipe, pipeRelativePath, objects, stringMinSize, stringMaxSize); + thread.detach(); +} + +/** + * Synchronously writes 'objects' BSON objects round-robinned from 'bsonObjs' to named pipe + * 'pipeRelativePath'. Note that the open() call itself will block until a pipe reader attaches to + * the same pipe. Absorbs exceptions because this is called by an async detached thread, so escaping + * exceptions will cause fuzzer tests to fail as its try blocks are only around the main thread. + */ +void NamedPipeHelper::writeToPipeObjects(const std::string& pipeRelativePath, + long objects, + const std::vector<BSONObj>& bsonObjs) noexcept { + try { + const int kNumBsonObjs = bsonObjs.size(); + NamedPipeOutput pipeWriter(pipeRelativePath); // producer + + pipeWriter.open(); + for (long obj = 0; obj < objects; ++obj) { + BSONObj bsonObj{bsonObjs[obj % kNumBsonObjs]}; + pipeWriter.write(bsonObj.objdata(), bsonObj.objsize()); + } + pipeWriter.close(); + } catch (const DBException& exc) { + std::cout << "NamedPipeHelper::writeToPipeObjects caught exception: " << exc.toString() + << std::endl; + } catch (...) { + // absorb + } +} + +/** + * Asynchronously writes 'objects' BSON objects round-robinned from 'bsonObjs' to named pipe + * 'pipeRelativePath'. + */ +void NamedPipeHelper::writeToPipeObjectsAsync(const std::string& pipeRelativePath, + long objects, + const std::vector<BSONObj>& bsonObjs) { + stdx::thread thread( + writeToPipeObjects, std::move(pipeRelativePath), objects, std::move(bsonObjs)); + thread.detach(); +} +} // namespace mongo diff --git a/src/mongo/shell/named_pipe_test_helper.h b/src/mongo/shell/named_pipe_test_helper.h new file mode 100644 index 00000000000..5da8990db3e --- /dev/null +++ b/src/mongo/shell/named_pipe_test_helper.h @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/storage/named_pipe.h" + +#include <string> + +#include "mongo/bson/bsonelement.h" +#include "mongo/stdx/thread.h" + +namespace mongo { +/** + * This class supports writing and reading named pipes in the mongo test shell. + */ +class NamedPipeHelper { +public: + static BSONObj readFromPipes(const std::vector<std::string>& pipeRelativePaths); + static void writeToPipeAsync(const std::string& pipeRelativePath, + long objects, + long stringMinSize, + long stringMaxSize); + static void writeToPipeObjectsAsync(const std::string& pipeRelativePath, + long objects, + const std::vector<BSONObj>& bsonObjs); + +private: + static std::string getString(int length); + static void writeToPipe(const std::string& pipeRelativePath, + long objects, + long stringMinSize, + long stringMaxSize) noexcept; + static void writeToPipeObjects(const std::string& pipeRelativePath, + long objects, + const std::vector<BSONObj>& bsonObjs) noexcept; +}; +} // namespace mongo diff --git a/src/mongo/shell/shell_utils_launcher.cpp b/src/mongo/shell/shell_utils_launcher.cpp index c781e9dae05..173bb7a9b24 100644 --- a/src/mongo/shell/shell_utils_launcher.cpp +++ b/src/mongo/shell/shell_utils_launcher.cpp @@ -27,9 +27,6 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/shell/shell_utils_launcher.h" #include <algorithm> @@ -61,11 +58,14 @@ #include "mongo/base/environment_buffer.h" #include "mongo/base/error_codes.h" +#include "mongo/bson/bsonelement.h" #include "mongo/bson/util/builder.h" #include "mongo/client/dbclient_connection.h" #include "mongo/db/traffic_reader.h" #include "mongo/logv2/log.h" +#include "mongo/platform/basic.h" #include "mongo/scripting/engine.h" +#include "mongo/shell/named_pipe_test_helper.h" #include "mongo/shell/shell_options.h" #include "mongo/shell/shell_utils.h" #include "mongo/util/assert_util.h" @@ -1262,6 +1262,124 @@ int KillMongoProgramInstances() { return returnCode; } +/** + * Reads a set of test named pipes. 'args' BSONObj should contain one or more fields like: + * "0": string; relative path of the first pipe + * "1": string; relative path of the second pipe + * ... + * Any field names not sequentially numbered from 0 will be ignored. + */ +BSONObj ReadTestPipes(const BSONObj& args, void* unused) { + int fieldNum = 0; // next field name in numeric form + BSONElement pipePathElem; // next pipe relative path + std::vector<std::string> pipeRelativePaths; // all pipe relative paths + + do { + pipePathElem = BSONElement(args.getField(std::to_string(fieldNum))); + if (pipePathElem.type() == BSONType::String) { + pipeRelativePaths.emplace_back(pipePathElem.str()); + } else if (pipePathElem.type() != BSONType::EOO) { + uasserted(ErrorCodes::FailedToParse, + "Argument {} (pipe path) must be a string"_format(fieldNum)); + } + ++fieldNum; + } while (pipePathElem.type() != BSONType::EOO); + + if (pipeRelativePaths.size() > 0) { + return NamedPipeHelper::readFromPipes(pipeRelativePaths); + } + return {}; +} + +/** + * Writes a test named pipe of generated BSONobj's. 'args' BSONObj should contain fields: + * "0": string; relative path of the pipe + * "1": number; number of BSON objects to write to the pipe + * "2": OPTIONAL number; lower bound on size of "string" field in generated object (default 0) + * "3": OPTIONAL number; upper bound on size of "string" field in generated object (default 2048) + * capped at 16,750,000 (slightly less than BSON object maximum of 16 MB) + */ +BSONObj WriteTestPipe(const BSONObj& args, void* unused) { + const long kStringMaxSize = 16750000; // max allowed size for generated object's "string" field + BSONElement pipePathElem(args.getField("0")); + BSONElement objectsElem(args.getField("1")); + BSONElement stringMinSizeStr(args.getField("2")); + BSONElement stringMaxSizeStr(args.getField("3")); + long stringMinSize = 0; // default "string" field minimum size + long stringMaxSize = 2048; // default "string" field maximum size + + uassert(ErrorCodes::FailedToParse, + "First argument (pipe path) must be a string", + pipePathElem.type() == BSONType::String); + uassert(ErrorCodes::FailedToParse, + "Second argument (number of objects) must be a number", + objectsElem.isNumber()); + if (stringMinSizeStr.isNumber()) { // optional + stringMinSize = stringMinSizeStr.numberLong(); + if (stringMinSize < 0) { + stringMinSize = 0; + } + if (stringMinSize > kStringMaxSize) { + stringMinSize = kStringMaxSize; + } + } + if (stringMaxSizeStr.isNumber()) { // optional + stringMaxSize = stringMaxSizeStr.numberLong(); + if (stringMaxSize < 0) { + stringMaxSize = 0; + } + if (stringMaxSize > kStringMaxSize) { + stringMaxSize = kStringMaxSize; + } + } + uassert(ErrorCodes::FailedToParse, + "Third argument (string min size) must be <= fourth argument (string max size)", + stringMinSize <= stringMaxSize); + + NamedPipeHelper::writeToPipeAsync( + pipePathElem.str(), objectsElem.numberLong(), stringMinSize, stringMaxSize); + + return {}; +} + + +/** + * Writes a test named pipe by round-robinning caller-provided objects to the pipe. 'args' BSONObj + * should contain fields: + * "0": string; relative path of the pipe + * "1": number; number of BSON objects to write to the pipe + * "2": BSONArray; array of objects to round-robin write to the pipe + */ +BSONObj WriteTestPipeObjects(const BSONObj& args, void* unused) { + BSONElement pipePathElem(args.getField("0")); + BSONElement objectsElem(args.getField("1")); + BSONElement bsonElems(args.getField("2")); + + uassert(ErrorCodes::FailedToParse, + "First argument (pipe path) must be a string", + pipePathElem.type() == BSONType::String); + uassert(ErrorCodes::FailedToParse, + "Second argument (number of objects) must be a number", + objectsElem.isNumber()); + uassert(ErrorCodes::FailedToParse, + "Third argument must be an array of objects to round-robin over", + bsonElems.type() == mongo::Array); + + // Convert bsonElems into bsonObjs as the former are pointers into local stack memory that will + // become invalid when this method returns, but they are needed by the async writer thread. + std::vector<BSONElement> bsonElemsVector = bsonElems.Array(); + std::vector<BSONObj> bsonObjs; + for (BSONElement bsonElem : bsonElemsVector) { + bsonObjs.emplace_back(bsonElem.Obj().getOwned()); + } + + // Write the pipe asynchronously. + NamedPipeHelper::writeToPipeObjectsAsync( + pipePathElem.str(), objectsElem.numberLong(), bsonObjs); + + return {}; +} + std::vector<ProcessId> getRunningMongoChildProcessIds() { std::vector<ProcessId> registeredPids, outPids; registry.getRegisteredPids(registeredPids); @@ -1308,6 +1426,26 @@ MongoProgramScope::~MongoProgramScope() { DESTRUCTOR_GUARD(KillMongoProgramInstances(); ClearRawMongoProgramOutput(BSONObj(), nullptr)) } +/** + * Defines (funcName, CallbackFunction) pairs where funcName becomes the name of a function in the + * mongo test shell and CallbackFunction is its C++ callback (handler). The callbacks must all have + * signatures like + * BSONObj CallbackFunction(const BSONObj& args, void* data) + * (contract from injectNative()), though nobody is using the data parameter at time of writing. + * + * The BSONObj they return must put the result into field "" such as + * return BSON("" << true); + * or + * return BSON("" << BSON("resultInfo1" << resultValue1 << "resultInfo2" << resultValue2)); + * + * In the shell these are called like + * funcName(arg1, arg2, ...) + * for example + * _writeTestPipe("my_pipe_file", 1234) + * The args will come in as the BSONObj first parameter of the callback with fields named + * sequentially from "0", e.g. for the above: + * {"0": "my_pipe_file", "1": 1234} + */ void installShellUtilsLauncher(Scope& scope) { scope.injectNative("_startMongoProgram", StartMongoProgram); scope.injectNative("_runningMongoChildProcessIds", RunningMongoChildProcessIds); @@ -1327,6 +1465,9 @@ void installShellUtilsLauncher(Scope& scope) { scope.injectNative("copyDbpath", CopyDbpath); scope.injectNative("convertTrafficRecordingToBSON", ConvertTrafficRecordingToBSON); scope.injectNative("getFCVConstants", GetFCVConstants); + scope.injectNative("_readTestPipes", ReadTestPipes); + scope.injectNative("_writeTestPipe", WriteTestPipe); + scope.injectNative("_writeTestPipeObjects", WriteTestPipeObjects); } } // namespace shell_utils } // namespace mongo |