summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/base/data_range.h4
-rw-r--r--src/mongo/client/SConscript5
-rw-r--r--src/mongo/client/remote_command_runner_impl.cpp153
-rw-r--r--src/mongo/executor/SConscript16
-rw-r--r--src/mongo/executor/downconvert_find_and_getmore_commands.cpp274
-rw-r--r--src/mongo/executor/downconvert_find_and_getmore_commands.h81
-rw-r--r--src/mongo/executor/network_interface_asio.cpp2
-rw-r--r--src/mongo/executor/network_interface_asio.h40
-rw-r--r--src/mongo/executor/network_interface_asio_auth.cpp19
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp80
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp60
11 files changed, 556 insertions, 178 deletions
diff --git a/src/mongo/base/data_range.h b/src/mongo/base/data_range.h
index 9226e6c133a..ee6da34ee7a 100644
--- a/src/mongo/base/data_range.h
+++ b/src/mongo/base/data_range.h
@@ -62,6 +62,10 @@ public:
return _end - _begin;
}
+ bool empty() const {
+ return length() == 0;
+ }
+
template <typename T>
Status read(T* t, size_t offset = 0) const {
if (offset > length()) {
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index f8e130a76b8..63862a8f71c 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -166,12 +166,9 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/query/command_request_response',
- '$BUILD_DIR/mongo/db/query/lite_parsed_query',
+ '$BUILD_DIR/mongo/executor/downconvert_find_and_getmore_commands',
'$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/rpc/metadata',
- '$BUILD_DIR/mongo/rpc/protocol',
'$BUILD_DIR/mongo/util/net/hostandport',
'connection_pool',
],
diff --git a/src/mongo/client/remote_command_runner_impl.cpp b/src/mongo/client/remote_command_runner_impl.cpp
index 6bd8d9375fc..486e039b148 100644
--- a/src/mongo/client/remote_command_runner_impl.cpp
+++ b/src/mongo/client/remote_command_runner_impl.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
+#include "mongo/executor/downconvert_find_and_getmore_commands.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/protocol.h"
@@ -89,135 +90,51 @@ Status getStatusFromCursorResult(DBClientCursor& cursor) {
getErrField(error).valuestrsafe());
}
-/**
- * Downconverts the specified find command to a find protocol operation and sends it to the
- * server on the specified connection.
- */
-Status runDownconvertedFindCommand(DBClientConnection* conn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObj* output) {
- const NamespaceString nss(dbname, cmdObj.firstElement().String());
- if (!nss.isValid()) {
- return {ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid collection name: " << nss.ns()};
- }
- const std::string& ns = nss.ns();
-
- // It is a little heavy handed to use LiteParsedQuery to convert the command object to
- // query() arguments but we get validation and consistent behavior with the find
- // command implementation on the remote server.
- auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, false);
- if (!lpqStatus.isOK()) {
- *output = getCommandResultFromStatus(lpqStatus.getStatus());
- return lpqStatus.getStatus();
- }
-
- auto& lpq = lpqStatus.getValue();
+using RequestDownconverter = StatusWith<Message>(*)(const RemoteCommandRequest&);
+using ReplyUpconverter = StatusWith<RemoteCommandResponse>(*)(std::uint32_t requestId,
+ StringData cursorNamespace,
+ const Message& response);
- // We are downconverting a find command, and find command can only have ntoreturn
- // if it was generated by mongos.
- invariant(!lpq->getNToReturn());
-
- Query query(lpq->getFilter());
- if (!lpq->getSort().isEmpty()) {
- query.sort(lpq->getSort());
- }
- if (!lpq->getHint().isEmpty()) {
- query.hint(lpq->getHint());
- }
- if (!lpq->getMin().isEmpty()) {
- query.minKey(lpq->getMin());
- }
- if (!lpq->getMax().isEmpty()) {
- query.minKey(lpq->getMax());
- }
- if (lpq->isExplain()) {
- query.explain();
- }
- if (lpq->isSnapshot()) {
- query.snapshot();
- }
- int nToReturn = lpq->getLimit().value_or(0) * -1;
- int nToSkip = lpq->getSkip().value_or(0);
- const BSONObj* fieldsToReturn = &lpq->getProj();
- int queryOptions = lpq->getOptions();
- int batchSize = lpq->getBatchSize().value_or(0);
-
- std::unique_ptr<DBClientCursor> cursor =
- conn->query(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize);
-
- if (!cursor) {
- return {ErrorCodes::HostUnreachable,
- str::stream() << "cursor initialization failed due to connection problems with "
- << conn->getServerAddress()};
+template <RequestDownconverter downconvertRequest, ReplyUpconverter upconvertReply>
+StatusWith<RemoteCommandResponse> runDownconvertedCommand(DBClientConnection* conn,
+ const RemoteCommandRequest& request) {
+ auto swDownconvertedRequest = downconvertRequest(request);
+ if (!swDownconvertedRequest.isOK()) {
+ return swDownconvertedRequest.getStatus();
}
- cursor->decouple();
+ Message requestMsg{std::move(swDownconvertedRequest.getValue())};
+ Message responseMsg;
- Status status = getStatusFromCursorResult(*cursor);
- if (!status.isOK()) {
- *output = getCommandResultFromStatus(status);
- return status;
+ try {
+ conn->call(requestMsg, responseMsg);
+ } catch (...) {
+ return exceptionToStatus();
}
- BSONArrayBuilder batch;
- while (cursor->moreInCurrentBatch()) {
- batch.append(cursor->next());
- }
+ auto messageId = requestMsg.header().getId();
- BSONObjBuilder result;
- appendCursorResponseObject(cursor->getCursorId(), ns, batch.arr(), &result);
- Command::appendCommandStatus(result, Status::OK());
- *output = result.obj();
- return Status::OK();
+ return upconvertReply(messageId, DbMessage(requestMsg).getns(), responseMsg);
}
/**
- * Downconverts the specified getMore command to legacy getMore operation and sends it to the
+ * Downconverts the specified find command to a find protocol operation and sends it to the
* server on the specified connection.
*/
-Status runDownconvertedGetMoreCommand(DBClientConnection* conn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObj* output) {
- StatusWith<GetMoreRequest> parseResult = GetMoreRequest::parseFromBSON(dbname, cmdObj);
- if (!parseResult.isOK()) {
- const Status& status = parseResult.getStatus();
- *output = getCommandResultFromStatus(status);
- return status;
- }
-
- const GetMoreRequest& req = parseResult.getValue();
- const std::string& ns = req.nss.ns();
-
- std::unique_ptr<DBClientCursor> cursor =
- conn->getMore(ns, req.cursorid, req.batchSize.value_or(0));
-
- if (!cursor) {
- return {ErrorCodes::HostUnreachable,
- str::stream() << "cursor initialization failed due to connection problems with "
- << conn->getServerAddress()};
- }
-
- cursor->decouple();
-
- Status status = getStatusFromCursorResult(*cursor);
- if (!status.isOK()) {
- *output = getCommandResultFromStatus(status);
- return status;
- }
-
- BSONArrayBuilder batch;
- while (cursor->moreInCurrentBatch()) {
- batch.append(cursor->next());
- }
+StatusWith<RemoteCommandResponse> runDownconvertedFindCommand(DBClientConnection* conn,
+ const RemoteCommandRequest& request) {
+ return runDownconvertedCommand<executor::downconvertFindCommandRequest,
+ executor::upconvertLegacyQueryResponse>(conn, request);
+}
- BSONObjBuilder result;
- appendGetMoreResponseObject(cursor->getCursorId(), ns, batch.arr(), &result);
- Command::appendCommandStatus(result, Status::OK());
- *output = result.obj();
- return Status::OK();
+/**
+ * Downconverts the specified getMore command to legacy getMore operation and sends it to the
+ * server on the specified connection.
+ */
+StatusWith<RemoteCommandResponse> runDownconvertedGetMoreCommand(
+ DBClientConnection* conn, const RemoteCommandRequest& request) {
+ return runDownconvertedCommand<executor::downconvertGetMoreCommandRequest,
+ executor::upconvertLegacyGetMoreResponse>(conn, request);
}
} // namespace
@@ -285,9 +202,9 @@ StatusWith<RemoteCommandResponse> RemoteCommandRunnerImpl::runCommand(
output = commandResponse->getCommandReply().getOwned();
metadata = commandResponse->getMetadata().getOwned();
} else if (isFindCmd) {
- runDownconvertedFindCommand(conn.get(), request.dbname, request.cmdObj, &output);
+ return runDownconvertedFindCommand(conn.get(), request);
} else if (isGetMoreCmd) {
- runDownconvertedGetMoreCommand(conn.get(), request.dbname, request.cmdObj, &output);
+ return runDownconvertedGetMoreCommand(conn.get(), request);
}
const Date_t requestFinishDate = Date_t::now();
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 3d3d6b05f7a..c3d5f578552 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -88,6 +88,7 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/authcommon',
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/third_party/shim_asio',
+ 'downconvert_find_and_getmore_commands',
'network_interface',
'task_executor_interface',
])
@@ -169,3 +170,18 @@ env.CppUnitTest(
'thread_pool_task_executor_test_fixture',
]
)
+
+env.Library(
+ target='downconvert_find_and_getmore_commands',
+ source=[
+ 'downconvert_find_and_getmore_commands.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/clientdriver',
+ '$BUILD_DIR/mongo/db/query/command_request_response',
+ '$BUILD_DIR/mongo/db/query/command_request_response',
+ '$BUILD_DIR/mongo/db/query/lite_parsed_query',
+ '$BUILD_DIR/mongo/rpc/protocol',
+ ],
+)
diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
new file mode 100644
index 00000000000..b031fe60df1
--- /dev/null
+++ b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
@@ -0,0 +1,274 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/downconvert_find_and_getmore_commands.h"
+
+#include <memory>
+#include <string>
+#include <tuple>
+
+#include "mongo/base/data_range_cursor.h"
+#include "mongo/base/data_type_validated.h"
+#include "mongo/base/status_with.h"
+#include "mongo/client/constants.h"
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/cursor_id.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/executor/remote_command_response.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/rpc/object_check.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/net/message.h"
+
+namespace mongo {
+namespace executor {
+
+namespace {
+
+StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::uint32_t requestId,
+ const Message& response) {
+ auto header = response.header();
+ if (header.getOperation() != mongo::opReply) {
+ return {ErrorCodes::ProtocolError,
+ str::stream() << "Expected to be decoding an OP_REPLY but got "
+ << mongo::opToString(header.getOperation())};
+ }
+
+ if (header.getResponseTo() != requestId) {
+ return {ErrorCodes::ProtocolError,
+ str::stream() << "responseTo field of OP_REPLY header with value '"
+ << header.getResponseTo() << "' does not match requestId '"
+ << requestId << "'"};
+ }
+
+ if ((header.dataLen() < 0) ||
+ (static_cast<std::size_t>(header.dataLen()) > mongo::MaxMessageSizeBytes)) {
+ return {ErrorCodes::InvalidLength,
+ str::stream() << "Received message has invalid length field with value "
+ << header.dataLen()};
+ }
+
+ QueryResult::View qr = response.header().view2ptr();
+
+ auto resultFlags = qr.getResultFlags();
+
+ if (resultFlags & ResultFlag_CursorNotFound) {
+ return {ErrorCodes::CursorNotFound,
+ str::stream() << "Cursor with id '" << qr.getCursorId() << "' not found"};
+ }
+
+ // Use CDRC directly instead of DocumentRange as DocumentRange has a throwing API.
+ ConstDataRangeCursor cdrc{qr.data(), qr.data() + header.dataLen()};
+
+ if (resultFlags & ResultFlag_ErrSet) {
+ if (qr.getNReturned() != 1) {
+ return {ErrorCodes::BadValue,
+ str::stream() << "ResultFlag_ErrSet flag set on reply, but nReturned was '"
+ << qr.getNReturned() << "' - expected 1"};
+ }
+ // Convert error document to a Status.
+ // Will throw if first document is invalid BSON.
+ auto first = cdrc.readAndAdvance<Validated<BSONObj>>();
+ if (!first.isOK()) {
+ return first.getStatus();
+ }
+
+ // Convert error document to a status.
+ return getStatusFromCommandResult(first.getValue());
+ }
+
+ Validated<BSONObj> nextObj;
+ BSONArrayBuilder batch;
+ while (!cdrc.empty() && batch.arrSize() < qr.getNReturned()) {
+ auto readStatus = cdrc.readAndAdvance(&nextObj);
+ if (!readStatus.isOK()) {
+ return readStatus;
+ }
+ batch.append(nextObj.val);
+ }
+ if (qr.getNReturned() != batch.arrSize()) {
+ return {ErrorCodes::InvalidLength,
+ str::stream() << "Count of documents in OP_REPLY message (" << batch.arrSize()
+ << ") did not match the value specified in the nReturned field ("
+ << qr.getNReturned() << ")"};
+ }
+
+ return {std::make_tuple(qr.getCursorId(), batch.arr())};
+}
+
+} // namespace
+
+StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& request) {
+ const auto& cmdObj = request.cmdObj;
+ const NamespaceString nss(request.dbname, cmdObj.firstElement().String());
+ if (!nss.isValid()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid collection name: " << nss.ns()};
+ }
+
+ const std::string& ns = nss.ns();
+
+ // It is a little heavy handed to use LiteParsedQuery to convert the command object to
+ // query() arguments but we get validation and consistent behavior with the find
+ // command implementation on the remote server.
+ auto lpqStatus = LiteParsedQuery::makeFromFindCommand(nss, cmdObj, false);
+ if (!lpqStatus.isOK()) {
+ return lpqStatus.getStatus();
+ }
+
+ auto lpq = std::move(lpqStatus.getValue());
+
+ // We are downconverting a find command, and find command can only have ntoreturn
+ // if it was generated by mongos.
+ invariant(!lpq->getNToReturn());
+ Query query(lpq->getFilter());
+ if (!lpq->getSort().isEmpty()) {
+ query.sort(lpq->getSort());
+ }
+ if (!lpq->getHint().isEmpty()) {
+ query.hint(lpq->getHint());
+ }
+ if (!lpq->getMin().isEmpty()) {
+ query.minKey(lpq->getMin());
+ }
+ if (!lpq->getMax().isEmpty()) {
+ query.minKey(lpq->getMax());
+ }
+ if (lpq->isExplain()) {
+ query.explain();
+ }
+ if (lpq->isSnapshot()) {
+ query.snapshot();
+ }
+
+ const int nToReturn = lpq->getLimit().value_or(0) * -1;
+ const int nToSkip = lpq->getSkip().value_or(0);
+ const BSONObj* fieldsToReturn = &lpq->getProj();
+ int queryOptions = lpq->getOptions(); // non-const so we can set slaveOk if we need to
+ const int batchSize = lpq->getBatchSize().value_or(0);
+
+ const int nextBatchSize = [batchSize, nToReturn]() {
+ if (nToReturn == 0)
+ return batchSize;
+ if (batchSize == 0)
+ return nToReturn;
+ return batchSize < nToReturn ? batchSize : nToReturn;
+ }();
+
+ // We can't downconvert all metadata, since we aren't sending a command, but we do need to
+ // downconvert $secondaryOk to the slaveOK bit.
+ auto ssm = rpc::ServerSelectionMetadata::readFromMetadata(request.metadata);
+ if (!ssm.isOK()) {
+ return ssm.getStatus();
+ }
+ if (ssm.getValue().isSecondaryOk()) {
+ queryOptions |= mongo::QueryOption_SlaveOk;
+ }
+
+ Message message;
+ assembleQueryRequest(
+ ns, query.obj, nextBatchSize, nToSkip, fieldsToReturn, queryOptions, message);
+
+ return {std::move(message)};
+}
+
+StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::uint32_t requestId,
+ StringData cursorNamespace,
+ const Message& response) {
+ auto swBatch = getBatchFromReply(requestId, response);
+ if (!swBatch.isOK()) {
+ return swBatch.getStatus();
+ }
+
+ BSONArray batch;
+ CursorId cursorId;
+ std::tie(cursorId, batch) = std::move(swBatch.getValue());
+
+ BSONObjBuilder result;
+ appendCursorResponseObject(cursorId, cursorNamespace, std::move(batch), &result);
+ // Using Command::appendCommandStatus would create a circular dep, so it's simpler to just do
+ // this.
+ result.append("ok", 1.0);
+
+ RemoteCommandResponse upconvertedResponse;
+ upconvertedResponse.data = result.obj();
+
+ return {std::move(upconvertedResponse)};
+}
+
+StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest& request) {
+ auto swGetMoreRequest = GetMoreRequest::parseFromBSON(request.dbname, request.cmdObj);
+ if (!swGetMoreRequest.isOK()) {
+ return swGetMoreRequest.getStatus();
+ }
+
+ auto getMoreRequest = std::move(swGetMoreRequest.getValue());
+
+ BufBuilder b;
+ b.appendNum(std::int32_t{0}); // reserved bits
+ b.appendStr(getMoreRequest.nss.ns());
+ // Without this static cast, we will append batchSize as an int64 and get an invalid message.
+ b.appendNum(static_cast<std::int32_t>(getMoreRequest.batchSize.value_or(0)));
+ b.appendNum(getMoreRequest.cursorid);
+ Message m;
+ m.setData(dbGetMore, b.buf(), b.len());
+
+ return {std::move(m)};
+}
+
+StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::uint32_t requestId,
+ StringData cursorNamespace,
+ const Message& response) {
+ auto swBatch = getBatchFromReply(requestId, response);
+ if (!swBatch.isOK()) {
+ return swBatch.getStatus();
+ }
+
+ BSONArray batch;
+ CursorId cursorId;
+
+ std::tie(cursorId, batch) = std::move(swBatch.getValue());
+
+ BSONObjBuilder result;
+ appendGetMoreResponseObject(cursorId, cursorNamespace, std::move(batch), &result);
+ result.append("ok", 1.0);
+
+ RemoteCommandResponse resp;
+ resp.data = result.obj();
+
+ return {std::move(resp)};
+}
+
+} // namespace mongo
+} // namespace executor
diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.h b/src/mongo/executor/downconvert_find_and_getmore_commands.h
new file mode 100644
index 00000000000..dea13150ddd
--- /dev/null
+++ b/src/mongo/executor/downconvert_find_and_getmore_commands.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <cstdint>
+#include <memory>
+
+namespace mongo {
+
+class Message;
+
+template <typename T>
+class StatusWith;
+
+class StringData;
+class NamespaceString;
+
+namespace executor {
+struct RemoteCommandRequest;
+struct RemoteCommandResponse;
+
+/**
+ * Downconverts a find command request to the legacy (non-command) OP_QUERY format. The returned
+ * message is formed, with the exception of the messageId header field, which must be set by
+ * the caller before sending the message over the wire. Note that our legacy socket code sets the
+ * messageId in MessagingPort::say().
+ */
+StatusWith<Message> downconvertFindCommandRequest(const RemoteCommandRequest& request);
+
+/**
+ * Upconverts the OP_REPLY received in response to a legacy OP_QUERY to a semantically equivalent
+ * find command response. The 'requestId' parameter is the messageId of the original OP_QUERY, and
+ * the 'cursorNamespace' is the full namespace of the collection the query ran on.
+ */
+StatusWith<RemoteCommandResponse> upconvertLegacyQueryResponse(std::uint32_t requestId,
+ StringData cursorNamespace,
+ const Message& response);
+
+/**
+ * Downconverts a getMore command request to the legacy OP_GET_MORE format. The returned message
+ * is fully formed, with the exception of the messageId header field, which must be set by the
+ * the caller before sending the message over the wire. Note that our legacy socket code sets the
+ * messageId in MessagingPort::say().
+ */
+StatusWith<Message> downconvertGetMoreCommandRequest(const RemoteCommandRequest& request);
+
+/**
+ * Upconverts the OP_REPLY received in response to a legacy OP_GET_MORE to a semantically equivalent
+ * getMore command response. The 'requestId' parameter is the messageId of the original OP_GET_MORE,
+ * and the 'curesorNamespace' is the full namespace of the collection the original query ran on.
+ */
+StatusWith<RemoteCommandResponse> upconvertLegacyGetMoreResponse(std::uint32_t requestId,
+ StringData cursorNamespace,
+ const Message& response);
+
+} // namespace mongo
+} // namespace executor
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 4b40543a53b..3e43011c7b2 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -126,7 +126,7 @@ Date_t NetworkInterfaceASIO::now() {
void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) {
- auto ownedOp = stdx::make_unique<AsyncOp>(cbHandle, request, onFinish, now());
+ auto ownedOp = stdx::make_unique<AsyncOp>(this, cbHandle, request, onFinish, now());
AsyncOp* op = ownedOp.get();
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 39fd9b57d6c..f36cbf12e0d 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -88,6 +88,8 @@ private:
enum class State { kReady, kRunning, kShutdown };
+ friend class AsyncOp;
+
/**
* AsyncConnection encapsulates the per-connection state we maintain.
*/
@@ -122,7 +124,27 @@ private:
*/
class AsyncCommand {
public:
- AsyncCommand(AsyncConnection* conn, Message&& command, Date_t now);
+ /**
+ * Describes the variant of AsyncCommand this object represents.
+ */
+ enum class CommandType {
+ /**
+ * An ordinary command of an unspecified Protocol.
+ */
+ kRPC,
+
+ /**
+ * A 'find' command that has been downconverted to an OP_QUERY.
+ */
+ kDownConvertedFind,
+
+ /**
+ * A 'getMore' command that has been downconverted to an OP_GET_MORE.
+ */
+ kDownConvertedGetMore,
+ };
+
+ AsyncCommand(AsyncConnection* conn, CommandType type, Message&& command, Date_t now);
NetworkInterfaceASIO::AsyncConnection& conn();
@@ -135,6 +157,8 @@ private:
private:
NetworkInterfaceASIO::AsyncConnection* const _conn;
+ const CommandType _type;
+
Message _toSend;
Message _toRecv;
@@ -149,7 +173,8 @@ private:
*/
class AsyncOp {
public:
- AsyncOp(const TaskExecutor::CallbackHandle& cbHandle,
+ AsyncOp(NetworkInterfaceASIO* net,
+ const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish,
Date_t now);
@@ -166,11 +191,11 @@ private:
// AsyncOp may run multiple commands over its lifetime (for example, an ismaster
// command, the command provided to the NetworkInterface via startCommand(), etc.)
// Calling beginCommand() resets internal state to prepare to run newCommand.
- AsyncCommand& beginCommand(const RemoteCommandRequest& request,
- rpc::Protocol protocol,
- Date_t now);
- AsyncCommand& beginCommand(Message&& newCommand, Date_t now);
- AsyncCommand& command();
+ Status beginCommand(const RemoteCommandRequest& request);
+ Status beginCommand(Message&& newCommand,
+ AsyncCommand::CommandType = AsyncCommand::CommandType::kRPC);
+
+ AsyncCommand* command();
void finish(const TaskExecutor::ResponseStatus& status);
@@ -183,6 +208,7 @@ private:
void setOperationProtocol(rpc::Protocol proto);
private:
+ NetworkInterfaceASIO* const _owner;
// Information describing a task enqueued on the NetworkInterface
// via a call to startCommand().
TaskExecutor::CallbackHandle _cbHandle;
diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp
index 410d1bd4405..2bfa40e5d8e 100644
--- a/src/mongo/executor/network_interface_asio_auth.cpp
+++ b/src/mongo/executor/network_interface_asio_auth.cpp
@@ -58,12 +58,15 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
requestBuilder.setCommandArgs(BSON("isMaster" << 1));
// Set current command to ismaster request and run
- auto& cmd = op->beginCommand(std::move(*(requestBuilder.done())), now());
+ auto beginStatus = op->beginCommand(std::move(*(requestBuilder.done())));
+ if (!beginStatus.isOK()) {
+ return _completeOperation(op, beginStatus);
+ }
// Callback to parse protocol information out of received ismaster response
auto parseIsMaster = [this, op]() {
- auto swCommandReply = op->command().response(rpc::Protocol::kOpQuery, now());
+ auto swCommandReply = op->command()->response(rpc::Protocol::kOpQuery, now());
if (!swCommandReply.isOK()) {
return _completeOperation(op, swCommandReply.getStatus());
}
@@ -99,7 +102,7 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
};
- _asyncRunCommand(&cmd,
+ _asyncRunCommand(op->command(),
[this, op, parseIsMaster](std::error_code ec, size_t bytes) {
_validateAndRun(op, ec, std::move(parseIsMaster));
});
@@ -127,14 +130,18 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
// authenticateClient will use this to run auth-related commands over our connection.
auto runCommandHook = [this, op](executor::RemoteCommandRequest request,
auth::AuthCompletionHandler handler) {
- auto& cmd = op->beginCommand(request, op->operationProtocol(), now());
+
+ auto beginStatus = op->beginCommand(request);
+ if (!beginStatus.isOK()) {
+ return handler(beginStatus);
+ }
auto callAuthCompletionHandler = [this, op, handler]() {
- auto authResponse = op->command().response(op->operationProtocol(), now());
+ auto authResponse = op->command()->response(op->operationProtocol(), now());
handler(authResponse);
};
- _asyncRunCommand(&cmd,
+ _asyncRunCommand(op->command(),
[this, op, callAuthCompletionHandler](std::error_code ec, size_t bytes) {
_validateAndRun(op, ec, callAuthCompletionHandler);
});
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index dd526430566..452b26a6aec 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/jsobj.h"
#include "mongo/executor/async_stream_interface.h"
+#include "mongo/executor/downconvert_find_and_getmore_commands.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/protocol.h"
#include "mongo/rpc/reply_interface.h"
@@ -123,12 +124,37 @@ void asyncRecvMessageBody(AsyncStreamInterface& stream,
stream.read(asio::buffer(mdView.data(), bodyLength), std::forward<Handler>(handler));
}
+ResponseStatus decodeRPC(Message* received, rpc::Protocol protocol, Milliseconds elapsed) {
+ try {
+ // makeReply will throw if the reply is invalid
+ auto reply = rpc::makeReply(received);
+ if (reply->getProtocol() != protocol) {
+ auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol));
+ if (!requestProtocol.isOK())
+ return requestProtocol.getStatus();
+
+ return Status(ErrorCodes::RPCProtocolNegotiationFailed,
+ str::stream() << "Mismatched RPC protocols - request was '"
+ << requestProtocol.getValue().toString() << "' '"
+ << " but reply was '" << opToString(received->operation())
+ << "'");
+ }
+ auto ownedCommandReply = reply->getCommandReply().getOwned();
+ auto ownedReplyMetadata = reply->getMetadata().getOwned();
+ return {RemoteCommandResponse(
+ std::move(ownedCommandReply), std::move(ownedReplyMetadata), elapsed)};
+ } catch (...) {
+ return exceptionToStatus();
+ }
+}
+
} // namespace
NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn,
+ CommandType type,
Message&& command,
Date_t now)
- : _conn(conn), _toSend(std::move(command)), _start(now) {
+ : _conn(conn), _type(type), _toSend(std::move(command)), _start(now) {
_toSend.header().setResponseTo(0);
}
@@ -150,30 +176,20 @@ MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() {
ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(rpc::Protocol protocol, Date_t now) {
auto& received = _toRecv;
- try {
- auto reply = rpc::makeReply(&received);
-
- if (reply->getProtocol() != protocol) {
- auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol));
- if (!requestProtocol.isOK())
- return requestProtocol.getStatus();
-
- return Status(ErrorCodes::RPCProtocolNegotiationFailed,
- str::stream() << "Mismatched RPC protocols - request was '"
- << requestProtocol.getValue().toString() << "' '"
- << " but reply was '" << opToString(received.operation())
- << "'");
+ switch (_type) {
+ case CommandType::kRPC: {
+ return decodeRPC(&received, protocol, now - _start);
+ }
+ case CommandType::kDownConvertedFind: {
+ auto ns = DbMessage(_toSend).getns();
+ return upconvertLegacyQueryResponse(_toSend.header().getId(), ns, received);
+ }
+ case CommandType::kDownConvertedGetMore: {
+ auto ns = DbMessage(_toSend).getns();
+ return upconvertLegacyGetMoreResponse(_toSend.header().getId(), ns, received);
}
-
- // unavoidable copy
- auto ownedCommandReply = reply->getCommandReply().getOwned();
- auto ownedReplyMetadata = reply->getMetadata().getOwned();
- return ResponseStatus(RemoteCommandResponse(
- std::move(ownedCommandReply), std::move(ownedReplyMetadata), now - _start));
- } catch (...) {
- // makeReply can throw if the reply was invalid.
- return exceptionToStatus();
}
+ MONGO_UNREACHABLE;
}
void NetworkInterfaceASIO::_startCommand(AsyncOp* op) {
@@ -188,9 +204,12 @@ void NetworkInterfaceASIO::_startCommand(AsyncOp* op) {
}
void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
- auto& cmd = op->beginCommand(op->request(), op->operationProtocol(), now());
+ auto beginStatus = op->beginCommand(op->request());
+ if (!beginStatus.isOK()) {
+ return _completeOperation(op, beginStatus);
+ }
- _asyncRunCommand(&cmd,
+ _asyncRunCommand(op->command(),
[this, op](std::error_code ec, size_t bytes) {
_validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); });
});
@@ -198,7 +217,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) {
// TODO: handle metadata readers.
- auto response = op->command().response(op->operationProtocol(), now());
+ auto response = op->command()->response(op->operationProtocol(), now());
_completeOperation(op, response);
}
@@ -290,10 +309,13 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
return _beginCommunication(op);
}
- auto& cmd = op->beginCommand(*optionalRequest, op->operationProtocol(), now());
+ auto beginStatus = op->beginCommand(*optionalRequest);
+ if (!beginStatus.isOK()) {
+ return _completeOperation(op, beginStatus);
+ }
auto finishHook = [this, op]() {
- auto response = op->command().response(op->operationProtocol(), now());
+ auto response = op->command()->response(op->operationProtocol(), now());
if (!response.isOK()) {
return _completeOperation(op, response.getStatus());
@@ -311,7 +333,7 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
return _beginCommunication(op);
};
- return _asyncRunCommand(&cmd,
+ return _asyncRunCommand(op->command(),
[this, op, finishHook](std::error_code ec, std::size_t bytes) {
_validateAndRun(op, ec, finishHook);
});
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index 087534ee936..069ed7f98bc 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -30,10 +30,14 @@
#include "mongo/platform/basic.h"
-#include "mongo/executor/network_interface_asio.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/executor/async_stream_interface.h"
+#include "mongo/executor/downconvert_find_and_getmore_commands.h"
+#include "mongo/executor/network_interface_asio.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/request_builder_interface.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace executor {
@@ -61,11 +65,17 @@ std::unique_ptr<Message> messageFromRequest(const RemoteCommandRequest& request,
} // namespace
-NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHandle,
+NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner,
+ const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish,
Date_t now)
- : _cbHandle(cbHandle), _request(request), _onFinish(onFinish), _start(now), _canceled(0) {}
+ : _owner(owner),
+ _cbHandle(cbHandle),
+ _request(request),
+ _onFinish(onFinish),
+ _start(now),
+ _canceled(0) {}
void NetworkInterfaceASIO::AsyncOp::cancel() {
// An operation may be in mid-flight when it is canceled, so we
@@ -91,26 +101,50 @@ void NetworkInterfaceASIO::AsyncOp::setConnection(AsyncConnection&& conn) {
_connection = std::move(conn);
}
-NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand(
- Message&& newCommand, Date_t now) {
+Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand,
+ AsyncCommand::CommandType type) {
// NOTE: We operate based on the assumption that AsyncOp's
// AsyncConnection does not change over its lifetime.
invariant(_connection.is_initialized());
// Construct a new AsyncCommand object for each command.
- _command.emplace(_connection.get_ptr(), std::move(newCommand), now);
- return _command.get();
+ _command.emplace(_connection.get_ptr(), type, std::move(newCommand), _owner->now());
+ return Status::OK();
}
-NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand(
- const RemoteCommandRequest& request, rpc::Protocol protocol, Date_t now) {
- auto newCommand = messageFromRequest(request, protocol);
- return beginCommand(std::move(*newCommand), now);
+Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request) {
+ // Check if we need to downconvert find or getMore commands.
+ StringData commandName = request.cmdObj.firstElement().fieldNameStringData();
+ const auto isFindCmd = commandName == LiteParsedQuery::kFindCommandName;
+ const auto isGetMoreCmd = commandName == GetMoreRequest::kGetMoreCommandName;
+ const auto isFindOrGetMoreCmd = isFindCmd || isGetMoreCmd;
+
+ // If we aren't sending a find or getMore, or the server supports OP_COMMAND we don't have
+ // to worry about downconversion.
+ if (!isFindOrGetMoreCmd || connection().serverProtocols() == rpc::supports::kAll) {
+ auto newCommand = messageFromRequest(request, operationProtocol());
+ return beginCommand(std::move(*newCommand), AsyncCommand::CommandType::kRPC);
+ } else if (isFindCmd) {
+ auto downconvertedFind = downconvertFindCommandRequest(request);
+ if (!downconvertedFind.isOK()) {
+ return downconvertedFind.getStatus();
+ }
+ return beginCommand(std::move(downconvertedFind.getValue()),
+ AsyncCommand::CommandType::kDownConvertedFind);
+ } else {
+ invariant(isGetMoreCmd);
+ auto downconvertedGetMore = downconvertGetMoreCommandRequest(request);
+ if (!downconvertedGetMore.isOK()) {
+ return downconvertedGetMore.getStatus();
+ }
+ return beginCommand(std::move(downconvertedGetMore.getValue()),
+ AsyncCommand::CommandType::kDownConvertedGetMore);
+ }
}
-NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() {
+NetworkInterfaceASIO::AsyncCommand* NetworkInterfaceASIO::AsyncOp::command() {
invariant(_command.is_initialized());
- return _command.get();
+ return _command.get_ptr();
}
void NetworkInterfaceASIO::AsyncOp::finish(const ResponseStatus& status) {