/* Copyright 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/client/replica_set_monitor.h" #include #include #include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor_internal.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" #include "mongo/s/grid.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/background.h" #include "mongo/util/concurrency/mutex.h" // for StaticObserver #include "mongo/util/debug_util.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/string_map.h" #include "mongo/util/static_observer.h" #include "mongo/util/timer.h" namespace mongo { using std::shared_ptr; using std::numeric_limits; using std::set; using std::string; using std::vector; namespace { // Pull nested types to top-level scope typedef ReplicaSetMonitor::IsMasterReply IsMasterReply; typedef ReplicaSetMonitor::ScanState ScanState; typedef ReplicaSetMonitor::ScanStatePtr ScanStatePtr; typedef ReplicaSetMonitor::SetState SetState; typedef ReplicaSetMonitor::SetStatePtr SetStatePtr; typedef ReplicaSetMonitor::Refresher Refresher; typedef ScanState::UnconfirmedReplies UnconfirmedReplies; typedef SetState::Node Node; typedef SetState::Nodes Nodes; using executor::TaskExecutor; using CallbackArgs = TaskExecutor::CallbackArgs; using CallbackHandle = TaskExecutor::CallbackHandle; const double socketTimeoutSecs = 5; // Intentionally chosen to compare worse than all known latencies. const int64_t unknownLatency = numeric_limits::max(); const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); const Milliseconds kFindHostMaxBackOffTime(500); // TODO: Move to ReplicaSetMonitorManager ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook; ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook; StaticObserver staticObserver; // // Helpers for stl algorithms // bool isMaster(const Node& node) { return node.isMaster; } bool compareLatencies(const Node* lhs, const Node* rhs) { // NOTE: this automatically compares Node::unknownLatency worse than all others. return lhs->latencyMicros < rhs->latencyMicros; } bool hostsEqual(const Node& lhs, const HostAndPort& rhs) { return lhs.host == rhs; } // Allows comparing two Nodes, or a HostAndPort and a Node. // NOTE: the two HostAndPort overload is only needed to support extra checks in some STL // implementations. For simplicity, no comparator should be used with collections of just // HostAndPort. struct CompareHosts { bool operator()(const Node& lhs, const Node& rhs) { return lhs.host < rhs.host; } bool operator()(const Node& lhs, const HostAndPort& rhs) { return lhs.host < rhs; } bool operator()(const HostAndPort& lhs, const Node& rhs) { return lhs < rhs.host; } bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) { return lhs < rhs; } } compareHosts; // like an overloaded function, but able to pass to stl algorithms // The following structs should be treated as functions returning a UnaryPredicate. // Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost)); // They all hold their constructor argument by reference. struct HostIs { explicit HostIs(const HostAndPort& host) : _host(host) {} bool operator()(const HostAndPort& host) { return host == _host; } bool operator()(const Node& node) { return node.host == _host; } const HostAndPort& _host; }; struct HostNotIn { explicit HostNotIn(const std::set& hosts) : _hosts(hosts) {} bool operator()(const HostAndPort& host) { return !_hosts.count(host); } bool operator()(const Node& node) { return !_hosts.count(node.host); } const std::set& _hosts; }; /** * Replica set refresh period on the task executor. */ const Seconds kRefreshPeriod(30); } // namespace // At 1 check every 10 seconds, 30 checks takes 5 minutes std::atomic ReplicaSetMonitor::maxConsecutiveFailedChecks(30); // NOLINT // If we cannot find a host after 15 seconds of refreshing, give up const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15); // Defaults to random selection as required by the spec bool ReplicaSetMonitor::useDeterministicHostSelection = false; ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set& seeds) : _state(std::make_shared(name, seeds)), _executor(globalRSMonitorManager.getExecutor()) {} void ReplicaSetMonitor::init() { stdx::lock_guard lk(_mutex); invariant(_executor); std::weak_ptr that(shared_from_this()); auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) { if (auto ptr = that.lock()) { ptr->_refresh(cbArgs); } }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { LOG(1) << "Couldn't schedule refresh for " << getName() << ". Executor shutdown in progress"; return; } if (!status.isOK()) { severe() << "Can't start refresh for replica set " << getName() << causedBy(status.getStatus()); fassertFailed(40139); } _refresherHandle = status.getValue(); } ReplicaSetMonitor::~ReplicaSetMonitor() { // need this lock because otherwise can get race with scheduling in _refresh stdx::lock_guard lk(_mutex); if (!_refresherHandle || !_executor) { return; } _executor->cancel(_refresherHandle); // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang // Its ok not to call it because the d-tor is called only when the last owning pointer goes out // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a // task to execute eliminating the need to call method "wait". // _refresherHandle = {}; } void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) { if (!cbArgs.status.isOK()) { return; } Timer t; startOrContinueRefresh().refreshAll(); LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec"; if (!isSetUsable()) { log() << "Stopping periodic monitoring of set " << getName() << " because none of the hosts could be contacted for an extended period of " "time."; ReplicaSetMonitor::remove(getName()); return; } { // reschedule itself invariant(_executor); stdx::lock_guard lk(_mutex); std::weak_ptr that(shared_from_this()); auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, [=](const CallbackArgs& cbArgs) { if (auto ptr = that.lock()) { ptr->_refresh(cbArgs); } }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress"; return; } if (!status.isOK()) { severe() << "Can't continue refresh for replica set " << getName() << " due to " << status.getStatus().toString(); fassertFailed(40140); } _refresherHandle = status.getValue(); } } StatusWith ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, Milliseconds maxWait) { { // Fast path, for the failure-free case stdx::lock_guard lk(_state->mutex); HostAndPort out = _state->getMatchingHost(criteria); if (!out.empty()) return out; } const auto startTimeMs = Date_t::now(); while (true) { // We might not have found any matching hosts due to the scan, which just completed may have // seen stale data from before we joined. Therefore we should participate in a new scan to // make sure all hosts are contacted at least once (possibly by other threads) before this // function gives up. Refresher refresher(startOrContinueRefresh()); HostAndPort out = refresher.refreshUntilMatches(criteria); if (!out.empty()) return out; if (!isSetUsable()) { return Status(ErrorCodes::ReplicaSetNotFound, str::stream() << "None of the hosts for replica set " << getName() << " could be contacted."); } const Milliseconds remaining = maxWait - (Date_t::now() - startTimeMs); if (remaining < kFindHostMaxBackOffTime) { break; } // Back-off so we don't spam the replica set hosts too much sleepFor(kFindHostMaxBackOffTime); } return Status(ErrorCodes::FailedToSatisfyReadPreference, str::stream() << "could not find host matching read preference " << criteria.toString() << " for set " << getName()); } HostAndPort ReplicaSetMonitor::getMasterOrUassert() { return uassertStatusOK(getHostOrRefresh(kPrimaryOnlyReadPreference)); } Refresher ReplicaSetMonitor::startOrContinueRefresh() { stdx::lock_guard lk(_state->mutex); Refresher out(_state); DEV _state->checkInvariants(); return out; } void ReplicaSetMonitor::failedHost(const HostAndPort& host) { stdx::lock_guard lk(_state->mutex); Node* node = _state->findNode(host); if (node) node->markFailed(); DEV _state->checkInvariants(); } bool ReplicaSetMonitor::isPrimary(const HostAndPort& host) const { stdx::lock_guard lk(_state->mutex); Node* node = _state->findNode(host); return node ? node->isMaster : false; } bool ReplicaSetMonitor::isHostUp(const HostAndPort& host) const { stdx::lock_guard lk(_state->mutex); Node* node = _state->findNode(host); return node ? node->isUp : false; } int ReplicaSetMonitor::getMinWireVersion() const { stdx::lock_guard lk(_state->mutex); int minVersion = 0; for (const auto& host : _state->nodes) { if (host.isUp) { minVersion = std::max(minVersion, host.minWireVersion); } } return minVersion; } int ReplicaSetMonitor::getMaxWireVersion() const { stdx::lock_guard lk(_state->mutex); int maxVersion = std::numeric_limits::max(); for (const auto& host : _state->nodes) { if (host.isUp) { maxVersion = std::min(maxVersion, host.maxWireVersion); } } return maxVersion; } bool ReplicaSetMonitor::isSetUsable() const { stdx::lock_guard lk(_state->mutex); return _state->isUsable(); } std::string ReplicaSetMonitor::getName() const { // name is const so don't need to lock return _state->name; } std::string ReplicaSetMonitor::getServerAddress() const { stdx::lock_guard lk(_state->mutex); return _state->getConfirmedServerAddress(); } bool ReplicaSetMonitor::contains(const HostAndPort& host) const { stdx::lock_guard lk(_state->mutex); return _state->seedNodes.count(host); } void ReplicaSetMonitor::createIfNeeded(const string& name, const set& servers) { globalRSMonitorManager.getOrCreateMonitor( ConnectionString::forReplicaSet(name, vector(servers.begin(), servers.end()))); } shared_ptr ReplicaSetMonitor::get(const std::string& name) { return globalRSMonitorManager.getMonitor(name); } void ReplicaSetMonitor::remove(const string& name) { globalRSMonitorManager.removeMonitor(name); // Kill all pooled ReplicaSetConnections for this set. They will not function correctly // after we kill the ReplicaSetMonitor. globalConnPool.removeHost(name); } void ReplicaSetMonitor::setAsynchronousConfigChangeHook(ConfigChangeHook hook) { invariant(!asyncConfigChangeHook); asyncConfigChangeHook = hook; } void ReplicaSetMonitor::setSynchronousConfigChangeHook(ConfigChangeHook hook) { invariant(!syncConfigChangeHook); syncConfigChangeHook = hook; } // TODO move to correct order with non-statics before pushing void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder) const { stdx::lock_guard lk(_state->mutex); // NOTE: the format here must be consistent for backwards compatibility BSONArrayBuilder hosts(bsonObjBuilder.subarrayStart("hosts")); for (unsigned i = 0; i < _state->nodes.size(); i++) { const Node& node = _state->nodes[i]; BSONObjBuilder builder; builder.append("addr", node.host.toString()); builder.append("ok", node.isUp); builder.append("ismaster", node.isMaster); // intentionally not camelCase builder.append("hidden", false); // we don't keep hidden nodes in the set builder.append("secondary", node.isUp && !node.isMaster); int32_t pingTimeMillis = 0; if (node.latencyMicros / 1000 > numeric_limits::max()) { // In particular, Node::unknownLatency does not fit in an int32. pingTimeMillis = numeric_limits::max(); } else { pingTimeMillis = node.latencyMicros / 1000; } builder.append("pingTimeMillis", pingTimeMillis); if (!node.tags.isEmpty()) { builder.append("tags", node.tags); } hosts.append(builder.obj()); } hosts.done(); } void ReplicaSetMonitor::cleanup() { globalRSMonitorManager.removeAllMonitors(); asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook(); syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook(); } bool ReplicaSetMonitor::isKnownToHaveGoodPrimary() const { stdx::lock_guard lk(_state->mutex); for (const auto& node : _state->nodes) { if (node.isMaster) { return true; } } return false; } Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan), _startedNewScan(false) { if (_scan) return; // participate in in-progress scan LOG(2) << "Starting new refresh of replica set " << _set->name; _scan = startNewScan(_set.get()); _set->currentScan = _scan; _startedNewScan = true; } Refresher::NextStep Refresher::getNextStep() { // If the set is faulty, don't try anymore if (!_set->isUsable()) { return NextStep(NextStep::DONE); } // No longer the current scan if (_scan != _set->currentScan) { return NextStep(NextStep::DONE); } // Wait for all dispatched hosts to return before trying any fallback hosts. if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) { return NextStep(NextStep::WAIT); } // If we haven't yet found a master, try contacting unconfirmed hosts if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) { _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand); _scan->possibleNodes.clear(); } if (_scan->hostsToScan.empty()) { // We've tried all hosts we can, so nothing more to do in this round. if (!_scan->foundUpMaster) { warning() << "No primary detected for set " << _set->name; // Since we've talked to everyone we could but still didn't find a primary, we // do the best we can, and assume all unconfirmedReplies are actually from nodes // in the set (we've already confirmed that they think they are). This is // important since it allows us to bootstrap to a usable state even if we are // unable to talk to a master while starting up. As soon as we are able to // contact a master, we will remove any nodes that it doesn't think are part of // the set, undoing the damage we cause here. // NOTE: we don't modify seedNodes or notify about set membership change in this // case since it hasn't been confirmed by a master. const string oldAddr = _set->getUnconfirmedServerAddress(); for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); it != _scan->unconfirmedReplies.end(); ++it) { _set->findOrCreateNode(it->host)->update(*it); } const string newAddr = _set->getUnconfirmedServerAddress(); if (oldAddr != newAddr && syncConfigChangeHook) { // Run the syncConfigChangeHook because the ShardRegistry needs to know about any // node we might talk to. Don't run the asyncConfigChangeHook because we don't // want to update the seed list stored on the config servers with unconfirmed hosts. syncConfigChangeHook(_set->name, _set->getUnconfirmedServerAddress()); } } if (_scan->foundAnyUpNodes) { _set->consecutiveFailedScans = 0; } else { _set->consecutiveFailedScans++; log() << "All nodes for set " << _set->name << " are down. " << "This has happened for " << _set->consecutiveFailedScans << " checks in a row. Polling will stop after " << maxConsecutiveFailedChecks - _set->consecutiveFailedScans << " more failed checks"; } // Makes sure all other Refreshers in this round return DONE _set->currentScan.reset(); return NextStep(NextStep::DONE); } // Pop and return the next hostToScan. HostAndPort host = _scan->hostsToScan.front(); _scan->hostsToScan.pop_front(); _scan->waitingFor.insert(host); _scan->triedHosts.insert(host); return NextStep(NextStep::CONTACT_HOST, host); } void Refresher::receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& replyObj) { // Be careful: all return paths must call either failedHost or cv.notify_all! _scan->waitingFor.erase(from); const IsMasterReply reply(from, latencyMicros, replyObj); // Handle various failure cases if (!reply.ok) { failedHost(from); return; } if (reply.setName != _set->name) { if (reply.raw["isreplicaset"].trueValue()) { // The reply came from a node in the state referred to as RSGhost in the SDAM // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event, // if a reply from a ghost offers a list of possible other members of the replica set, // and if this refresher has yet to find the replica set master, we add hosts listed in // the reply to the list of possible replica set members. if (!_scan->foundUpMaster) { _scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end()); } } else { warning() << "node: " << from << " isn't a part of set: " << _set->name << " ismaster: " << replyObj; } failedHost(from); return; } if (reply.isMaster) { if (!receivedIsMasterFromMaster(reply)) { log() << "node " << from << " believes it is primary, but its election id of " << reply.electionId << " and config version of " << reply.configVersion << " is older than the most recent election id " << _set->maxElectionId << " and config version of " << _set->configVersion; failedHost(from); return; } } if (_scan->foundUpMaster) { // We only update a Node if a master has confirmed it is in the set. _set->updateNodeIfInNodes(reply); } else { receivedIsMasterBeforeFoundMaster(reply); _scan->unconfirmedReplies.push_back(reply); } // _set->nodes may still not have any nodes with isUp==true, but we have at least found a // connectible host that is that claims to be in the set. _scan->foundAnyUpNodes = true; // TODO consider only notifying if we've updated a node or we've emptied waitingFor. _set->cv.notify_all(); DEV _set->checkInvariants(); } void Refresher::failedHost(const HostAndPort& host) { _scan->waitingFor.erase(host); // Failed hosts can't pass criteria, so the only way they'd effect the _refreshUntilMatches // loop is if it was the last host we were waitingFor. if (_scan->waitingFor.empty()) _set->cv.notify_all(); Node* node = _set->findNode(host); if (node) node->markFailed(); } ScanStatePtr Refresher::startNewScan(const SetState* set) { const ScanStatePtr scan = std::make_shared(); // The heuristics we use in deciding the order to contact hosts are designed to find a // master as quickly as possible. This is because we can't use any hosts we find until // we either get the latest set of members from a master or talk to all possible hosts // without finding a master. // TODO It might make sense to check down nodes first if the last seen master is still // marked as up. int upNodes = 0; for (Nodes::const_iterator it(set->nodes.begin()), end(set->nodes.end()); it != end; ++it) { if (it->isUp) { // scan the nodes we think are up first scan->hostsToScan.push_front(it->host); upNodes++; } else { scan->hostsToScan.push_back(it->host); } } // shuffle the queue, but keep "up" nodes at the front std::random_shuffle(scan->hostsToScan.begin(), scan->hostsToScan.begin() + upNodes, set->rand); std::random_shuffle(scan->hostsToScan.begin() + upNodes, scan->hostsToScan.end(), set->rand); if (!set->lastSeenMaster.empty()) { // move lastSeenMaster to front of queue std::stable_partition( scan->hostsToScan.begin(), scan->hostsToScan.end(), HostIs(set->lastSeenMaster)); } return scan; } bool Refresher::receivedIsMasterFromMaster(const IsMasterReply& reply) { invariant(reply.isMaster); // Reject if config version is older. This is for backwards compatibility with nodes in pv0 // since they don't have the same ordering with pv1 electionId. if (reply.configVersion < _set->configVersion) { return false; } if (reply.electionId.isSet()) { // ElectionIds are only comparable if they are of the same protocol version. However, since // isMaster has no protocol version field, we use the configVersion instead. This works // because configVersion needs to be incremented whenever the protocol version is changed. if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() && _set->maxElectionId.compare(reply.electionId) > 0) { return false; } _set->maxElectionId = reply.electionId; } _set->configVersion = reply.configVersion; // Mark all nodes as not master. We will mark ourself as master before releasing the lock. // NOTE: we use a "last-wins" policy if multiple hosts claim to be master. for (size_t i = 0; i < _set->nodes.size(); i++) { _set->nodes[i].isMaster = false; } // Check if the master agrees with our current list of nodes. // REMINDER: both _set->nodes and reply.normalHosts are sorted. if (_set->nodes.size() != reply.normalHosts.size() || !std::equal( _set->nodes.begin(), _set->nodes.end(), reply.normalHosts.begin(), hostsEqual)) { LOG(2) << "Adjusting nodes in our view of replica set " << _set->name << " based on master reply: " << reply.raw; // remove non-members from _set->nodes _set->nodes.erase( std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.normalHosts)), _set->nodes.end()); // add new members to _set->nodes for (std::set::const_iterator it = reply.normalHosts.begin(); it != reply.normalHosts.end(); ++it) { _set->findOrCreateNode(*it); } // replace hostToScan queue with untried normal hosts. can both add and remove // hosts from the queue. _scan->hostsToScan.clear(); _scan->enqueAllUntriedHosts(reply.normalHosts, _set->rand); if (!_scan->waitingFor.empty()) { // make sure we don't wait for any hosts that aren't considered members std::set newWaitingFor; std::set_intersection(reply.normalHosts.begin(), reply.normalHosts.end(), _scan->waitingFor.begin(), _scan->waitingFor.end(), std::inserter(newWaitingFor, newWaitingFor.end())); _scan->waitingFor.swap(newWaitingFor); } } if (reply.normalHosts != _set->seedNodes) { const string oldAddr = _set->getConfirmedServerAddress(); _set->seedNodes = reply.normalHosts; // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare // and we want to record our changes log() << "changing hosts to " << _set->getConfirmedServerAddress() << " from " << oldAddr; if (syncConfigChangeHook) { syncConfigChangeHook(_set->name, _set->getConfirmedServerAddress()); } if (asyncConfigChangeHook) { // call from a separate thread to avoid blocking and holding lock while potentially // going over the network stdx::thread bg(asyncConfigChangeHook, _set->name, _set->getConfirmedServerAddress()); bg.detach(); } } // Update other nodes's information based on replies we've already seen for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); it != _scan->unconfirmedReplies.end(); ++it) { // this ignores replies from hosts not in _set->nodes (as modified above) _set->updateNodeIfInNodes(*it); } _scan->unconfirmedReplies.clear(); _scan->foundUpMaster = true; _set->lastSeenMaster = reply.host; return true; } void Refresher::receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply) { invariant(!reply.isMaster); // This function doesn't alter _set at all. It only modifies the work queue in _scan. // Add everyone this host claims is in the set to possibleNodes. _scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end()); // If this node thinks the primary is someone we haven't tried, make that the next // hostToScan. if (!reply.primary.empty() && !_scan->triedHosts.count(reply.primary)) { std::deque::iterator it = std::stable_partition( _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(reply.primary)); if (it == _scan->hostsToScan.begin()) { // reply.primary wasn't in hostsToScan _scan->hostsToScan.push_front(reply.primary); } } } HostAndPort Refresher::_refreshUntilMatches(const ReadPreferenceSetting* criteria) { stdx::unique_lock lk(_set->mutex); while (true) { if (criteria) { HostAndPort out = _set->getMatchingHost(*criteria); if (!out.empty()) return out; } const NextStep ns = getNextStep(); DEV _set->checkInvariants(); switch (ns.step) { case NextStep::DONE: // getNextStep may have updated nodes if no master was found return criteria ? _set->getMatchingHost(*criteria) : HostAndPort(); case NextStep::WAIT: // TODO consider treating as DONE for refreshAll _set->cv.wait(lk); continue; case NextStep::CONTACT_HOST: { BSONObj reply; // empty on error int64_t pingMicros = 0; lk.unlock(); // relocked after attempting to call isMaster try { ScopedDbConnection conn(ConnectionString(ns.host), socketTimeoutSecs); bool ignoredOutParam = false; Timer timer; conn->isMaster(ignoredOutParam, &reply); pingMicros = timer.micros(); conn.done(); // return to pool on success. } catch (...) { reply = BSONObj(); // should be a no-op but want to be sure } lk.lock(); // Ignore the reply and return if we are no longer the current scan. This might // happen if it was decided that the host we were contacting isn't part of the set. if (_scan != _set->currentScan) return criteria ? _set->getMatchingHost(*criteria) : HostAndPort(); if (reply.isEmpty()) failedHost(ns.host); else receivedIsMaster(ns.host, pingMicros, reply); } } } } void IsMasterReply::parse(const BSONObj& obj) { try { raw = obj.getOwned(); // don't use obj again after this line ok = raw["ok"].trueValue(); if (!ok) return; setName = raw["setName"].str(); hidden = raw["hidden"].trueValue(); secondary = raw["secondary"].trueValue(); minWireVersion = raw["minWireVersion"].numberInt(); maxWireVersion = raw["maxWireVersion"].numberInt(); // hidden nodes can't be master, even if they claim to be. isMaster = !hidden && raw["ismaster"].trueValue(); if (isMaster && raw.hasField("electionId")) { electionId = raw["electionId"].OID(); } configVersion = raw["setVersion"].numberInt(); const string primaryString = raw["primary"].str(); primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString); // both hosts and passives, but not arbiters, are considered "normal hosts" normalHosts.clear(); BSONForEach(host, raw.getObjectField("hosts")) { normalHosts.insert(HostAndPort(host.String())); } BSONForEach(host, raw.getObjectField("passives")) { normalHosts.insert(HostAndPort(host.String())); } tags = raw.getObjectField("tags"); } catch (const std::exception& e) { ok = false; log() << "exception while parsing isMaster reply: " << e.what() << " " << obj; } } Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {} void Node::markFailed() { LOG(1) << "Marking host " << host << " as failed"; isUp = false; isMaster = false; } bool Node::matches(const ReadPreference pref) const { if (!isUp) return false; if (pref == ReadPreference::PrimaryOnly) { return isMaster; } if (pref == ReadPreference::SecondaryOnly) { if (isMaster) return false; } return true; } bool Node::matches(const BSONObj& tag) const { BSONForEach(tagCriteria, tag) { if (this->tags[tagCriteria.fieldNameStringData()] != tagCriteria) return false; } return true; } void Node::update(const IsMasterReply& reply) { invariant(host == reply.host); invariant(reply.ok); LOG(3) << "Updating host " << host << " based on ismaster reply: " << reply.raw; // Nodes that are hidden or neither master or secondary are considered down since we can't // send any operations to them. isUp = !reply.hidden && (reply.isMaster || reply.secondary); isMaster = reply.isMaster; minWireVersion = reply.minWireVersion; maxWireVersion = reply.maxWireVersion; // save a copy if unchanged if (!tags.binaryEqual(reply.tags)) tags = reply.tags.getOwned(); if (reply.latencyMicros >= 0) { // TODO upper bound? if (latencyMicros == unknownLatency) { latencyMicros = reply.latencyMicros; } else { // update latency with smoothed moving average (1/4th the delta) latencyMicros += (reply.latencyMicros - latencyMicros) / 4; } } } SetState::SetState(StringData name, const std::set& seedNodes) : name(name.toString()), consecutiveFailedScans(0), seedNodes(seedNodes), latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * 1000), rand(int64_t(time(0))), roundRobin(0) { uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); if (name.empty()) warning() << "Replica set name empty, first node: " << *(seedNodes.begin()); // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a // scan until we start a scan and either find a master or contact all hosts without finding // one. // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to // sort nodes after this loop. for (std::set::const_iterator it = seedNodes.begin(); it != seedNodes.end(); ++it) { nodes.push_back(Node(*it)); } DEV checkInvariants(); } bool SetState::isUsable() const { return consecutiveFailedScans < maxConsecutiveFailedChecks; } HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const { switch (criteria.pref) { // "Prefered" read preferences are defined in terms of other preferences case ReadPreference::PrimaryPreferred: { HostAndPort out = getMatchingHost(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); // NOTE: the spec says we should use the primary even if tags don't match if (!out.empty()) return out; return getMatchingHost( ReadPreferenceSetting(ReadPreference::SecondaryOnly, criteria.tags)); } case ReadPreference::SecondaryPreferred: { HostAndPort out = getMatchingHost( ReadPreferenceSetting(ReadPreference::SecondaryOnly, criteria.tags)); if (!out.empty()) return out; // NOTE: the spec says we should use the primary even if tags don't match return getMatchingHost( ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); } case ReadPreference::PrimaryOnly: { // NOTE: isMaster implies isUp Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster); if (it == nodes.end()) return HostAndPort(); return it->host; } // The difference between these is handled by Node::matches case ReadPreference::SecondaryOnly: case ReadPreference::Nearest: { BSONForEach(tagElem, criteria.tags.getTagBSON()) { uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj()); BSONObj tag = tagElem.Obj(); std::vector matchingNodes; for (size_t i = 0; i < nodes.size(); i++) { if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag)) { matchingNodes.push_back(&nodes[i]); } } // don't do more complicated selection if not needed if (matchingNodes.empty()) continue; if (matchingNodes.size() == 1) return matchingNodes.front()->host; // order by latency and don't consider hosts further than a threshold from the // closest. std::sort(matchingNodes.begin(), matchingNodes.end(), compareLatencies); for (size_t i = 1; i < matchingNodes.size(); i++) { int64_t distance = matchingNodes[i]->latencyMicros - matchingNodes[0]->latencyMicros; if (distance >= latencyThresholdMicros) { // this node and all remaining ones are too far away matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end()); break; } } // of the remaining nodes, pick one at random (or use round-robin) if (ReplicaSetMonitor::useDeterministicHostSelection) { // only in tests return matchingNodes[roundRobin++ % matchingNodes.size()]->host; } else { // normal case return matchingNodes[rand.nextInt32(matchingNodes.size())]->host; }; } return HostAndPort(); } default: uassert(16337, "Unknown read preference", false); break; } } Node* SetState::findNode(const HostAndPort& host) { const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); if (it == nodes.end() || it->host != host) return NULL; return &(*it); } Node* SetState::findOrCreateNode(const HostAndPort& host) { // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class // must function correctly even with more nodes). If we lift that restriction, we may need // to consider alternate algorithms. Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); if (it == nodes.end() || it->host != host) { LOG(2) << "Adding node " << host << " to our view of replica set " << name; it = nodes.insert(it, Node(host)); } return &(*it); } void SetState::updateNodeIfInNodes(const IsMasterReply& reply) { Node* node = findNode(reply.host); if (!node) { LOG(2) << "Skipping application of ismaster reply from " << reply.host << " since it isn't a confirmed member of set " << name; return; } node->update(reply); } std::string SetState::getConfirmedServerAddress() const { StringBuilder ss; if (!name.empty()) ss << name << "/"; for (std::set::const_iterator it = seedNodes.begin(); it != seedNodes.end(); ++it) { if (it != seedNodes.begin()) ss << ","; it->append(ss); } return ss.str(); } std::string SetState::getUnconfirmedServerAddress() const { StringBuilder ss; if (!name.empty()) ss << name << "/"; for (std::vector::const_iterator it = nodes.begin(); it != nodes.end(); ++it) { if (it != nodes.begin()) ss << ","; it->host.append(ss); } return ss.str(); } void SetState::checkInvariants() const { bool foundMaster = false; for (size_t i = 0; i < nodes.size(); i++) { // no empty hosts invariant(!nodes[i].host.empty()); if (nodes[i].isMaster) { // masters must be up invariant(nodes[i].isUp); // at most one master invariant(!foundMaster); foundMaster = true; // if we have a master it should be the same as lastSeenMaster invariant(nodes[i].host == lastSeenMaster); } // should never end up with negative latencies invariant(nodes[i].latencyMicros >= 0); // nodes must be sorted by host with no-dupes invariant(i == 0 || (nodes[i - 1].host < nodes[i].host)); } // nodes should be a (non-strict) superset of the seedNodes invariant(std::includes( nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts)); if (currentScan) { // hostsToScan can't have dups or hosts already in triedHosts. std::set cantSee = currentScan->triedHosts; for (std::deque::const_iterator it = currentScan->hostsToScan.begin(); it != currentScan->hostsToScan.end(); ++it) { invariant(!cantSee.count(*it)); cantSee.insert(*it); // make sure we don't see this again } // We should only be waitingFor hosts that are in triedHosts invariant(std::includes(currentScan->triedHosts.begin(), currentScan->triedHosts.end(), currentScan->waitingFor.begin(), currentScan->waitingFor.end())); // We should only have unconfirmedReplies if we haven't found a master yet invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty()); } } template void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) { invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. // no std::copy_if before c++11 for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end; ++it) { if (!triedHosts.count(*it)) { hostsToScan.push_back(*it); } } std::random_shuffle(hostsToScan.begin(), hostsToScan.end(), rand); } }