summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2023-02-25 07:27:11 +0000
committerGerrit Code Review <review@openstack.org>2023-02-25 07:27:11 +0000
commitc0483c5b940d559b4a9efd2b5c64bf5b5528fa11 (patch)
treeb69d2f5bbf3164129256798372402c07691c0436
parente21766cf6415097a5d8b2ba612baec07e86d5c8e (diff)
parent983879421f9a181759daacd8689a32c8afd12322 (diff)
downloadswift-c0483c5b940d559b4a9efd2b5c64bf5b5528fa11.tar.gz
Merge "sharder: make misplaced objects lookup faster"
-rw-r--r--swift/container/sharder.py179
-rw-r--r--test/unit/container/test_sharder.py208
2 files changed, 299 insertions, 88 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index 7afa3d840..0705602c5 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -1525,106 +1525,119 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
return self._audit_root_container(broker)
return self._audit_shard_container(broker)
- def yield_objects(self, broker, src_shard_range, since_row=None):
+ def yield_objects(self, broker, src_shard_range, since_row=None,
+ batch_size=None):
"""
- Iterates through all objects in ``src_shard_range`` in name order
- yielding them in lists of up to CONTAINER_LISTING_LIMIT length. Both
- deleted and undeleted objects are included.
+ Iterates through all object rows in ``src_shard_range`` in name order
+ yielding them in lists of up to ``batch_size`` in length. All batches
+ of rows that are not marked deleted are yielded before all batches of
+ rows that are marked deleted.
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
describing the source range.
- :param since_row: include only items whose ROWID is greater than
- the given row id; by default all rows are included.
- :return: a generator of tuples of (list of objects, broker info dict)
+ :param since_row: include only object rows whose ROWID is greater than
+ the given row id; by default all object rows are included.
+ :param batch_size: The maximum number of object rows to include in each
+ yielded batch; defaults to cleave_row_batch_size.
+ :return: a generator of tuples of (list of rows, broker info dict)
"""
- marker = src_shard_range.lower_str
- while True:
- info = broker.get_info()
- info['max_row'] = broker.get_max_row()
- start = time.time()
- objects = broker.get_objects(
- self.cleave_row_batch_size,
- marker=marker,
- end_marker=src_shard_range.end_marker,
- include_deleted=None, # give me everything
- since_row=since_row)
- if objects:
- self.logger.debug('got %s objects from %s in %ss',
- len(objects), broker.db_file,
- time.time() - start)
- yield objects, info
-
- if len(objects) < self.cleave_row_batch_size:
- break
- marker = objects[-1]['name']
+ if (src_shard_range.lower == ShardRange.MAX or
+ src_shard_range.upper == ShardRange.MIN):
+ # this is an unexpected condition but handled with an early return
+ # just in case, because:
+ # lower == ShardRange.MAX -> marker == ''
+ # which could result in rows being erroneously yielded.
+ return
+
+ batch_size = batch_size or self.cleave_row_batch_size
+ for include_deleted in (False, True):
+ marker = src_shard_range.lower_str
+ while True:
+ info = broker.get_info()
+ info['max_row'] = broker.get_max_row()
+ start = time.time()
+ objects = broker.get_objects(
+ limit=batch_size,
+ marker=marker,
+ end_marker=src_shard_range.end_marker,
+ include_deleted=include_deleted,
+ since_row=since_row)
+ self.logger.debug(
+ 'got %s rows (deleted=%s) from %s in %ss',
+ len(objects),
+ include_deleted,
+ broker.db_file,
+ time.time() - start)
+ if objects:
+ yield objects, info
+
+ if len(objects) < batch_size:
+ break
+ marker = objects[-1]['name']
def yield_objects_to_shard_range(self, broker, src_shard_range,
dest_shard_ranges):
"""
- Iterates through all objects in ``src_shard_range`` to place them in
- destination shard ranges provided by the ``next_shard_range`` function.
- Yields tuples of (object list, destination shard range in which those
- objects belong). Note that the same destination shard range may be
- referenced in more than one yielded tuple.
+ Iterates through all object rows in ``src_shard_range`` to place them
+ in destination shard ranges provided by the ``dest_shard_ranges``
+ function. Yields tuples of ``(batch of object rows, destination shard
+ range in which those object rows belong, broker info)``.
+
+ If no destination shard range exists for a batch of object rows then
+ tuples are yielded of ``(batch of object rows, None, broker info)``.
+ This indicates to the caller that there are a non-zero number of object
+ rows for which no destination shard range was found.
+
+ Note that the same destination shard range may be referenced in more
+ than one yielded tuple.
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
describing the source range.
:param dest_shard_ranges: A function which should return a list of
- destination shard ranges in name order.
- :return: a generator of tuples of
- (object list, shard range, broker info dict)
+ destination shard ranges sorted in the order defined by
+ :meth:`~swift.common.utils.ShardRange.sort_key`.
+ :return: a generator of tuples of ``(object row list, shard range,
+ broker info dict)`` where ``shard_range`` may be ``None``.
"""
- dest_shard_range_iter = dest_shard_range = None
- for objs, info in self.yield_objects(broker, src_shard_range):
- if not objs:
- return
+ # calling dest_shard_ranges() may result in a request to fetch shard
+ # ranges, so first check that the broker actually has misplaced object
+ # rows in the source namespace
+ for _ in self.yield_objects(broker, src_shard_range, batch_size=1):
+ break
+ else:
+ return
- def next_or_none(it):
- try:
- return next(it)
- except StopIteration:
- return None
-
- if dest_shard_range_iter is None:
- dest_shard_range_iter = iter(dest_shard_ranges())
- dest_shard_range = next_or_none(dest_shard_range_iter)
-
- unplaced = False
- last_index = next_index = 0
- for obj in objs:
- if dest_shard_range is None:
- # no more destinations: yield remainder of batch and bail
- # NB there may be more batches of objects but none of them
- # will be placed so no point fetching them
- yield objs[last_index:], None, info
- return
- if obj['name'] <= dest_shard_range.lower:
- unplaced = True
- elif unplaced:
- # end of run of unplaced objects, yield them
- yield objs[last_index:next_index], None, info
- last_index = next_index
- unplaced = False
- while (dest_shard_range and
- obj['name'] > dest_shard_range.upper):
- if next_index != last_index:
- # yield the objects in current dest_shard_range
- yield (objs[last_index:next_index],
- dest_shard_range,
- info)
- last_index = next_index
- dest_shard_range = next_or_none(dest_shard_range_iter)
- next_index += 1
-
- if next_index != last_index:
- # yield tail of current batch of objects
- # NB there may be more objects for the current
- # dest_shard_range in the next batch from yield_objects
- yield (objs[last_index:next_index],
- None if unplaced else dest_shard_range,
- info)
+ dest_shard_range_iter = iter(dest_shard_ranges())
+ src_shard_range_marker = src_shard_range.lower
+ for dest_shard_range in dest_shard_range_iter:
+ if dest_shard_range.upper <= src_shard_range.lower:
+ continue
+
+ if dest_shard_range.lower > src_shard_range_marker:
+ # no destination for a sub-namespace of the source namespace
+ sub_src_range = src_shard_range.copy(
+ lower=src_shard_range_marker, upper=dest_shard_range.lower)
+ for objs, info in self.yield_objects(broker, sub_src_range):
+ yield objs, None, info
+
+ sub_src_range = src_shard_range.copy(
+ lower=max(dest_shard_range.lower, src_shard_range.lower),
+ upper=min(dest_shard_range.upper, src_shard_range.upper))
+ for objs, info in self.yield_objects(broker, sub_src_range):
+ yield objs, dest_shard_range, info
+
+ src_shard_range_marker = dest_shard_range.upper
+ if dest_shard_range.upper >= src_shard_range.upper:
+ # the entire source namespace has been traversed
+ break
+ else:
+ # dest_shard_ranges_iter was exhausted before reaching the end of
+ # the source namespace
+ sub_src_range = src_shard_range.copy(lower=src_shard_range_marker)
+ for objs, info in self.yield_objects(broker, sub_src_range):
+ yield objs, None, info
def _post_replicate_hook(self, broker, info, responses):
# override superclass behaviour
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index 5e77c9071..ab5ae9136 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -427,13 +427,13 @@ class TestSharder(BaseTestSharder):
'container-sharder-6021-ic')
def _assert_stats(self, expected, sharder, category):
- # assertEqual doesn't work with a defaultdict
+ # assertEqual doesn't work with a stats defaultdict so copy to a dict
+ # before comparing
stats = sharder.stats['sharding'][category]
+ actual = {}
for k, v in expected.items():
- actual = stats[k]
- self.assertEqual(
- v, actual, 'Expected %s but got %s for %s in %s' %
- (v, actual, k, stats))
+ actual[k] = stats[k]
+ self.assertEqual(expected, actual)
return stats
def _assert_recon_stats(self, expected, sharder, category):
@@ -1181,6 +1181,204 @@ class TestSharder(BaseTestSharder):
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
+ def test_yield_objects(self):
+ broker = self._make_broker()
+ objects = [
+ ('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a',
+ i % 2, 0) for i in range(30)]
+ for obj in objects:
+ broker.put_object(*obj)
+
+ src_range = ShardRange('dont/care', Timestamp.now())
+ with self._mock_sharder(conf={}) as sharder:
+ batches = [b for b, _ in
+ sharder.yield_objects(broker, src_range)]
+ self.assertEqual([15, 15], [len(b) for b in batches])
+ self.assertEqual([[0] * 15, [1] * 15],
+ [[o['deleted'] for o in b] for b in batches])
+
+ # custom batch size
+ with self._mock_sharder(conf={}) as sharder:
+ batches = [b for b, _ in
+ sharder.yield_objects(broker, src_range, batch_size=10)]
+ self.assertEqual([10, 5, 10, 5], [len(b) for b in batches])
+ self.assertEqual([[0] * 10, [0] * 5, [1] * 10, [1] * 5],
+ [[o['deleted'] for o in b] for b in batches])
+
+ # restricted source range
+ src_range = ShardRange('dont/care', Timestamp.now(),
+ lower='o10', upper='o20')
+ with self._mock_sharder(conf={}) as sharder:
+ batches = [b for b, _ in
+ sharder.yield_objects(broker, src_range)]
+ self.assertEqual([5, 5], [len(b) for b in batches])
+ self.assertEqual([[0] * 5, [1] * 5],
+ [[o['deleted'] for o in b] for b in batches])
+
+ # null source range
+ src_range = ShardRange('dont/care', Timestamp.now(),
+ lower=ShardRange.MAX)
+ with self._mock_sharder(conf={}) as sharder:
+ batches = [b for b, _ in
+ sharder.yield_objects(broker, src_range)]
+ self.assertEqual([], batches)
+ src_range = ShardRange('dont/care', Timestamp.now(),
+ upper=ShardRange.MIN)
+ with self._mock_sharder(conf={}) as sharder:
+ batches = [b for b, _ in
+ sharder.yield_objects(broker, src_range)]
+ self.assertEqual([], batches)
+
+ def test_yield_objects_to_shard_range_no_objects(self):
+ # verify that dest_shard_ranges func is not called if the source
+ # broker has no objects
+ broker = self._make_broker()
+ dest_shard_ranges = mock.MagicMock()
+ src_range = ShardRange('dont/care', Timestamp.now())
+ with self._mock_sharder(conf={}) as sharder:
+ batches = [b for b, _ in
+ sharder.yield_objects_to_shard_range(
+ broker, src_range, dest_shard_ranges)]
+ self.assertEqual([], batches)
+ dest_shard_ranges.assert_not_called()
+
+ def test_yield_objects_to_shard_range(self):
+ broker = self._make_broker()
+ objects = [
+ ('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a',
+ i % 2, 0) for i in range(30)]
+ for obj in objects:
+ broker.put_object(*obj)
+ orig_info = broker.get_info()
+ # yield_objects annotates the info dict...
+ orig_info['max_row'] = 30
+ dest_ranges = [
+ ShardRange('shard/0', Timestamp.now(), upper='o09'),
+ ShardRange('shard/1', Timestamp.now(), lower='o09', upper='o19'),
+ ShardRange('shard/2', Timestamp.now(), lower='o19'),
+ ]
+
+ # complete overlap of src and dest, multiple batches per dest shard
+ # range per deleted/not deleted
+ src_range = ShardRange('dont/care', Timestamp.now())
+ dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
+ with self._mock_sharder(conf={'cleave_row_batch_size': 4}) as sharder:
+ yielded = [y for y in
+ sharder.yield_objects_to_shard_range(
+ broker, src_range, dest_shard_ranges)]
+ self.assertEqual([dest_ranges[0], dest_ranges[0],
+ dest_ranges[0], dest_ranges[0],
+ dest_ranges[1], dest_ranges[1],
+ dest_ranges[1], dest_ranges[1],
+ dest_ranges[2], dest_ranges[2],
+ dest_ranges[2], dest_ranges[2]],
+ [dest for _, dest, _ in yielded])
+ self.assertEqual([[o[0] for o in objects[0:8:2]],
+ [o[0] for o in objects[8:10:2]],
+ [o[0] for o in objects[1:8:2]],
+ [o[0] for o in objects[9:10:2]],
+ [o[0] for o in objects[10:18:2]],
+ [o[0] for o in objects[18:20:2]],
+ [o[0] for o in objects[11:18:2]],
+ [o[0] for o in objects[19:20:2]],
+ [o[0] for o in objects[20:28:2]],
+ [o[0] for o in objects[28:30:2]],
+ [o[0] for o in objects[21:28:2]],
+ [o[0] for o in objects[29:30:2]]],
+ [[o['name'] for o in objs] for objs, _, _ in yielded])
+ self.assertEqual([orig_info] * 12, [info for _, _, info in yielded])
+
+ # src narrower than dest
+ src_range = ShardRange('dont/care', Timestamp.now(),
+ lower='o15', upper='o25')
+ dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
+ with self._mock_sharder(conf={}) as sharder:
+ yielded = [y for y in
+ sharder.yield_objects_to_shard_range(
+ broker, src_range, dest_shard_ranges)]
+ self.assertEqual([dest_ranges[1], dest_ranges[1],
+ dest_ranges[2], dest_ranges[2]],
+ [dest for _, dest, _ in yielded])
+ self.assertEqual([[o[0] for o in objects[16:20:2]],
+ [o[0] for o in objects[17:20:2]],
+ [o[0] for o in objects[20:26:2]],
+ [o[0] for o in objects[21:26:2]]],
+ [[o['name'] for o in objs] for objs, _, _ in yielded])
+ self.assertEqual([orig_info] * 4, [info for _, _, info in yielded])
+
+ # src much narrower than dest
+ src_range = ShardRange('dont/care', Timestamp.now(),
+ lower='o15', upper='o18')
+ dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
+ with self._mock_sharder(conf={}) as sharder:
+ yielded = [y for y in
+ sharder.yield_objects_to_shard_range(
+ broker, src_range, dest_shard_ranges)]
+ self.assertEqual([dest_ranges[1], dest_ranges[1]],
+ [dest for _, dest, _ in yielded])
+ self.assertEqual([[o[0] for o in objects[16:19:2]],
+ [o[0] for o in objects[17:19:2]]],
+ [[o['name'] for o in objs] for objs, _, _ in yielded])
+ self.assertEqual([orig_info] * 2, [info for _, _, info in yielded])
+
+ # dest narrower than src
+ src_range = ShardRange('dont/care', Timestamp.now(),
+ lower='o05', upper='o25')
+ dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:])
+ with self._mock_sharder(conf={}) as sharder:
+ yielded = [y for y in
+ sharder.yield_objects_to_shard_range(
+ broker, src_range, dest_shard_ranges)]
+ self.assertEqual([None, None,
+ dest_ranges[1], dest_ranges[1],
+ dest_ranges[2], dest_ranges[2]],
+ [dest for _, dest, _ in yielded])
+ self.assertEqual([[o[0] for o in objects[6:10:2]],
+ [o[0] for o in objects[7:10:2]],
+ [o[0] for o in objects[10:20:2]],
+ [o[0] for o in objects[11:20:2]],
+ [o[0] for o in objects[20:26:2]],
+ [o[0] for o in objects[21:26:2]]],
+ [[o['name'] for o in objs] for objs, _, _ in yielded])
+ self.assertEqual([orig_info] * 6, [info for _, _, info in yielded])
+
+ # dest much narrower than src
+ src_range = ShardRange('dont/care', Timestamp.now(),
+ lower='o05', upper='o25')
+ dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:2])
+ with self._mock_sharder(conf={}) as sharder:
+ yielded = [y for y in
+ sharder.yield_objects_to_shard_range(
+ broker, src_range, dest_shard_ranges)]
+ self.assertEqual([None, None,
+ dest_ranges[1], dest_ranges[1],
+ None, None],
+ [dest for _, dest, _ in yielded])
+ self.assertEqual([[o[0] for o in objects[6:10:2]],
+ [o[0] for o in objects[7:10:2]],
+ [o[0] for o in objects[10:20:2]],
+ [o[0] for o in objects[11:20:2]],
+ [o[0] for o in objects[20:26:2]],
+ [o[0] for o in objects[21:26:2]]],
+ [[o['name'] for o in objs] for objs, _, _ in yielded])
+ self.assertEqual([orig_info] * 6, [info for _, _, info in yielded])
+
+ # no dest, source is entire namespace, multiple batches
+ src_range = ShardRange('dont/care', Timestamp.now())
+ dest_shard_ranges = mock.MagicMock(return_value=[])
+ with self._mock_sharder(conf={'cleave_row_batch_size': 10}) as sharder:
+ yielded = [y for y in
+ sharder.yield_objects_to_shard_range(
+ broker, src_range, dest_shard_ranges)]
+ self.assertEqual([None] * 4,
+ [dest for _, dest, _ in yielded])
+ self.assertEqual([[o[0] for o in objects[:20:2]],
+ [o[0] for o in objects[20::2]],
+ [o[0] for o in objects[1:20:2]],
+ [o[0] for o in objects[21::2]]],
+ [[o['name'] for o in objs] for objs, _, _ in yielded])
+ self.assertEqual([orig_info] * 4, [info for _, _, info in yielded])
+
def _check_cleave_root(self, conf=None):
broker = self._make_broker()
objects = [