1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include <functional>
#include "mongo/base/status_with.h"
#include "mongo/client/fetcher.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/abstract_async_component.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
namespace repl {
/**
* This class represents an abstract base class for replication components that try to read from
* remote oplogs. An abstract oplog fetcher is an abstract async component. It owns a Fetcher
* that fetches operations from a remote oplog and restarts from the last fetched oplog entry on
* error.
*
* The `find` command and metadata are provided by oplog fetchers that subclass the abstract oplog
* fetcher. Subclasses also provide a callback to run on successful batches.
*/
class AbstractOplogFetcher : public AbstractAsyncComponent {
AbstractOplogFetcher(const AbstractOplogFetcher&) = delete;
AbstractOplogFetcher& operator=(const AbstractOplogFetcher&) = delete;
public:
/**
* Type of function called by the abstract oplog fetcher on shutdown with
* the final abstract oplog fetcher status.
*
* The status will be Status::OK() if we have processed the last batch of operations
* from the cursor ("bob" is null in the fetcher callback).
*
* This function will be called 0 times if startup() fails and at most once after startup()
* returns success.
*/
using OnShutdownCallbackFn = std::function<void(const Status& shutdownStatus)>;
/**
* Invariants if validation fails on any of the provided arguments.
*/
AbstractOplogFetcher(executor::TaskExecutor* executor,
OpTime lastFetched,
HostAndPort source,
NamespaceString nss,
std::size_t maxFetcherRestarts,
OnShutdownCallbackFn onShutdownCallbackFn,
const std::string& componentName);
virtual ~AbstractOplogFetcher() = default;
std::string toString() const;
// ================== Test support API ===================
/**
* Returns the command object sent in first remote command. Since the Fetcher is not created
* until startup, this cannot be used until the Fetcher is guaranteed to exist.
*/
BSONObj getCommandObject_forTest() const;
/**
* Returns the `find` query provided to the Fetcher. Since the Fetcher is not created until
* startup, this can be used for logging the `find` query before startup.
*/
BSONObj getFindQuery_forTest() const;
/**
* Returns the OpTime of the last oplog entry fetched and processed.
*/
OpTime getLastOpTimeFetched_forTest() const;
protected:
/**
* Returns how long the `find` command should wait before timing out.
*/
virtual Milliseconds _getInitialFindMaxTime() const;
/**
* Returns how long the `find` command should wait before timing out, if we are retrying the
* 'find' due to an error. This timeout should be considerably smaller than our initial oplog
* find time, since a communication failure with an upstream node may indicate it is
* unreachable.
*/
virtual Milliseconds _getRetriedFindMaxTime() const;
/**
* Returns how long the `getMore` command should wait before timing out.
*/
virtual Milliseconds _getGetMoreMaxTime() const;
/**
* Returns the sync source from which this oplog fetcher is fetching.
*/
HostAndPort _getSource() const;
/**
* Returns the namespace from which this oplog fetcher is fetching.
*/
NamespaceString _getNamespace() const;
/**
* Returns the OpTime of the last oplog entry fetched and processed.
*/
OpTime _getLastOpTimeFetched() const;
// =============== AbstractAsyncComponent overrides ================
/**
* Initializes and schedules a Fetcher with a `find` command specified by the subclass.
*/
virtual Status _doStartup_inlock() noexcept override;
/**
* Shuts down the Fetcher.
*/
virtual void _doShutdown_inlock() noexcept override;
private:
stdx::mutex* _getMutex() noexcept override;
/**
* This function must be overriden by subclass oplog fetchers to specify what `find` command
* to issue to the sync source. The subclass is provided with the last OpTime fetched so that
* it can begin its Fetcher from the middle of the oplog.
*/
virtual BSONObj _makeFindCommandObject(const NamespaceString& nss,
OpTime lastOpTimeFetched,
Milliseconds findMaxTime) const = 0;
/**
* This function must be overriden by subclass oplog fetchers to specify what metadata object
* to send with the `find` command.
*/
virtual BSONObj _makeMetadataObject() const = 0;
/**
* Function called by the abstract oplog fetcher when it gets a successful batch from
* the sync source.
*
* On success, returns the BSONObj of the `getMore` command that should be sent back to the
* sync source. On failure returns a status that will be passed to the _finishCallback.
*/
virtual StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) = 0;
/**
* This function creates a Fetcher with the given `find` command and metadata.
*/
std::unique_ptr<Fetcher> _makeFetcher(const BSONObj& findCommandObj,
const BSONObj& metadataObj,
Milliseconds findTimeout);
/**
* Callback used to make a Fetcher, and then save and schedule it in a lock.
*/
void _makeAndScheduleFetcherCallback(const executor::TaskExecutor::CallbackArgs& args);
/**
* Schedules fetcher and updates counters.
*/
Status _scheduleFetcher_inlock();
/**
* Processes each batch of results from the cursor started by the Fetcher on the sync source.
*
* Calls "_finishCallback" if there is an error or if there are no further results to
* request from the sync source.
*/
void _callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob);
/**
* Notifies caller that the oplog fetcher has completed processing operations from
* the remote oplog using the "_onShutdownCallbackFn".
*/
void _finishCallback(Status status);
// Sync source to read from.
const HostAndPort _source;
// Namespace of the oplog to read.
const NamespaceString _nss;
// Maximum number of times to consecutively restart the Fetcher on non-cancellation errors.
const std::size_t _maxFetcherRestarts;
// Protects member data of this AbstractOplogFetcher.
mutable stdx::mutex _mutex;
// Function to call when the oplog fetcher shuts down.
OnShutdownCallbackFn _onShutdownCallbackFn;
// Used to keep track of the last oplog entry read and processed from the sync source.
OpTime _lastFetched;
// Fetcher restarts since the last successful oplog query response.
std::size_t _fetcherRestarts = 0;
std::unique_ptr<Fetcher> _fetcher;
std::unique_ptr<Fetcher> _shuttingDownFetcher;
// Handle to currently scheduled _makeAndScheduleFetcherCallback task.
executor::TaskExecutor::CallbackHandle _makeAndScheduleFetcherHandle;
};
} // namespace repl
} // namespace mongo
|