/** * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/db/cancelable_operation_context.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/find_command_gen.h" #include "mongo/db/resource_yielder.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/write_concern_error_detail.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/future.h" namespace mongo::txn_api { namespace details { class TxnMetadataHooks; class TransactionWithRetries; } // namespace details // Max number of retries allowed for a transaction operation. The API uses exponential backoffs // capped at 1 second for transient error and commit network error retries, so this corresponds to // roughly 2 minutes of sleeps in total between retries meant to loosely mirror the 2 minute timeout // used by the driver's convenient transactions API: // https://github.com/mongodb/specifications/blob/92d77a6d/source/transactions-convenient-api/transactions-convenient-api.rst static constexpr int kTxnRetryLimit = 120; static constexpr auto kMaxTimeMSField = "maxTimeMS"; /** * Encapsulates the command status and write concern error from a response to a commitTransaction * command. */ struct CommitResult { /** * Returns an error status with additional context if any of the inner errors are non OK. */ Status getEffectiveStatus() const { if (!cmdStatus.isOK()) { return cmdStatus.withContext("Command error committing internal transaction"); } if (!wcError.toStatus().isOK()) { return wcError.toStatus().withContext( "Write concern error committing internal transaction"); } return Status::OK(); } Status cmdStatus; WriteConcernErrorDetail wcError; }; /** * Interface for the “backend” of an internal transaction responsible for executing commands. * Intended to be overriden and customized for different use cases. */ class TransactionClient { public: virtual ~TransactionClient(){}; /** * Called by the transaction that owns this transaction client to install hooks for attaching * transaction metadata to requests and parsing it from responses. Must be called before any * commands have been sent and cannot be called more than once. */ virtual void initialize(std::unique_ptr hooks, const CancellationToken& token) = 0; /** * Runs the given command as part of the transaction that owns this transaction client. */ virtual SemiFuture runCommand(StringData dbName, BSONObj cmd) const = 0; /** * Helper method to run commands representable as a BatchedCommandRequest in the transaction * client's transaction. * * The given stmtIds are included in the sent command. If the API's transaction was spawned on * behalf of a retryable write, the statement ids must be unique for each write in the * transaction as the underlying servers will save history for each id the same as for a * retryable write. A write can opt out of this by sending a -1 statement id, which is ignored. * * If a sent statement id had already been seen for this transaction, the write with that id * won't apply a second time and instead returns its response from its original execution. That * write's id will be in the batch response's "retriedStmtIds" array field. * * Users of this API for transactions spawned on behalf of retryable writes likely should * include a stmtId for each write that should not execute twice and should check the * "retriedStmtIds" in the returned BatchedCommandResponse to detect when a write had already * applied, and thus the retryable write that spawned this transaction has already committed. * Note that only one "pre" or "post" image can be stored per transaction, so only one * findAndModify per transaction may have a non -1 statement id. * */ virtual SemiFuture runCRUDOp(const BatchedCommandRequest& cmd, std::vector stmtIds) const = 0; /** * Helper method that runs the given find in the transaction client's transaction and will * iterate and exhaust the find's cursor, returning a vector with all matching documents. */ virtual SemiFuture> exhaustiveFind( const FindCommandRequest& cmd) const = 0; /** * Whether the implementation expects to work in the client transaction context. The API * currently assumes the client transaction was always started in the server before the API is * invoked, which is true for service entry point clients, but may not be true for all possible * implementations. */ virtual bool supportsClientTransactionContext() const = 0; /** * Returns if the client is eligible to run cluster operations. */ virtual bool runsClusterOperations() const = 0; }; using Callback = unique_function(const TransactionClient& txnClient, ExecutorPtr txnExec)>; /** * Encapsulates the logic for executing an internal transaction based on the state in the given * OperationContext and automatically retrying on errors. * * TODO SERVER-65839: Make a version for async contexts that doesn't require an opCtx. */ class SyncTransactionWithRetries { public: SyncTransactionWithRetries(const SyncTransactionWithRetries&) = delete; SyncTransactionWithRetries operator=(const SyncTransactionWithRetries&) = delete; /** * Returns a SyncTransactionWithRetries suitable for use within an existing operation. The * session options from the given opCtx will be used to infer the transaction's options. * * Optionally accepts a custom TransactionClient and will default to a client that runs commands * against the local service entry point. */ SyncTransactionWithRetries(OperationContext* opCtx, std::shared_ptr executor, std::unique_ptr resourceYielder, std::unique_ptr txnClient = nullptr); /** * Returns a bundle with the commit command status and write concern error, if any. Any error * prior to receiving a response from commit (e.g. an interruption or a user assertion in the * given callback) will result in a non-ok StatusWith. Note that abort errors are not returned * because an abort will only happen implicitly when another error has occurred, and that * original error is returned instead. * * Will yield resources on the given opCtx before running if a resourceYielder was provided in * the constructor and unyield after running. Unyield will always be attempted if yield * succeeded, but an error from unyield will not be returned if the transaction itself returned * an error. * * TODO SERVER-65840: Allow returning any type. */ StatusWith runNoThrow(OperationContext* opCtx, Callback callback) noexcept; /** * Same as above except will throw if the commit result has a non-ok command status or a write * concern error. */ void run(OperationContext* opCtx, Callback callback) { auto result = uassertStatusOK(runNoThrow(opCtx, std::move(callback))); uassertStatusOK(result.getEffectiveStatus()); } private: CancellationSource _source; std::unique_ptr _resourceYielder; std::shared_ptr _txn; }; /** * Contains implementation details for the above API. Classes in this namespace should not be used * directly. */ namespace details { /** * Customization point for behaviors different in the default SEPTransactionClient and the one for * running distributed transactions. */ class SEPTransactionClientBehaviors { public: virtual ~SEPTransactionClientBehaviors() {} /** * Makes any necessary modifications to the given command, e.g. changing the name to the * "cluster" version for the cluster behaviors. */ virtual BSONObj maybeModifyCommand(BSONObj cmdObj) const = 0; /** * Returns a future with the result of running the given request. */ virtual Future handleRequest(OperationContext* opCtx, const Message& request) const = 0; /** * Returns if the client is eligible to run cluster operations. */ virtual bool runsClusterOperations() const = 0; }; /** * Default behaviors that does not modify commands and runs them against the local process service * entry point. */ class DefaultSEPTransactionClientBehaviors : public SEPTransactionClientBehaviors { public: BSONObj maybeModifyCommand(BSONObj cmdObj) const override { return cmdObj; } Future handleRequest(OperationContext* opCtx, const Message& request) const override; bool runsClusterOperations() const { return false; } }; /** * Default transaction client that runs given commands through the local process service entry * point. */ class SEPTransactionClient : public TransactionClient { public: SEPTransactionClient(OperationContext* opCtx, std::shared_ptr executor, std::unique_ptr behaviors) : _serviceContext(opCtx->getServiceContext()), _executor(executor), _token(CancellationToken::uncancelable()), _behaviors(std::move(behaviors)) {} SEPTransactionClient(const SEPTransactionClient&) = delete; SEPTransactionClient operator=(const SEPTransactionClient&) = delete; virtual void initialize(std::unique_ptr hooks, const CancellationToken& token) override { invariant(!_hooks); _hooks = std::move(hooks); _token = token; } virtual SemiFuture runCommand(StringData dbName, BSONObj cmd) const override; virtual SemiFuture runCRUDOp( const BatchedCommandRequest& cmd, std::vector stmtIds) const override; virtual SemiFuture> exhaustiveFind( const FindCommandRequest& cmd) const override; virtual bool supportsClientTransactionContext() const override { return true; } virtual bool runsClusterOperations() const override { return _behaviors->runsClusterOperations(); } private: ServiceContext* const _serviceContext; std::shared_ptr _executor; CancellationToken _token; std::unique_ptr _behaviors; std::unique_ptr _hooks; }; /** * Encapsulates the logic for an internal transaction based on the state in the given * OperationContext. */ class Transaction : public std::enable_shared_from_this { public: enum class ExecutionContext { kOwnSession, kClientSession, kClientRetryableWrite, kClientTransaction, }; enum class ErrorHandlingStep { kDoNotRetry, kAbortAndDoNotRetry, kRetryTransaction, kRetryCommit, }; Transaction(const Transaction&) = delete; Transaction operator=(const Transaction&) = delete; ~Transaction(); /** * Constructs a Transaction with the given TransactionClient and extracts the session options * and infers its execution context from the given OperationContext. */ Transaction(OperationContext* opCtx, std::shared_ptr executor, const CancellationToken& token, std::unique_ptr txnClient) : _executor(executor), _txnClient(std::move(txnClient)), _service(opCtx->getServiceContext()) { _primeTransaction(opCtx); _txnClient->initialize(_makeTxnMetadataHooks(), token); } /** * Sets the callback to be used by this transaction. */ void setCallback(Callback callback) { invariant(!_callback); _callback = std::move(callback); } /** * Runs the previously set callback with the TransactionClient owned by this transaction. */ SemiFuture runCallback(); /** * Used by the transaction runner to commit the transaction. Returns a future with a non-OK * status if the commit failed to send, otherwise returns a future with a bundle with the * command and write concern statuses. */ SemiFuture commit(); /** * Used by the transaction runner to abort the transaction. Returns a future with a non-OK * status if there was an error sending the command, a non-ok command result, or a write concern * error. */ SemiFuture abort(); /** * Handles the given transaction result based on where the transaction is in its lifecycle and * its execution context, e.g. by updating its txnNumber, returning the next step for the * transaction runner. */ ErrorHandlingStep handleError(const StatusWith& swResult, int attemptCounter) const noexcept; /** * Returns an object with info about the internal transaction for diagnostics. */ BSONObj reportStateForLog() const; /** * Attaches transaction metadata to the given command and updates internal transaction state. */ void prepareRequest(BSONObjBuilder* cmdBuilder); /** * Extracts relevant info, like TransientTransactionError labels, from the given command * response. */ void processResponse(const BSONObj& reply); /** * Prepares the internal transaction state for a full transaction retry. */ void primeForTransactionRetry() noexcept; /** * Prepares the internal transaction state for a retry of commit. */ void primeForCommitRetry() noexcept; /** * Returns the latest operationTime returned by a command in this transaction. */ LogicalTime getOperationTime() const; private: enum class TransactionState { kInit, kStarted, kStartedCommit, kRetryingCommit, kStartedAbort, kDone, }; std::string _transactionStateToString(TransactionState txnState) const; bool _isInCommit() const { return _state == TransactionState::kStartedCommit || _state == TransactionState::kRetryingCommit; } std::unique_ptr _makeTxnMetadataHooks() { return std::make_unique(*this); } BSONObj _reportStateForLog(WithLock) const; void _setSessionInfo(WithLock, LogicalSessionId lsid, TxnNumber txnNumber, boost::optional startTransaction); SemiFuture _commitOrAbort(StringData dbName, StringData cmdName); /** * Extracts transaction options from Operation Context and infers the internal transaction’s * execution context, e.g. client has no session, client is running a retryable write. */ void _primeTransaction(OperationContext* opCtx); const std::shared_ptr _executor; std::unique_ptr _txnClient; Callback _callback; boost::optional _opDeadline; BSONObj _writeConcern; BSONObj _readConcern; APIParameters _apiParameters; ExecutionContext _execContext; // Protects the members below that are accessed by the TxnMetadataHooks, which are called by the // user's callback and may run on a separate thread than the one that is driving the // Transaction. mutable Mutex _mutex = MONGO_MAKE_LATCH("Transaction::_mutex"); LogicalTime _lastOperationTime; bool _latestResponseHasTransientTransactionErrorLabel{false}; OperationSessionInfo _sessionInfo; TransactionState _state{TransactionState::kInit}; bool _acquiredSessionFromPool{false}; ServiceContext* _service; }; /** * Hooks called by each TransactionClient before sending a request and upon receiving a response * responsible for attaching relevant transaction metadata and updating the transaction's state */ class TxnMetadataHooks { public: TxnMetadataHooks(details::Transaction& internalTxn) : _internalTxn(internalTxn) {} void runRequestHook(BSONObjBuilder* cmdBuilder) { _internalTxn.prepareRequest(cmdBuilder); } void runReplyHook(const BSONObj& reply) { _internalTxn.processResponse(reply); } private: Transaction& _internalTxn; }; class TransactionWithRetries : public std::enable_shared_from_this { public: TransactionWithRetries(const TransactionWithRetries&) = delete; TransactionWithRetries operator=(const TransactionWithRetries&) = delete; TransactionWithRetries(OperationContext* opCtx, std::shared_ptr executor, const CancellationToken& token, std::unique_ptr txnClient) : _internalTxn(std::make_shared(opCtx, executor, token, std::move(txnClient))), _executor(executor), _token(token) {} /** * Returns a bundle with the commit command status and write concern error, if any. Any error * prior to receiving a response from commit (e.g. an interruption or a user assertion in the * given callback) will result in a non-ok StatusWith. Note that abort errors are not returned * because an abort will only happen implicitly when another error has occurred, and that * original error is returned instead. * * TODO SERVER-65840: Allow returning a SemiFuture with any type. */ SemiFuture run(Callback callback) noexcept; /** * Returns the latest operationTime returned by a command in this transaction. */ LogicalTime getOperationTime() const { return _internalTxn->getOperationTime(); } private: // Helper methods for running a transaction. ExecutorFuture _runBodyHandleErrors(int bodyAttempts); ExecutorFuture _runCommitHandleErrors(int commitAttempts); ExecutorFuture _runCommitWithRetries(); /** * Attempts to abort the active internal transaction, logging on errors after swallowing them. */ ExecutorFuture _bestEffortAbort(); std::shared_ptr _internalTxn; std::shared_ptr _executor; CancellationToken _token; }; } // namespace details } // namespace mongo::txn_api