If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/client/sdam/topology_description.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork #include "mongo/client/sdam/server_description.h" #include "mongo/db/wire_version.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" namespace mongo::sdam { MONGO_FAIL_POINT_DEFINE(topologyDescriptionInstallServerDescription); //////////////////////// // TopologyDescription //////////////////////// TopologyDescription::TopologyDescription(SdamConfiguration config) : _type(config.getInitialType()), _setName(config.getSetName()) { if (auto seeds = config.getSeedList()) { _servers.clear(); for (auto address : *seeds) { _servers.push_back(std::make_shared(address)); } } } const UUID& TopologyDescription::getId() const { return _id; } TopologyType TopologyDescription::getType() const { return _type; } const boost::optional& TopologyDescription::getSetName() const { return _setName; } const boost::optional& TopologyDescription::getMaxSetVersion() const { return _maxSetVersion; } const boost::optional& TopologyDescription::getMaxElectionId() const { return _maxElectionId; } const std::vector& TopologyDescription::getServers() const { return _servers; } bool TopologyDescription::isWireVersionCompatible() const { return _compatible; } const boost::optional& TopologyDescription::getWireVersionCompatibleError() const { return _compatibleError; } const boost::optional& TopologyDescription::getLogicalSessionTimeoutMinutes() const { return _logicalSessionTimeoutMinutes; } void TopologyDescription::setType(TopologyType type) { _type = type; } bool TopologyDescription::containsServerAddress(const ServerAddress& address) const { return findServerByAddress(address) != boost::none; } std::vector TopologyDescription::findServers( std::function predicate) const { std::vector result; std::copy_if(_servers.begin(), _servers.end(), std::back_inserter(result), predicate); return result; } const boost::optional TopologyDescription::findServerByAddress( ServerAddress address) const { auto results = findServers([address](const ServerDescriptionPtr& serverDescription) { return serverDescription->getAddress() == address; }); return (results.size() > 0) ? boost::make_optional(results.front()) : boost::none; } boost::optional TopologyDescription::installServerDescription( const ServerDescriptionPtr& newServerDescription) { LOGV2_DEBUG(4333202, 2, "install server description {description}", "description"_attr = newServerDescription->toString()); boost::optional previousDescription; if (getType() == TopologyType::kSingle) { // For Single, there is always one ServerDescription in TopologyDescription.servers; // the ServerDescription in TopologyDescription.servers MUST be replaced with the new // ServerDescription if the new topologyVersion is >= the old. invariant(_servers.size() == 1); previousDescription = _servers[0]; _servers[0] = std::shared_ptr(newServerDescription); } else { for (auto it = _servers.begin(); it != _servers.end(); ++it) { const auto& currentDescription = *it; if (currentDescription->getAddress() == newServerDescription->getAddress()) { previousDescription = *it; *it = std::shared_ptr(newServerDescription); break; } } if (!previousDescription) { _servers.push_back(std::shared_ptr(newServerDescription)); } } newServerDescription->_topologyDescription = shared_from_this(); checkWireCompatibilityVersions(); calculateLogicalSessionTimeout(); topologyDescriptionInstallServerDescription.shouldFail(); return previousDescription; } void TopologyDescription::removeServerDescription(const ServerAddress& serverAddress) { auto it = std::find_if( _servers.begin(), _servers.end(), [serverAddress](const ServerDescriptionPtr& description) { return description->getAddress() == serverAddress; }); if (it != _servers.end()) { _servers.erase(it); } } void TopologyDescription::checkWireCompatibilityVersions() { const WireVersionInfo supportedWireVersion = {BATCH_COMMANDS, LATEST_WIRE_VERSION}; std::ostringstream errorOss; _compatible = true; for (const auto& serverDescription : _servers) { if (serverDescription->getType() == ServerType::kUnknown) { continue; } if (serverDescription->getMinWireVersion() > supportedWireVersion.maxWireVersion) { _compatible = false; errorOss << "Server at " << serverDescription->getAddress() << " requires wire version " << serverDescription->getMinWireVersion() << " but this version of mongo only supports up to " << supportedWireVersion.maxWireVersion << "."; break; } else if (serverDescription->getMaxWireVersion() < supportedWireVersion.minWireVersion) { _compatible = false; const auto& mongoVersion = minimumRequiredMongoVersionString(supportedWireVersion.minWireVersion); errorOss << "Server at " << serverDescription->getAddress() << " requires wire version " << serverDescription->getMaxWireVersion() << " but this version of mongo requires at least " << supportedWireVersion.minWireVersion << " (MongoDB " << mongoVersion << ")."; break; } } _compatibleError = (_compatible) ? boost::none : boost::make_optional(errorOss.str()); } const std::string TopologyDescription::minimumRequiredMongoVersionString(int version) { switch (version) { case RESUMABLE_INITIAL_SYNC: return "4.4"; case SHARDED_TRANSACTIONS: return "4.2"; case REPLICA_SET_TRANSACTIONS: return "4.0"; case SUPPORTS_OP_MSG: return "3.6"; case COMMANDS_ACCEPT_WRITE_CONCERN: return "3.4"; case BATCH_COMMANDS: return "3.2"; case FIND_COMMAND: return "3.2"; case RELEASE_2_7_7: return "3.0"; case AGG_RETURNS_CURSORS: return "2.6"; case RELEASE_2_4_AND_BEFORE: return "2.4"; default: MONGO_UNREACHABLE; } } void TopologyDescription::calculateLogicalSessionTimeout() { int min = INT_MAX; bool foundNone = false; bool hasDataBearingServer = false; invariant(_servers.size() > 0); for (auto description : _servers) { if (!description->isDataBearingServer()) { continue; } hasDataBearingServer = true; auto logicalSessionTimeout = description->getLogicalSessionTimeoutMinutes(); if (!logicalSessionTimeout) { foundNone = true; break; } min = std::min(*logicalSessionTimeout, min); } _logicalSessionTimeoutMinutes = (foundNone || !hasDataBearingServer) ? boost::none : boost::make_optional(min); } BSONObj TopologyDescription::toBSON() { BSONObjBuilder bson; bson << "id" << _id.toString(); bson << "topologyType" << mongo::sdam::toString(_type); BSONObjBuilder bsonServers; for (auto server : this->getServers()) { bsonServers << server->getAddress() << server->toBson(); } bson.append("servers", bsonServers.obj()); if (_logicalSessionTimeoutMinutes) { bson << "logicalSessionTimeoutMinutes" << *_logicalSessionTimeoutMinutes; } if (_setName) { bson << "setName" << *_setName; } if (_compatible) { bson << "compatible" << true; } else { bson << "compatible" << false; bson << "compatibleError" << *_compatibleError; } if (_maxSetVersion) { bson << "maxSetVersion" << *_maxSetVersion; } if (_maxElectionId) { bson << "maxElectionId" << *_maxElectionId; } return bson.obj(); } std::string TopologyDescription::toString() { return toBSON().toString(); } boost::optional TopologyDescription::getPrimary() { if (getType() != TopologyType::kReplicaSetWithPrimary) { return boost::none; } auto foundPrimaries = findServers( [](const ServerDescriptionPtr& s) { return s->getType() == ServerType::kRSPrimary; }); invariant(foundPrimaries.size() == 1); return foundPrimaries[0]; } } // namespace mongo::sdam