diff options
Diffstat (limited to 'src/mongo/db/repl')
18 files changed, 94 insertions, 18 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index daa0389c9fa..0cb2c057f8f 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -226,6 +226,11 @@ void BackgroundSync::_run() { Client::initThread("BackgroundSync"); AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + while (!inShutdown()) { try { _runProducer(); diff --git a/src/mongo/db/repl/noop_writer.cpp b/src/mongo/db/repl/noop_writer.cpp index 4bf8188010b..81be6651282 100644 --- a/src/mongo/db/repl/noop_writer.cpp +++ b/src/mongo/db/repl/noop_writer.cpp @@ -83,6 +83,13 @@ public: private: void run(Seconds waitTime, NoopWriteFn noopWrite) { Client::initThread("NoopWriter"); + + // TODO(SERVER-74656): Please revisit if this thread could be made killable. + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + while (true) { const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); OperationContext& opCtx = *opCtxPtr; diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index af33eaf9633..3e8364baa34 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -171,9 +171,9 @@ std::unique_ptr<ThreadPool> makeReplWriterPool(int threadCount, auto client = Client::getCurrent(); AuthorizationSession::get(*client)->grantInternalAuthorization(client); - if (isKillableByStepdown) { + if (!isKillableByStepdown) { stdx::lock_guard<Client> lk(*client); - client->setSystemOperationKillableByStepdown(lk); + client->setSystemOperationUnkillableByStepdown(lk); } }; auto pool = std::make_unique<ThreadPool>(options); diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index baff24f4ba7..10239d41777 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -296,6 +296,11 @@ void ApplyBatchFinalizerForJournal::record(const OpTimeAndWallTime& newOpTimeAnd void ApplyBatchFinalizerForJournal::_run() { Client::initThread("ApplyBatchFinalizerForJournal"); + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + while (true) { OpTimeAndWallTime latestOpTimeAndWallTime = {OpTime(), Date_t()}; diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp index 86415c4f1ad..a652d7346e3 100644 --- a/src/mongo/db/repl/oplog_batcher.cpp +++ b/src/mongo/db/repl/oplog_batcher.cpp @@ -298,6 +298,13 @@ void OplogBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { void OplogBatcher::_run(StorageInterface* storageInterface) { Client::initThread("ReplBatcher"); + { + // The OplogBatcher's thread has its own shutdown sequence triggered by the OplogApplier, + // so we don't want it to be killed in other ways. + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + BatchLimits batchLimits; while (true) { diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index ab4f061d84d..4bf6d12cc49 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -335,9 +335,6 @@ void PrimaryOnlyService::startup(OperationContext* opCtx) { auto client = Client::getCurrent(); AuthorizationSession::get(*client)->grantInternalAuthorization(&cc()); - stdx::lock_guard<Client> lk(*client); - client->setSystemOperationKillableByStepdown(lk); - // Associate this Client with this PrimaryOnlyService primaryOnlyServiceStateForClient(client).primaryOnlyService = this; }; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 313ac3773f6..427c05f6dbd 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -146,6 +146,12 @@ auto makeThreadPool(const std::string& poolName, const std::string& threadName) threadPoolOptions.poolName = poolName; threadPoolOptions.onCreateThread = [](const std::string& threadName) { Client::initThread(threadName.c_str()); + + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); }; return std::make_unique<ThreadPool>(threadPoolOptions); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 1113eb9bc6c..60ba492573f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2651,6 +2651,11 @@ void ReplicationCoordinatorImpl::AutoGetRstlForStepUpStepDown::_killOpThreadFn() invariant(!cc().isFromUserConnection()); + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + LOGV2(21343, "Starting to kill user operations"); auto uniqueOpCtx = cc().makeOperationContext(); OperationContext* opCtx = uniqueOpCtx.get(); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 744e1cf824f..6b50af1e0b7 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -116,7 +116,10 @@ void ReplCoordTest::addSelf(const HostAndPort& selfHost) { void ReplCoordTest::init() { invariant(!_repl); invariant(!_callShutdown); - + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } auto service = getGlobalServiceContext(); _storageInterface = new StorageInterfaceMock(); StorageInterface::set(service, std::unique_ptr<StorageInterface>(_storageInterface)); @@ -160,6 +163,9 @@ void ReplCoordTest::init() { executor::ThreadPoolMock::Options tpOptions; tpOptions.onCreateThread = []() { Client::initThread("replexec"); + + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); }; auto pool = std::make_unique<executor::ThreadPoolMock>(_net, seed, tpOptions); auto replExec = diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index f88973b38b9..5eca6a6e47c 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -88,7 +88,10 @@ public: void RollbackTest::setUp() { ServiceContextMongoDTest::setUp(); - + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } _storageInterface = new StorageInterfaceRollback(); auto serviceContext = getServiceContext(); auto consistencyMarkers = std::make_unique<ReplicationConsistencyMarkersMock>(); diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 00173186d96..01e9a24cbea 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -224,6 +224,12 @@ StorageInterfaceImpl::createCollectionForBulkLoading( auto opCtx = cc().makeOperationContext(); opCtx->setEnforceConstraints(false); + // TODO(SERVER-74656): Please revisit if this thread could be made killable. + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + // DocumentValidationSettings::kDisableInternalValidation is currently inert. // But, it's logically ok to disable internal validation as this function gets called // only during initial sync. diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index 9b74f405eb0..37af1be31a2 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -159,6 +159,12 @@ void SyncSourceFeedback::run(executor::TaskExecutor* executor, ReplicationCoordinator* replCoord) { Client::initThread("SyncSourceFeedback"); + // TODO(SERVER-74656): Please revisit if this thread could be made killable. + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + HostAndPort syncTarget; // keepAliveInterval indicates how frequently to forward progress in the absence of updates. diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index 9764eb86820..2c6bca06031 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -134,6 +134,13 @@ void TenantFileImporterService::startMigration(const UUID& migrationId) { _workerThread = std::make_unique<stdx::thread>([this, migrationId] { Client::initThread("TenantFileImporterService"); + + // TODO(SERVER-74661): Please revisit if this thread could be made killable. + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); + } + try { _handleEvents(migrationId); } catch (const DBException& err) { diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp index 0b020e66722..0c193a2ab11 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp @@ -72,6 +72,10 @@ TenantMigrationAccessBlockerRegistry::TenantMigrationAccessBlockerRegistry() { threadPoolOptions.poolName = "TenantMigrationBlockerAsyncThreadPool"; threadPoolOptions.onCreateThread = [](const std::string& threadName) { Client::initThread(threadName.c_str()); + + // TODO(SERVER-74661): Please revisit if this thread could be made killable. + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationUnkillableByStepdown(lk); }; _asyncBlockingOperationsExecutor = std::make_shared<executor::ThreadPoolTaskExecutor>( std::make_unique<ThreadPool>(threadPoolOptions), diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 596ed32c807..c432b856d4e 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -382,17 +382,6 @@ TenantMigrationDonorService::Instance::_makeRecipientCmdExecutor() { Client::initThread(threadName.c_str()); auto client = Client::getCurrent(); AuthorizationSession::get(*client)->grantInternalAuthorization(&cc()); - - // Ideally, we should also associate the client created by _recipientCmdExecutor with the - // TenantMigrationDonorService to make the opCtxs created by the task executor get - // registered in the TenantMigrationDonorService, and killed on stepdown. But that would - // require passing the pointer to the TenantMigrationService into the Instance and making - // constructInstance not const so we can set the client's decoration here. Right now there - // is no need for that since the task executor is only used with scheduleRemoteCommand and - // no opCtx will be created (the cancellation token is responsible for canceling the - // outstanding work on the task executor). - stdx::lock_guard<Client> lk(*client); - client->setSystemOperationKillableByStepdown(lk); }; auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp index 07eb6c5f327..916d401ed25 100644 --- a/src/mongo/db/repl/topology_version_observer.cpp +++ b/src/mongo/db/repl/topology_version_observer.cpp @@ -178,6 +178,12 @@ void TopologyVersionObserver::_workerThreadBody() noexcept try { invariant(_serviceContext); ThreadClient tc(kTopologyVersionObserverName, _serviceContext); + // TODO(SERVER-74656): Please revisit if this thread could be made killable. + { + stdx::lock_guard<Client> lk(*tc.get()); + tc.get()->setSystemOperationUnkillableByStepdown(lk); + } + auto getTopologyVersion = [&]() -> boost::optional<TopologyVersion> { // Only the observer thread updates `_cache`, thus there is no need to hold the lock before // accessing `_cache` here. diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index c61f2cbde99..01ad3f0aa27 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -786,6 +786,13 @@ void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplica // transaction oplog entry. auto newClient = opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions"); + + // TODO(SERVER-74656): Please revisit if this thread could be made killable. + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient.get()->setSystemOperationUnkillableByStepdown(lk); + } + AlternativeClientRegion acr(newClient); const auto newOpCtx = cc().makeOperationContext(); diff --git a/src/mongo/db/repl/wait_for_majority_service.cpp b/src/mongo/db/repl/wait_for_majority_service.cpp index 37d6407e0fe..a55bfcc701e 100644 --- a/src/mongo/db/repl/wait_for_majority_service.cpp +++ b/src/mongo/db/repl/wait_for_majority_service.cpp @@ -107,6 +107,16 @@ void WaitForMajorityServiceImplBase::startup(ServiceContext* ctx) { ClientStrand::make(ctx->makeClient(kWaitClientName + _getReadOrWrite())); _waitForMajorityCancellationClient = ClientStrand::make(ctx->makeClient(kCancelClientName + _getReadOrWrite())); + // TODO(SERVER-74656): Please revisit if this thread could be made killable. + { + stdx::lock_guard<Client> lk(*_waitForMajorityClient->getClientPointer()); + _waitForMajorityClient->getClientPointer()->setSystemOperationUnkillableByStepdown(lk); + } + { + stdx::lock_guard<Client> lk(*_waitForMajorityCancellationClient->getClientPointer()); + _waitForMajorityCancellationClient->getClientPointer() + ->setSystemOperationUnkillableByStepdown(lk); + } _backgroundWorkComplete = _periodicallyWaitForMajority(); _pool->startup(); _state = State::kRunning; |