diff options
| -rw-r--r-- | src/mongo/base/data_range.h | 4 | ||||
| -rw-r--r-- | src/mongo/client/SConscript | 5 | ||||
| -rw-r--r-- | src/mongo/client/remote_command_runner_impl.cpp | 153 | ||||
| -rw-r--r-- | src/mongo/executor/SConscript | 16 | ||||
| -rw-r--r-- | src/mongo/executor/downconvert_find_and_getmore_commands.cpp | 274 | ||||
| -rw-r--r-- | src/mongo/executor/downconvert_find_and_getmore_commands.h | 81 | ||||
| -rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 2 | ||||
| -rw-r--r-- | src/mongo/executor/network_interface_asio.h | 40 | ||||
| -rw-r--r-- | src/mongo/executor/network_interface_asio_auth.cpp | 19 | ||||
| -rw-r--r-- | src/mongo/executor/network_interface_asio_command.cpp | 80 | ||||
| -rw-r--r-- | src/mongo/executor/network_interface_asio_operation.cpp | 60 |
11 files changed, 556 insertions, 178 deletions
diff --git a/src/mongo/base/data_range.h b/src/mongo/base/data_range.h index 9226e6c133a..ee6da34ee7a 100644 --- a/src/mongo/base/data_range.h +++ b/src/mongo/base/data_range.h @@ -62,6 +62,10 @@ public: return _end - _begin; } + bool empty() const { + return length() == 0; + } + template <typename T> Status read(T* t, size_t offset = 0) const { if (offset > length()) { diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index f8e130a76b8..63862a8f71c 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -166,12 +166,9 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/db/query/command_request_response', - '$BUILD_DIR/mongo/db/query/lite_parsed_query', + '$BUILD_DIR/mongo/executor/downconvert_find_and_getmore_commands', '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/rpc/metadata', - '$BUILD_DIR/mongo/rpc/protocol', '$BUILD_DIR/mongo/util/net/hostandport', 'connection_pool', ], diff --git a/src/mongo/client/remote_command_runner_impl.cpp b/src/mongo/client/remote_command_runner_impl.cpp index 6bd8d9375fc..486e039b148 100644 --- a/src/mongo/client/remote_command_runner_impl.cpp +++ b/src/mongo/client/remote_command_runner_impl.cpp @@ -35,6 +35,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" +#include "mongo/executor/downconvert_find_and_getmore_commands.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/protocol.h" @@ -89,135 +90,51 @@ Status getStatusFromCursorResult(DBClientCursor& cursor) { getErrField(error).valuestrsafe()); } -/** - * Downconverts the specified find command to a find protocol operation and sends it to the - * server on the specified connection. - */ -Status runDownconvertedFindCommand(DBClientConnection* conn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObj* output) { - const NamespaceString nss(dbname, cmdObj.firstElement().String()); - if (!nss.isValid()) { - return {ErrorCodes::InvalidNamespace, - str::stream() << "Invalid collection name: " << nss.ns()}; - } - const std::string& ns = nss.ns(); - - // It is a little heavy handed to use LiteParsedQuery to convert the command object to - // query() arguments but we get validation and consistent behavior with the find - // command implementation on the remote server. - auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, false); - if (!lpqStatus.isOK()) { - *output = getCommandResultFromStatus(lpqStatus.getStatus()); - return lpqStatus.getStatus(); - } - - auto& lpq = lpqStatus.getValue(); +using RequestDownconverter = StatusWith<Message>(*)(const RemoteCommandRequest&); +using ReplyUpconverter = StatusWith<RemoteCommandResponse>(*)(std::uint32_t requestId, + StringData cursorNamespace, + const Message& response); - // We are downconverting a find command, and find command can only have ntoreturn - // if it was generated by mongos. - invariant(!lpq->getNToReturn()); - - Query query(lpq->getFilter()); - if (!lpq->getSort().isEmpty()) { - query.sort(lpq->getSort()); - } - if (!lpq->getHint().isEmpty()) { - query.hint(lpq->getHint()); - } - if (!lpq->getMin().isEmpty()) { - query.minKey(lpq->getMin()); - } - if (!lpq->getMax().isEmpty()) { - query.minKey(lpq->getMax()); - } - if (lpq->isExplain()) { - query.explain(); - } - if (lpq->isSnapshot()) { - query.snapshot(); - } - int nToReturn = lpq->getLimit().value_or(0) * -1; - int nToSkip = lpq->getSkip().value_or(0); - const BSONObj* fieldsToReturn = &lpq->getProj(); - int queryOptions = lpq->getOptions(); - int batchSize = lpq->getBatchSize().value_or(0); - - std::unique_ptr<DBClientCursor> cursor = - conn->query(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); - - if (!cursor) { - return {ErrorCodes::HostUnreachable, - str::stream() << "cursor initialization failed due to connection problems with " - << conn->getServerAddress()}; +template <RequestDownconverter downconvertRequest, ReplyUpconverter upconvertReply> +StatusWith<RemoteCommandResponse> runDownconvertedCommand(DBClientConnection* conn, + const RemoteCommandRequest& request) { + auto swDownconvertedRequest = downconvertRequest(request); + if (!swDownconvertedRequest.isOK()) { + return swDownconvertedRequest.getStatus(); } - cursor->decouple(); + Message requestMsg{std::move(swDownconvertedRequest.getValue())}; + Message responseMsg; - Status status = getStatusFromCursorResult(*cursor); - if (!status.isOK()) { - *output = getCommandResultFromStatus(status); - return status; + try { + conn->call(requestMsg, responseMsg); + } catch (...) { + return exceptionToStatus(); } - BSONArrayBuilder batch; - while (cursor->moreInCurrentBatch()) { - batch.append(cursor->next()); - } + auto messageId = requestMsg.header().getId(); - BSONObjBuilder result; - appendCursorResponseObject(cursor->getCursorId(), ns, batch.arr(), &result); - Command::appendCommandStatus(result, Status::OK()); - *output = result.obj(); - return Status::OK(); + return upconvertReply(messageId, DbMessage(requestMsg).getns(), responseMsg); } /** - * Downconverts the specified getMore command to legacy getMore operation and sends it to the + * Downconverts the specified find command to a find protocol operation and sends it to the * server on the specified connection. */ -Status runDownconvertedGetMoreCommand(DBClientConnection* conn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObj* output) { - StatusWith<GetMoreRequest> parseResult = GetMoreRequest::parseFromBSON(dbname, cmdObj); - if (!parseResult.isOK()) { - const Status& status = parseResult.getStatus(); - *output = getCommandResultFromStatus(status); - return status; - } - - const GetMoreRequest& req = parseResult.getValue(); - const std::string& ns = req.nss.ns(); - - std::unique_ptr<DBClientCursor> cursor = - conn->getMore(ns, req.cursorid, req.batchSize.value_or(0)); - - if (!cursor) { - return {ErrorCodes::HostUnreachable, - str::stream() << "cursor initialization failed due to connection problems with " - << conn->getServerAddress()}; - } - - cursor->decouple(); - - Status status = getStatusFromCursorResult(*cursor); - if (!status.isOK()) { - *output = getCommandResultFromStatus(status); - return status; - } - - BSONArrayBuilder batch; - while (cursor->moreInCurrentBatch()) { - batch.append(cursor->next()); - } +StatusWith<RemoteCommandResponse> runDownconvertedFindCommand(DBClientConnection* conn, + const RemoteCommandRequest& request) { + return runDownconvertedCommand<executor::downconvertFindCommandRequest, + executor::upconvertLegacyQueryResponse>(conn, request); +} - BSONObjBuilder result; - appendGetMoreResponseObject(cursor->getCursorId(), ns, batch.arr(), &result); - Command::appendCommandStatus(result, Status::OK()); - *output = result.obj(); - return Status::OK(); +/** + * Downconverts the specified getMore command to legacy getMore operation and sends it to the + * server on the specified connection. + */ +StatusWith<RemoteCommandResponse> runDownconvertedGetMoreCommand( + DBClientConnection* conn, const RemoteCommandRequest& request) { + return runDownconvertedCommand<executor::downconvertGetMoreCommandRequest, + executor::upconvertLegacyGetMoreResponse>(conn, request); } } // namespace @@ -285,9 +202,9 @@ StatusWith<RemoteCommandResponse> RemoteCommandRunnerImpl::runCommand( output = commandResponse->getCommandReply().getOwned(); metadata = commandResponse->getMetadata().getOwned(); } else if (isFindCmd) { - runDownconvertedFindCommand(conn.get(), request.dbname, request.cmdObj, &output); + return runDownconvertedFindCommand(conn.get(), request); } else if (isGetMoreCmd) { - runDownconvertedGetMoreCommand(conn.get(), request.dbname, request.cmdObj, &output); + return runDownconvertedGetMoreCommand(conn.get(), request); } const Date_t requestFinishDate = Date_t::now(); diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 3d3d6b05f7a..c3d5f578552 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -88,6 +88,7 @@ env.Library( '$BUILD_DIR/mongo/db/auth/authcommon', '$BUILD_DIR/mongo/rpc/rpc', '$BUILD_DIR/third_party/shim_asio', + 'downconvert_find_and_getmore_commands', 'network_interface', 'task_executor_interface', ]) @@ -169,3 +170,18 @@ env.CppUnitTest( 'thread_pool_task_executor_test_fixture', ] ) + +env.Library( + target='downconvert_find_and_getmore_commands', + source=[ + 'downconvert_find_and_getmore_commands.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/db/query/command_request_response', + '$BUILD_DIR/mongo/db/query/command_request_response', + '$BUILD_DIR/mongo/db/query/lite_parsed_query', + '$BUILD_DIR/mongo/rpc/protocol', + ], +) diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp new file mode 100644 index 00000000000..b031fe60df1 --- /dev/null +++ b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp @@ -0,0 +1,274 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/downconvert_find_and_getmore_commands.h" + +#include <memory> +#include <string> +#include <tuple> + +#include "mongo/base/data_range_cursor.h" +#include "mongo/base/data_type_validated.h" +#include "mongo/base/status_with.h" +#include "mongo/client/constants.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/cursor_id.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/remote_command_response.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/rpc/object_check.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/net/message.h" + +namespace mongo { +namespace executor { + +namespace { + +StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::uint32_t requestId, + const Message& response) { + auto header = response.header(); + if (header.getOperation() != mongo::opReply) { + return {ErrorCodes::ProtocolError, + str::stream() << "Expected to be decoding an OP_REPLY but got " + << mongo::opToString(header.getOperation())}; + } + + if (header.getResponseTo() != requestId) { + return {ErrorCodes::ProtocolError, + str::stream() << "responseTo field of OP_REPLY header with value '" + << header.getResponseTo() << "' does not match requestId '" + << requestId << "'"}; + } + + if ((header.dataLen() < 0) || + (static_cast<std::size_t>(header.dataLen()) > mongo::MaxMessageSizeBytes)) { + return {ErrorCodes::InvalidLength, + str::stream() << "Received message has invalid length field with value " + << header.dataLen()}; + } + + QueryResult::View qr = response.header().view2ptr(); + + auto resultFlags = qr.getResultFlags(); + + if (resultFlags & ResultFlag_CursorNotFound) { + return {ErrorCodes::CursorNotFound, + str::stream() << "Cursor with id '" << qr.getCursorId() << "' not found"}; + } + + // Use CDRC directly instead of DocumentRange as DocumentRange has a throwing API. + ConstDataRangeCursor cdrc{qr.data(), qr.data() + header.dataLen()}; + + if (resultFlags & ResultFlag_ErrSet) { + if (qr.getNReturned() != 1) { + return {ErrorCodes::BadValue, + str::stream() << "ResultFlag_ErrSet flag set on reply, but nReturned was '" + << qr.getNReturned() << "' - expected 1"}; + } + // Convert error document to a Status. + // Will throw if first document is invalid BSON. + auto first = cdrc.readAndAdvance<Validated<BSONObj>>(); + if (!first.isOK()) { + return first.getStatus(); + } + + // Convert error document to a status. + return getStatusFromCommandResult(first.getValue()); + } + + Validated<BSONObj> nextObj; + BSONArrayBuilder batch; + while (!cdrc.empty() && batch.arrSize() < qr.getNReturned()) { + auto readStatus = cdrc.readAndAdvance(&nextObj); + if (!readStatus.isOK()) { + return readStatus; + } + batch.append(nextObj.val); + } + if (qr.getNReturned() != batch.arrSize()) { + return {ErrorCodes::InvalidLength, + str::stream() << "Count of documents in OP_REPLY message (" << batch.arrSize() + << ") did not match the value specified in the nReturned field (" + << qr.getNReturned() << ")"}; + } + + return {std::make_tuple(qr.getCursorId(), batch.arr())}; +} + +} // namespace + +StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& request) { + const auto& cmdObj = request.cmdObj; + const NamespaceString nss(request.dbname, cmdObj.firstElement().String()); + if (!nss.isValid()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "Invalid collection name: " << nss.ns()}; + } + + const std::string& ns = nss.ns(); + + // It is a little heavy handed to use LiteParsedQuery to convert the command object to + // query() arguments but we get validation and consistent behavior with the find + // command implementation on the remote server. + auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, false); + if (!lpqStatus.isOK()) { + return lpqStatus.getStatus(); + } + + auto lpq = std::move(lpqStatus.getValue()); + + // We are downconverting a find command, and find command can only have ntoreturn + // if it was generated by mongos. + invariant(!lpq->getNToReturn()); + Query query(lpq->getFilter()); + if (!lpq->getSort().isEmpty()) { + query.sort(lpq->getSort()); + } + if (!lpq->getHint().isEmpty()) { + query.hint(lpq->getHint()); + } + if (!lpq->getMin().isEmpty()) { + query.minKey(lpq->getMin()); + } + if (!lpq->getMax().isEmpty()) { + query.minKey(lpq->getMax()); + } + if (lpq->isExplain()) { + query.explain(); + } + if (lpq->isSnapshot()) { + query.snapshot(); + } + + const int nToReturn = lpq->getLimit().value_or(0) * -1; + const int nToSkip = lpq->getSkip().value_or(0); + const BSONObj* fieldsToReturn = &lpq->getProj(); + int queryOptions = lpq->getOptions(); // non-const so we can set slaveOk if we need to + const int batchSize = lpq->getBatchSize().value_or(0); + + const int nextBatchSize = [batchSize, nToReturn]() { + if (nToReturn == 0) + return batchSize; + if (batchSize == 0) + return nToReturn; + return batchSize < nToReturn ? batchSize : nToReturn; + }(); + + // We can't downconvert all metadata, since we aren't sending a command, but we do need to + // downconvert $secondaryOk to the slaveOK bit. + auto ssm = rpc::ServerSelectionMetadata::readFromMetadata(request.metadata); + if (!ssm.isOK()) { + return ssm.getStatus(); + } + if (ssm.getValue().isSecondaryOk()) { + queryOptions |= mongo::QueryOption_SlaveOk; + } + + Message message; + assembleQueryRequest( + ns, query.obj, nextBatchSize, nToSkip, fieldsToReturn, queryOptions, message); + + return {std::move(message)}; +} + +StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::uint32_t requestId, + StringData cursorNamespace, + const Message& response) { + auto swBatch = getBatchFromReply(requestId, response); + if (!swBatch.isOK()) { + return swBatch.getStatus(); + } + + BSONArray batch; + CursorId cursorId; + std::tie(cursorId, batch) = std::move(swBatch.getValue()); + + BSONObjBuilder result; + appendCursorResponseObject(cursorId, cursorNamespace, std::move(batch), &result); + // Using Command::appendCommandStatus would create a circular dep, so it's simpler to just do + // this. + result.append("ok", 1.0); + + RemoteCommandResponse upconvertedResponse; + upconvertedResponse.data = result.obj(); + + return {std::move(upconvertedResponse)}; +} + +StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest& request) { + auto swGetMoreRequest = GetMoreRequest::parseFromBSON(request.dbname, request.cmdObj); + if (!swGetMoreRequest.isOK()) { + return swGetMoreRequest.getStatus(); + } + + auto getMoreRequest = std::move(swGetMoreRequest.getValue()); + + BufBuilder b; + b.appendNum(std::int32_t{0}); // reserved bits + b.appendStr(getMoreRequest.nss.ns()); + // Without this static cast, we will append batchSize as an int64 and get an invalid message. + b.appendNum(static_cast<std::int32_t>(getMoreRequest.batchSize.value_or(0))); + b.appendNum(getMoreRequest.cursorid); + Message m; + m.setData(dbGetMore, b.buf(), b.len()); + + return {std::move(m)}; +} + +StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::uint32_t requestId, + StringData cursorNamespace, + const Message& response) { + auto swBatch = getBatchFromReply(requestId, response); + if (!swBatch.isOK()) { + return swBatch.getStatus(); + } + + BSONArray batch; + CursorId cursorId; + + std::tie(cursorId, batch) = std::move(swBatch.getValue()); + + BSONObjBuilder result; + appendGetMoreResponseObject(cursorId, cursorNamespace, std::move(batch), &result); + result.append("ok", 1.0); + + RemoteCommandResponse resp; + resp.data = result.obj(); + + return {std::move(resp)}; +} + +} // namespace mongo +} // namespace executor diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.h b/src/mongo/executor/downconvert_find_and_getmore_commands.h new file mode 100644 index 00000000000..dea13150ddd --- /dev/null +++ b/src/mongo/executor/downconvert_find_and_getmore_commands.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <cstdint> +#include <memory> + +namespace mongo { + +class Message; + +template <typename T> +class StatusWith; + +class StringData; +class NamespaceString; + +namespace executor { +struct RemoteCommandRequest; +struct RemoteCommandResponse; + +/** + * Downconverts a find command request to the legacy (non-command) OP_QUERY format. The returned + * message is formed, with the exception of the messageId header field, which must be set by + * the caller before sending the message over the wire. Note that our legacy socket code sets the + * messageId in MessagingPort::say(). + */ +StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& request); + +/** + * Upconverts the OP_REPLY received in response to a legacy OP_QUERY to a semantically equivalent + * find command response. The 'requestId' parameter is the messageId of the original OP_QUERY, and + * the 'cursorNamespace' is the full namespace of the collection the query ran on. + */ +StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::uint32_t requestId, + StringData cursorNamespace, + const Message& response); + +/** + * Downconverts a getMore command request to the legacy OP_GET_MORE format. The returned message + * is fully formed, with the exception of the messageId header field, which must be set by the + * the caller before sending the message over the wire. Note that our legacy socket code sets the + * messageId in MessagingPort::say(). + */ +StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest& request); + +/** + * Upconverts the OP_REPLY received in response to a legacy OP_GET_MORE to a semantically equivalent + * getMore command response. The 'requestId' parameter is the messageId of the original OP_GET_MORE, + * and the 'curesorNamespace' is the full namespace of the collection the original query ran on. + */ +StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::uint32_t requestId, + StringData cursorNamespace, + const Message& response); + +} // namespace mongo +} // namespace executor diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 4b40543a53b..3e43011c7b2 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -126,7 +126,7 @@ Date_t NetworkInterfaceASIO::now() { void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { - auto ownedOp = stdx::make_unique<AsyncOp>(cbHandle, request, onFinish, now()); + auto ownedOp = stdx::make_unique<AsyncOp>(this, cbHandle, request, onFinish, now()); AsyncOp* op = ownedOp.get(); diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 39fd9b57d6c..f36cbf12e0d 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -88,6 +88,8 @@ private: enum class State { kReady, kRunning, kShutdown }; + friend class AsyncOp; + /** * AsyncConnection encapsulates the per-connection state we maintain. */ @@ -122,7 +124,27 @@ private: */ class AsyncCommand { public: - AsyncCommand(AsyncConnection* conn, Message&& command, Date_t now); + /** + * Describes the variant of AsyncCommand this object represents. + */ + enum class CommandType { + /** + * An ordinary command of an unspecified Protocol. + */ + kRPC, + + /** + * A 'find' command that has been downconverted to an OP_QUERY. + */ + kDownConvertedFind, + + /** + * A 'getMore' command that has been downconverted to an OP_GET_MORE. + */ + kDownConvertedGetMore, + }; + + AsyncCommand(AsyncConnection* conn, CommandType type, Message&& command, Date_t now); NetworkInterfaceASIO::AsyncConnection& conn(); @@ -135,6 +157,8 @@ private: private: NetworkInterfaceASIO::AsyncConnection* const _conn; + const CommandType _type; + Message _toSend; Message _toRecv; @@ -149,7 +173,8 @@ private: */ class AsyncOp { public: - AsyncOp(const TaskExecutor::CallbackHandle& cbHandle, + AsyncOp(NetworkInterfaceASIO* net, + const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish, Date_t now); @@ -166,11 +191,11 @@ private: // AsyncOp may run multiple commands over its lifetime (for example, an ismaster // command, the command provided to the NetworkInterface via startCommand(), etc.) // Calling beginCommand() resets internal state to prepare to run newCommand. - AsyncCommand& beginCommand(const RemoteCommandRequest& request, - rpc::Protocol protocol, - Date_t now); - AsyncCommand& beginCommand(Message&& newCommand, Date_t now); - AsyncCommand& command(); + Status beginCommand(const RemoteCommandRequest& request); + Status beginCommand(Message&& newCommand, + AsyncCommand::CommandType = AsyncCommand::CommandType::kRPC); + + AsyncCommand* command(); void finish(const TaskExecutor::ResponseStatus& status); @@ -183,6 +208,7 @@ private: void setOperationProtocol(rpc::Protocol proto); private: + NetworkInterfaceASIO* const _owner; // Information describing a task enqueued on the NetworkInterface // via a call to startCommand(). TaskExecutor::CallbackHandle _cbHandle; diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp index 410d1bd4405..2bfa40e5d8e 100644 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ b/src/mongo/executor/network_interface_asio_auth.cpp @@ -58,12 +58,15 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { requestBuilder.setCommandArgs(BSON("isMaster" << 1)); // Set current command to ismaster request and run - auto& cmd = op->beginCommand(std::move(*(requestBuilder.done())), now()); + auto beginStatus = op->beginCommand(std::move(*(requestBuilder.done()))); + if (!beginStatus.isOK()) { + return _completeOperation(op, beginStatus); + } // Callback to parse protocol information out of received ismaster response auto parseIsMaster = [this, op]() { - auto swCommandReply = op->command().response(rpc::Protocol::kOpQuery, now()); + auto swCommandReply = op->command()->response(rpc::Protocol::kOpQuery, now()); if (!swCommandReply.isOK()) { return _completeOperation(op, swCommandReply.getStatus()); } @@ -99,7 +102,7 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { }; - _asyncRunCommand(&cmd, + _asyncRunCommand(op->command(), [this, op, parseIsMaster](std::error_code ec, size_t bytes) { _validateAndRun(op, ec, std::move(parseIsMaster)); }); @@ -127,14 +130,18 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) { // authenticateClient will use this to run auth-related commands over our connection. auto runCommandHook = [this, op](executor::RemoteCommandRequest request, auth::AuthCompletionHandler handler) { - auto& cmd = op->beginCommand(request, op->operationProtocol(), now()); + + auto beginStatus = op->beginCommand(request); + if (!beginStatus.isOK()) { + return handler(beginStatus); + } auto callAuthCompletionHandler = [this, op, handler]() { - auto authResponse = op->command().response(op->operationProtocol(), now()); + auto authResponse = op->command()->response(op->operationProtocol(), now()); handler(authResponse); }; - _asyncRunCommand(&cmd, + _asyncRunCommand(op->command(), [this, op, callAuthCompletionHandler](std::error_code ec, size_t bytes) { _validateAndRun(op, ec, callAuthCompletionHandler); }); diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index dd526430566..452b26a6aec 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -38,6 +38,7 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/jsobj.h" #include "mongo/executor/async_stream_interface.h" +#include "mongo/executor/downconvert_find_and_getmore_commands.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/protocol.h" #include "mongo/rpc/reply_interface.h" @@ -123,12 +124,37 @@ void asyncRecvMessageBody(AsyncStreamInterface& stream, stream.read(asio::buffer(mdView.data(), bodyLength), std::forward<Handler>(handler)); } +ResponseStatus decodeRPC(Message* received, rpc::Protocol protocol, Milliseconds elapsed) { + try { + // makeReply will throw if the reply is invalid + auto reply = rpc::makeReply(received); + if (reply->getProtocol() != protocol) { + auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol)); + if (!requestProtocol.isOK()) + return requestProtocol.getStatus(); + + return Status(ErrorCodes::RPCProtocolNegotiationFailed, + str::stream() << "Mismatched RPC protocols - request was '" + << requestProtocol.getValue().toString() << "' '" + << " but reply was '" << opToString(received->operation()) + << "'"); + } + auto ownedCommandReply = reply->getCommandReply().getOwned(); + auto ownedReplyMetadata = reply->getMetadata().getOwned(); + return {RemoteCommandResponse( + std::move(ownedCommandReply), std::move(ownedReplyMetadata), elapsed)}; + } catch (...) { + return exceptionToStatus(); + } +} + } // namespace NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn, + CommandType type, Message&& command, Date_t now) - : _conn(conn), _toSend(std::move(command)), _start(now) { + : _conn(conn), _type(type), _toSend(std::move(command)), _start(now) { _toSend.header().setResponseTo(0); } @@ -150,30 +176,20 @@ MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() { ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(rpc::Protocol protocol, Date_t now) { auto& received = _toRecv; - try { - auto reply = rpc::makeReply(&received); - - if (reply->getProtocol() != protocol) { - auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol)); - if (!requestProtocol.isOK()) - return requestProtocol.getStatus(); - - return Status(ErrorCodes::RPCProtocolNegotiationFailed, - str::stream() << "Mismatched RPC protocols - request was '" - << requestProtocol.getValue().toString() << "' '" - << " but reply was '" << opToString(received.operation()) - << "'"); + switch (_type) { + case CommandType::kRPC: { + return decodeRPC(&received, protocol, now - _start); + } + case CommandType::kDownConvertedFind: { + auto ns = DbMessage(_toSend).getns(); + return upconvertLegacyQueryResponse(_toSend.header().getId(), ns, received); + } + case CommandType::kDownConvertedGetMore: { + auto ns = DbMessage(_toSend).getns(); + return upconvertLegacyGetMoreResponse(_toSend.header().getId(), ns, received); } - - // unavoidable copy - auto ownedCommandReply = reply->getCommandReply().getOwned(); - auto ownedReplyMetadata = reply->getMetadata().getOwned(); - return ResponseStatus(RemoteCommandResponse( - std::move(ownedCommandReply), std::move(ownedReplyMetadata), now - _start)); - } catch (...) { - // makeReply can throw if the reply was invalid. - return exceptionToStatus(); } + MONGO_UNREACHABLE; } void NetworkInterfaceASIO::_startCommand(AsyncOp* op) { @@ -188,9 +204,12 @@ void NetworkInterfaceASIO::_startCommand(AsyncOp* op) { } void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { - auto& cmd = op->beginCommand(op->request(), op->operationProtocol(), now()); + auto beginStatus = op->beginCommand(op->request()); + if (!beginStatus.isOK()) { + return _completeOperation(op, beginStatus); + } - _asyncRunCommand(&cmd, + _asyncRunCommand(op->command(), [this, op](std::error_code ec, size_t bytes) { _validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); }); }); @@ -198,7 +217,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) { // TODO: handle metadata readers. - auto response = op->command().response(op->operationProtocol(), now()); + auto response = op->command()->response(op->operationProtocol(), now()); _completeOperation(op, response); } @@ -290,10 +309,13 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { return _beginCommunication(op); } - auto& cmd = op->beginCommand(*optionalRequest, op->operationProtocol(), now()); + auto beginStatus = op->beginCommand(*optionalRequest); + if (!beginStatus.isOK()) { + return _completeOperation(op, beginStatus); + } auto finishHook = [this, op]() { - auto response = op->command().response(op->operationProtocol(), now()); + auto response = op->command()->response(op->operationProtocol(), now()); if (!response.isOK()) { return _completeOperation(op, response.getStatus()); @@ -311,7 +333,7 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { return _beginCommunication(op); }; - return _asyncRunCommand(&cmd, + return _asyncRunCommand(op->command(), [this, op, finishHook](std::error_code ec, std::size_t bytes) { _validateAndRun(op, ec, finishHook); }); diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index 087534ee936..069ed7f98bc 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -30,10 +30,14 @@ #include "mongo/platform/basic.h" -#include "mongo/executor/network_interface_asio.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/lite_parsed_query.h" #include "mongo/executor/async_stream_interface.h" +#include "mongo/executor/downconvert_find_and_getmore_commands.h" +#include "mongo/executor/network_interface_asio.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/request_builder_interface.h" +#include "mongo/util/log.h" namespace mongo { namespace executor { @@ -61,11 +65,17 @@ std::unique_ptr<Message> messageFromRequest(const RemoteCommandRequest& request, } // namespace -NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHandle, +NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner, + const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish, Date_t now) - : _cbHandle(cbHandle), _request(request), _onFinish(onFinish), _start(now), _canceled(0) {} + : _owner(owner), + _cbHandle(cbHandle), + _request(request), + _onFinish(onFinish), + _start(now), + _canceled(0) {} void NetworkInterfaceASIO::AsyncOp::cancel() { // An operation may be in mid-flight when it is canceled, so we @@ -91,26 +101,50 @@ void NetworkInterfaceASIO::AsyncOp::setConnection(AsyncConnection&& conn) { _connection = std::move(conn); } -NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand( - Message&& newCommand, Date_t now) { +Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand, + AsyncCommand::CommandType type) { // NOTE: We operate based on the assumption that AsyncOp's // AsyncConnection does not change over its lifetime. invariant(_connection.is_initialized()); // Construct a new AsyncCommand object for each command. - _command.emplace(_connection.get_ptr(), std::move(newCommand), now); - return _command.get(); + _command.emplace(_connection.get_ptr(), type, std::move(newCommand), _owner->now()); + return Status::OK(); } -NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand( - const RemoteCommandRequest& request, rpc::Protocol protocol, Date_t now) { - auto newCommand = messageFromRequest(request, protocol); - return beginCommand(std::move(*newCommand), now); +Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request) { + // Check if we need to downconvert find or getMore commands. + StringData commandName = request.cmdObj.firstElement().fieldNameStringData(); + const auto isFindCmd = commandName == LiteParsedQuery::kFindCommandName; + const auto isGetMoreCmd = commandName == GetMoreRequest::kGetMoreCommandName; + const auto isFindOrGetMoreCmd = isFindCmd || isGetMoreCmd; + + // If we aren't sending a find or getMore, or the server supports OP_COMMAND we don't have + // to worry about downconversion. + if (!isFindOrGetMoreCmd || connection().serverProtocols() == rpc::supports::kAll) { + auto newCommand = messageFromRequest(request, operationProtocol()); + return beginCommand(std::move(*newCommand), AsyncCommand::CommandType::kRPC); + } else if (isFindCmd) { + auto downconvertedFind = downconvertFindCommandRequest(request); + if (!downconvertedFind.isOK()) { + return downconvertedFind.getStatus(); + } + return beginCommand(std::move(downconvertedFind.getValue()), + AsyncCommand::CommandType::kDownConvertedFind); + } else { + invariant(isGetMoreCmd); + auto downconvertedGetMore = downconvertGetMoreCommandRequest(request); + if (!downconvertedGetMore.isOK()) { + return downconvertedGetMore.getStatus(); + } + return beginCommand(std::move(downconvertedGetMore.getValue()), + AsyncCommand::CommandType::kDownConvertedGetMore); + } } -NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() { +NetworkInterfaceASIO::AsyncCommand* NetworkInterfaceASIO::AsyncOp::command() { invariant(_command.is_initialized()); - return _command.get(); + return _command.get_ptr(); } void NetworkInterfaceASIO::AsyncOp::finish(const ResponseStatus& status) { |
