diff options
author | Zuul <zuul@review.opendev.org> | 2023-02-25 07:27:11 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-02-25 07:27:11 +0000 |
commit | c0483c5b940d559b4a9efd2b5c64bf5b5528fa11 (patch) | |
tree | b69d2f5bbf3164129256798372402c07691c0436 | |
parent | e21766cf6415097a5d8b2ba612baec07e86d5c8e (diff) | |
parent | 983879421f9a181759daacd8689a32c8afd12322 (diff) | |
download | swift-c0483c5b940d559b4a9efd2b5c64bf5b5528fa11.tar.gz |
Merge "sharder: make misplaced objects lookup faster"
-rw-r--r-- | swift/container/sharder.py | 179 | ||||
-rw-r--r-- | test/unit/container/test_sharder.py | 208 |
2 files changed, 299 insertions, 88 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 7afa3d840..0705602c5 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -1525,106 +1525,119 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): return self._audit_root_container(broker) return self._audit_shard_container(broker) - def yield_objects(self, broker, src_shard_range, since_row=None): + def yield_objects(self, broker, src_shard_range, since_row=None, + batch_size=None): """ - Iterates through all objects in ``src_shard_range`` in name order - yielding them in lists of up to CONTAINER_LISTING_LIMIT length. Both - deleted and undeleted objects are included. + Iterates through all object rows in ``src_shard_range`` in name order + yielding them in lists of up to ``batch_size`` in length. All batches + of rows that are not marked deleted are yielded before all batches of + rows that are marked deleted. :param broker: A :class:`~swift.container.backend.ContainerBroker`. :param src_shard_range: A :class:`~swift.common.utils.ShardRange` describing the source range. - :param since_row: include only items whose ROWID is greater than - the given row id; by default all rows are included. - :return: a generator of tuples of (list of objects, broker info dict) + :param since_row: include only object rows whose ROWID is greater than + the given row id; by default all object rows are included. + :param batch_size: The maximum number of object rows to include in each + yielded batch; defaults to cleave_row_batch_size. + :return: a generator of tuples of (list of rows, broker info dict) """ - marker = src_shard_range.lower_str - while True: - info = broker.get_info() - info['max_row'] = broker.get_max_row() - start = time.time() - objects = broker.get_objects( - self.cleave_row_batch_size, - marker=marker, - end_marker=src_shard_range.end_marker, - include_deleted=None, # give me everything - since_row=since_row) - if objects: - self.logger.debug('got %s objects from %s in %ss', - len(objects), broker.db_file, - time.time() - start) - yield objects, info - - if len(objects) < self.cleave_row_batch_size: - break - marker = objects[-1]['name'] + if (src_shard_range.lower == ShardRange.MAX or + src_shard_range.upper == ShardRange.MIN): + # this is an unexpected condition but handled with an early return + # just in case, because: + # lower == ShardRange.MAX -> marker == '' + # which could result in rows being erroneously yielded. + return + + batch_size = batch_size or self.cleave_row_batch_size + for include_deleted in (False, True): + marker = src_shard_range.lower_str + while True: + info = broker.get_info() + info['max_row'] = broker.get_max_row() + start = time.time() + objects = broker.get_objects( + limit=batch_size, + marker=marker, + end_marker=src_shard_range.end_marker, + include_deleted=include_deleted, + since_row=since_row) + self.logger.debug( + 'got %s rows (deleted=%s) from %s in %ss', + len(objects), + include_deleted, + broker.db_file, + time.time() - start) + if objects: + yield objects, info + + if len(objects) < batch_size: + break + marker = objects[-1]['name'] def yield_objects_to_shard_range(self, broker, src_shard_range, dest_shard_ranges): """ - Iterates through all objects in ``src_shard_range`` to place them in - destination shard ranges provided by the ``next_shard_range`` function. - Yields tuples of (object list, destination shard range in which those - objects belong). Note that the same destination shard range may be - referenced in more than one yielded tuple. + Iterates through all object rows in ``src_shard_range`` to place them + in destination shard ranges provided by the ``dest_shard_ranges`` + function. Yields tuples of ``(batch of object rows, destination shard + range in which those object rows belong, broker info)``. + + If no destination shard range exists for a batch of object rows then + tuples are yielded of ``(batch of object rows, None, broker info)``. + This indicates to the caller that there are a non-zero number of object + rows for which no destination shard range was found. + + Note that the same destination shard range may be referenced in more + than one yielded tuple. :param broker: A :class:`~swift.container.backend.ContainerBroker`. :param src_shard_range: A :class:`~swift.common.utils.ShardRange` describing the source range. :param dest_shard_ranges: A function which should return a list of - destination shard ranges in name order. - :return: a generator of tuples of - (object list, shard range, broker info dict) + destination shard ranges sorted in the order defined by + :meth:`~swift.common.utils.ShardRange.sort_key`. + :return: a generator of tuples of ``(object row list, shard range, + broker info dict)`` where ``shard_range`` may be ``None``. """ - dest_shard_range_iter = dest_shard_range = None - for objs, info in self.yield_objects(broker, src_shard_range): - if not objs: - return + # calling dest_shard_ranges() may result in a request to fetch shard + # ranges, so first check that the broker actually has misplaced object + # rows in the source namespace + for _ in self.yield_objects(broker, src_shard_range, batch_size=1): + break + else: + return - def next_or_none(it): - try: - return next(it) - except StopIteration: - return None - - if dest_shard_range_iter is None: - dest_shard_range_iter = iter(dest_shard_ranges()) - dest_shard_range = next_or_none(dest_shard_range_iter) - - unplaced = False - last_index = next_index = 0 - for obj in objs: - if dest_shard_range is None: - # no more destinations: yield remainder of batch and bail - # NB there may be more batches of objects but none of them - # will be placed so no point fetching them - yield objs[last_index:], None, info - return - if obj['name'] <= dest_shard_range.lower: - unplaced = True - elif unplaced: - # end of run of unplaced objects, yield them - yield objs[last_index:next_index], None, info - last_index = next_index - unplaced = False - while (dest_shard_range and - obj['name'] > dest_shard_range.upper): - if next_index != last_index: - # yield the objects in current dest_shard_range - yield (objs[last_index:next_index], - dest_shard_range, - info) - last_index = next_index - dest_shard_range = next_or_none(dest_shard_range_iter) - next_index += 1 - - if next_index != last_index: - # yield tail of current batch of objects - # NB there may be more objects for the current - # dest_shard_range in the next batch from yield_objects - yield (objs[last_index:next_index], - None if unplaced else dest_shard_range, - info) + dest_shard_range_iter = iter(dest_shard_ranges()) + src_shard_range_marker = src_shard_range.lower + for dest_shard_range in dest_shard_range_iter: + if dest_shard_range.upper <= src_shard_range.lower: + continue + + if dest_shard_range.lower > src_shard_range_marker: + # no destination for a sub-namespace of the source namespace + sub_src_range = src_shard_range.copy( + lower=src_shard_range_marker, upper=dest_shard_range.lower) + for objs, info in self.yield_objects(broker, sub_src_range): + yield objs, None, info + + sub_src_range = src_shard_range.copy( + lower=max(dest_shard_range.lower, src_shard_range.lower), + upper=min(dest_shard_range.upper, src_shard_range.upper)) + for objs, info in self.yield_objects(broker, sub_src_range): + yield objs, dest_shard_range, info + + src_shard_range_marker = dest_shard_range.upper + if dest_shard_range.upper >= src_shard_range.upper: + # the entire source namespace has been traversed + break + else: + # dest_shard_ranges_iter was exhausted before reaching the end of + # the source namespace + sub_src_range = src_shard_range.copy(lower=src_shard_range_marker) + for objs, info in self.yield_objects(broker, sub_src_range): + yield objs, None, info def _post_replicate_hook(self, broker, info, responses): # override superclass behaviour diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 5e77c9071..ab5ae9136 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -427,13 +427,13 @@ class TestSharder(BaseTestSharder): 'container-sharder-6021-ic') def _assert_stats(self, expected, sharder, category): - # assertEqual doesn't work with a defaultdict + # assertEqual doesn't work with a stats defaultdict so copy to a dict + # before comparing stats = sharder.stats['sharding'][category] + actual = {} for k, v in expected.items(): - actual = stats[k] - self.assertEqual( - v, actual, 'Expected %s but got %s for %s in %s' % - (v, actual, k, stats)) + actual[k] = stats[k] + self.assertEqual(expected, actual) return stats def _assert_recon_stats(self, expected, sharder, category): @@ -1181,6 +1181,204 @@ class TestSharder(BaseTestSharder): 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), params=params) + def test_yield_objects(self): + broker = self._make_broker() + objects = [ + ('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a', + i % 2, 0) for i in range(30)] + for obj in objects: + broker.put_object(*obj) + + src_range = ShardRange('dont/care', Timestamp.now()) + with self._mock_sharder(conf={}) as sharder: + batches = [b for b, _ in + sharder.yield_objects(broker, src_range)] + self.assertEqual([15, 15], [len(b) for b in batches]) + self.assertEqual([[0] * 15, [1] * 15], + [[o['deleted'] for o in b] for b in batches]) + + # custom batch size + with self._mock_sharder(conf={}) as sharder: + batches = [b for b, _ in + sharder.yield_objects(broker, src_range, batch_size=10)] + self.assertEqual([10, 5, 10, 5], [len(b) for b in batches]) + self.assertEqual([[0] * 10, [0] * 5, [1] * 10, [1] * 5], + [[o['deleted'] for o in b] for b in batches]) + + # restricted source range + src_range = ShardRange('dont/care', Timestamp.now(), + lower='o10', upper='o20') + with self._mock_sharder(conf={}) as sharder: + batches = [b for b, _ in + sharder.yield_objects(broker, src_range)] + self.assertEqual([5, 5], [len(b) for b in batches]) + self.assertEqual([[0] * 5, [1] * 5], + [[o['deleted'] for o in b] for b in batches]) + + # null source range + src_range = ShardRange('dont/care', Timestamp.now(), + lower=ShardRange.MAX) + with self._mock_sharder(conf={}) as sharder: + batches = [b for b, _ in + sharder.yield_objects(broker, src_range)] + self.assertEqual([], batches) + src_range = ShardRange('dont/care', Timestamp.now(), + upper=ShardRange.MIN) + with self._mock_sharder(conf={}) as sharder: + batches = [b for b, _ in + sharder.yield_objects(broker, src_range)] + self.assertEqual([], batches) + + def test_yield_objects_to_shard_range_no_objects(self): + # verify that dest_shard_ranges func is not called if the source + # broker has no objects + broker = self._make_broker() + dest_shard_ranges = mock.MagicMock() + src_range = ShardRange('dont/care', Timestamp.now()) + with self._mock_sharder(conf={}) as sharder: + batches = [b for b, _ in + sharder.yield_objects_to_shard_range( + broker, src_range, dest_shard_ranges)] + self.assertEqual([], batches) + dest_shard_ranges.assert_not_called() + + def test_yield_objects_to_shard_range(self): + broker = self._make_broker() + objects = [ + ('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a', + i % 2, 0) for i in range(30)] + for obj in objects: + broker.put_object(*obj) + orig_info = broker.get_info() + # yield_objects annotates the info dict... + orig_info['max_row'] = 30 + dest_ranges = [ + ShardRange('shard/0', Timestamp.now(), upper='o09'), + ShardRange('shard/1', Timestamp.now(), lower='o09', upper='o19'), + ShardRange('shard/2', Timestamp.now(), lower='o19'), + ] + + # complete overlap of src and dest, multiple batches per dest shard + # range per deleted/not deleted + src_range = ShardRange('dont/care', Timestamp.now()) + dest_shard_ranges = mock.MagicMock(return_value=dest_ranges) + with self._mock_sharder(conf={'cleave_row_batch_size': 4}) as sharder: + yielded = [y for y in + sharder.yield_objects_to_shard_range( + broker, src_range, dest_shard_ranges)] + self.assertEqual([dest_ranges[0], dest_ranges[0], + dest_ranges[0], dest_ranges[0], + dest_ranges[1], dest_ranges[1], + dest_ranges[1], dest_ranges[1], + dest_ranges[2], dest_ranges[2], + dest_ranges[2], dest_ranges[2]], + [dest for _, dest, _ in yielded]) + self.assertEqual([[o[0] for o in objects[0:8:2]], + [o[0] for o in objects[8:10:2]], + [o[0] for o in objects[1:8:2]], + [o[0] for o in objects[9:10:2]], + [o[0] for o in objects[10:18:2]], + [o[0] for o in objects[18:20:2]], + [o[0] for o in objects[11:18:2]], + [o[0] for o in objects[19:20:2]], + [o[0] for o in objects[20:28:2]], + [o[0] for o in objects[28:30:2]], + [o[0] for o in objects[21:28:2]], + [o[0] for o in objects[29:30:2]]], + [[o['name'] for o in objs] for objs, _, _ in yielded]) + self.assertEqual([orig_info] * 12, [info for _, _, info in yielded]) + + # src narrower than dest + src_range = ShardRange('dont/care', Timestamp.now(), + lower='o15', upper='o25') + dest_shard_ranges = mock.MagicMock(return_value=dest_ranges) + with self._mock_sharder(conf={}) as sharder: + yielded = [y for y in + sharder.yield_objects_to_shard_range( + broker, src_range, dest_shard_ranges)] + self.assertEqual([dest_ranges[1], dest_ranges[1], + dest_ranges[2], dest_ranges[2]], + [dest for _, dest, _ in yielded]) + self.assertEqual([[o[0] for o in objects[16:20:2]], + [o[0] for o in objects[17:20:2]], + [o[0] for o in objects[20:26:2]], + [o[0] for o in objects[21:26:2]]], + [[o['name'] for o in objs] for objs, _, _ in yielded]) + self.assertEqual([orig_info] * 4, [info for _, _, info in yielded]) + + # src much narrower than dest + src_range = ShardRange('dont/care', Timestamp.now(), + lower='o15', upper='o18') + dest_shard_ranges = mock.MagicMock(return_value=dest_ranges) + with self._mock_sharder(conf={}) as sharder: + yielded = [y for y in + sharder.yield_objects_to_shard_range( + broker, src_range, dest_shard_ranges)] + self.assertEqual([dest_ranges[1], dest_ranges[1]], + [dest for _, dest, _ in yielded]) + self.assertEqual([[o[0] for o in objects[16:19:2]], + [o[0] for o in objects[17:19:2]]], + [[o['name'] for o in objs] for objs, _, _ in yielded]) + self.assertEqual([orig_info] * 2, [info for _, _, info in yielded]) + + # dest narrower than src + src_range = ShardRange('dont/care', Timestamp.now(), + lower='o05', upper='o25') + dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:]) + with self._mock_sharder(conf={}) as sharder: + yielded = [y for y in + sharder.yield_objects_to_shard_range( + broker, src_range, dest_shard_ranges)] + self.assertEqual([None, None, + dest_ranges[1], dest_ranges[1], + dest_ranges[2], dest_ranges[2]], + [dest for _, dest, _ in yielded]) + self.assertEqual([[o[0] for o in objects[6:10:2]], + [o[0] for o in objects[7:10:2]], + [o[0] for o in objects[10:20:2]], + [o[0] for o in objects[11:20:2]], + [o[0] for o in objects[20:26:2]], + [o[0] for o in objects[21:26:2]]], + [[o['name'] for o in objs] for objs, _, _ in yielded]) + self.assertEqual([orig_info] * 6, [info for _, _, info in yielded]) + + # dest much narrower than src + src_range = ShardRange('dont/care', Timestamp.now(), + lower='o05', upper='o25') + dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:2]) + with self._mock_sharder(conf={}) as sharder: + yielded = [y for y in + sharder.yield_objects_to_shard_range( + broker, src_range, dest_shard_ranges)] + self.assertEqual([None, None, + dest_ranges[1], dest_ranges[1], + None, None], + [dest for _, dest, _ in yielded]) + self.assertEqual([[o[0] for o in objects[6:10:2]], + [o[0] for o in objects[7:10:2]], + [o[0] for o in objects[10:20:2]], + [o[0] for o in objects[11:20:2]], + [o[0] for o in objects[20:26:2]], + [o[0] for o in objects[21:26:2]]], + [[o['name'] for o in objs] for objs, _, _ in yielded]) + self.assertEqual([orig_info] * 6, [info for _, _, info in yielded]) + + # no dest, source is entire namespace, multiple batches + src_range = ShardRange('dont/care', Timestamp.now()) + dest_shard_ranges = mock.MagicMock(return_value=[]) + with self._mock_sharder(conf={'cleave_row_batch_size': 10}) as sharder: + yielded = [y for y in + sharder.yield_objects_to_shard_range( + broker, src_range, dest_shard_ranges)] + self.assertEqual([None] * 4, + [dest for _, dest, _ in yielded]) + self.assertEqual([[o[0] for o in objects[:20:2]], + [o[0] for o in objects[20::2]], + [o[0] for o in objects[1:20:2]], + [o[0] for o in objects[21::2]]], + [[o['name'] for o in objs] for objs, _, _ in yielded]) + self.assertEqual([orig_info] * 4, [info for _, _, info in yielded]) + def _check_cleave_root(self, conf=None): broker = self._make_broker() objects = [ |