summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpaul luse <paul.e.luse@intel.com>2014-10-28 09:51:06 -0700
committerClay Gerrard <clay.gerrard@gmail.com>2015-04-14 00:52:17 -0700
commit647b66a2ce4c85c43dcca49776d35c5ebb9cf15e (patch)
tree4dd192eb498e4cb47a779a80936c2668c0edeacb
parentb2189ef47ae08c39c348e7f4c90697ecb9ba64f9 (diff)
downloadswift-647b66a2ce4c85c43dcca49776d35c5ebb9cf15e.tar.gz
Erasure Code Reconstructor
This patch adds the erasure code reconstructor. It follows the design of the replicator but: - There is no notion of update() or update_deleted(). - There is a single job processor - Jobs are processed partition by partition. - At the end of processing a rebalanced or handoff partition, the reconstructor will remove successfully reverted objects if any. And various ssync changes such as the addition of reconstruct_fa() function called from ssync_sender which performs the actual reconstruction while sending the object to the receiver Co-Authored-By: Alistair Coles <alistair.coles@hp.com> Co-Authored-By: Thiago da Silva <thiago@redhat.com> Co-Authored-By: John Dickinson <me@not.mn> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Co-Authored-By: Tushar Gohad <tushar.gohad@intel.com> Co-Authored-By: Samuel Merritt <sam@swiftstack.com> Co-Authored-By: Christian Schwede <christian.schwede@enovance.com> Co-Authored-By: Yuan Zhou <yuan.zhou@intel.com> blueprint ec-reconstructor Change-Id: I7d15620dc66ee646b223bb9fff700796cd6bef51
-rwxr-xr-xbin/swift-object-reconstructor31
-rw-r--r--etc/object-server.conf-sample23
-rw-r--r--setup.cfg1
-rw-r--r--swift/common/exceptions.py4
-rw-r--r--swift/common/manager.py3
-rw-r--r--swift/obj/reconstructor.py925
-rw-r--r--swift/obj/replicator.py14
-rw-r--r--swift/obj/server.py4
-rw-r--r--swift/obj/ssync_receiver.py37
-rw-r--r--swift/obj/ssync_sender.py71
-rw-r--r--test/probe/brain.py38
-rw-r--r--test/probe/common.py52
-rw-r--r--test/probe/test_container_merge_policy_index.py22
-rwxr-xr-xtest/probe/test_empty_device_handoff.py6
-rwxr-xr-xtest/probe/test_object_async_update.py4
-rwxr-xr-xtest/probe/test_object_failures.py6
-rwxr-xr-xtest/probe/test_object_handoff.py4
-rw-r--r--test/probe/test_object_metadata_replication.py9
-rw-r--r--test/probe/test_reconstructor_durable.py157
-rw-r--r--test/probe/test_reconstructor_rebuild.py170
-rwxr-xr-xtest/probe/test_reconstructor_revert.py258
-rw-r--r--test/probe/test_replication_servers_working.py7
-rwxr-xr-xtest/unit/obj/test_reconstructor.py2484
-rw-r--r--test/unit/obj/test_replicator.py49
-rwxr-xr-xtest/unit/obj/test_server.py8
-rw-r--r--test/unit/obj/test_ssync_receiver.py165
-rw-r--r--test/unit/obj/test_ssync_sender.py720
27 files changed, 5038 insertions, 234 deletions
diff --git a/bin/swift-object-reconstructor b/bin/swift-object-reconstructor
new file mode 100755
index 000000000..ee4c5d643
--- /dev/null
+++ b/bin/swift-object-reconstructor
@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from swift.obj.reconstructor import ObjectReconstructor
+from swift.common.utils import parse_options
+from swift.common.daemon import run_daemon
+from optparse import OptionParser
+
+if __name__ == '__main__':
+ parser = OptionParser("%prog CONFIG [options]")
+ parser.add_option('-d', '--devices',
+ help='Reconstruct only given devices. '
+ 'Comma-separated list')
+ parser.add_option('-p', '--partitions',
+ help='Reconstruct only given partitions. '
+ 'Comma-separated list')
+ conf_file, options = parse_options(parser=parser, once=True)
+ run_daemon(ObjectReconstructor, conf_file, **options)
diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample
index b594a9576..c510e0fb2 100644
--- a/etc/object-server.conf-sample
+++ b/etc/object-server.conf-sample
@@ -211,6 +211,29 @@ use = egg:swift#recon
# removed when it has successfully replicated to all the canonical nodes.
# handoff_delete = auto
+[object-reconstructor]
+# You can override the default log routing for this app here (don't use set!):
+# Unless otherwise noted, each setting below has the same meaning as described
+# in the [object-replicator] section, however these settings apply to the EC
+# reconstructor
+#
+# log_name = object-reconstructor
+# log_facility = LOG_LOCAL0
+# log_level = INFO
+# log_address = /dev/log
+#
+# daemonize = on
+# run_pause = 30
+# concurrency = 1
+# stats_interval = 300
+# node_timeout = 10
+# http_timeout = 60
+# lockup_timeout = 1800
+# reclaim_age = 604800
+# ring_check_interval = 15
+# recon_cache_path = /var/cache/swift
+# handoffs_first = False
+
[object-updater]
# You can override the default log routing for this app here (don't use set!):
# log_name = object-updater
diff --git a/setup.cfg b/setup.cfg
index ea9c954a5..4b648b110 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -51,6 +51,7 @@ scripts =
bin/swift-object-expirer
bin/swift-object-info
bin/swift-object-replicator
+ bin/swift-object-reconstructor
bin/swift-object-server
bin/swift-object-updater
bin/swift-oldies
diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py
index 064925431..b4c926eb1 100644
--- a/swift/common/exceptions.py
+++ b/swift/common/exceptions.py
@@ -53,6 +53,10 @@ class MultiphasePUTNotSupported(SwiftException):
pass
+class SuffixSyncError(SwiftException):
+ pass
+
+
class DiskFileError(SwiftException):
pass
diff --git a/swift/common/manager.py b/swift/common/manager.py
index a4ef350da..260516d6a 100644
--- a/swift/common/manager.py
+++ b/swift/common/manager.py
@@ -33,7 +33,8 @@ ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-reconciler',
'container-server', 'container-sync',
'container-updater', 'object-auditor', 'object-server',
- 'object-expirer', 'object-replicator', 'object-updater',
+ 'object-expirer', 'object-replicator',
+ 'object-reconstructor', 'object-updater',
'proxy-server', 'account-replicator', 'account-reaper']
MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server',
'object-server']
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py
new file mode 100644
index 000000000..0ee2afbf6
--- /dev/null
+++ b/swift/obj/reconstructor.py
@@ -0,0 +1,925 @@
+# Copyright (c) 2010-2015 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from os.path import join
+import random
+import time
+import itertools
+from collections import defaultdict
+import cPickle as pickle
+import shutil
+
+from eventlet import (GreenPile, GreenPool, Timeout, sleep, hubs, tpool,
+ spawn)
+from eventlet.support.greenlets import GreenletExit
+
+from swift import gettext_ as _
+from swift.common.utils import (
+ whataremyips, unlink_older_than, compute_eta, get_logger,
+ dump_recon_cache, ismount, mkdirs, config_true_value, list_from_csv,
+ get_hub, tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
+from swift.common.swob import HeaderKeyDict
+from swift.common.bufferedhttp import http_connect
+from swift.common.daemon import Daemon
+from swift.common.ring.utils import is_local_device
+from swift.obj.ssync_sender import Sender as ssync_sender
+from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
+from swift.obj.diskfile import DiskFileRouter, get_data_dir, \
+ get_tmp_dir
+from swift.common.storage_policy import POLICIES, EC_POLICY
+from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
+ SuffixSyncError
+
+SYNC, REVERT = ('sync_only', 'sync_revert')
+
+
+hubs.use_hub(get_hub())
+
+
+class RebuildingECDiskFileStream(object):
+ """
+ This class wraps the the reconstructed fragment archive data and
+ metadata in the DiskFile interface for ssync.
+ """
+
+ def __init__(self, metadata, frag_index, rebuilt_fragment_iter):
+ # start with metadata from a participating FA
+ self.metadata = metadata
+
+ # the new FA is going to have the same length as others in the set
+ self._content_length = self.metadata['Content-Length']
+
+ # update the FI and delete the ETag, the obj server will
+ # recalc on the other side...
+ self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
+ del self.metadata['ETag']
+
+ self.frag_index = frag_index
+ self.rebuilt_fragment_iter = rebuilt_fragment_iter
+
+ def get_metadata(self):
+ return self.metadata
+
+ @property
+ def content_length(self):
+ return self._content_length
+
+ def reader(self):
+ for chunk in self.rebuilt_fragment_iter:
+ yield chunk
+
+
+class ObjectReconstructor(Daemon):
+ """
+ Reconstruct objects using erasure code. And also rebalance EC Fragment
+ Archive objects off handoff nodes.
+
+ Encapsulates most logic and data needed by the object reconstruction
+ process. Each call to .reconstruct() performs one pass. It's up to the
+ caller to do this in a loop.
+ """
+
+ def __init__(self, conf, logger=None):
+ """
+ :param conf: configuration object obtained from ConfigParser
+ :param logger: logging object
+ """
+ self.conf = conf
+ self.logger = logger or get_logger(
+ conf, log_route='object-reconstructor')
+ self.devices_dir = conf.get('devices', '/srv/node')
+ self.mount_check = config_true_value(conf.get('mount_check', 'true'))
+ self.swift_dir = conf.get('swift_dir', '/etc/swift')
+ self.port = int(conf.get('bind_port', 6000))
+ self.concurrency = int(conf.get('concurrency', 1))
+ self.stats_interval = int(conf.get('stats_interval', '300'))
+ self.ring_check_interval = int(conf.get('ring_check_interval', 15))
+ self.next_check = time.time() + self.ring_check_interval
+ self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
+ self.partition_times = []
+ self.run_pause = int(conf.get('run_pause', 30))
+ self.http_timeout = int(conf.get('http_timeout', 60))
+ self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
+ self.recon_cache_path = conf.get('recon_cache_path',
+ '/var/cache/swift')
+ self.rcache = os.path.join(self.recon_cache_path, "object.recon")
+ # defaults subject to change after beta
+ self.conn_timeout = float(conf.get('conn_timeout', 0.5))
+ self.node_timeout = float(conf.get('node_timeout', 10))
+ self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
+ self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
+ self.headers = {
+ 'Content-Length': '0',
+ 'user-agent': 'obj-reconstructor %s' % os.getpid()}
+ self.handoffs_first = config_true_value(conf.get('handoffs_first',
+ False))
+ self._df_router = DiskFileRouter(conf, self.logger)
+
+ def load_object_ring(self, policy):
+ """
+ Make sure the policy's rings are loaded.
+
+ :param policy: the StoragePolicy instance
+ :returns: appropriate ring object
+ """
+ policy.load_ring(self.swift_dir)
+ return policy.object_ring
+
+ def check_ring(self, object_ring):
+ """
+ Check to see if the ring has been updated
+
+ :param object_ring: the ring to check
+ :returns: boolean indicating whether or not the ring has changed
+ """
+ if time.time() > self.next_check:
+ self.next_check = time.time() + self.ring_check_interval
+ if object_ring.has_changed():
+ return False
+ return True
+
+ def _full_path(self, node, part, path, policy):
+ return '%(replication_ip)s:%(replication_port)s' \
+ '/%(device)s/%(part)s%(path)s ' \
+ 'policy#%(policy)d frag#%(frag_index)s' % {
+ 'replication_ip': node['replication_ip'],
+ 'replication_port': node['replication_port'],
+ 'device': node['device'],
+ 'part': part, 'path': path,
+ 'policy': policy,
+ 'frag_index': node.get('index', 'handoff'),
+ }
+
+ def _get_response(self, node, part, path, headers, policy):
+ """
+ Helper method for reconstruction that GETs a single EC fragment
+ archive
+
+ :param node: the node to GET from
+ :param part: the partition
+ :param path: full path of the desired EC archive
+ :param headers: the headers to send
+ :param policy: an instance of
+ :class:`~swift.common.storage_policy.BaseStoragePolicy`
+ :returns: response
+ """
+ resp = None
+ headers['X-Backend-Node-Index'] = node['index']
+ try:
+ with ConnectionTimeout(self.conn_timeout):
+ conn = http_connect(node['ip'], node['port'], node['device'],
+ part, 'GET', path, headers=headers)
+ with Timeout(self.node_timeout):
+ resp = conn.getresponse()
+ if resp.status != HTTP_OK:
+ self.logger.warning(
+ _("Invalid response %(resp)s from %(full_path)s"),
+ {'resp': resp.status,
+ 'full_path': self._full_path(node, part, path, policy)})
+ resp = None
+ except (Exception, Timeout):
+ self.logger.exception(
+ _("Trying to GET %(full_path)s"), {
+ 'full_path': self._full_path(node, part, path, policy)})
+ return resp
+
+ def reconstruct_fa(self, job, node, metadata):
+ """
+ Reconstructs a fragment archive - this method is called from ssync
+ after a remote node responds that is missing this object - the local
+ diskfile is opened to provide metadata - but to reconstruct the
+ missing fragment archive we must connect to multiple object servers.
+
+ :param job: job from ssync_sender
+ :param node: node that we're rebuilding to
+ :param metadata: the metadata to attach to the rebuilt archive
+ :returns: a DiskFile like class for use by ssync
+ :raises DiskFileError: if the fragment archive cannot be reconstructed
+ """
+
+ part_nodes = job['policy'].object_ring.get_part_nodes(
+ job['partition'])
+ part_nodes.remove(node)
+
+ # the fragment index we need to reconstruct is the position index
+ # of the node we're rebuilding to within the primary part list
+ fi_to_rebuild = node['index']
+
+ # KISS send out connection requests to all nodes, see what sticks
+ headers = {
+ 'X-Backend-Storage-Policy-Index': int(job['policy']),
+ }
+ pile = GreenAsyncPile(len(part_nodes))
+ path = metadata['name']
+ for node in part_nodes:
+ pile.spawn(self._get_response, node, job['partition'],
+ path, headers, job['policy'])
+ responses = []
+ etag = None
+ for resp in pile:
+ if not resp:
+ continue
+ resp.headers = HeaderKeyDict(resp.getheaders())
+ responses.append(resp)
+ etag = sorted(responses, reverse=True,
+ key=lambda r: Timestamp(
+ r.headers.get('X-Backend-Timestamp')
+ ))[0].headers.get('X-Object-Sysmeta-Ec-Etag')
+ responses = [r for r in responses if
+ r.headers.get('X-Object-Sysmeta-Ec-Etag') == etag]
+
+ if len(responses) >= job['policy'].ec_ndata:
+ break
+ else:
+ self.logger.error(
+ 'Unable to get enough responses (%s/%s) '
+ 'to reconstruct %s with ETag %s' % (
+ len(responses), job['policy'].ec_ndata,
+ self._full_path(node, job['partition'],
+ metadata['name'], job['policy']),
+ etag))
+ raise DiskFileError('Unable to reconstruct EC archive')
+
+ rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
+ responses[:job['policy'].ec_ndata], path, job['policy'],
+ fi_to_rebuild)
+ return RebuildingECDiskFileStream(metadata, fi_to_rebuild,
+ rebuilt_fragment_iter)
+
+ def _reconstruct(self, policy, fragment_payload, frag_index):
+ # XXX with jerasure this doesn't work if we need to rebuild a
+ # parity fragment, and not all data fragments are available
+ # segment = policy.pyeclib_driver.reconstruct(
+ # fragment_payload, [frag_index])[0]
+
+ # for safety until pyeclib 1.0.7 we'll just use decode and encode
+ segment = policy.pyeclib_driver.decode(fragment_payload)
+ return policy.pyeclib_driver.encode(segment)[frag_index]
+
+ def make_rebuilt_fragment_iter(self, responses, path, policy, frag_index):
+ """
+ Turn a set of connections from backend object servers into a generator
+ that yields up the rebuilt fragment archive for frag_index.
+ """
+
+ def _get_one_fragment(resp):
+ buff = ''
+ remaining_bytes = policy.fragment_size
+ while remaining_bytes:
+ chunk = resp.read(remaining_bytes)
+ if not chunk:
+ break
+ remaining_bytes -= len(chunk)
+ buff += chunk
+ return buff
+
+ def fragment_payload_iter():
+ # We need a fragment from each connections, so best to
+ # use a GreenPile to keep them ordered and in sync
+ pile = GreenPile(len(responses))
+ while True:
+ for resp in responses:
+ pile.spawn(_get_one_fragment, resp)
+ try:
+ with Timeout(self.node_timeout):
+ fragment_payload = [fragment for fragment in pile]
+ except (Exception, Timeout):
+ self.logger.exception(
+ _("Error trying to rebuild %(path)s "
+ "policy#%(policy)d frag#%(frag_index)s"), {
+ 'path': path,
+ 'policy': policy,
+ 'frag_index': frag_index,
+ })
+ break
+ if not all(fragment_payload):
+ break
+ rebuilt_fragment = self._reconstruct(
+ policy, fragment_payload, frag_index)
+ yield rebuilt_fragment
+
+ return fragment_payload_iter()
+
+ def stats_line(self):
+ """
+ Logs various stats for the currently running reconstruction pass.
+ """
+ if self.reconstruction_count:
+ elapsed = (time.time() - self.start) or 0.000001
+ rate = self.reconstruction_count / elapsed
+ self.logger.info(
+ _("%(reconstructed)d/%(total)d (%(percentage).2f%%)"
+ " partitions reconstructed in %(time).2fs (%(rate).2f/sec, "
+ "%(remaining)s remaining)"),
+ {'reconstructed': self.reconstruction_count,
+ 'total': self.job_count,
+ 'percentage':
+ self.reconstruction_count * 100.0 / self.job_count,
+ 'time': time.time() - self.start, 'rate': rate,
+ 'remaining': '%d%s' % compute_eta(self.start,
+ self.reconstruction_count,
+ self.job_count)})
+ if self.suffix_count:
+ self.logger.info(
+ _("%(checked)d suffixes checked - "
+ "%(hashed).2f%% hashed, %(synced).2f%% synced"),
+ {'checked': self.suffix_count,
+ 'hashed': (self.suffix_hash * 100.0) / self.suffix_count,
+ 'synced': (self.suffix_sync * 100.0) / self.suffix_count})
+ self.partition_times.sort()
+ self.logger.info(
+ _("Partition times: max %(max).4fs, "
+ "min %(min).4fs, med %(med).4fs"),
+ {'max': self.partition_times[-1],
+ 'min': self.partition_times[0],
+ 'med': self.partition_times[
+ len(self.partition_times) // 2]})
+ else:
+ self.logger.info(
+ _("Nothing reconstructed for %s seconds."),
+ (time.time() - self.start))
+
+ def kill_coros(self):
+ """Utility function that kills all coroutines currently running."""
+ for coro in list(self.run_pool.coroutines_running):
+ try:
+ coro.kill(GreenletExit)
+ except GreenletExit:
+ pass
+
+ def heartbeat(self):
+ """
+ Loop that runs in the background during reconstruction. It
+ periodically logs progress.
+ """
+ while True:
+ sleep(self.stats_interval)
+ self.stats_line()
+
+ def detect_lockups(self):
+ """
+ In testing, the pool.waitall() call very occasionally failed to return.
+ This is an attempt to make sure the reconstructor finishes its
+ reconstruction pass in some eventuality.
+ """
+ while True:
+ sleep(self.lockup_timeout)
+ if self.reconstruction_count == self.last_reconstruction_count:
+ self.logger.error(_("Lockup detected.. killing live coros."))
+ self.kill_coros()
+ self.last_reconstruction_count = self.reconstruction_count
+
+ def _get_partners(self, frag_index, part_nodes):
+ """
+ Returns the left and right partners of the node whose index is
+ equal to the given frag_index.
+
+ :param frag_index: a fragment index
+ :param part_nodes: a list of primary nodes
+ :returns: [<node-to-left>, <node-to-right>]
+ """
+ return [
+ part_nodes[(frag_index - 1) % len(part_nodes)],
+ part_nodes[(frag_index + 1) % len(part_nodes)],
+ ]
+
+ def _get_hashes(self, policy, path, recalculate=None, do_listdir=False):
+ df_mgr = self._df_router[policy]
+ hashed, suffix_hashes = tpool_reraise(
+ df_mgr._get_hashes, path, recalculate=recalculate,
+ do_listdir=do_listdir, reclaim_age=self.reclaim_age)
+ self.logger.update_stats('suffix.hashes', hashed)
+ return suffix_hashes
+
+ def get_suffix_delta(self, local_suff, local_index,
+ remote_suff, remote_index):
+ """
+ Compare the local suffix hashes with the remote suffix hashes
+ for the given local and remote fragment indexes. Return those
+ suffixes which should be synced.
+
+ :param local_suff: the local suffix hashes (from _get_hashes)
+ :param local_index: the local fragment index for the job
+ :param remote_suff: the remote suffix hashes (from remote
+ REPLICATE request)
+ :param remote_index: the remote fragment index for the job
+
+ :returns: a list of strings, the suffix dirs to sync
+ """
+ suffixes = []
+ for suffix, sub_dict_local in local_suff.iteritems():
+ sub_dict_remote = remote_suff.get(suffix, {})
+ if (sub_dict_local.get(None) != sub_dict_remote.get(None) or
+ sub_dict_local.get(local_index) !=
+ sub_dict_remote.get(remote_index)):
+ suffixes.append(suffix)
+ return suffixes
+
+ def rehash_remote(self, node, job, suffixes):
+ try:
+ with Timeout(self.http_timeout):
+ conn = http_connect(
+ node['replication_ip'], node['replication_port'],
+ node['device'], job['partition'], 'REPLICATE',
+ '/' + '-'.join(sorted(suffixes)),
+ headers=self.headers)
+ conn.getresponse().read()
+ except (Exception, Timeout):
+ self.logger.exception(
+ _("Trying to sync suffixes with %s") % self._full_path(
+ node, job['partition'], '', job['policy']))
+
+ def _get_suffixes_to_sync(self, job, node):
+ """
+ For SYNC jobs we need to make a remote REPLICATE request to get
+ the remote node's current suffix's hashes and then compare to our
+ local suffix's hashes to decide which suffixes (if any) are out
+ of sync.
+
+ :param: the job dict, with the keys defined in ``_get_part_jobs``
+ :param node: the remote node dict
+ :returns: a (possibly empty) list of strings, the suffixes to be
+ synced with the remote node.
+ """
+ # get hashes from the remote node
+ remote_suffixes = None
+ try:
+ with Timeout(self.http_timeout):
+ resp = http_connect(
+ node['replication_ip'], node['replication_port'],
+ node['device'], job['partition'], 'REPLICATE',
+ '', headers=self.headers).getresponse()
+ if resp.status == HTTP_INSUFFICIENT_STORAGE:
+ self.logger.error(
+ _('%s responded as unmounted'),
+ self._full_path(node, job['partition'], '',
+ job['policy']))
+ elif resp.status != HTTP_OK:
+ self.logger.error(
+ _("Invalid response %(resp)s "
+ "from %(full_path)s"), {
+ 'resp': resp.status,
+ 'full_path': self._full_path(
+ node, job['partition'], '',
+ job['policy'])
+ })
+ else:
+ remote_suffixes = pickle.loads(resp.read())
+ except (Exception, Timeout):
+ # all exceptions are logged here so that our caller can
+ # safely catch our exception and continue to the next node
+ # without logging
+ self.logger.exception('Unable to get remote suffix hashes '
+ 'from %r' % self._full_path(
+ node, job['partition'], '',
+ job['policy']))
+
+ if remote_suffixes is None:
+ raise SuffixSyncError('Unable to get remote suffix hashes')
+
+ suffixes = self.get_suffix_delta(job['hashes'],
+ job['frag_index'],
+ remote_suffixes,
+ node['index'])
+ # now recalculate local hashes for suffixes that don't
+ # match so we're comparing the latest
+ local_suff = self._get_hashes(job['policy'], job['path'],
+ recalculate=suffixes)
+
+ suffixes = self.get_suffix_delta(local_suff,
+ job['frag_index'],
+ remote_suffixes,
+ node['index'])
+
+ self.suffix_count += len(suffixes)
+ return suffixes
+
+ def delete_reverted_objs(self, job, objects, frag_index):
+ """
+ For EC we can potentially revert only some of a partition
+ so we'll delete reverted objects here. Note that we delete
+ the fragment index of the file we sent to the remote node.
+
+ :param job: the job being processed
+ :param objects: a dict of objects to be deleted, each entry maps
+ hash=>timestamp
+ :param frag_index: (int) the fragment index of data files to be deleted
+ """
+ df_mgr = self._df_router[job['policy']]
+ for object_hash, timestamp in objects.items():
+ try:
+ df = df_mgr.get_diskfile_from_hash(
+ job['local_dev']['device'], job['partition'],
+ object_hash, job['policy'],
+ frag_index=frag_index)
+ df.purge(Timestamp(timestamp), frag_index)
+ except DiskFileError:
+ continue
+
+ def process_job(self, job):
+ """
+ Sync the local partition with the remote node(s) according to
+ the parameters of the job. For primary nodes, the SYNC job type
+ will define both left and right hand sync_to nodes to ssync with
+ as defined by this primary nodes index in the node list based on
+ the fragment index found in the partition. For non-primary
+ nodes (either handoff revert, or rebalance) the REVERT job will
+ define a single node in sync_to which is the proper/new home for
+ the fragment index.
+
+ N.B. ring rebalancing can be time consuming and handoff nodes'
+ fragment indexes do not have a stable order, it's possible to
+ have more than one REVERT job for a partition, and in some rare
+ failure conditions there may even also be a SYNC job for the
+ same partition - but each one will be processed separately
+ because each job will define a separate list of node(s) to
+ 'sync_to'.
+
+ :param: the job dict, with the keys defined in ``_get_job_info``
+ """
+ self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
+ begin = time.time()
+ if job['job_type'] == REVERT:
+ self._revert(job, begin)
+ else:
+ self._sync(job, begin)
+ self.partition_times.append(time.time() - begin)
+ self.reconstruction_count += 1
+
+ def _sync(self, job, begin):
+ """
+ Process a SYNC job.
+ """
+ self.logger.increment(
+ 'partition.update.count.%s' % (job['local_dev']['device'],))
+ # after our left and right partners, if there's some sort of
+ # failure we'll continue onto the remaining primary nodes and
+ # make sure they're in sync - or potentially rebuild missing
+ # fragments we find
+ dest_nodes = itertools.chain(
+ job['sync_to'],
+ # I think we could order these based on our index to better
+ # protect against a broken chain
+ itertools.ifilter(
+ lambda n: n['id'] not in (n['id'] for n in job['sync_to']),
+ job['policy'].object_ring.get_part_nodes(job['partition'])),
+ )
+ syncd_with = 0
+ for node in dest_nodes:
+ if syncd_with >= len(job['sync_to']):
+ # success!
+ break
+
+ try:
+ suffixes = self._get_suffixes_to_sync(job, node)
+ except SuffixSyncError:
+ continue
+
+ if not suffixes:
+ syncd_with += 1
+ continue
+
+ # ssync any out-of-sync suffixes with the remote node
+ success, _ = ssync_sender(
+ self, node, job, suffixes)()
+ # let remote end know to rehash it's suffixes
+ self.rehash_remote(node, job, suffixes)
+ # update stats for this attempt
+ self.suffix_sync += len(suffixes)
+ self.logger.update_stats('suffix.syncs', len(suffixes))
+ if success:
+ syncd_with += 1
+ self.logger.timing_since('partition.update.timing', begin)
+
+ def _revert(self, job, begin):
+ """
+ Process a REVERT job.
+ """
+ self.logger.increment(
+ 'partition.delete.count.%s' % (job['local_dev']['device'],))
+ # we'd desperately like to push this partition back to it's
+ # primary location, but if that node is down, the next best thing
+ # is one of the handoff locations - which *might* be us already!
+ dest_nodes = itertools.chain(
+ job['sync_to'],
+ job['policy'].object_ring.get_more_nodes(job['partition']),
+ )
+ syncd_with = 0
+ reverted_objs = {}
+ for node in dest_nodes:
+ if syncd_with >= len(job['sync_to']):
+ break
+ if node['id'] == job['local_dev']['id']:
+ # this is as good a place as any for this data for now
+ break
+ success, in_sync_objs = ssync_sender(
+ self, node, job, job['suffixes'])()
+ self.rehash_remote(node, job, job['suffixes'])
+ if success:
+ syncd_with += 1
+ reverted_objs.update(in_sync_objs)
+ if syncd_with >= len(job['sync_to']):
+ self.delete_reverted_objs(
+ job, reverted_objs, job['frag_index'])
+ self.logger.timing_since('partition.delete.timing', begin)
+
+ def _get_part_jobs(self, local_dev, part_path, partition, policy):
+ """
+ Helper function to build jobs for a partition, this method will
+ read the suffix hashes and create job dictionaries to describe
+ the needed work. There will be one job for each fragment index
+ discovered in the partition.
+
+ For a fragment index which corresponds to this node's ring
+ index, a job with job_type SYNC will be created to ensure that
+ the left and right hand primary ring nodes for the part have the
+ corresponding left and right hand fragment archives.
+
+ A fragment index (or entire partition) for which this node is
+ not the primary corresponding node, will create job(s) with
+ job_type REVERT to ensure that fragment archives are pushed to
+ the correct node and removed from this one.
+
+ A partition may result in multiple jobs. Potentially many
+ REVERT jobs, and zero or one SYNC job.
+
+ :param local_dev: the local device
+ :param part_path: full path to partition
+ :param partition: partition number
+ :param policy: the policy
+
+ :returns: a list of dicts of job info
+ """
+ # find all the fi's in the part, and which suffixes have them
+ hashes = self._get_hashes(policy, part_path, do_listdir=True)
+ non_data_fragment_suffixes = []
+ data_fi_to_suffixes = defaultdict(list)
+ for suffix, fi_hash in hashes.items():
+ if not fi_hash:
+ # this is for sanity and clarity, normally an empty
+ # suffix would get del'd from the hashes dict, but an
+ # OSError trying to re-hash the suffix could leave the
+ # value empty - it will log the exception; but there's
+ # no way to properly address this suffix at this time.
+ continue
+ data_frag_indexes = [f for f in fi_hash if f is not None]
+ if not data_frag_indexes:
+ non_data_fragment_suffixes.append(suffix)
+ else:
+ for fi in data_frag_indexes:
+ data_fi_to_suffixes[fi].append(suffix)
+
+ # helper to ensure consistent structure of jobs
+ def build_job(job_type, frag_index, suffixes, sync_to):
+ return {
+ 'job_type': job_type,
+ 'frag_index': frag_index,
+ 'suffixes': suffixes,
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': hashes,
+ 'policy': policy,
+ 'local_dev': local_dev,
+ # ssync likes to have it handy
+ 'device': local_dev['device'],
+ }
+
+ # aggregate jobs for all the fragment index in this part
+ jobs = []
+
+ # check the primary nodes - to see if the part belongs here
+ part_nodes = policy.object_ring.get_part_nodes(partition)
+ for node in part_nodes:
+ if node['id'] == local_dev['id']:
+ # this partition belongs here, we'll need a sync job
+ frag_index = node['index']
+ try:
+ suffixes = data_fi_to_suffixes.pop(frag_index)
+ except KeyError:
+ suffixes = []
+ sync_job = build_job(
+ job_type=SYNC,
+ frag_index=frag_index,
+ suffixes=suffixes,
+ sync_to=self._get_partners(frag_index, part_nodes),
+ )
+ # ssync callback to rebuild missing fragment_archives
+ sync_job['sync_diskfile_builder'] = self.reconstruct_fa
+ jobs.append(sync_job)
+ break
+
+ # assign remaining data fragment suffixes to revert jobs
+ ordered_fis = sorted((len(suffixes), fi) for fi, suffixes
+ in data_fi_to_suffixes.items())
+ for count, fi in ordered_fis:
+ revert_job = build_job(
+ job_type=REVERT,
+ frag_index=fi,
+ suffixes=data_fi_to_suffixes[fi],
+ sync_to=[part_nodes[fi]],
+ )
+ jobs.append(revert_job)
+
+ # now we need to assign suffixes that have no data fragments
+ if non_data_fragment_suffixes:
+ if jobs:
+ # the first job will be either the sync_job, or the
+ # revert_job for the fragment index that is most common
+ # among the suffixes
+ jobs[0]['suffixes'].extend(non_data_fragment_suffixes)
+ else:
+ # this is an unfortunate situation, we need a revert job to
+ # push partitions off this node, but none of the suffixes
+ # have any data fragments to hint at which node would be a
+ # good candidate to receive the tombstones.
+ jobs.append(build_job(
+ job_type=REVERT,
+ frag_index=None,
+ suffixes=non_data_fragment_suffixes,
+ # this is super safe
+ sync_to=part_nodes,
+ # something like this would be probably be better
+ # sync_to=random.sample(part_nodes, 3),
+ ))
+ # return a list of jobs for this part
+ return jobs
+
+ def collect_parts(self, override_devices=None,
+ override_partitions=None):
+ """
+ Helper for yielding partitions in the top level reconstructor
+ """
+ override_devices = override_devices or []
+ override_partitions = override_partitions or []
+ ips = whataremyips()
+ for policy in POLICIES:
+ if policy.policy_type != EC_POLICY:
+ continue
+ self._diskfile_mgr = self._df_router[policy]
+ self.load_object_ring(policy)
+ data_dir = get_data_dir(policy)
+ local_devices = itertools.ifilter(
+ lambda dev: dev and is_local_device(
+ ips, self.port,
+ dev['replication_ip'], dev['replication_port']),
+ policy.object_ring.devs)
+ for local_dev in local_devices:
+ if override_devices and (local_dev['device'] not in
+ override_devices):
+ continue
+ dev_path = join(self.devices_dir, local_dev['device'])
+ obj_path = join(dev_path, data_dir)
+ tmp_path = join(dev_path, get_tmp_dir(int(policy)))
+ if self.mount_check and not ismount(dev_path):
+ self.logger.warn(_('%s is not mounted'),
+ local_dev['device'])
+ continue
+ unlink_older_than(tmp_path, time.time() -
+ self.reclaim_age)
+ if not os.path.exists(obj_path):
+ try:
+ mkdirs(obj_path)
+ except Exception:
+ self.logger.exception(
+ 'Unable to create %s' % obj_path)
+ continue
+ try:
+ partitions = os.listdir(obj_path)
+ except OSError:
+ self.logger.exception(
+ 'Unable to list partitions in %r' % obj_path)
+ continue
+ for partition in partitions:
+ part_path = join(obj_path, partition)
+ if not (partition.isdigit() and
+ os.path.isdir(part_path)):
+ self.logger.warning(
+ 'Unexpected entity in data dir: %r' % part_path)
+ remove_file(part_path)
+ continue
+ partition = int(partition)
+ if override_partitions and (partition not in
+ override_partitions):
+ continue
+ part_info = {
+ 'local_dev': local_dev,
+ 'policy': policy,
+ 'partition': partition,
+ 'part_path': part_path,
+ }
+ yield part_info
+
+ def build_reconstruction_jobs(self, part_info):
+ """
+ Helper function for collect_jobs to build jobs for reconstruction
+ using EC style storage policy
+ """
+ jobs = self._get_part_jobs(**part_info)
+ random.shuffle(jobs)
+ if self.handoffs_first:
+ # Move the handoff revert jobs to the front of the list
+ jobs.sort(key=lambda job: job['job_type'], reverse=True)
+ self.job_count += len(jobs)
+ return jobs
+
+ def _reset_stats(self):
+ self.start = time.time()
+ self.job_count = 0
+ self.suffix_count = 0
+ self.suffix_sync = 0
+ self.suffix_hash = 0
+ self.reconstruction_count = 0
+ self.last_reconstruction_count = -1
+
+ def delete_partition(self, path):
+ self.logger.info(_("Removing partition: %s"), path)
+ tpool.execute(shutil.rmtree, path, ignore_errors=True)
+
+ def reconstruct(self, **kwargs):
+ """Run a reconstruction pass"""
+ self._reset_stats()
+ self.partition_times = []
+
+ stats = spawn(self.heartbeat)
+ lockup_detector = spawn(self.detect_lockups)
+ sleep() # Give spawns a cycle
+
+ try:
+ self.run_pool = GreenPool(size=self.concurrency)
+ for part_info in self.collect_parts(**kwargs):
+ if not self.check_ring(part_info['policy'].object_ring):
+ self.logger.info(_("Ring change detected. Aborting "
+ "current reconstruction pass."))
+ return
+ jobs = self.build_reconstruction_jobs(part_info)
+ if not jobs:
+ # If this part belongs on this node, _get_part_jobs
+ # will *always* build a sync_job - even if there's
+ # no suffixes in the partition that needs to sync.
+ # If there's any suffixes in the partition then our
+ # job list would have *at least* one revert job.
+ # Therefore we know this part a) doesn't belong on
+ # this node and b) doesn't have any suffixes in it.
+ self.run_pool.spawn(self.delete_partition,
+ part_info['part_path'])
+ for job in jobs:
+ self.run_pool.spawn(self.process_job, job)
+ with Timeout(self.lockup_timeout):
+ self.run_pool.waitall()
+ except (Exception, Timeout):
+ self.logger.exception(_("Exception in top-level"
+ "reconstruction loop"))
+ self.kill_coros()
+ finally:
+ stats.kill()
+ lockup_detector.kill()
+ self.stats_line()
+
+ def run_once(self, *args, **kwargs):
+ start = time.time()
+ self.logger.info(_("Running object reconstructor in script mode."))
+ override_devices = list_from_csv(kwargs.get('devices'))
+ override_partitions = [int(p) for p in
+ list_from_csv(kwargs.get('partitions'))]
+ self.reconstruct(
+ override_devices=override_devices,
+ override_partitions=override_partitions)
+ total = (time.time() - start) / 60
+ self.logger.info(
+ _("Object reconstruction complete (once). (%.02f minutes)"), total)
+ if not (override_partitions or override_devices):
+ dump_recon_cache({'object_reconstruction_time': total,
+ 'object_reconstruction_last': time.time()},
+ self.rcache, self.logger)
+
+ def run_forever(self, *args, **kwargs):
+ self.logger.info(_("Starting object reconstructor in daemon mode."))
+ # Run the reconstructor continually
+ while True:
+ start = time.time()
+ self.logger.info(_("Starting object reconstruction pass."))
+ # Run the reconstructor
+ self.reconstruct()
+ total = (time.time() - start) / 60
+ self.logger.info(
+ _("Object reconstruction complete. (%.02f minutes)"), total)
+ dump_recon_cache({'object_reconstruction_time': total,
+ 'object_reconstruction_last': time.time()},
+ self.rcache, self.logger)
+ self.logger.debug('reconstruction sleeping for %s seconds.',
+ self.run_pause)
+ sleep(self.run_pause)
diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py
index ddf431ec7..580d1827e 100644
--- a/swift/obj/replicator.py
+++ b/swift/obj/replicator.py
@@ -171,7 +171,7 @@ class ObjectReplicator(Daemon):
sync method in Swift.
"""
if not os.path.exists(job['path']):
- return False, set()
+ return False, {}
args = [
'rsync',
'--recursive',
@@ -196,11 +196,11 @@ class ObjectReplicator(Daemon):
args.append(spath)
had_any = True
if not had_any:
- return False, set()
+ return False, {}
data_dir = get_data_dir(job['policy'])
args.append(join(rsync_module, node['device'],
data_dir, job['partition']))
- return self._rsync(args) == 0, set()
+ return self._rsync(args) == 0, {}
def ssync(self, node, job, suffixes, remote_check_objs=None):
return ssync_sender.Sender(
@@ -246,8 +246,9 @@ class ObjectReplicator(Daemon):
self.conf.get('sync_method', 'rsync') == 'ssync':
kwargs['remote_check_objs'] = \
synced_remote_regions[node['region']]
- # cand_objs is a list of objects for deletion
- success, cand_objs = self.sync(
+ # candidates is a dict(hash=>timestamp) of objects
+ # for deletion
+ success, candidates = self.sync(
node, job, suffixes, **kwargs)
if success:
with Timeout(self.http_timeout):
@@ -258,7 +259,8 @@ class ObjectReplicator(Daemon):
'/' + '-'.join(suffixes), headers=self.headers)
conn.getresponse().read()
if node['region'] != job['region']:
- synced_remote_regions[node['region']] = cand_objs
+ synced_remote_regions[node['region']] = \
+ candidates.keys()
responses.append(success)
for region, cand_objs in synced_remote_regions.iteritems():
if delete_objs is None:
diff --git a/swift/obj/server.py b/swift/obj/server.py
index 0ebaeb5f0..658f207a8 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -880,7 +880,7 @@ class ObjectController(BaseStorageServer):
@public
@replication
@timing_stats(sample_rate=0.1)
- def REPLICATION(self, request):
+ def SSYNC(self, request):
return Response(app_iter=ssync_receiver.Receiver(self, request)())
def __call__(self, env, start_response):
@@ -914,7 +914,7 @@ class ObjectController(BaseStorageServer):
trans_time = time.time() - start_time
if self.log_requests:
log_line = get_log_line(req, res, trans_time, '')
- if req.method in ('REPLICATE', 'REPLICATION') or \
+ if req.method in ('REPLICATE', 'SSYNC') or \
'X-Backend-Replication' in req.headers:
self.logger.debug(log_line)
else:
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py
index 99495cd48..b636a1624 100644
--- a/swift/obj/ssync_receiver.py
+++ b/swift/obj/ssync_receiver.py
@@ -29,23 +29,23 @@ from swift.common import request_helpers
class Receiver(object):
"""
- Handles incoming REPLICATION requests to the object server.
+ Handles incoming SSYNC requests to the object server.
These requests come from the object-replicator daemon that uses
:py:mod:`.ssync_sender`.
- The number of concurrent REPLICATION requests is restricted by
+ The number of concurrent SSYNC requests is restricted by
use of a replication_semaphore and can be configured with the
object-server.conf [object-server] replication_concurrency
setting.
- A REPLICATION request is really just an HTTP conduit for
+ An SSYNC request is really just an HTTP conduit for
sender/receiver replication communication. The overall
- REPLICATION request should always succeed, but it will contain
+ SSYNC request should always succeed, but it will contain
multiple requests within its request and response bodies. This
"hack" is done so that replication concurrency can be managed.
- The general process inside a REPLICATION request is:
+ The general process inside an SSYNC request is:
1. Initialize the request: Basic request validation, mount check,
acquire semaphore lock, etc..
@@ -73,10 +73,10 @@ class Receiver(object):
def __call__(self):
"""
- Processes a REPLICATION request.
+ Processes an SSYNC request.
Acquires a semaphore lock and then proceeds through the steps
- of the REPLICATION process.
+ of the SSYNC process.
"""
# The general theme for functions __call__ calls is that they should
# raise exceptions.MessageTimeout for client timeouts (logged locally),
@@ -89,7 +89,7 @@ class Receiver(object):
try:
# Double try blocks in case our main error handlers fail.
try:
- # intialize_request is for preamble items that can be done
+ # initialize_request is for preamble items that can be done
# outside a replication semaphore lock.
for data in self.initialize_request():
yield data
@@ -112,7 +112,7 @@ class Receiver(object):
self.app.replication_semaphore.release()
except exceptions.ReplicationLockTimeout as err:
self.app.logger.debug(
- '%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % (
+ '%s/%s/%s SSYNC LOCK TIMEOUT: %s' % (
self.request.remote_addr, self.device, self.partition,
err))
yield ':ERROR: %d %r\n' % (0, str(err))
@@ -169,8 +169,11 @@ class Receiver(object):
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
- self.policy_idx = \
- int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0))
+ if 'X-Backend-Ssync-Frag-Index' in self.request.headers:
+ self.frag_index = int(
+ self.request.headers['X-Backend-Ssync-Frag-Index'])
+ else:
+ self.frag_index = None
utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if self.diskfile_mgr.mount_check and not constraints.check_mount(
@@ -183,7 +186,7 @@ class Receiver(object):
def missing_check(self):
"""
Handles the receiver-side of the MISSING_CHECK step of a
- REPLICATION request.
+ SSYNC request.
Receives a list of hashes and timestamps of object
information the sender can provide and responds with a list
@@ -227,11 +230,13 @@ class Receiver(object):
line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':MISSING_CHECK: END':
break
- object_hash, timestamp = [urllib.unquote(v) for v in line.split()]
+ parts = line.split()
+ object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
want = False
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
- self.device, self.partition, object_hash, self.policy)
+ self.device, self.partition, object_hash, self.policy,
+ frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
want = True
else:
@@ -254,7 +259,7 @@ class Receiver(object):
def updates(self):
"""
- Handles the UPDATES step of a REPLICATION request.
+ Handles the UPDATES step of an SSYNC request.
Receives a set of PUT and DELETE subrequests that will be
routed to the object server itself for processing. These
@@ -354,7 +359,7 @@ class Receiver(object):
subreq_iter())
else:
raise Exception('Invalid subrequest method %s' % method)
- subreq.headers['X-Backend-Storage-Policy-Index'] = self.policy_idx
+ subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
subreq.headers['X-Backend-Replication'] = 'True'
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \
diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py
index 02745d21e..8e9202c00 100644
--- a/swift/obj/ssync_sender.py
+++ b/swift/obj/ssync_sender.py
@@ -22,7 +22,7 @@ from swift.common import http
class Sender(object):
"""
- Sends REPLICATION requests to the object server.
+ Sends SSYNC requests to the object server.
These requests are eventually handled by
:py:mod:`.ssync_receiver` and full documentation about the
@@ -31,6 +31,7 @@ class Sender(object):
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
self.daemon = daemon
+ self.df_mgr = self.daemon._diskfile_mgr
self.node = node
self.job = job
self.suffixes = suffixes
@@ -38,28 +39,28 @@ class Sender(object):
self.response = None
self.response_buffer = ''
self.response_chunk_left = 0
- self.available_set = set()
+ # available_map has an entry for each object in given suffixes that
+ # is available to be sync'd; each entry is a hash => timestamp
+ self.available_map = {}
# When remote_check_objs is given in job, ssync_sender trys only to
# make sure those objects exist or not in remote.
self.remote_check_objs = remote_check_objs
+ # send_list has an entry for each object that the receiver wants to
+ # be sync'ed; each entry is an object hash
self.send_list = []
self.failures = 0
- @property
- def policy_idx(self):
- return int(self.job.get('policy', 0))
-
def __call__(self):
"""
Perform ssync with remote node.
- :returns: a 2-tuple, in the form (success, can_delete_objs).
-
- Success is a boolean, and can_delete_objs is an iterable of strings
- representing the hashes which are in sync with the remote node.
+ :returns: a 2-tuple, in the form (success, can_delete_objs) where
+ success is a boolean and can_delete_objs is the map of
+ objects that are in sync with the receiver. Each entry in
+ can_delete_objs maps a hash => timestamp
"""
if not self.suffixes:
- return True, set()
+ return True, {}
try:
# Double try blocks in case our main error handler fails.
try:
@@ -72,18 +73,20 @@ class Sender(object):
self.missing_check()
if self.remote_check_objs is None:
self.updates()
- can_delete_obj = self.available_set
+ can_delete_obj = self.available_map
else:
# when we are initialized with remote_check_objs we don't
# *send* any requested updates; instead we only collect
# what's already in sync and safe for deletion
- can_delete_obj = self.available_set.difference(
- self.send_list)
+ in_sync_hashes = (set(self.available_map.keys()) -
+ set(self.send_list))
+ can_delete_obj = dict((hash_, self.available_map[hash_])
+ for hash_ in in_sync_hashes)
self.disconnect()
if not self.failures:
return True, can_delete_obj
else:
- return False, set()
+ return False, {}
except (exceptions.MessageTimeout,
exceptions.ReplicationException) as err:
self.daemon.logger.error(
@@ -109,11 +112,11 @@ class Sender(object):
# would only get called if the above except Exception handler
# failed (bad node or job data).
self.daemon.logger.exception('EXCEPTION in replication.Sender')
- return False, set()
+ return False, {}
def connect(self):
"""
- Establishes a connection and starts a REPLICATION request
+ Establishes a connection and starts an SSYNC request
with the object server.
"""
with exceptions.MessageTimeout(
@@ -121,11 +124,13 @@ class Sender(object):
self.connection = bufferedhttp.BufferedHTTPConnection(
'%s:%s' % (self.node['replication_ip'],
self.node['replication_port']))
- self.connection.putrequest('REPLICATION', '/%s/%s' % (
+ self.connection.putrequest('SSYNC', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader('X-Backend-Storage-Policy-Index',
- self.policy_idx)
+ int(self.job['policy']))
+ self.connection.putheader('X-Backend-Ssync-Frag-Index',
+ self.node['index'])
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
@@ -137,7 +142,7 @@ class Sender(object):
def readline(self):
"""
- Reads a line from the REPLICATION response body.
+ Reads a line from the SSYNC response body.
httplib has no readline and will block on read(x) until x is
read, so we have to do the work ourselves. A bit of this is
@@ -183,7 +188,7 @@ class Sender(object):
def missing_check(self):
"""
Handles the sender-side of the MISSING_CHECK step of a
- REPLICATION request.
+ SSYNC request.
Full documentation of this can be found at
:py:meth:`.Receiver.missing_check`.
@@ -193,14 +198,15 @@ class Sender(object):
self.daemon.node_timeout, 'missing_check start'):
msg = ':MISSING_CHECK: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
- hash_gen = self.daemon._diskfile_mgr.yield_hashes(
+ hash_gen = self.df_mgr.yield_hashes(
self.job['device'], self.job['partition'],
- self.policy_idx, self.suffixes)
+ self.job['policy'], self.suffixes,
+ frag_index=self.job.get('frag_index'))
if self.remote_check_objs is not None:
hash_gen = ifilter(lambda (path, object_hash, timestamp):
object_hash in self.remote_check_objs, hash_gen)
for path, object_hash, timestamp in hash_gen:
- self.available_set.add(object_hash)
+ self.available_map[object_hash] = timestamp
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
@@ -234,12 +240,13 @@ class Sender(object):
line = line.strip()
if line == ':MISSING_CHECK: END':
break
- if line:
- self.send_list.append(line)
+ parts = line.split()
+ if parts:
+ self.send_list.append(parts[0])
def updates(self):
"""
- Handles the sender-side of the UPDATES step of a REPLICATION
+ Handles the sender-side of the UPDATES step of an SSYNC
request.
Full documentation of this can be found at
@@ -252,15 +259,19 @@ class Sender(object):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for object_hash in self.send_list:
try:
- df = self.daemon._diskfile_mgr.get_diskfile_from_hash(
+ df = self.df_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash,
- self.policy_idx)
+ self.job['policy'], frag_index=self.job.get('frag_index'))
except exceptions.DiskFileNotExist:
continue
url_path = urllib.quote(
'/%s/%s/%s' % (df.account, df.container, df.obj))
try:
df.open()
+ # EC reconstructor may have passed a callback to build
+ # an alternative diskfile...
+ df = self.job.get('sync_diskfile_builder', lambda *args: df)(
+ self.job, self.node, df.get_metadata())
except exceptions.DiskFileDeleted as err:
self.send_delete(url_path, err.timestamp)
except exceptions.DiskFileError:
@@ -328,7 +339,7 @@ class Sender(object):
def disconnect(self):
"""
Closes down the connection to the object server once done
- with the REPLICATION request.
+ with the SSYNC request.
"""
try:
with exceptions.MessageTimeout(
diff --git a/test/probe/brain.py b/test/probe/brain.py
index cbb5ef7cf..9ca931aac 100644
--- a/test/probe/brain.py
+++ b/test/probe/brain.py
@@ -67,7 +67,7 @@ class BrainSplitter(object):
__metaclass__ = meta_command
def __init__(self, url, token, container_name='test', object_name='test',
- server_type='container'):
+ server_type='container', policy=None):
self.url = url
self.token = token
self.account = utils.split_path(urlparse(url).path, 2, 2)[1]
@@ -81,9 +81,26 @@ class BrainSplitter(object):
o = object_name if server_type == 'object' else None
c = container_name if server_type in ('object', 'container') else None
- part, nodes = ring.Ring(
- '/etc/swift/%s.ring.gz' % server_type).get_nodes(
- self.account, c, o)
+ if server_type in ('container', 'account'):
+ if policy:
+ raise TypeError('Metadata server brains do not '
+ 'support specific storage policies')
+ self.policy = None
+ self.ring = ring.Ring(
+ '/etc/swift/%s.ring.gz' % server_type)
+ elif server_type == 'object':
+ if not policy:
+ raise TypeError('Object BrainSplitters need to '
+ 'specify the storage policy')
+ self.policy = policy
+ policy.load_ring('/etc/swift')
+ self.ring = policy.object_ring
+ else:
+ raise ValueError('Unkonwn server_type: %r' % server_type)
+ self.server_type = server_type
+
+ part, nodes = self.ring.get_nodes(self.account, c, o)
+
node_ids = [n['id'] for n in nodes]
if all(n_id in node_ids for n_id in (0, 1)):
self.primary_numbers = (1, 2)
@@ -172,6 +189,8 @@ parser.add_option('-o', '--object', default='object-%s' % uuid.uuid4(),
help='set object name')
parser.add_option('-s', '--server_type', default='container',
help='set server type')
+parser.add_option('-P', '--policy_name', default=None,
+ help='set policy')
def main():
@@ -186,8 +205,17 @@ def main():
return 'ERROR: unknown command %s' % cmd
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
+ if options.server_type == 'object' and not options.policy_name:
+ options.policy_name = POLICIES.default.name
+ if options.policy_name:
+ options.server_type = 'object'
+ policy = POLICIES.get_by_name(options.policy_name)
+ if not policy:
+ return 'ERROR: unknown policy %r' % options.policy_name
+ else:
+ policy = None
brain = BrainSplitter(url, token, options.container, options.object,
- options.server_type)
+ options.server_type, policy=policy)
for cmd_args in commands:
parts = cmd_args.split(':', 1)
command = parts[0]
diff --git a/test/probe/common.py b/test/probe/common.py
index 3cea02241..1311cc178 100644
--- a/test/probe/common.py
+++ b/test/probe/common.py
@@ -24,15 +24,19 @@ from nose import SkipTest
from swiftclient import get_auth, head_account
+from swift.obj.diskfile import get_data_dir
from swift.common.ring import Ring
from swift.common.utils import readconf
from swift.common.manager import Manager
-from swift.common.storage_policy import POLICIES
+from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC
ENABLED_POLICIES = [p for p in POLICIES if not p.is_deprecated]
+POLICIES_BY_TYPE = defaultdict(list)
+for p in POLICIES:
+ POLICIES_BY_TYPE[p.policy_type].append(p)
def get_server_number(port, port2server):
@@ -138,6 +142,17 @@ def kill_nonprimary_server(primary_nodes, port2server, pids):
return port
+def build_port_to_conf(server):
+ # map server to config by port
+ port_to_config = {}
+ for server_ in Manager([server]):
+ for config_path in server_.conf_files():
+ conf = readconf(config_path,
+ section_name='%s-replicator' % server_.type)
+ port_to_config[int(conf['bind_port'])] = conf
+ return port_to_config
+
+
def get_ring(ring_name, required_replicas, required_devices,
server=None, force_validate=None):
if not server:
@@ -152,13 +167,7 @@ def get_ring(ring_name, required_replicas, required_devices,
if len(ring.devs) != required_devices:
raise SkipTest('%s has %s devices instead of %s' % (
ring.serialized_path, len(ring.devs), required_devices))
- # map server to config by port
- port_to_config = {}
- for server_ in Manager([server]):
- for config_path in server_.conf_files():
- conf = readconf(config_path,
- section_name='%s-replicator' % server_.type)
- port_to_config[int(conf['bind_port'])] = conf
+ port_to_config = build_port_to_conf(server)
for dev in ring.devs:
# verify server is exposing mounted device
conf = port_to_config[dev['port']]
@@ -262,6 +271,10 @@ class ProbeTest(unittest.TestCase):
['account-replicator', 'container-replicator',
'object-replicator'])
self.updaters = Manager(['container-updater', 'object-updater'])
+ self.server_port_to_conf = {}
+ # get some configs backend daemon configs loaded up
+ for server in ('account', 'container', 'object'):
+ self.server_port_to_conf[server] = build_port_to_conf(server)
except BaseException:
try:
raise
@@ -274,6 +287,18 @@ class ProbeTest(unittest.TestCase):
def tearDown(self):
Manager(['all']).kill()
+ def device_dir(self, server, node):
+ conf = self.server_port_to_conf[server][node['port']]
+ return os.path.join(conf['devices'], node['device'])
+
+ def storage_dir(self, server, node, part=None, policy=None):
+ policy = policy or self.policy
+ device_path = self.device_dir(server, node)
+ path_parts = [device_path, get_data_dir(policy)]
+ if part is not None:
+ path_parts.append(str(part))
+ return os.path.join(*path_parts)
+
def get_to_final_state(self):
# these .stop()s are probably not strictly necessary,
# but may prevent race conditions
@@ -291,7 +316,16 @@ class ReplProbeTest(ProbeTest):
acct_cont_required_devices = 4
obj_required_replicas = 3
obj_required_devices = 4
- policy_requirements = {'is_default': True}
+ policy_requirements = {'policy_type': REPL_POLICY}
+
+
+class ECProbeTest(ProbeTest):
+
+ acct_cont_required_replicas = 3
+ acct_cont_required_devices = 4
+ obj_required_replicas = 6
+ obj_required_devices = 8
+ policy_requirements = {'policy_type': EC_POLICY}
if __name__ == "__main__":
diff --git a/test/probe/test_container_merge_policy_index.py b/test/probe/test_container_merge_policy_index.py
index dd4e50477..d604b1371 100644
--- a/test/probe/test_container_merge_policy_index.py
+++ b/test/probe/test_container_merge_policy_index.py
@@ -26,7 +26,8 @@ from swift.common import utils, direct_client
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from test.probe.brain import BrainSplitter
-from test.probe.common import ReplProbeTest, ENABLED_POLICIES
+from test.probe.common import (ReplProbeTest, ENABLED_POLICIES,
+ POLICIES_BY_TYPE, REPL_POLICY)
from swiftclient import client, ClientException
@@ -234,6 +235,18 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
orig_policy_index, node))
def test_reconcile_manifest(self):
+ # this test is not only testing a split brain scenario on
+ # multiple policies with mis-placed objects - it even writes out
+ # a static large object directly to the storage nodes while the
+ # objects are unavailably mis-placed from *behind* the proxy and
+ # doesn't know how to do that for EC_POLICY (clayg: why did you
+ # guys let me write a test that does this!?) - so we force
+ # wrong_policy (where the manifest gets written) to be one of
+ # any of your configured REPL_POLICY (we know you have one
+ # because this is a ReplProbeTest)
+ wrong_policy = random.choice(POLICIES_BY_TYPE[REPL_POLICY])
+ policy = random.choice([p for p in ENABLED_POLICIES
+ if p is not wrong_policy])
manifest_data = []
def write_part(i):
@@ -250,17 +263,14 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
# get an old container stashed
self.brain.stop_primary_half()
- policy = random.choice(ENABLED_POLICIES)
- self.brain.put_container(policy.idx)
+ self.brain.put_container(int(policy))
self.brain.start_primary_half()
# write some parts
for i in range(10):
write_part(i)
self.brain.stop_handoff_half()
- wrong_policy = random.choice([p for p in ENABLED_POLICIES
- if p is not policy])
- self.brain.put_container(wrong_policy.idx)
+ self.brain.put_container(int(wrong_policy))
# write some more parts
for i in range(10, 20):
write_part(i)
diff --git a/test/probe/test_empty_device_handoff.py b/test/probe/test_empty_device_handoff.py
index 7002fa487..e0e450a4b 100755
--- a/test/probe/test_empty_device_handoff.py
+++ b/test/probe/test_empty_device_handoff.py
@@ -44,7 +44,9 @@ class TestEmptyDevice(ReplProbeTest):
def test_main(self):
# Create container
container = 'container-%s' % uuid4()
- client.put_container(self.url, self.token, container)
+ client.put_container(self.url, self.token, container,
+ headers={'X-Storage-Policy':
+ self.policy.name})
cpart, cnodes = self.container_ring.get_nodes(self.account, container)
cnode = cnodes[0]
@@ -58,7 +60,7 @@ class TestEmptyDevice(ReplProbeTest):
# Delete the default data directory for objects on the primary server
obj_dir = '%s/%s' % (self._get_objects_dir(onode),
- get_data_dir(self.policy.idx))
+ get_data_dir(self.policy))
shutil.rmtree(obj_dir, True)
self.assertFalse(os.path.exists(obj_dir))
diff --git a/test/probe/test_object_async_update.py b/test/probe/test_object_async_update.py
index 34ec08253..05d05b3ad 100755
--- a/test/probe/test_object_async_update.py
+++ b/test/probe/test_object_async_update.py
@@ -108,7 +108,9 @@ class TestUpdateOverrides(ReplProbeTest):
'X-Backend-Container-Update-Override-Etag': 'override-etag',
'X-Backend-Container-Update-Override-Content-Type': 'override-type'
}
- client.put_container(self.url, self.token, 'c1')
+ client.put_container(self.url, self.token, 'c1',
+ headers={'X-Storage-Policy':
+ self.policy.name})
self.int_client.upload_object(StringIO(u'stuff'), self.account,
'c1', 'o1', headers)
diff --git a/test/probe/test_object_failures.py b/test/probe/test_object_failures.py
index 9147b1ed5..469683a10 100755
--- a/test/probe/test_object_failures.py
+++ b/test/probe/test_object_failures.py
@@ -52,7 +52,9 @@ def get_data_file_path(obj_dir):
class TestObjectFailures(ReplProbeTest):
def _setup_data_file(self, container, obj, data):
- client.put_container(self.url, self.token, container)
+ client.put_container(self.url, self.token, container,
+ headers={'X-Storage-Policy':
+ self.policy.name})
client.put_object(self.url, self.token, container, obj, data)
odata = client.get_object(self.url, self.token, container, obj)[-1]
self.assertEquals(odata, data)
@@ -65,7 +67,7 @@ class TestObjectFailures(ReplProbeTest):
obj_server_conf = readconf(self.configs['object-server'][node_id])
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s/%s/%s/%s/%s/' % (devices, device,
- get_data_dir(self.policy.idx),
+ get_data_dir(self.policy),
opart, hash_str[-3:], hash_str)
data_file = get_data_file_path(obj_dir)
return onode, opart, data_file
diff --git a/test/probe/test_object_handoff.py b/test/probe/test_object_handoff.py
index 41a67cf28..f513eef2e 100755
--- a/test/probe/test_object_handoff.py
+++ b/test/probe/test_object_handoff.py
@@ -30,7 +30,9 @@ class TestObjectHandoff(ReplProbeTest):
def test_main(self):
# Create container
container = 'container-%s' % uuid4()
- client.put_container(self.url, self.token, container)
+ client.put_container(self.url, self.token, container,
+ headers={'X-Storage-Policy':
+ self.policy.name})
# Kill one container/obj primary server
cpart, cnodes = self.container_ring.get_nodes(self.account, container)
diff --git a/test/probe/test_object_metadata_replication.py b/test/probe/test_object_metadata_replication.py
index 357cfec5b..c278e5f81 100644
--- a/test/probe/test_object_metadata_replication.py
+++ b/test/probe/test_object_metadata_replication.py
@@ -73,7 +73,8 @@ class Test(ReplProbeTest):
self.container_name = 'container-%s' % uuid.uuid4()
self.object_name = 'object-%s' % uuid.uuid4()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
- self.object_name, 'object')
+ self.object_name, 'object',
+ policy=self.policy)
self.tempdir = mkdtemp()
conf_path = os.path.join(self.tempdir, 'internal_client.conf')
conf_body = """
@@ -128,7 +129,7 @@ class Test(ReplProbeTest):
self.object_name)
def test_object_delete_is_replicated(self):
- self.brain.put_container(policy_index=0)
+ self.brain.put_container(policy_index=int(self.policy))
# put object
self._put_object()
@@ -174,7 +175,7 @@ class Test(ReplProbeTest):
def test_sysmeta_after_replication_with_subsequent_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
- self.brain.put_container(policy_index=0)
+ self.brain.put_container(policy_index=int(self.policy))
# put object
self._put_object()
# put newer object with sysmeta to first server subset
@@ -221,7 +222,7 @@ class Test(ReplProbeTest):
def test_sysmeta_after_replication_with_prior_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
- self.brain.put_container(policy_index=0)
+ self.brain.put_container(policy_index=int(self.policy))
# put object
self._put_object()
diff --git a/test/probe/test_reconstructor_durable.py b/test/probe/test_reconstructor_durable.py
new file mode 100644
index 000000000..eeef00e62
--- /dev/null
+++ b/test/probe/test_reconstructor_durable.py
@@ -0,0 +1,157 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from hashlib import md5
+import unittest
+import uuid
+import random
+import os
+import errno
+
+from test.probe.common import ECProbeTest
+
+from swift.common import direct_client
+from swift.common.storage_policy import EC_POLICY
+from swift.common.manager import Manager
+
+from swiftclient import client
+
+
+class Body(object):
+
+ def __init__(self, total=3.5 * 2 ** 20):
+ self.total = total
+ self.hasher = md5()
+ self.size = 0
+ self.chunk = 'test' * 16 * 2 ** 10
+
+ @property
+ def etag(self):
+ return self.hasher.hexdigest()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.size > self.total:
+ raise StopIteration()
+ self.size += len(self.chunk)
+ self.hasher.update(self.chunk)
+ return self.chunk
+
+ def __next__(self):
+ return self.next()
+
+
+class TestReconstructorPropDurable(ECProbeTest):
+
+ def setUp(self):
+ super(TestReconstructorPropDurable, self).setUp()
+ self.container_name = 'container-%s' % uuid.uuid4()
+ self.object_name = 'object-%s' % uuid.uuid4()
+ # sanity
+ self.assertEqual(self.policy.policy_type, EC_POLICY)
+ self.reconstructor = Manager(["object-reconstructor"])
+
+ def direct_get(self, node, part):
+ req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
+ headers, data = direct_client.direct_get_object(
+ node, part, self.account, self.container_name,
+ self.object_name, headers=req_headers,
+ resp_chunk_size=64 * 2 ** 20)
+ hasher = md5()
+ for chunk in data:
+ hasher.update(chunk)
+ return hasher.hexdigest()
+
+ def _check_node(self, node, part, etag, headers_post):
+ # get fragment archive etag
+ fragment_archive_etag = self.direct_get(node, part)
+
+ # remove the .durable from the selected node
+ part_dir = self.storage_dir('object', node, part=part)
+ for dirs, subdirs, files in os.walk(part_dir):
+ for fname in files:
+ if fname.endswith('.durable'):
+ durable = os.path.join(dirs, fname)
+ os.remove(durable)
+ break
+ try:
+ os.remove(os.path.join(part_dir, 'hashes.pkl'))
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+
+ # fire up reconstructor to propogate the .durable
+ self.reconstructor.once()
+
+ # fragment is still exactly as it was before!
+ self.assertEqual(fragment_archive_etag,
+ self.direct_get(node, part))
+
+ # check meta
+ meta = client.head_object(self.url, self.token,
+ self.container_name,
+ self.object_name)
+ for key in headers_post:
+ self.assertTrue(key in meta)
+ self.assertEqual(meta[key], headers_post[key])
+
+ def _format_node(self, node):
+ return '%s#%s' % (node['device'], node['index'])
+
+ def test_main(self):
+ # create EC container
+ headers = {'X-Storage-Policy': self.policy.name}
+ client.put_container(self.url, self.token, self.container_name,
+ headers=headers)
+
+ # PUT object
+ contents = Body()
+ headers = {'x-object-meta-foo': 'meta-foo'}
+ headers_post = {'x-object-meta-bar': 'meta-bar'}
+
+ etag = client.put_object(self.url, self.token,
+ self.container_name,
+ self.object_name,
+ contents=contents, headers=headers)
+ client.post_object(self.url, self.token, self.container_name,
+ self.object_name, headers=headers_post)
+ del headers_post['X-Auth-Token'] # WTF, where did this come from?
+
+ # built up a list of node lists to kill a .durable from,
+ # first try a single node
+ # then adjacent nodes and then nodes >1 node apart
+ opart, onodes = self.object_ring.get_nodes(
+ self.account, self.container_name, self.object_name)
+ single_node = [random.choice(onodes)]
+ adj_nodes = [onodes[0], onodes[-1]]
+ far_nodes = [onodes[0], onodes[-2]]
+ test_list = [single_node, adj_nodes, far_nodes]
+
+ for node_list in test_list:
+ for onode in node_list:
+ try:
+ self._check_node(onode, opart, etag, headers_post)
+ except AssertionError as e:
+ self.fail(
+ str(e) + '\n... for node %r of scenario %r' % (
+ self._format_node(onode),
+ [self._format_node(n) for n in node_list]))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/probe/test_reconstructor_rebuild.py b/test/probe/test_reconstructor_rebuild.py
new file mode 100644
index 000000000..5edfcc52d
--- /dev/null
+++ b/test/probe/test_reconstructor_rebuild.py
@@ -0,0 +1,170 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from hashlib import md5
+import unittest
+import uuid
+import shutil
+import random
+
+from test.probe.common import ECProbeTest
+
+from swift.common import direct_client
+from swift.common.storage_policy import EC_POLICY
+from swift.common.manager import Manager
+
+from swiftclient import client
+
+
+class Body(object):
+
+ def __init__(self, total=3.5 * 2 ** 20):
+ self.total = total
+ self.hasher = md5()
+ self.size = 0
+ self.chunk = 'test' * 16 * 2 ** 10
+
+ @property
+ def etag(self):
+ return self.hasher.hexdigest()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.size > self.total:
+ raise StopIteration()
+ self.size += len(self.chunk)
+ self.hasher.update(self.chunk)
+ return self.chunk
+
+ def __next__(self):
+ return self.next()
+
+
+class TestReconstructorRebuild(ECProbeTest):
+
+ def setUp(self):
+ super(TestReconstructorRebuild, self).setUp()
+ self.container_name = 'container-%s' % uuid.uuid4()
+ self.object_name = 'object-%s' % uuid.uuid4()
+ # sanity
+ self.assertEqual(self.policy.policy_type, EC_POLICY)
+ self.reconstructor = Manager(["object-reconstructor"])
+
+ def proxy_get(self):
+ # GET object
+ headers, body = client.get_object(self.url, self.token,
+ self.container_name,
+ self.object_name,
+ resp_chunk_size=64 * 2 ** 10)
+ resp_checksum = md5()
+ for chunk in body:
+ resp_checksum.update(chunk)
+ return resp_checksum.hexdigest()
+
+ def direct_get(self, node, part):
+ req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
+ headers, data = direct_client.direct_get_object(
+ node, part, self.account, self.container_name,
+ self.object_name, headers=req_headers,
+ resp_chunk_size=64 * 2 ** 20)
+ hasher = md5()
+ for chunk in data:
+ hasher.update(chunk)
+ return hasher.hexdigest()
+
+ def _check_node(self, node, part, etag, headers_post):
+ # get fragment archive etag
+ fragment_archive_etag = self.direct_get(node, part)
+
+ # remove data from the selected node
+ part_dir = self.storage_dir('object', node, part=part)
+ shutil.rmtree(part_dir, True)
+
+ # this node can't servce the data any more
+ try:
+ self.direct_get(node, part)
+ except direct_client.DirectClientException as err:
+ self.assertEqual(err.http_status, 404)
+ else:
+ self.fail('Node data on %r was not fully destoryed!' %
+ (node,))
+
+ # make sure we can still GET the object and its correct, the
+ # proxy is doing decode on remaining fragments to get the obj
+ self.assertEqual(etag, self.proxy_get())
+
+ # fire up reconstructor
+ self.reconstructor.once()
+
+ # fragment is rebuilt exactly as it was before!
+ self.assertEqual(fragment_archive_etag,
+ self.direct_get(node, part))
+
+ # check meta
+ meta = client.head_object(self.url, self.token,
+ self.container_name,
+ self.object_name)
+ for key in headers_post:
+ self.assertTrue(key in meta)
+ self.assertEqual(meta[key], headers_post[key])
+
+ def _format_node(self, node):
+ return '%s#%s' % (node['device'], node['index'])
+
+ def test_main(self):
+ # create EC container
+ headers = {'X-Storage-Policy': self.policy.name}
+ client.put_container(self.url, self.token, self.container_name,
+ headers=headers)
+
+ # PUT object
+ contents = Body()
+ headers = {'x-object-meta-foo': 'meta-foo'}
+ headers_post = {'x-object-meta-bar': 'meta-bar'}
+
+ etag = client.put_object(self.url, self.token,
+ self.container_name,
+ self.object_name,
+ contents=contents, headers=headers)
+ client.post_object(self.url, self.token, self.container_name,
+ self.object_name, headers=headers_post)
+ del headers_post['X-Auth-Token'] # WTF, where did this come from?
+
+ # built up a list of node lists to kill data from,
+ # first try a single node
+ # then adjacent nodes and then nodes >1 node apart
+ opart, onodes = self.object_ring.get_nodes(
+ self.account, self.container_name, self.object_name)
+ single_node = [random.choice(onodes)]
+ adj_nodes = [onodes[0], onodes[-1]]
+ far_nodes = [onodes[0], onodes[-2]]
+ test_list = [single_node, adj_nodes, far_nodes]
+
+ for node_list in test_list:
+ for onode in node_list:
+ try:
+ self._check_node(onode, opart, etag, headers_post)
+ except AssertionError as e:
+ self.fail(
+ str(e) + '\n... for node %r of scenario %r' % (
+ self._format_node(onode),
+ [self._format_node(n) for n in node_list]))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/probe/test_reconstructor_revert.py b/test/probe/test_reconstructor_revert.py
new file mode 100755
index 000000000..2a7bd7c83
--- /dev/null
+++ b/test/probe/test_reconstructor_revert.py
@@ -0,0 +1,258 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from hashlib import md5
+import unittest
+import uuid
+import os
+
+from test.probe.common import ECProbeTest
+
+from swift.common import direct_client
+from swift.common.storage_policy import EC_POLICY
+from swift.common.manager import Manager
+from swift.common.utils import renamer
+
+from swiftclient import client
+
+
+class Body(object):
+
+ def __init__(self, total=3.5 * 2 ** 20):
+ self.total = total
+ self.hasher = md5()
+ self.size = 0
+ self.chunk = 'test' * 16 * 2 ** 10
+
+ @property
+ def etag(self):
+ return self.hasher.hexdigest()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.size > self.total:
+ raise StopIteration()
+ self.size += len(self.chunk)
+ self.hasher.update(self.chunk)
+ return self.chunk
+
+ def __next__(self):
+ return self.next()
+
+
+class TestReconstructorRevert(ECProbeTest):
+
+ def setUp(self):
+ super(TestReconstructorRevert, self).setUp()
+ self.container_name = 'container-%s' % uuid.uuid4()
+ self.object_name = 'object-%s' % uuid.uuid4()
+
+ # sanity
+ self.assertEqual(self.policy.policy_type, EC_POLICY)
+ self.reconstructor = Manager(["object-reconstructor"])
+
+ def kill_drive(self, device):
+ if os.path.ismount(device):
+ os.system('sudo umount %s' % device)
+ else:
+ renamer(device, device + "X")
+
+ def revive_drive(self, device):
+ disabled_name = device + "X"
+ if os.path.isdir(disabled_name):
+ renamer(device + "X", device)
+ else:
+ os.system('sudo mount %s' % device)
+
+ def proxy_get(self):
+ # GET object
+ headers, body = client.get_object(self.url, self.token,
+ self.container_name,
+ self.object_name,
+ resp_chunk_size=64 * 2 ** 10)
+ resp_checksum = md5()
+ for chunk in body:
+ resp_checksum.update(chunk)
+ return resp_checksum.hexdigest()
+
+ def direct_get(self, node, part):
+ req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
+ headers, data = direct_client.direct_get_object(
+ node, part, self.account, self.container_name,
+ self.object_name, headers=req_headers,
+ resp_chunk_size=64 * 2 ** 20)
+ hasher = md5()
+ for chunk in data:
+ hasher.update(chunk)
+ return hasher.hexdigest()
+
+ def test_revert_object(self):
+ # create EC container
+ headers = {'X-Storage-Policy': self.policy.name}
+ client.put_container(self.url, self.token, self.container_name,
+ headers=headers)
+
+ # get our node lists
+ opart, onodes = self.object_ring.get_nodes(
+ self.account, self.container_name, self.object_name)
+ hnodes = self.object_ring.get_more_nodes(opart)
+
+ # kill 2 a parity count number of primary nodes so we can
+ # force data onto handoffs, we do that by renaming dev dirs
+ # to induce 507
+ p_dev1 = self.device_dir('object', onodes[0])
+ p_dev2 = self.device_dir('object', onodes[1])
+ self.kill_drive(p_dev1)
+ self.kill_drive(p_dev2)
+
+ # PUT object
+ contents = Body()
+ headers = {'x-object-meta-foo': 'meta-foo'}
+ headers_post = {'x-object-meta-bar': 'meta-bar'}
+ client.put_object(self.url, self.token, self.container_name,
+ self.object_name, contents=contents,
+ headers=headers)
+ client.post_object(self.url, self.token, self.container_name,
+ self.object_name, headers=headers_post)
+ del headers_post['X-Auth-Token'] # WTF, where did this come from?
+
+ # these primaries can't servce the data any more, we expect 507
+ # here and not 404 because we're using mount_check to kill nodes
+ for onode in (onodes[0], onodes[1]):
+ try:
+ self.direct_get(onode, opart)
+ except direct_client.DirectClientException as err:
+ self.assertEqual(err.http_status, 507)
+ else:
+ self.fail('Node data on %r was not fully destoryed!' %
+ (onode,))
+
+ # now take out another primary
+ p_dev3 = self.device_dir('object', onodes[2])
+ self.kill_drive(p_dev3)
+
+ # this node can't servce the data any more
+ try:
+ self.direct_get(onodes[2], opart)
+ except direct_client.DirectClientException as err:
+ self.assertEqual(err.http_status, 507)
+ else:
+ self.fail('Node data on %r was not fully destoryed!' %
+ (onode,))
+
+ # make sure we can still GET the object and its correct
+ # we're now pulling from handoffs and reconstructing
+ etag = self.proxy_get()
+ self.assertEqual(etag, contents.etag)
+
+ # rename the dev dirs so they don't 507 anymore
+ self.revive_drive(p_dev1)
+ self.revive_drive(p_dev2)
+ self.revive_drive(p_dev3)
+
+ # fire up reconstructor on handoff nodes only
+ for hnode in hnodes:
+ hnode_id = (hnode['port'] - 6000) / 10
+ self.reconstructor.once(number=hnode_id)
+
+ # first threee primaries have data again
+ for onode in (onodes[0], onodes[2]):
+ self.direct_get(onode, opart)
+
+ # check meta
+ meta = client.head_object(self.url, self.token,
+ self.container_name,
+ self.object_name)
+ for key in headers_post:
+ self.assertTrue(key in meta)
+ self.assertEqual(meta[key], headers_post[key])
+
+ # handoffs are empty
+ for hnode in hnodes:
+ try:
+ self.direct_get(hnode, opart)
+ except direct_client.DirectClientException as err:
+ self.assertEqual(err.http_status, 404)
+ else:
+ self.fail('Node data on %r was not fully destoryed!' %
+ (hnode,))
+
+ def test_delete_propogate(self):
+ # create EC container
+ headers = {'X-Storage-Policy': self.policy.name}
+ client.put_container(self.url, self.token, self.container_name,
+ headers=headers)
+
+ # get our node lists
+ opart, onodes = self.object_ring.get_nodes(
+ self.account, self.container_name, self.object_name)
+ hnodes = self.object_ring.get_more_nodes(opart)
+ p_dev2 = self.device_dir('object', onodes[1])
+
+ # PUT object
+ contents = Body()
+ client.put_object(self.url, self.token, self.container_name,
+ self.object_name, contents=contents)
+
+ # now lets shut one down
+ self.kill_drive(p_dev2)
+
+ # delete on the ones that are left
+ client.delete_object(self.url, self.token,
+ self.container_name,
+ self.object_name)
+
+ # spot check a node
+ try:
+ self.direct_get(onodes[0], opart)
+ except direct_client.DirectClientException as err:
+ self.assertEqual(err.http_status, 404)
+ else:
+ self.fail('Node data on %r was not fully destoryed!' %
+ (onodes[0],))
+
+ # enable the first node again
+ self.revive_drive(p_dev2)
+
+ # propogate the delete...
+ # fire up reconstructor on handoff nodes only
+ for hnode in hnodes:
+ hnode_id = (hnode['port'] - 6000) / 10
+ self.reconstructor.once(number=hnode_id, override_devices=['sdb8'])
+
+ # check the first node to make sure its gone
+ try:
+ self.direct_get(onodes[1], opart)
+ except direct_client.DirectClientException as err:
+ self.assertEqual(err.http_status, 404)
+ else:
+ self.fail('Node data on %r was not fully destoryed!' %
+ (onodes[0]))
+
+ # make sure proxy get can't find it
+ try:
+ self.proxy_get()
+ except Exception as err:
+ self.assertEqual(err.http_status, 404)
+ else:
+ self.fail('Node data on %r was not fully destoryed!' %
+ (onodes[0]))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/probe/test_replication_servers_working.py b/test/probe/test_replication_servers_working.py
index db657f3e7..64b906fdc 100644
--- a/test/probe/test_replication_servers_working.py
+++ b/test/probe/test_replication_servers_working.py
@@ -21,7 +21,6 @@ import time
import shutil
from swiftclient import client
-from swift.common.storage_policy import POLICIES
from swift.obj.diskfile import get_data_dir
from test.probe.common import ReplProbeTest
@@ -88,7 +87,7 @@ class TestReplicatorFunctions(ReplProbeTest):
# Delete file "hashes.pkl".
# Check, that all files were replicated.
path_list = []
- data_dir = get_data_dir(POLICIES.default.idx)
+ data_dir = get_data_dir(self.policy)
# Figure out where the devices are
for node_id in range(1, 5):
conf = readconf(self.configs['object-server'][node_id])
@@ -100,7 +99,9 @@ class TestReplicatorFunctions(ReplProbeTest):
# Put data to storage nodes
container = 'container-%s' % uuid4()
- client.put_container(self.url, self.token, container)
+ client.put_container(self.url, self.token, container,
+ headers={'X-Storage-Policy':
+ self.policy.name})
obj = 'object-%s' % uuid4()
client.put_object(self.url, self.token, container, obj, 'VERIFY')
diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py
new file mode 100755
index 000000000..93a50e84d
--- /dev/null
+++ b/test/unit/obj/test_reconstructor.py
@@ -0,0 +1,2484 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import itertools
+import unittest
+import os
+from hashlib import md5
+import mock
+import cPickle as pickle
+import tempfile
+import time
+import shutil
+import re
+import random
+from eventlet import Timeout
+
+from contextlib import closing, nested, contextmanager
+from gzip import GzipFile
+from shutil import rmtree
+from swift.common import utils
+from swift.common.exceptions import DiskFileError
+from swift.obj import diskfile, reconstructor as object_reconstructor
+from swift.common import ring
+from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
+ POLICIES, EC_POLICY)
+from swift.obj.reconstructor import REVERT
+
+from test.unit import (patch_policies, debug_logger, mocked_http_conn,
+ FabricatedRing, make_timestamp_iter)
+
+
+@contextmanager
+def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
+ def fake_ssync(daemon, node, job, suffixes):
+ if ssync_calls is not None:
+ ssync_calls.append(
+ {'node': node, 'job': job, 'suffixes': suffixes})
+
+ def fake_call():
+ if response_callback:
+ response = response_callback(node, job, suffixes)
+ else:
+ response = True, {}
+ return response
+ return fake_call
+
+ with mock.patch('swift.obj.reconstructor.ssync_sender', fake_ssync):
+ yield fake_ssync
+
+
+def make_ec_archive_bodies(policy, test_body):
+ segment_size = policy.ec_segment_size
+ # split up the body into buffers
+ chunks = [test_body[x:x + segment_size]
+ for x in range(0, len(test_body), segment_size)]
+ # encode the buffers into fragment payloads
+ fragment_payloads = []
+ for chunk in chunks:
+ fragments = policy.pyeclib_driver.encode(chunk)
+ if not fragments:
+ break
+ fragment_payloads.append(fragments)
+
+ # join up the fragment payloads per node
+ ec_archive_bodies = [''.join(fragments)
+ for fragments in zip(*fragment_payloads)]
+ return ec_archive_bodies
+
+
+def _ips():
+ return ['127.0.0.1']
+object_reconstructor.whataremyips = _ips
+
+
+def _create_test_rings(path):
+ testgz = os.path.join(path, 'object.ring.gz')
+ intended_replica2part2dev_id = [
+ [0, 1, 2],
+ [1, 2, 3],
+ [2, 3, 0]
+ ]
+
+ intended_devs = [
+ {'id': 0, 'device': 'sda1', 'zone': 0, 'ip': '127.0.0.0',
+ 'port': 6000},
+ {'id': 1, 'device': 'sda1', 'zone': 1, 'ip': '127.0.0.1',
+ 'port': 6000},
+ {'id': 2, 'device': 'sda1', 'zone': 2, 'ip': '127.0.0.2',
+ 'port': 6000},
+ {'id': 3, 'device': 'sda1', 'zone': 4, 'ip': '127.0.0.3',
+ 'port': 6000}
+ ]
+ intended_part_shift = 30
+ with closing(GzipFile(testgz, 'wb')) as f:
+ pickle.dump(
+ ring.RingData(intended_replica2part2dev_id,
+ intended_devs, intended_part_shift),
+ f)
+
+ testgz = os.path.join(path, 'object-1.ring.gz')
+ with closing(GzipFile(testgz, 'wb')) as f:
+ pickle.dump(
+ ring.RingData(intended_replica2part2dev_id,
+ intended_devs, intended_part_shift),
+ f)
+
+
+def count_stats(logger, key, metric):
+ count = 0
+ for record in logger.log_dict[key]:
+ log_args, log_kwargs = record
+ m = log_args[0]
+ if re.match(metric, m):
+ count += 1
+ return count
+
+
+@patch_policies([StoragePolicy(0, name='zero', is_default=True),
+ ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand',
+ ec_ndata=2, ec_nparity=1)])
+class TestGlobalSetupObjectReconstructor(unittest.TestCase):
+
+ def setUp(self):
+ self.testdir = tempfile.mkdtemp()
+ _create_test_rings(self.testdir)
+ POLICIES[0].object_ring = ring.Ring(self.testdir, ring_name='object')
+ POLICIES[1].object_ring = ring.Ring(self.testdir, ring_name='object-1')
+ utils.HASH_PATH_SUFFIX = 'endcap'
+ utils.HASH_PATH_PREFIX = ''
+ self.devices = os.path.join(self.testdir, 'node')
+ os.makedirs(self.devices)
+ os.mkdir(os.path.join(self.devices, 'sda1'))
+ self.objects = os.path.join(self.devices, 'sda1',
+ diskfile.get_data_dir(POLICIES[0]))
+ self.objects_1 = os.path.join(self.devices, 'sda1',
+ diskfile.get_data_dir(POLICIES[1]))
+ os.mkdir(self.objects)
+ os.mkdir(self.objects_1)
+ self.parts = {}
+ self.parts_1 = {}
+ self.part_nums = ['0', '1', '2']
+ for part in self.part_nums:
+ self.parts[part] = os.path.join(self.objects, part)
+ os.mkdir(self.parts[part])
+ self.parts_1[part] = os.path.join(self.objects_1, part)
+ os.mkdir(self.parts_1[part])
+
+ self.conf = dict(
+ swift_dir=self.testdir, devices=self.devices, mount_check='false',
+ timeout='300', stats_interval='1')
+ self.logger = debug_logger('test-reconstructor')
+ self.reconstructor = object_reconstructor.ObjectReconstructor(
+ self.conf, logger=self.logger)
+
+ self.policy = POLICIES[1]
+
+ # most of the reconstructor test methods require that there be
+ # real objects in place, not just part dirs, so we'll create them
+ # all here....
+ # part 0: 3C1/hash/xxx-1.data <-- job: sync_only - parnters (FI 1)
+ # /xxx.durable <-- included in earlier job (FI 1)
+ # 061/hash/xxx-1.data <-- included in earlier job (FI 1)
+ # /xxx.durable <-- included in earlier job (FI 1)
+ # /xxx-2.data <-- job: sync_revert to index 2
+
+ # part 1: 3C1/hash/xxx-0.data <-- job: sync_only - parnters (FI 0)
+ # /xxx-1.data <-- job: sync_revert to index 1
+ # /xxx.durable <-- included in earlier jobs (FI 0, 1)
+ # 061/hash/xxx-1.data <-- included in earlier job (FI 1)
+ # /xxx.durable <-- included in earlier job (FI 1)
+
+ # part 2: 3C1/hash/xxx-2.data <-- job: sync_revert to index 2
+ # /xxx.durable <-- included in earlier job (FI 2)
+ # 061/hash/xxx-0.data <-- job: sync_revert to index 0
+ # /xxx.durable <-- included in earlier job (FI 0)
+
+ def _create_frag_archives(policy, obj_path, local_id, obj_set):
+ # we'll create 2 sets of objects in different suffix dirs
+ # so we cover all the scenarios we want (3 of them)
+ # 1) part dir with all FI's matching the local node index
+ # 2) part dir with one local and mix of others
+ # 3) part dir with no local FI and one or more others
+ def part_0(set):
+ if set == 0:
+ # just the local
+ return local_id
+ else:
+ # onde local and all of another
+ if obj_num == 0:
+ return local_id
+ else:
+ return (local_id + 1) % 3
+
+ def part_1(set):
+ if set == 0:
+ # one local and all of another
+ if obj_num == 0:
+ return local_id
+ else:
+ return (local_id + 2) % 3
+ else:
+ # just the local node
+ return local_id
+
+ def part_2(set):
+ # this part is a handoff in our config (always)
+ # so lets do a set with indicies from different nodes
+ if set == 0:
+ return (local_id + 1) % 3
+ else:
+ return (local_id + 2) % 3
+
+ # function dictionary for defining test scenarios base on set #
+ scenarios = {'0': part_0,
+ '1': part_1,
+ '2': part_2}
+
+ def _create_df(obj_num, part_num):
+ self._create_diskfile(
+ part=part_num, object_name='o' + str(obj_set),
+ policy=policy, frag_index=scenarios[part_num](obj_set),
+ timestamp=utils.Timestamp(t))
+
+ for part_num in self.part_nums:
+ # create 3 unique objcets per part, each part
+ # will then have a unique mix of FIs for the
+ # possible scenarios
+ for obj_num in range(0, 3):
+ _create_df(obj_num, part_num)
+
+ ips = utils.whataremyips()
+ for policy in [p for p in POLICIES if p.policy_type == EC_POLICY]:
+ self.ec_policy = policy
+ self.ec_obj_ring = self.reconstructor.load_object_ring(
+ self.ec_policy)
+ data_dir = diskfile.get_data_dir(self.ec_policy)
+ for local_dev in [dev for dev in self.ec_obj_ring.devs
+ if dev and dev['replication_ip'] in ips and
+ dev['replication_port'] ==
+ self.reconstructor.port]:
+ self.ec_local_dev = local_dev
+ dev_path = os.path.join(self.reconstructor.devices_dir,
+ self.ec_local_dev['device'])
+ self.ec_obj_path = os.path.join(dev_path, data_dir)
+
+ # create a bunch of FA's to test
+ t = 1421181937.70054 # time.time()
+ with mock.patch('swift.obj.diskfile.time') as mock_time:
+ # since (a) we are using a fixed time here to create
+ # frags which corresponds to all the hardcoded hashes and
+ # (b) the EC diskfile will delete its .data file right
+ # after creating if it has expired, use this horrible hack
+ # to prevent the reclaim happening
+ mock_time.time.return_value = 0.0
+ _create_frag_archives(self.ec_policy, self.ec_obj_path,
+ self.ec_local_dev['id'], 0)
+ _create_frag_archives(self.ec_policy, self.ec_obj_path,
+ self.ec_local_dev['id'], 1)
+ break
+ break
+
+ def tearDown(self):
+ rmtree(self.testdir, ignore_errors=1)
+
+ def _create_diskfile(self, policy=None, part=0, object_name='o',
+ frag_index=0, timestamp=None, test_data=None):
+ policy = policy or self.policy
+ df_mgr = self.reconstructor._df_router[policy]
+ df = df_mgr.get_diskfile('sda1', part, 'a', 'c', object_name,
+ policy=policy)
+ with df.create() as writer:
+ timestamp = timestamp or utils.Timestamp(time.time())
+ test_data = test_data or 'test data'
+ writer.write(test_data)
+ metadata = {
+ 'X-Timestamp': timestamp.internal,
+ 'Content-Length': len(test_data),
+ 'Etag': md5(test_data).hexdigest(),
+ 'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
+ }
+ writer.put(metadata)
+ writer.commit(timestamp)
+ return df
+
+ def debug_wtf(self):
+ # won't include this in the final, just handy reminder of where
+ # things are...
+ for pol in [p for p in POLICIES if p.policy_type == EC_POLICY]:
+ obj_ring = pol.object_ring
+ for part_num in self.part_nums:
+ print "\n part_num %s " % part_num
+ part_nodes = obj_ring.get_part_nodes(int(part_num))
+ print "\n part_nodes %s " % part_nodes
+ for local_dev in obj_ring.devs:
+ partners = self.reconstructor._get_partners(
+ local_dev['id'], obj_ring, part_num)
+ if partners:
+ print "\n local_dev %s \n partners %s " % (local_dev,
+ partners)
+
+ def assert_expected_jobs(self, part_num, jobs):
+ for job in jobs:
+ del job['path']
+ del job['policy']
+ if 'local_index' in job:
+ del job['local_index']
+ job['suffixes'].sort()
+
+ expected = []
+ # part num 0
+ expected.append(
+ [{
+ 'sync_to': [{
+ 'index': 2,
+ 'replication_port': 6000,
+ 'zone': 2,
+ 'ip': '127.0.0.2',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.2',
+ 'device': 'sda1',
+ 'id': 2,
+ }],
+ 'job_type': object_reconstructor.REVERT,
+ 'suffixes': ['061'],
+ 'partition': 0,
+ 'frag_index': 2,
+ 'device': 'sda1',
+ 'local_dev': {
+ 'replication_port': 6000,
+ 'zone': 1,
+ 'ip': '127.0.0.1',
+ 'region': 1,
+ 'id': 1,
+ 'replication_ip': '127.0.0.1',
+ 'device': 'sda1', 'port': 6000,
+ },
+ 'hashes': {
+ '061': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 2: '0e6e8d48d801dc89fd31904ae3b31229',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229',
+ },
+ '3c1': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229',
+ },
+ },
+ }, {
+ 'sync_to': [{
+ 'index': 0,
+ 'replication_port': 6000,
+ 'zone': 0,
+ 'ip': '127.0.0.0',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.0',
+ 'device': 'sda1', 'id': 0,
+ }, {
+ 'index': 2,
+ 'replication_port': 6000,
+ 'zone': 2,
+ 'ip': '127.0.0.2',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.2',
+ 'device': 'sda1',
+ 'id': 2,
+ }],
+ 'job_type': object_reconstructor.SYNC,
+ 'sync_diskfile_builder': self.reconstructor.reconstruct_fa,
+ 'suffixes': ['061', '3c1'],
+ 'partition': 0,
+ 'frag_index': 1,
+ 'device': 'sda1',
+ 'local_dev': {
+ 'replication_port': 6000,
+ 'zone': 1,
+ 'ip': '127.0.0.1',
+ 'region': 1,
+ 'id': 1,
+ 'replication_ip': '127.0.0.1',
+ 'device': 'sda1',
+ 'port': 6000,
+ },
+ 'hashes':
+ {
+ '061': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 2: '0e6e8d48d801dc89fd31904ae3b31229',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229'
+ },
+ '3c1': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229',
+ },
+ },
+ }]
+ )
+ # part num 1
+ expected.append(
+ [{
+ 'sync_to': [{
+ 'index': 1,
+ 'replication_port': 6000,
+ 'zone': 2,
+ 'ip': '127.0.0.2',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.2',
+ 'device': 'sda1',
+ 'id': 2,
+ }],
+ 'job_type': object_reconstructor.REVERT,
+ 'suffixes': ['061', '3c1'],
+ 'partition': 1,
+ 'frag_index': 1,
+ 'device': 'sda1',
+ 'local_dev': {
+ 'replication_port': 6000,
+ 'zone': 1,
+ 'ip': '127.0.0.1',
+ 'region': 1,
+ 'id': 1,
+ 'replication_ip': '127.0.0.1',
+ 'device': 'sda1',
+ 'port': 6000,
+ },
+ 'hashes':
+ {
+ '061': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229',
+ },
+ '3c1': {
+ 0: '0e6e8d48d801dc89fd31904ae3b31229',
+ None: '85b02a5283704292a511078a5c483da5',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229',
+ },
+ },
+ }, {
+ 'sync_to': [{
+ 'index': 2,
+ 'replication_port': 6000,
+ 'zone': 4,
+ 'ip': '127.0.0.3',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.3',
+ 'device': 'sda1', 'id': 3,
+ }, {
+ 'index': 1,
+ 'replication_port': 6000,
+ 'zone': 2,
+ 'ip': '127.0.0.2',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.2',
+ 'device': 'sda1',
+ 'id': 2,
+ }],
+ 'job_type': object_reconstructor.SYNC,
+ 'sync_diskfile_builder': self.reconstructor.reconstruct_fa,
+ 'suffixes': ['3c1'],
+ 'partition': 1,
+ 'frag_index': 0,
+ 'device': 'sda1',
+ 'local_dev': {
+ 'replication_port': 6000,
+ 'zone': 1,
+ 'ip': '127.0.0.1',
+ 'region': 1,
+ 'id': 1,
+ 'replication_ip': '127.0.0.1',
+ 'device': 'sda1',
+ 'port': 6000,
+ },
+ 'hashes': {
+ '061': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229',
+ },
+ '3c1': {
+ 0: '0e6e8d48d801dc89fd31904ae3b31229',
+ None: '85b02a5283704292a511078a5c483da5',
+ 1: '0e6e8d48d801dc89fd31904ae3b31229',
+ },
+ },
+ }]
+ )
+ # part num 2
+ expected.append(
+ [{
+ 'sync_to': [{
+ 'index': 0,
+ 'replication_port': 6000,
+ 'zone': 2,
+ 'ip': '127.0.0.2',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.2',
+ 'device': 'sda1', 'id': 2,
+ }],
+ 'job_type': object_reconstructor.REVERT,
+ 'suffixes': ['061'],
+ 'partition': 2,
+ 'frag_index': 0,
+ 'device': 'sda1',
+ 'local_dev': {
+ 'replication_port': 6000,
+ 'zone': 1,
+ 'ip': '127.0.0.1',
+ 'region': 1,
+ 'id': 1,
+ 'replication_ip': '127.0.0.1',
+ 'device': 'sda1',
+ 'port': 6000,
+ },
+ 'hashes': {
+ '061': {
+ 0: '0e6e8d48d801dc89fd31904ae3b31229',
+ None: '85b02a5283704292a511078a5c483da5'
+ },
+ '3c1': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 2: '0e6e8d48d801dc89fd31904ae3b31229'
+ },
+ },
+ }, {
+ 'sync_to': [{
+ 'index': 2,
+ 'replication_port': 6000,
+ 'zone': 0,
+ 'ip': '127.0.0.0',
+ 'region': 1,
+ 'port': 6000,
+ 'replication_ip': '127.0.0.0',
+ 'device': 'sda1',
+ 'id': 0,
+ }],
+ 'job_type': object_reconstructor.REVERT,
+ 'suffixes': ['3c1'],
+ 'partition': 2,
+ 'frag_index': 2,
+ 'device': 'sda1',
+ 'local_dev': {
+ 'replication_port': 6000,
+ 'zone': 1,
+ 'ip': '127.0.0.1',
+ 'region': 1,
+ 'id': 1,
+ 'replication_ip': '127.0.0.1',
+ 'device': 'sda1',
+ 'port': 6000
+ },
+ 'hashes': {
+ '061': {
+ 0: '0e6e8d48d801dc89fd31904ae3b31229',
+ None: '85b02a5283704292a511078a5c483da5'
+ },
+ '3c1': {
+ None: '85b02a5283704292a511078a5c483da5',
+ 2: '0e6e8d48d801dc89fd31904ae3b31229'
+ },
+ },
+ }]
+ )
+
+ def check_jobs(part_num):
+ try:
+ expected_jobs = expected[int(part_num)]
+ except (IndexError, ValueError):
+ self.fail('Unknown part number %r' % part_num)
+ expected_by_part_frag_index = dict(
+ ((j['partition'], j['frag_index']), j) for j in expected_jobs)
+ for job in jobs:
+ job_key = (job['partition'], job['frag_index'])
+ if job_key in expected_by_part_frag_index:
+ for k, value in job.items():
+ expected_value = \
+ expected_by_part_frag_index[job_key][k]
+ try:
+ if isinstance(value, list):
+ value.sort()
+ expected_value.sort()
+ self.assertEqual(value, expected_value)
+ except AssertionError as e:
+ extra_info = \
+ '\n\n... for %r in part num %s job %r' % (
+ k, part_num, job_key)
+ raise AssertionError(str(e) + extra_info)
+ else:
+ self.fail(
+ 'Unexpected job %r for part num %s - '
+ 'expected jobs where %r' % (
+ job_key, part_num,
+ expected_by_part_frag_index.keys()))
+ for expected_job in expected_jobs:
+ if expected_job in jobs:
+ jobs.remove(expected_job)
+ self.assertFalse(jobs) # that should be all of them
+ check_jobs(part_num)
+
+ def test_run_once(self):
+ with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
+ with mock_ssync_sender():
+ self.reconstructor.run_once()
+
+ def test_get_response(self):
+ part = self.part_nums[0]
+ node = POLICIES[0].object_ring.get_part_nodes(int(part))[0]
+ for stat_code in (200, 400):
+ with mocked_http_conn(stat_code):
+ resp = self.reconstructor._get_response(node, part,
+ path='nada',
+ headers={},
+ policy=POLICIES[0])
+ if resp:
+ self.assertEqual(resp.status, 200)
+ else:
+ self.assertEqual(
+ len(self.reconstructor.logger.log_dict['warning']), 1)
+
+ def test_reconstructor_skips_bogus_partition_dirs(self):
+ # A directory in the wrong place shouldn't crash the reconstructor
+ rmtree(self.objects_1)
+ os.mkdir(self.objects_1)
+
+ os.mkdir(os.path.join(self.objects_1, "burrito"))
+ jobs = []
+ for part_info in self.reconstructor.collect_parts():
+ jobs += self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertEqual(len(jobs), 0)
+
+ def test_check_ring(self):
+ testring = tempfile.mkdtemp()
+ _create_test_rings(testring)
+ obj_ring = ring.Ring(testring, ring_name='object') # noqa
+ self.assertTrue(self.reconstructor.check_ring(obj_ring))
+ orig_check = self.reconstructor.next_check
+ self.reconstructor.next_check = orig_check - 30
+ self.assertTrue(self.reconstructor.check_ring(obj_ring))
+ self.reconstructor.next_check = orig_check
+ orig_ring_time = obj_ring._mtime
+ obj_ring._mtime = orig_ring_time - 30
+ self.assertTrue(self.reconstructor.check_ring(obj_ring))
+ self.reconstructor.next_check = orig_check - 30
+ self.assertFalse(self.reconstructor.check_ring(obj_ring))
+ rmtree(testring, ignore_errors=1)
+
+ def test_build_reconstruction_jobs(self):
+ self.reconstructor.handoffs_first = False
+ self.reconstructor._reset_stats()
+ for part_info in self.reconstructor.collect_parts():
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertTrue(jobs[0]['job_type'] in
+ (object_reconstructor.SYNC,
+ object_reconstructor.REVERT))
+ self.assert_expected_jobs(part_info['partition'], jobs)
+
+ self.reconstructor.handoffs_first = True
+ self.reconstructor._reset_stats()
+ for part_info in self.reconstructor.collect_parts():
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertTrue(jobs[0]['job_type'] ==
+ object_reconstructor.REVERT)
+ self.assert_expected_jobs(part_info['partition'], jobs)
+
+ def test_get_partners(self):
+ # we're going to perform an exhaustive test of every possible
+ # combination of partitions and nodes in our custom test ring
+
+ # format: [dev_id in question, 'part_num',
+ # [part_nodes for the given part], left id, right id...]
+ expected_partners = sorted([
+ (0, '0', [0, 1, 2], 2, 1), (0, '2', [2, 3, 0], 3, 2),
+ (1, '0', [0, 1, 2], 0, 2), (1, '1', [1, 2, 3], 3, 2),
+ (2, '0', [0, 1, 2], 1, 0), (2, '1', [1, 2, 3], 1, 3),
+ (2, '2', [2, 3, 0], 0, 3), (3, '1', [1, 2, 3], 2, 1),
+ (3, '2', [2, 3, 0], 2, 0), (0, '0', [0, 1, 2], 2, 1),
+ (0, '2', [2, 3, 0], 3, 2), (1, '0', [0, 1, 2], 0, 2),
+ (1, '1', [1, 2, 3], 3, 2), (2, '0', [0, 1, 2], 1, 0),
+ (2, '1', [1, 2, 3], 1, 3), (2, '2', [2, 3, 0], 0, 3),
+ (3, '1', [1, 2, 3], 2, 1), (3, '2', [2, 3, 0], 2, 0),
+ ])
+
+ got_partners = []
+ for pol in POLICIES:
+ obj_ring = pol.object_ring
+ for part_num in self.part_nums:
+ part_nodes = obj_ring.get_part_nodes(int(part_num))
+ primary_ids = [n['id'] for n in part_nodes]
+ for node in part_nodes:
+ partners = self.reconstructor._get_partners(
+ node['index'], part_nodes)
+ left = partners[0]['id']
+ right = partners[1]['id']
+ got_partners.append((
+ node['id'], part_num, primary_ids, left, right))
+
+ self.assertEqual(expected_partners, sorted(got_partners))
+
+ def test_collect_parts(self):
+ parts = []
+ for part_info in self.reconstructor.collect_parts():
+ parts.append(part_info['partition'])
+ self.assertEqual(sorted(parts), [0, 1, 2])
+
+ def test_collect_parts_mkdirs_error(self):
+
+ def blowup_mkdirs(path):
+ raise OSError('Ow!')
+
+ with mock.patch.object(object_reconstructor, 'mkdirs', blowup_mkdirs):
+ rmtree(self.objects_1, ignore_errors=1)
+ parts = []
+ for part_info in self.reconstructor.collect_parts():
+ parts.append(part_info['partition'])
+ error_lines = self.logger.get_lines_for_level('error')
+ self.assertEqual(len(error_lines), 1)
+ log_args, log_kwargs = self.logger.log_dict['error'][0]
+ self.assertEquals(str(log_kwargs['exc_info'][1]), 'Ow!')
+
+ def test_removes_zbf(self):
+ # After running xfs_repair, a partition directory could become a
+ # zero-byte file. If this happens, the reconstructor should clean it
+ # up, log something, and move on to the next partition.
+
+ # Surprise! Partition dir 1 is actually a zero-byte file.
+ pol_1_part_1_path = os.path.join(self.objects_1, '1')
+ rmtree(pol_1_part_1_path)
+ with open(pol_1_part_1_path, 'w'):
+ pass
+ self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
+
+ # since our collect_parts job is a generator, that yields directly
+ # into build_jobs and then spawns it's safe to do the remove_files
+ # without making reconstructor startup slow
+ for part_info in self.reconstructor.collect_parts():
+ self.assertNotEqual(pol_1_part_1_path, part_info['part_path'])
+ self.assertFalse(os.path.exists(pol_1_part_1_path))
+ warnings = self.reconstructor.logger.get_lines_for_level('warning')
+ self.assertEqual(1, len(warnings))
+ self.assertTrue('Unexpected entity in data dir:' in warnings[0],
+ 'Warning not found in %s' % warnings)
+
+ def _make_fake_ssync(self, ssync_calls):
+ class _fake_ssync(object):
+ def __init__(self, daemon, node, job, suffixes, **kwargs):
+ # capture context and generate an available_map of objs
+ context = {}
+ context['node'] = node
+ context['job'] = job
+ context['suffixes'] = suffixes
+ self.suffixes = suffixes
+ self.daemon = daemon
+ self.job = job
+ hash_gen = self.daemon._diskfile_mgr.yield_hashes(
+ self.job['device'], self.job['partition'],
+ self.job['policy'], self.suffixes,
+ frag_index=self.job.get('frag_index'))
+ self.available_map = {}
+ for path, hash_, ts in hash_gen:
+ self.available_map[hash_] = ts
+ context['available_map'] = self.available_map
+ ssync_calls.append(context)
+
+ def __call__(self, *args, **kwargs):
+ return True, self.available_map
+
+ return _fake_ssync
+
+ def test_delete_reverted(self):
+ # verify reconstructor deletes reverted frag indexes after ssync'ing
+
+ def visit_obj_dirs(context):
+ for suff in context['suffixes']:
+ suff_dir = os.path.join(
+ context['job']['path'], suff)
+ for root, dirs, files in os.walk(suff_dir):
+ for d in dirs:
+ dirpath = os.path.join(root, d)
+ files = os.listdir(dirpath)
+ yield dirpath, files
+
+ n_files = n_files_after = 0
+
+ # run reconstructor with delete function mocked out to check calls
+ ssync_calls = []
+ delete_func =\
+ 'swift.obj.reconstructor.ObjectReconstructor.delete_reverted_objs'
+ with mock.patch('swift.obj.reconstructor.ssync_sender',
+ self._make_fake_ssync(ssync_calls)):
+ with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
+ with mock.patch(delete_func) as mock_delete:
+ self.reconstructor.reconstruct()
+ expected_calls = []
+ for context in ssync_calls:
+ if context['job']['job_type'] == REVERT:
+ for dirpath, files in visit_obj_dirs(context):
+ # sanity check - expect some files to be in dir,
+ # may not be for the reverted frag index
+ self.assertTrue(files)
+ n_files += len(files)
+ expected_calls.append(mock.call(context['job'],
+ context['available_map'],
+ context['node']['index']))
+ mock_delete.assert_has_calls(expected_calls, any_order=True)
+
+ ssync_calls = []
+ with mock.patch('swift.obj.reconstructor.ssync_sender',
+ self._make_fake_ssync(ssync_calls)):
+ with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
+ self.reconstructor.reconstruct()
+ for context in ssync_calls:
+ if context['job']['job_type'] == REVERT:
+ data_file_tail = ('#%s.data'
+ % context['node']['index'])
+ for dirpath, files in visit_obj_dirs(context):
+ n_files_after += len(files)
+ for filename in files:
+ self.assertFalse(
+ filename.endswith(data_file_tail))
+
+ # sanity check that some files should were deleted
+ self.assertTrue(n_files > n_files_after)
+
+ def test_get_part_jobs(self):
+ # yeah, this test code expects a specific setup
+ self.assertEqual(len(self.part_nums), 3)
+
+ # OK, at this point we should have 4 loaded parts with one
+ jobs = []
+ for partition in os.listdir(self.ec_obj_path):
+ part_path = os.path.join(self.ec_obj_path, partition)
+ jobs = self.reconstructor._get_part_jobs(
+ self.ec_local_dev, part_path, int(partition), self.ec_policy)
+ self.assert_expected_jobs(partition, jobs)
+
+ def assertStatCount(self, stat_method, stat_prefix, expected_count):
+ count = count_stats(self.logger, stat_method, stat_prefix)
+ msg = 'expected %s != %s for %s %s' % (
+ expected_count, count, stat_method, stat_prefix)
+ self.assertEqual(expected_count, count, msg)
+
+ def test_delete_partition(self):
+ # part 2 is predefined to have all revert jobs
+ part_path = os.path.join(self.objects_1, '2')
+ self.assertTrue(os.access(part_path, os.F_OK))
+
+ ssync_calls = []
+ status = [200] * 2
+ body = pickle.dumps({})
+ with mocked_http_conn(*status, body=body) as request_log:
+ with mock.patch('swift.obj.reconstructor.ssync_sender',
+ self._make_fake_ssync(ssync_calls)):
+ self.reconstructor.reconstruct(override_partitions=[2])
+ expected_repliate_calls = set([
+ ('127.0.0.0', '/sda1/2/3c1'),
+ ('127.0.0.2', '/sda1/2/061'),
+ ])
+ found_calls = set((r['ip'], r['path'])
+ for r in request_log.requests)
+ self.assertEqual(expected_repliate_calls, found_calls)
+
+ expected_ssync_calls = sorted([
+ ('127.0.0.0', REVERT, 2, ['3c1']),
+ ('127.0.0.2', REVERT, 2, ['061']),
+ ])
+ self.assertEqual(expected_ssync_calls, sorted((
+ c['node']['ip'],
+ c['job']['job_type'],
+ c['job']['partition'],
+ c['suffixes'],
+ ) for c in ssync_calls))
+
+ expected_stats = {
+ ('increment', 'partition.delete.count.'): 2,
+ ('timing_since', 'partition.delete.timing'): 2,
+ }
+ for stat_key, expected in expected_stats.items():
+ stat_method, stat_prefix = stat_key
+ self.assertStatCount(stat_method, stat_prefix, expected)
+ # part 2 should be totally empty
+ policy = POLICIES[1]
+ hash_gen = self.reconstructor._df_router[policy].yield_hashes(
+ 'sda1', '2', policy)
+ for path, hash_, ts in hash_gen:
+ self.fail('found %s with %s in %s', (hash_, ts, path))
+ # but the partition directory and hashes pkl still exist
+ self.assertTrue(os.access(part_path, os.F_OK))
+ hashes_path = os.path.join(self.objects_1, '2', diskfile.HASH_FILE)
+ self.assertTrue(os.access(hashes_path, os.F_OK))
+
+ # ... but on next pass
+ ssync_calls = []
+ with mocked_http_conn() as request_log:
+ with mock.patch('swift.obj.reconstructor.ssync_sender',
+ self._make_fake_ssync(ssync_calls)):
+ self.reconstructor.reconstruct(override_partitions=[2])
+ # reconstruct won't generate any replicate or ssync_calls
+ self.assertFalse(request_log.requests)
+ self.assertFalse(ssync_calls)
+ # and the partition will get removed!
+ self.assertFalse(os.access(part_path, os.F_OK))
+
+ def test_process_job_all_success(self):
+ self.reconstructor._reset_stats()
+ with mock_ssync_sender():
+ with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
+ found_jobs = []
+ for part_info in self.reconstructor.collect_parts():
+ jobs = self.reconstructor.build_reconstruction_jobs(
+ part_info)
+ found_jobs.extend(jobs)
+ for job in jobs:
+ self.logger._clear()
+ node_count = len(job['sync_to'])
+ self.reconstructor.process_job(job)
+ if job['job_type'] == object_reconstructor.REVERT:
+ self.assertEqual(0, count_stats(
+ self.logger, 'update_stats', 'suffix.hashes'))
+ else:
+ self.assertStatCount('update_stats',
+ 'suffix.hashes',
+ node_count)
+ self.assertEqual(node_count, count_stats(
+ self.logger, 'update_stats', 'suffix.hashes'))
+ self.assertEqual(node_count, count_stats(
+ self.logger, 'update_stats', 'suffix.syncs'))
+ self.assertFalse('error' in
+ self.logger.all_log_lines())
+ self.assertEqual(self.reconstructor.suffix_sync, 8)
+ self.assertEqual(self.reconstructor.suffix_count, 8)
+ self.assertEqual(len(found_jobs), 6)
+
+ def test_process_job_all_insufficient_storage(self):
+ self.reconstructor._reset_stats()
+ with mock_ssync_sender():
+ with mocked_http_conn(*[507] * 10):
+ found_jobs = []
+ for part_info in self.reconstructor.collect_parts():
+ jobs = self.reconstructor.build_reconstruction_jobs(
+ part_info)
+ found_jobs.extend(jobs)
+ for job in jobs:
+ self.logger._clear()
+ self.reconstructor.process_job(job)
+ for line in self.logger.get_lines_for_level('error'):
+ self.assertTrue('responded as unmounted' in line)
+ self.assertEqual(0, count_stats(
+ self.logger, 'update_stats', 'suffix.hashes'))
+ self.assertEqual(0, count_stats(
+ self.logger, 'update_stats', 'suffix.syncs'))
+ self.assertEqual(self.reconstructor.suffix_sync, 0)
+ self.assertEqual(self.reconstructor.suffix_count, 0)
+ self.assertEqual(len(found_jobs), 6)
+
+ def test_process_job_all_client_error(self):
+ self.reconstructor._reset_stats()
+ with mock_ssync_sender():
+ with mocked_http_conn(*[400] * 10):
+ found_jobs = []
+ for part_info in self.reconstructor.collect_parts():
+ jobs = self.reconstructor.build_reconstruction_jobs(
+ part_info)
+ found_jobs.extend(jobs)
+ for job in jobs:
+ self.logger._clear()
+ self.reconstructor.process_job(job)
+ for line in self.logger.get_lines_for_level('error'):
+ self.assertTrue('Invalid response 400' in line)
+ self.assertEqual(0, count_stats(
+ self.logger, 'update_stats', 'suffix.hashes'))
+ self.assertEqual(0, count_stats(
+ self.logger, 'update_stats', 'suffix.syncs'))
+ self.assertEqual(self.reconstructor.suffix_sync, 0)
+ self.assertEqual(self.reconstructor.suffix_count, 0)
+ self.assertEqual(len(found_jobs), 6)
+
+ def test_process_job_all_timeout(self):
+ self.reconstructor._reset_stats()
+ with mock_ssync_sender():
+ with nested(mocked_http_conn(*[Timeout()] * 10)):
+ found_jobs = []
+ for part_info in self.reconstructor.collect_parts():
+ jobs = self.reconstructor.build_reconstruction_jobs(
+ part_info)
+ found_jobs.extend(jobs)
+ for job in jobs:
+ self.logger._clear()
+ self.reconstructor.process_job(job)
+ for line in self.logger.get_lines_for_level('error'):
+ self.assertTrue('Timeout (Nones)' in line)
+ self.assertStatCount(
+ 'update_stats', 'suffix.hashes', 0)
+ self.assertStatCount(
+ 'update_stats', 'suffix.syncs', 0)
+ self.assertEqual(self.reconstructor.suffix_sync, 0)
+ self.assertEqual(self.reconstructor.suffix_count, 0)
+ self.assertEqual(len(found_jobs), 6)
+
+
+@patch_policies(with_ec_default=True)
+class TestObjectReconstructor(unittest.TestCase):
+
+ def setUp(self):
+ self.policy = POLICIES.default
+ self.testdir = tempfile.mkdtemp()
+ self.devices = os.path.join(self.testdir, 'devices')
+ self.local_dev = self.policy.object_ring.devs[0]
+ self.ip = self.local_dev['replication_ip']
+ self.port = self.local_dev['replication_port']
+ self.conf = {
+ 'devices': self.devices,
+ 'mount_check': False,
+ 'bind_port': self.port,
+ }
+ self.logger = debug_logger('object-reconstructor')
+ self.reconstructor = object_reconstructor.ObjectReconstructor(
+ self.conf, logger=self.logger)
+ self.reconstructor._reset_stats()
+ # some tests bypass build_reconstruction_jobs and go to process_job
+ # directly, so you end up with a /0 when you try to show the
+ # percentage of complete jobs as ratio of the total job count
+ self.reconstructor.job_count = 1
+ self.policy.object_ring.max_more_nodes = \
+ self.policy.object_ring.replicas
+ self.ts_iter = make_timestamp_iter()
+
+ def tearDown(self):
+ self.reconstructor.stats_line()
+ shutil.rmtree(self.testdir)
+
+ def ts(self):
+ return next(self.ts_iter)
+
+ def test_collect_parts_skips_non_ec_policy_and_device(self):
+ stub_parts = (371, 78, 419, 834)
+ for policy in POLICIES:
+ datadir = diskfile.get_data_dir(policy)
+ for part in stub_parts:
+ utils.mkdirs(os.path.join(
+ self.devices, self.local_dev['device'],
+ datadir, str(part)))
+ with mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]):
+ part_infos = list(self.reconstructor.collect_parts())
+ found_parts = sorted(int(p['partition']) for p in part_infos)
+ self.assertEqual(found_parts, sorted(stub_parts))
+ for part_info in part_infos:
+ self.assertEqual(part_info['local_dev'], self.local_dev)
+ self.assertEqual(part_info['policy'], self.policy)
+ self.assertEqual(part_info['part_path'],
+ os.path.join(self.devices,
+ self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(part_info['partition'])))
+
+ def test_collect_parts_multi_device_skips_non_ring_devices(self):
+ device_parts = {
+ 'sda': (374,),
+ 'sdb': (179, 807),
+ 'sdc': (363, 468, 843),
+ }
+ for policy in POLICIES:
+ datadir = diskfile.get_data_dir(policy)
+ for dev, parts in device_parts.items():
+ for part in parts:
+ utils.mkdirs(os.path.join(
+ self.devices, dev,
+ datadir, str(part)))
+
+ # we're only going to add sda and sdc into the ring
+ local_devs = ('sda', 'sdc')
+ stub_ring_devs = [{
+ 'device': dev,
+ 'replication_ip': self.ip,
+ 'replication_port': self.port
+ } for dev in local_devs]
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch.object(self.policy.object_ring, '_devs',
+ new=stub_ring_devs)):
+ part_infos = list(self.reconstructor.collect_parts())
+ found_parts = sorted(int(p['partition']) for p in part_infos)
+ expected_parts = sorted(itertools.chain(
+ *(device_parts[d] for d in local_devs)))
+ self.assertEqual(found_parts, expected_parts)
+ for part_info in part_infos:
+ self.assertEqual(part_info['policy'], self.policy)
+ self.assertTrue(part_info['local_dev'] in stub_ring_devs)
+ dev = part_info['local_dev']
+ self.assertEqual(part_info['part_path'],
+ os.path.join(self.devices,
+ dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(part_info['partition'])))
+
+ def test_collect_parts_mount_check(self):
+ # each device has one part in it
+ local_devs = ('sda', 'sdb')
+ for i, dev in enumerate(local_devs):
+ datadir = diskfile.get_data_dir(self.policy)
+ utils.mkdirs(os.path.join(
+ self.devices, dev, datadir, str(i)))
+ stub_ring_devs = [{
+ 'device': dev,
+ 'replication_ip': self.ip,
+ 'replication_port': self.port
+ } for dev in local_devs]
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch.object(self.policy.object_ring, '_devs',
+ new=stub_ring_devs)):
+ part_infos = list(self.reconstructor.collect_parts())
+ self.assertEqual(2, len(part_infos)) # sanity
+ self.assertEqual(set(int(p['partition']) for p in part_infos),
+ set([0, 1]))
+
+ paths = []
+
+ def fake_ismount(path):
+ paths.append(path)
+ return False
+
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch.object(self.policy.object_ring, '_devs',
+ new=stub_ring_devs),
+ mock.patch('swift.obj.reconstructor.ismount',
+ fake_ismount)):
+ part_infos = list(self.reconstructor.collect_parts())
+ self.assertEqual(2, len(part_infos)) # sanity, same jobs
+ self.assertEqual(set(int(p['partition']) for p in part_infos),
+ set([0, 1]))
+
+ # ... because ismount was not called
+ self.assertEqual(paths, [])
+
+ # ... now with mount check
+ self.reconstructor.mount_check = True
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch.object(self.policy.object_ring, '_devs',
+ new=stub_ring_devs),
+ mock.patch('swift.obj.reconstructor.ismount',
+ fake_ismount)):
+ part_infos = list(self.reconstructor.collect_parts())
+ self.assertEqual([], part_infos) # sanity, no jobs
+
+ # ... because fake_ismount returned False for both paths
+ self.assertEqual(set(paths), set([
+ os.path.join(self.devices, dev) for dev in local_devs]))
+
+ def fake_ismount(path):
+ if path.endswith('sda'):
+ return True
+ else:
+ return False
+
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch.object(self.policy.object_ring, '_devs',
+ new=stub_ring_devs),
+ mock.patch('swift.obj.reconstructor.ismount',
+ fake_ismount)):
+ part_infos = list(self.reconstructor.collect_parts())
+ self.assertEqual(1, len(part_infos)) # only sda picked up (part 0)
+ self.assertEqual(part_infos[0]['partition'], 0)
+
+ def test_collect_parts_cleans_tmp(self):
+ local_devs = ('sda', 'sdc')
+ stub_ring_devs = [{
+ 'device': dev,
+ 'replication_ip': self.ip,
+ 'replication_port': self.port
+ } for dev in local_devs]
+ fake_unlink = mock.MagicMock()
+ self.reconstructor.reclaim_age = 1000
+ now = time.time()
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch('swift.obj.reconstructor.time.time',
+ return_value=now),
+ mock.patch.object(self.policy.object_ring, '_devs',
+ new=stub_ring_devs),
+ mock.patch('swift.obj.reconstructor.unlink_older_than',
+ fake_unlink)):
+ self.assertEqual([], list(self.reconstructor.collect_parts()))
+ # each local device hash unlink_older_than called on it,
+ # with now - self.reclaim_age
+ tmpdir = diskfile.get_tmp_dir(self.policy)
+ expected = now - 1000
+ self.assertEqual(fake_unlink.mock_calls, [
+ mock.call(os.path.join(self.devices, dev, tmpdir), expected)
+ for dev in local_devs])
+
+ def test_collect_parts_creates_datadir(self):
+ # create just the device path
+ dev_path = os.path.join(self.devices, self.local_dev['device'])
+ utils.mkdirs(dev_path)
+ with mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]):
+ self.assertEqual([], list(self.reconstructor.collect_parts()))
+ datadir_path = os.path.join(dev_path,
+ diskfile.get_data_dir(self.policy))
+ self.assertTrue(os.path.exists(datadir_path))
+
+ def test_collect_parts_creates_datadir_error(self):
+ # create just the device path
+ datadir_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy))
+ utils.mkdirs(os.path.dirname(datadir_path))
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch('swift.obj.reconstructor.mkdirs',
+ side_effect=OSError('kaboom!'))):
+ self.assertEqual([], list(self.reconstructor.collect_parts()))
+ error_lines = self.logger.get_lines_for_level('error')
+ self.assertEqual(len(error_lines), 1)
+ line = error_lines[0]
+ self.assertTrue('Unable to create' in line)
+ self.assertTrue(datadir_path in line)
+
+ def test_collect_parts_skips_invalid_paths(self):
+ datadir_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy))
+ utils.mkdirs(os.path.dirname(datadir_path))
+ with open(datadir_path, 'w') as f:
+ f.write('junk')
+ with mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]):
+ self.assertEqual([], list(self.reconstructor.collect_parts()))
+ self.assertTrue(os.path.exists(datadir_path))
+ error_lines = self.logger.get_lines_for_level('error')
+ self.assertEqual(len(error_lines), 1)
+ line = error_lines[0]
+ self.assertTrue('Unable to list partitions' in line)
+ self.assertTrue(datadir_path in line)
+
+ def test_collect_parts_removes_non_partition_files(self):
+ # create some junk next to partitions
+ datadir_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy))
+ num_parts = 3
+ for part in range(num_parts):
+ utils.mkdirs(os.path.join(datadir_path, str(part)))
+ junk_file = os.path.join(datadir_path, 'junk')
+ with open(junk_file, 'w') as f:
+ f.write('junk')
+ with mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]):
+ part_infos = list(self.reconstructor.collect_parts())
+ # the file is not included in the part_infos map
+ self.assertEqual(sorted(p['part_path'] for p in part_infos),
+ sorted([os.path.join(datadir_path, str(i))
+ for i in range(num_parts)]))
+ # and gets cleaned up
+ self.assertFalse(os.path.exists(junk_file))
+
+ def test_collect_parts_overrides(self):
+ # setup multiple devices, with multiple parts
+ device_parts = {
+ 'sda': (374, 843),
+ 'sdb': (179, 807),
+ 'sdc': (363, 468, 843),
+ }
+ datadir = diskfile.get_data_dir(self.policy)
+ for dev, parts in device_parts.items():
+ for part in parts:
+ utils.mkdirs(os.path.join(
+ self.devices, dev,
+ datadir, str(part)))
+
+ # we're only going to add sda and sdc into the ring
+ local_devs = ('sda', 'sdc')
+ stub_ring_devs = [{
+ 'device': dev,
+ 'replication_ip': self.ip,
+ 'replication_port': self.port
+ } for dev in local_devs]
+
+ expected = (
+ ({}, [
+ ('sda', 374),
+ ('sda', 843),
+ ('sdc', 363),
+ ('sdc', 468),
+ ('sdc', 843),
+ ]),
+ ({'override_devices': ['sda', 'sdc']}, [
+ ('sda', 374),
+ ('sda', 843),
+ ('sdc', 363),
+ ('sdc', 468),
+ ('sdc', 843),
+ ]),
+ ({'override_devices': ['sdc']}, [
+ ('sdc', 363),
+ ('sdc', 468),
+ ('sdc', 843),
+ ]),
+ ({'override_devices': ['sda']}, [
+ ('sda', 374),
+ ('sda', 843),
+ ]),
+ ({'override_devices': ['sdx']}, []),
+ ({'override_partitions': [374]}, [
+ ('sda', 374),
+ ]),
+ ({'override_partitions': [843]}, [
+ ('sda', 843),
+ ('sdc', 843),
+ ]),
+ ({'override_partitions': [843], 'override_devices': ['sda']}, [
+ ('sda', 843),
+ ]),
+ )
+ with nested(mock.patch('swift.obj.reconstructor.whataremyips',
+ return_value=[self.ip]),
+ mock.patch.object(self.policy.object_ring, '_devs',
+ new=stub_ring_devs)):
+ for kwargs, expected_parts in expected:
+ part_infos = list(self.reconstructor.collect_parts(**kwargs))
+ expected_paths = set(
+ os.path.join(self.devices, dev, datadir, str(part))
+ for dev, part in expected_parts)
+ found_paths = set(p['part_path'] for p in part_infos)
+ msg = 'expected %r != %r for %r' % (
+ expected_paths, found_paths, kwargs)
+ self.assertEqual(expected_paths, found_paths, msg)
+
+ def test_build_jobs_creates_empty_hashes(self):
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy), '0')
+ utils.mkdirs(part_path)
+ part_info = {
+ 'local_dev': self.local_dev,
+ 'policy': self.policy,
+ 'partition': 0,
+ 'part_path': part_path,
+ }
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertEqual(1, len(jobs))
+ job = jobs[0]
+ self.assertEqual(job['job_type'], object_reconstructor.SYNC)
+ self.assertEqual(job['frag_index'], 0)
+ self.assertEqual(job['suffixes'], [])
+ self.assertEqual(len(job['sync_to']), 2)
+ self.assertEqual(job['partition'], 0)
+ self.assertEqual(job['path'], part_path)
+ self.assertEqual(job['hashes'], {})
+ self.assertEqual(job['policy'], self.policy)
+ self.assertEqual(job['local_dev'], self.local_dev)
+ self.assertEqual(job['device'], self.local_dev['device'])
+ hashes_file = os.path.join(part_path,
+ diskfile.HASH_FILE)
+ self.assertTrue(os.path.exists(hashes_file))
+ suffixes = self.reconstructor._get_hashes(
+ self.policy, part_path, do_listdir=True)
+ self.assertEqual(suffixes, {})
+
+ def test_build_jobs_no_hashes(self):
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy), '0')
+ part_info = {
+ 'local_dev': self.local_dev,
+ 'policy': self.policy,
+ 'partition': 0,
+ 'part_path': part_path,
+ }
+ stub_hashes = {}
+ with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes)):
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertEqual(1, len(jobs))
+ job = jobs[0]
+ self.assertEqual(job['job_type'], object_reconstructor.SYNC)
+ self.assertEqual(job['frag_index'], 0)
+ self.assertEqual(job['suffixes'], [])
+ self.assertEqual(len(job['sync_to']), 2)
+ self.assertEqual(job['partition'], 0)
+ self.assertEqual(job['path'], part_path)
+ self.assertEqual(job['hashes'], {})
+ self.assertEqual(job['policy'], self.policy)
+ self.assertEqual(job['local_dev'], self.local_dev)
+ self.assertEqual(job['device'], self.local_dev['device'])
+
+ def test_build_jobs_primary(self):
+ ring = self.policy.object_ring = FabricatedRing()
+ # find a partition for which we're a primary
+ for partition in range(2 ** ring.part_power):
+ part_nodes = ring.get_part_nodes(partition)
+ try:
+ frag_index = [n['id'] for n in part_nodes].index(
+ self.local_dev['id'])
+ except ValueError:
+ pass
+ else:
+ break
+ else:
+ self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ part_info = {
+ 'local_dev': self.local_dev,
+ 'policy': self.policy,
+ 'partition': partition,
+ 'part_path': part_path,
+ }
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes)):
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertEqual(1, len(jobs))
+ job = jobs[0]
+ self.assertEqual(job['job_type'], object_reconstructor.SYNC)
+ self.assertEqual(job['frag_index'], frag_index)
+ self.assertEqual(job['suffixes'], stub_hashes.keys())
+ self.assertEqual(set([n['index'] for n in job['sync_to']]),
+ set([(frag_index + 1) % ring.replicas,
+ (frag_index - 1) % ring.replicas]))
+ self.assertEqual(job['partition'], partition)
+ self.assertEqual(job['path'], part_path)
+ self.assertEqual(job['hashes'], stub_hashes)
+ self.assertEqual(job['policy'], self.policy)
+ self.assertEqual(job['local_dev'], self.local_dev)
+ self.assertEqual(job['device'], self.local_dev['device'])
+
+ def test_build_jobs_handoff(self):
+ ring = self.policy.object_ring = FabricatedRing()
+ # find a partition for which we're a handoff
+ for partition in range(2 ** ring.part_power):
+ part_nodes = ring.get_part_nodes(partition)
+ if self.local_dev['id'] not in [n['id'] for n in part_nodes]:
+ break
+ else:
+ self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ part_info = {
+ 'local_dev': self.local_dev,
+ 'policy': self.policy,
+ 'partition': partition,
+ 'part_path': part_path,
+ }
+ # since this part doesn't belong on us it doesn't matter what
+ # frag_index we have
+ frag_index = random.randint(0, ring.replicas - 1)
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {None: 'hash'},
+ }
+ with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes)):
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertEqual(1, len(jobs))
+ job = jobs[0]
+ self.assertEqual(job['job_type'], object_reconstructor.REVERT)
+ self.assertEqual(job['frag_index'], frag_index)
+ self.assertEqual(sorted(job['suffixes']), sorted(stub_hashes.keys()))
+ self.assertEqual(len(job['sync_to']), 1)
+ self.assertEqual(job['sync_to'][0]['index'], frag_index)
+ self.assertEqual(job['path'], part_path)
+ self.assertEqual(job['partition'], partition)
+ self.assertEqual(sorted(job['hashes']), sorted(stub_hashes))
+ self.assertEqual(job['local_dev'], self.local_dev)
+
+ def test_build_jobs_mixed(self):
+ ring = self.policy.object_ring = FabricatedRing()
+ # find a partition for which we're a primary
+ for partition in range(2 ** ring.part_power):
+ part_nodes = ring.get_part_nodes(partition)
+ try:
+ frag_index = [n['id'] for n in part_nodes].index(
+ self.local_dev['id'])
+ except ValueError:
+ pass
+ else:
+ break
+ else:
+ self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ part_info = {
+ 'local_dev': self.local_dev,
+ 'policy': self.policy,
+ 'partition': partition,
+ 'part_path': part_path,
+ }
+ other_frag_index = random.choice([f for f in range(ring.replicas)
+ if f != frag_index])
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ '456': {other_frag_index: 'hash', None: 'hash'},
+ 'abc': {None: 'hash'},
+ }
+ with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes)):
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertEqual(2, len(jobs))
+ sync_jobs, revert_jobs = [], []
+ for job in jobs:
+ self.assertEqual(job['partition'], partition)
+ self.assertEqual(job['path'], part_path)
+ self.assertEqual(sorted(job['hashes']), sorted(stub_hashes))
+ self.assertEqual(job['policy'], self.policy)
+ self.assertEqual(job['local_dev'], self.local_dev)
+ self.assertEqual(job['device'], self.local_dev['device'])
+ {
+ object_reconstructor.SYNC: sync_jobs,
+ object_reconstructor.REVERT: revert_jobs,
+ }[job['job_type']].append(job)
+ self.assertEqual(1, len(sync_jobs))
+ job = sync_jobs[0]
+ self.assertEqual(job['frag_index'], frag_index)
+ self.assertEqual(sorted(job['suffixes']), sorted(['123', 'abc']))
+ self.assertEqual(len(job['sync_to']), 2)
+ self.assertEqual(set([n['index'] for n in job['sync_to']]),
+ set([(frag_index + 1) % ring.replicas,
+ (frag_index - 1) % ring.replicas]))
+ self.assertEqual(1, len(revert_jobs))
+ job = revert_jobs[0]
+ self.assertEqual(job['frag_index'], other_frag_index)
+ self.assertEqual(job['suffixes'], ['456'])
+ self.assertEqual(len(job['sync_to']), 1)
+ self.assertEqual(job['sync_to'][0]['index'], other_frag_index)
+
+ def test_build_jobs_revert_only_tombstones(self):
+ ring = self.policy.object_ring = FabricatedRing()
+ # find a partition for which we're a handoff
+ for partition in range(2 ** ring.part_power):
+ part_nodes = ring.get_part_nodes(partition)
+ if self.local_dev['id'] not in [n['id'] for n in part_nodes]:
+ break
+ else:
+ self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ part_info = {
+ 'local_dev': self.local_dev,
+ 'policy': self.policy,
+ 'partition': partition,
+ 'part_path': part_path,
+ }
+ # we have no fragment index to hint the jobs where they belong
+ stub_hashes = {
+ '123': {None: 'hash'},
+ 'abc': {None: 'hash'},
+ }
+ with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes)):
+ jobs = self.reconstructor.build_reconstruction_jobs(part_info)
+ self.assertEqual(len(jobs), 1)
+ job = jobs[0]
+ expected = {
+ 'job_type': object_reconstructor.REVERT,
+ 'frag_index': None,
+ 'suffixes': stub_hashes.keys(),
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ 'device': self.local_dev['device'],
+ }
+ self.assertEqual(ring.replica_count, len(job['sync_to']))
+ for k, v in expected.items():
+ msg = 'expected %s != %s for %s' % (
+ v, job[k], k)
+ self.assertEqual(v, job[k], msg)
+
+ def test_get_suffix_delta(self):
+ # different
+ local_suff = {'123': {None: 'abc', 0: 'def'}}
+ remote_suff = {'456': {None: 'ghi', 0: 'jkl'}}
+ local_index = 0
+ remote_index = 0
+ suffs = self.reconstructor.get_suffix_delta(local_suff,
+ local_index,
+ remote_suff,
+ remote_index)
+ self.assertEqual(suffs, ['123'])
+
+ # now the same
+ remote_suff = {'123': {None: 'abc', 0: 'def'}}
+ suffs = self.reconstructor.get_suffix_delta(local_suff,
+ local_index,
+ remote_suff,
+ remote_index)
+ self.assertEqual(suffs, [])
+
+ # now with a mis-matched None key (missing durable)
+ remote_suff = {'123': {None: 'ghi', 0: 'def'}}
+ suffs = self.reconstructor.get_suffix_delta(local_suff,
+ local_index,
+ remote_suff,
+ remote_index)
+ self.assertEqual(suffs, ['123'])
+
+ # now with bogus local index
+ local_suff = {'123': {None: 'abc', 99: 'def'}}
+ remote_suff = {'456': {None: 'ghi', 0: 'jkl'}}
+ suffs = self.reconstructor.get_suffix_delta(local_suff,
+ local_index,
+ remote_suff,
+ remote_index)
+ self.assertEqual(suffs, ['123'])
+
+ def test_process_job_primary_in_sync(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [n for n in self.policy.object_ring.devs
+ if n != self.local_dev][:2]
+ # setup left and right hashes
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ left_index = sync_to[0]['index'] = (frag_index - 1) % replicas
+ left_hashes = {
+ '123': {left_index: 'hash', None: 'hash'},
+ 'abc': {left_index: 'hash', None: 'hash'},
+ }
+ right_index = sync_to[1]['index'] = (frag_index + 1) % replicas
+ right_hashes = {
+ '123': {right_index: 'hash', None: 'hash'},
+ 'abc': {right_index: 'hash', None: 'hash'},
+ }
+ partition = 0
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.SYNC,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ responses = [(200, pickle.dumps(hashes)) for hashes in (
+ left_hashes, right_hashes)]
+ codes, body_iter = zip(*responses)
+
+ ssync_calls = []
+
+ with nested(
+ mock_ssync_sender(ssync_calls),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
+ self.reconstructor.process_job(job)
+
+ expected_suffix_calls = set([
+ ('10.0.0.1', '/sdb/0'),
+ ('10.0.0.2', '/sdc/0'),
+ ])
+ self.assertEqual(expected_suffix_calls,
+ set((r['ip'], r['path'])
+ for r in request_log.requests))
+
+ self.assertEqual(len(ssync_calls), 0)
+
+ def test_process_job_primary_not_in_sync(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [n for n in self.policy.object_ring.devs
+ if n != self.local_dev][:2]
+ # setup left and right hashes
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ sync_to[0]['index'] = (frag_index - 1) % replicas
+ left_hashes = {}
+ sync_to[1]['index'] = (frag_index + 1) % replicas
+ right_hashes = {}
+
+ partition = 0
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.SYNC,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ responses = [(200, pickle.dumps(hashes)) for hashes in (
+ left_hashes, left_hashes, right_hashes, right_hashes)]
+ codes, body_iter = zip(*responses)
+
+ ssync_calls = []
+ with nested(
+ mock_ssync_sender(ssync_calls),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
+ self.reconstructor.process_job(job)
+
+ expected_suffix_calls = set([
+ ('10.0.0.1', '/sdb/0'),
+ ('10.0.0.1', '/sdb/0/123-abc'),
+ ('10.0.0.2', '/sdc/0'),
+ ('10.0.0.2', '/sdc/0/123-abc'),
+ ])
+ self.assertEqual(expected_suffix_calls,
+ set((r['ip'], r['path'])
+ for r in request_log.requests))
+
+ expected_ssync_calls = sorted([
+ ('10.0.0.1', 0, set(['123', 'abc'])),
+ ('10.0.0.2', 0, set(['123', 'abc'])),
+ ])
+ self.assertEqual(expected_ssync_calls, sorted((
+ c['node']['ip'],
+ c['job']['partition'],
+ set(c['suffixes']),
+ ) for c in ssync_calls))
+
+ def test_process_job_sync_missing_durable(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [n for n in self.policy.object_ring.devs
+ if n != self.local_dev][:2]
+ # setup left and right hashes
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ # left hand side is in sync
+ left_index = sync_to[0]['index'] = (frag_index - 1) % replicas
+ left_hashes = {
+ '123': {left_index: 'hash', None: 'hash'},
+ 'abc': {left_index: 'hash', None: 'hash'},
+ }
+ # right hand side has fragment, but no durable (None key is whack)
+ right_index = sync_to[1]['index'] = (frag_index + 1) % replicas
+ right_hashes = {
+ '123': {right_index: 'hash', None: 'hash'},
+ 'abc': {right_index: 'hash', None: 'different-because-durable'},
+ }
+
+ partition = 0
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.SYNC,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ responses = [(200, pickle.dumps(hashes)) for hashes in (
+ left_hashes, right_hashes, right_hashes)]
+ codes, body_iter = zip(*responses)
+
+ ssync_calls = []
+ with nested(
+ mock_ssync_sender(ssync_calls),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
+ self.reconstructor.process_job(job)
+
+ expected_suffix_calls = set([
+ ('10.0.0.1', '/sdb/0'),
+ ('10.0.0.2', '/sdc/0'),
+ ('10.0.0.2', '/sdc/0/abc'),
+ ])
+ self.assertEqual(expected_suffix_calls,
+ set((r['ip'], r['path'])
+ for r in request_log.requests))
+
+ expected_ssync_calls = sorted([
+ ('10.0.0.2', 0, ['abc']),
+ ])
+ self.assertEqual(expected_ssync_calls, sorted((
+ c['node']['ip'],
+ c['job']['partition'],
+ c['suffixes'],
+ ) for c in ssync_calls))
+
+ def test_process_job_primary_some_in_sync(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [n for n in self.policy.object_ring.devs
+ if n != self.local_dev][:2]
+ # setup left and right hashes
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ left_index = sync_to[0]['index'] = (frag_index - 1) % replicas
+ left_hashes = {
+ '123': {left_index: 'hashX', None: 'hash'},
+ 'abc': {left_index: 'hash', None: 'hash'},
+ }
+ right_index = sync_to[1]['index'] = (frag_index + 1) % replicas
+ right_hashes = {
+ '123': {right_index: 'hash', None: 'hash'},
+ }
+ partition = 0
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.SYNC,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ responses = [(200, pickle.dumps(hashes)) for hashes in (
+ left_hashes, left_hashes, right_hashes, right_hashes)]
+ codes, body_iter = zip(*responses)
+
+ ssync_calls = []
+
+ with nested(
+ mock_ssync_sender(ssync_calls),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
+ self.reconstructor.process_job(job)
+
+ expected_suffix_calls = set([
+ ('10.0.0.1', '/sdb/0'),
+ ('10.0.0.1', '/sdb/0/123'),
+ ('10.0.0.2', '/sdc/0'),
+ ('10.0.0.2', '/sdc/0/abc'),
+ ])
+ self.assertEqual(expected_suffix_calls,
+ set((r['ip'], r['path'])
+ for r in request_log.requests))
+
+ self.assertEqual(len(ssync_calls), 2)
+ self.assertEqual(set(c['node']['index'] for c in ssync_calls),
+ set([left_index, right_index]))
+ for call in ssync_calls:
+ if call['node']['index'] == left_index:
+ self.assertEqual(call['suffixes'], ['123'])
+ elif call['node']['index'] == right_index:
+ self.assertEqual(call['suffixes'], ['abc'])
+ else:
+ self.fail('unexpected call %r' % call)
+
+ def test_process_job_primary_down(self):
+ replicas = self.policy.object_ring.replicas
+ partition = 0
+ frag_index = random.randint(0, replicas - 1)
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+
+ part_nodes = self.policy.object_ring.get_part_nodes(partition)
+ sync_to = part_nodes[:2]
+
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.SYNC,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'device': self.local_dev['device'],
+ 'local_dev': self.local_dev,
+ }
+
+ non_local = {'called': 0}
+
+ def ssync_response_callback(*args):
+ # in this test, ssync fails on the first (primary sync_to) node
+ if non_local['called'] >= 1:
+ return True, {}
+ non_local['called'] += 1
+ return False, {}
+
+ expected_suffix_calls = set()
+ for node in part_nodes[:3]:
+ expected_suffix_calls.update([
+ (node['replication_ip'], '/%s/0' % node['device']),
+ (node['replication_ip'], '/%s/0/123-abc' % node['device']),
+ ])
+
+ ssync_calls = []
+ with nested(
+ mock_ssync_sender(ssync_calls,
+ response_callback=ssync_response_callback),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*[200] * len(expected_suffix_calls),
+ body=pickle.dumps({})) as request_log:
+ self.reconstructor.process_job(job)
+
+ found_suffix_calls = set((r['ip'], r['path'])
+ for r in request_log.requests)
+ self.assertEqual(expected_suffix_calls, found_suffix_calls)
+
+ expected_ssync_calls = sorted([
+ ('10.0.0.0', 0, set(['123', 'abc'])),
+ ('10.0.0.1', 0, set(['123', 'abc'])),
+ ('10.0.0.2', 0, set(['123', 'abc'])),
+ ])
+ found_ssync_calls = sorted((
+ c['node']['ip'],
+ c['job']['partition'],
+ set(c['suffixes']),
+ ) for c in ssync_calls)
+ self.assertEqual(expected_ssync_calls, found_ssync_calls)
+
+ def test_process_job_suffix_call_errors(self):
+ replicas = self.policy.object_ring.replicas
+ partition = 0
+ frag_index = random.randint(0, replicas - 1)
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+
+ part_nodes = self.policy.object_ring.get_part_nodes(partition)
+ sync_to = part_nodes[:2]
+
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.SYNC,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'device': self.local_dev['device'],
+ 'local_dev': self.local_dev,
+ }
+
+ expected_suffix_calls = set((
+ node['replication_ip'], '/%s/0' % node['device']
+ ) for node in part_nodes)
+
+ possible_errors = [404, 507, Timeout(), Exception('kaboom!')]
+ codes = [random.choice(possible_errors)
+ for r in expected_suffix_calls]
+
+ ssync_calls = []
+ with nested(
+ mock_ssync_sender(ssync_calls),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*codes) as request_log:
+ self.reconstructor.process_job(job)
+
+ found_suffix_calls = set((r['ip'], r['path'])
+ for r in request_log.requests)
+ self.assertEqual(expected_suffix_calls, found_suffix_calls)
+
+ self.assertFalse(ssync_calls)
+
+ def test_process_job_handoff(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [random.choice([n for n in self.policy.object_ring.devs
+ if n != self.local_dev])]
+ sync_to[0]['index'] = frag_index
+
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ partition = 0
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.REVERT,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ ssync_calls = []
+ with nested(
+ mock_ssync_sender(ssync_calls),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
+ self.reconstructor.process_job(job)
+
+ expected_suffix_calls = set([
+ (sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']),
+ ])
+ found_suffix_calls = set((r['ip'], r['path'])
+ for r in request_log.requests)
+ self.assertEqual(expected_suffix_calls, found_suffix_calls)
+
+ self.assertEqual(len(ssync_calls), 1)
+ call = ssync_calls[0]
+ self.assertEqual(call['node'], sync_to[0])
+ self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
+
+ def test_process_job_revert_to_handoff(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [random.choice([n for n in self.policy.object_ring.devs
+ if n != self.local_dev])]
+ sync_to[0]['index'] = frag_index
+ partition = 0
+ handoff = next(self.policy.object_ring.get_more_nodes(partition))
+
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.REVERT,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ non_local = {'called': 0}
+
+ def ssync_response_callback(*args):
+ # in this test, ssync fails on the first (primary sync_to) node
+ if non_local['called'] >= 1:
+ return True, {}
+ non_local['called'] += 1
+ return False, {}
+
+ expected_suffix_calls = set([
+ (node['replication_ip'], '/%s/0/123-abc' % node['device'])
+ for node in (sync_to[0], handoff)
+ ])
+
+ ssync_calls = []
+ with nested(
+ mock_ssync_sender(ssync_calls,
+ response_callback=ssync_response_callback),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*[200] * len(expected_suffix_calls),
+ body=pickle.dumps({})) as request_log:
+ self.reconstructor.process_job(job)
+
+ found_suffix_calls = set((r['ip'], r['path'])
+ for r in request_log.requests)
+ self.assertEqual(expected_suffix_calls, found_suffix_calls)
+
+ self.assertEqual(len(ssync_calls), len(expected_suffix_calls))
+ call = ssync_calls[0]
+ self.assertEqual(call['node'], sync_to[0])
+ self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
+
+ def test_process_job_revert_is_handoff(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [random.choice([n for n in self.policy.object_ring.devs
+ if n != self.local_dev])]
+ sync_to[0]['index'] = frag_index
+ partition = 0
+ handoff_nodes = list(self.policy.object_ring.get_more_nodes(partition))
+
+ stub_hashes = {
+ '123': {frag_index: 'hash', None: 'hash'},
+ 'abc': {frag_index: 'hash', None: 'hash'},
+ }
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ job = {
+ 'job_type': object_reconstructor.REVERT,
+ 'frag_index': frag_index,
+ 'suffixes': stub_hashes.keys(),
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': stub_hashes,
+ 'policy': self.policy,
+ 'local_dev': handoff_nodes[-1],
+ }
+
+ def ssync_response_callback(*args):
+ # in this test ssync always fails, until we encounter ourselves in
+ # the list of possible handoff's to sync to
+ return False, {}
+
+ expected_suffix_calls = set([
+ (sync_to[0]['replication_ip'],
+ '/%s/0/123-abc' % sync_to[0]['device'])
+ ] + [
+ (node['replication_ip'], '/%s/0/123-abc' % node['device'])
+ for node in handoff_nodes[:-1]
+ ])
+
+ ssync_calls = []
+ with nested(
+ mock_ssync_sender(ssync_calls,
+ response_callback=ssync_response_callback),
+ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
+ return_value=(None, stub_hashes))):
+ with mocked_http_conn(*[200] * len(expected_suffix_calls),
+ body=pickle.dumps({})) as request_log:
+ self.reconstructor.process_job(job)
+
+ found_suffix_calls = set((r['ip'], r['path'])
+ for r in request_log.requests)
+ self.assertEqual(expected_suffix_calls, found_suffix_calls)
+
+ # this is ssync call to primary (which fails) plus the ssync call to
+ # all of the handoffs (except the last one - which is the local_dev)
+ self.assertEqual(len(ssync_calls), len(handoff_nodes))
+ call = ssync_calls[0]
+ self.assertEqual(call['node'], sync_to[0])
+ self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
+
+ def test_process_job_revert_cleanup(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [random.choice([n for n in self.policy.object_ring.devs
+ if n != self.local_dev])]
+ sync_to[0]['index'] = frag_index
+ partition = 0
+
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ os.makedirs(part_path)
+ df_mgr = self.reconstructor._df_router[self.policy]
+ df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
+ 'c', 'data-obj', policy=self.policy)
+ ts = self.ts()
+ with df.create() as writer:
+ test_data = 'test data'
+ writer.write(test_data)
+ metadata = {
+ 'X-Timestamp': ts.internal,
+ 'Content-Length': len(test_data),
+ 'Etag': md5(test_data).hexdigest(),
+ 'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
+ }
+ writer.put(metadata)
+ writer.commit(ts)
+
+ ohash = os.path.basename(df._datadir)
+ suffix = os.path.basename(os.path.dirname(df._datadir))
+
+ job = {
+ 'job_type': object_reconstructor.REVERT,
+ 'frag_index': frag_index,
+ 'suffixes': [suffix],
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': {},
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ def ssync_response_callback(*args):
+ return True, {ohash: ts}
+
+ ssync_calls = []
+ with mock_ssync_sender(ssync_calls,
+ response_callback=ssync_response_callback):
+ with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
+ self.reconstructor.process_job(job)
+
+ self.assertEqual([
+ (sync_to[0]['replication_ip'], '/%s/0/%s' % (
+ sync_to[0]['device'], suffix)),
+ ], [
+ (r['ip'], r['path']) for r in request_log.requests
+ ])
+ # hashpath is still there, but only the durable remains
+ files = os.listdir(df._datadir)
+ self.assertEqual(1, len(files))
+ self.assertTrue(files[0].endswith('.durable'))
+
+ # and more to the point, the next suffix recalc will clean it up
+ df_mgr = self.reconstructor._df_router[self.policy]
+ df_mgr.get_hashes(self.local_dev['device'], str(partition), [],
+ self.policy)
+ self.assertFalse(os.access(df._datadir, os.F_OK))
+
+ def test_process_job_revert_cleanup_tombstone(self):
+ replicas = self.policy.object_ring.replicas
+ frag_index = random.randint(0, replicas - 1)
+ sync_to = [random.choice([n for n in self.policy.object_ring.devs
+ if n != self.local_dev])]
+ sync_to[0]['index'] = frag_index
+ partition = 0
+
+ part_path = os.path.join(self.devices, self.local_dev['device'],
+ diskfile.get_data_dir(self.policy),
+ str(partition))
+ os.makedirs(part_path)
+ df_mgr = self.reconstructor._df_router[self.policy]
+ df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
+ 'c', 'data-obj', policy=self.policy)
+ ts = self.ts()
+ df.delete(ts)
+
+ ohash = os.path.basename(df._datadir)
+ suffix = os.path.basename(os.path.dirname(df._datadir))
+
+ job = {
+ 'job_type': object_reconstructor.REVERT,
+ 'frag_index': frag_index,
+ 'suffixes': [suffix],
+ 'sync_to': sync_to,
+ 'partition': partition,
+ 'path': part_path,
+ 'hashes': {},
+ 'policy': self.policy,
+ 'local_dev': self.local_dev,
+ }
+
+ def ssync_response_callback(*args):
+ return True, {ohash: ts}
+
+ ssync_calls = []
+ with mock_ssync_sender(ssync_calls,
+ response_callback=ssync_response_callback):
+ with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
+ self.reconstructor.process_job(job)
+
+ self.assertEqual([
+ (sync_to[0]['replication_ip'], '/%s/0/%s' % (
+ sync_to[0]['device'], suffix)),
+ ], [
+ (r['ip'], r['path']) for r in request_log.requests
+ ])
+ # hashpath is still there, but it's empty
+ self.assertEqual([], os.listdir(df._datadir))
+
+ def test_reconstruct_fa_no_errors(self):
+ job = {
+ 'partition': 0,
+ 'policy': self.policy,
+ }
+ part_nodes = self.policy.object_ring.get_part_nodes(0)
+ node = part_nodes[1]
+ metadata = {
+ 'name': '/a/c/o',
+ 'Content-Length': 0,
+ 'ETag': 'etag',
+ }
+
+ test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
+ etag = md5(test_data).hexdigest()
+ ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
+
+ broken_body = ec_archive_bodies.pop(1)
+
+ responses = list((200, body) for body in ec_archive_bodies)
+ headers = {'X-Object-Sysmeta-Ec-Etag': etag}
+ codes, body_iter = zip(*responses)
+ with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
+ df = self.reconstructor.reconstruct_fa(
+ job, node, metadata)
+ fixed_body = ''.join(df.reader())
+ self.assertEqual(len(fixed_body), len(broken_body))
+ self.assertEqual(md5(fixed_body).hexdigest(),
+ md5(broken_body).hexdigest())
+
+ def test_reconstruct_fa_errors_works(self):
+ job = {
+ 'partition': 0,
+ 'policy': self.policy,
+ }
+ part_nodes = self.policy.object_ring.get_part_nodes(0)
+ node = part_nodes[4]
+ metadata = {
+ 'name': '/a/c/o',
+ 'Content-Length': 0,
+ 'ETag': 'etag',
+ }
+
+ test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
+ etag = md5(test_data).hexdigest()
+ ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
+
+ broken_body = ec_archive_bodies.pop(4)
+
+ base_responses = list((200, body) for body in ec_archive_bodies)
+ # since we're already missing a fragment a +2 scheme can only support
+ # one additional failure at a time
+ for error in (Timeout(), 404, Exception('kaboom!')):
+ responses = list(base_responses)
+ error_index = random.randint(0, len(responses) - 1)
+ responses[error_index] = (error, '')
+ headers = {'X-Object-Sysmeta-Ec-Etag': etag}
+ codes, body_iter = zip(*responses)
+ with mocked_http_conn(*codes, body_iter=body_iter,
+ headers=headers):
+ df = self.reconstructor.reconstruct_fa(
+ job, node, dict(metadata))
+ fixed_body = ''.join(df.reader())
+ self.assertEqual(len(fixed_body), len(broken_body))
+ self.assertEqual(md5(fixed_body).hexdigest(),
+ md5(broken_body).hexdigest())
+
+ def test_reconstruct_fa_errors_fails(self):
+ job = {
+ 'partition': 0,
+ 'policy': self.policy,
+ }
+ part_nodes = self.policy.object_ring.get_part_nodes(0)
+ node = part_nodes[1]
+ policy = self.policy
+ metadata = {
+ 'name': '/a/c/o',
+ 'Content-Length': 0,
+ 'ETag': 'etag',
+ }
+
+ possible_errors = [404, Timeout(), Exception('kaboom!')]
+ codes = [random.choice(possible_errors) for i in
+ range(policy.object_ring.replicas - 1)]
+ with mocked_http_conn(*codes):
+ self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
+ job, node, metadata)
+
+ def test_reconstruct_fa_with_mixed_old_etag(self):
+ job = {
+ 'partition': 0,
+ 'policy': self.policy,
+ }
+ part_nodes = self.policy.object_ring.get_part_nodes(0)
+ node = part_nodes[1]
+ metadata = {
+ 'name': '/a/c/o',
+ 'Content-Length': 0,
+ 'ETag': 'etag',
+ }
+
+ test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
+ etag = md5(test_data).hexdigest()
+ ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
+
+ broken_body = ec_archive_bodies.pop(1)
+
+ ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
+ # bad response
+ bad_response = (200, '', {
+ 'X-Object-Sysmeta-Ec-Etag': 'some garbage',
+ 'X-Backend-Timestamp': next(ts).internal,
+ })
+
+ # good responses
+ headers = {
+ 'X-Object-Sysmeta-Ec-Etag': etag,
+ 'X-Backend-Timestamp': next(ts).internal
+ }
+ responses = [(200, body, headers)
+ for body in ec_archive_bodies]
+ # mixed together
+ error_index = random.randint(0, len(responses) - 2)
+ responses[error_index] = bad_response
+ codes, body_iter, headers = zip(*responses)
+ with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
+ df = self.reconstructor.reconstruct_fa(
+ job, node, metadata)
+ fixed_body = ''.join(df.reader())
+ self.assertEqual(len(fixed_body), len(broken_body))
+ self.assertEqual(md5(fixed_body).hexdigest(),
+ md5(broken_body).hexdigest())
+
+ def test_reconstruct_fa_with_mixed_new_etag(self):
+ job = {
+ 'partition': 0,
+ 'policy': self.policy,
+ }
+ part_nodes = self.policy.object_ring.get_part_nodes(0)
+ node = part_nodes[1]
+ metadata = {
+ 'name': '/a/c/o',
+ 'Content-Length': 0,
+ 'ETag': 'etag',
+ }
+
+ test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
+ etag = md5(test_data).hexdigest()
+ ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
+
+ broken_body = ec_archive_bodies.pop(1)
+
+ ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
+ # good responses
+ headers = {
+ 'X-Object-Sysmeta-Ec-Etag': etag,
+ 'X-Backend-Timestamp': next(ts).internal
+ }
+ responses = [(200, body, headers)
+ for body in ec_archive_bodies]
+ codes, body_iter, headers = zip(*responses)
+
+ # sanity check before negative test
+ with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
+ df = self.reconstructor.reconstruct_fa(
+ job, node, dict(metadata))
+ fixed_body = ''.join(df.reader())
+ self.assertEqual(len(fixed_body), len(broken_body))
+ self.assertEqual(md5(fixed_body).hexdigest(),
+ md5(broken_body).hexdigest())
+
+ # one newer etag can spoil the bunch
+ new_response = (200, '', {
+ 'X-Object-Sysmeta-Ec-Etag': 'some garbage',
+ 'X-Backend-Timestamp': next(ts).internal,
+ })
+ new_index = random.randint(0, len(responses) - self.policy.ec_nparity)
+ responses[new_index] = new_response
+ codes, body_iter, headers = zip(*responses)
+ with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
+ self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
+ job, node, dict(metadata))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py
index d662f01eb..f169e52dd 100644
--- a/test/unit/obj/test_replicator.py
+++ b/test/unit/obj/test_replicator.py
@@ -475,8 +475,8 @@ class TestObjectReplicator(unittest.TestCase):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
- f = open(os.path.join(df._datadir,
- normalize_timestamp(time.time()) + '.data'),
+ ts = normalize_timestamp(time.time())
+ f = open(os.path.join(df._datadir, ts + '.data'),
'wb')
f.write('1234567890')
f.close()
@@ -487,7 +487,7 @@ class TestObjectReplicator(unittest.TestCase):
self.assertTrue(os.access(part_path, os.F_OK))
def _fake_ssync(node, job, suffixes, **kwargs):
- return True, set([ohash])
+ return True, {ohash: ts}
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
@@ -707,8 +707,8 @@ class TestObjectReplicator(unittest.TestCase):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
- f = open(os.path.join(df._datadir,
- normalize_timestamp(time.time()) + '.data'),
+ ts = normalize_timestamp(time.time())
+ f = open(os.path.join(df._datadir, ts + '.data'),
'wb')
f.write('0')
f.close()
@@ -723,14 +723,14 @@ class TestObjectReplicator(unittest.TestCase):
def _fake_ssync(node, job, suffixes, **kwargs):
success = True
- ret_val = [whole_path_from]
+ ret_val = {ohash: ts}
if self.call_nums == 2:
# ssync should return (True, []) only when the second
# candidate node has not get the replica yet.
success = False
- ret_val = []
+ ret_val = {}
self.call_nums += 1
- return success, set(ret_val)
+ return success, ret_val
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
@@ -755,10 +755,9 @@ class TestObjectReplicator(unittest.TestCase):
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
+ ts = normalize_timestamp(time.time())
mkdirs(df._datadir)
- f = open(os.path.join(df._datadir,
- normalize_timestamp(time.time()) + '.data'),
- 'wb')
+ f = open(os.path.join(df._datadir, ts + '.data'), 'wb')
f.write('0')
f.close()
ohash = hash_path('a', 'c', 'o')
@@ -771,14 +770,14 @@ class TestObjectReplicator(unittest.TestCase):
def _fake_ssync(node, job, suffixes, **kwags):
success = False
- ret_val = []
+ ret_val = {}
if self.call_nums == 2:
# ssync should return (True, []) only when the second
# candidate node has not get the replica yet.
success = True
- ret_val = [whole_path_from]
+ ret_val = {ohash: ts}
self.call_nums += 1
- return success, set(ret_val)
+ return success, ret_val
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
@@ -805,9 +804,8 @@ class TestObjectReplicator(unittest.TestCase):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
- f = open(os.path.join(df._datadir,
- normalize_timestamp(time.time()) + '.data'),
- 'wb')
+ ts = normalize_timestamp(time.time())
+ f = open(os.path.join(df._datadir, ts + '.data'), 'wb')
f.write('0')
f.close()
ohash = hash_path('a', 'c', 'o')
@@ -818,16 +816,16 @@ class TestObjectReplicator(unittest.TestCase):
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
- in_sync_objs = []
+ in_sync_objs = {}
def _fake_ssync(node, job, suffixes, remote_check_objs=None):
self.call_nums += 1
if remote_check_objs is None:
# sync job
- ret_val = [whole_path_from]
+ ret_val = {ohash: ts}
else:
ret_val = in_sync_objs
- return True, set(ret_val)
+ return True, ret_val
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
@@ -847,9 +845,8 @@ class TestObjectReplicator(unittest.TestCase):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
- f = open(os.path.join(df._datadir,
- normalize_timestamp(time.time()) + '.data'),
- 'wb')
+ ts = normalize_timestamp(time.time())
+ f = open(os.path.join(df._datadir, ts + '.data'), 'wb')
f.write('0')
f.close()
ohash = hash_path('a', 'c', 'o')
@@ -863,14 +860,14 @@ class TestObjectReplicator(unittest.TestCase):
def _fake_ssync(node, job, suffixes, **kwargs):
success = True
- ret_val = [whole_path_from]
+ ret_val = {ohash: ts}
if self.call_nums == 2:
# ssync should return (True, []) only when the second
# candidate node has not get the replica yet.
success = False
- ret_val = []
+ ret_val = {}
self.call_nums += 1
- return success, set(ret_val)
+ return success, ret_val
rmdir_func = os.rmdir
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index cfb9fa281..52a34347a 100755
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -1417,7 +1417,7 @@ class TestObjectController(unittest.TestCase):
resp = server_handler.OPTIONS(req)
self.assertEquals(200, resp.status_int)
for verb in 'OPTIONS GET POST PUT DELETE HEAD REPLICATE \
- REPLICATION'.split():
+ SSYNC'.split():
self.assertTrue(
verb in resp.headers['Allow'].split(', '))
self.assertEquals(len(resp.headers['Allow'].split(', ')), 8)
@@ -4409,9 +4409,9 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 507)
- def test_REPLICATION_can_be_called(self):
+ def test_SSYNC_can_be_called(self):
req = Request.blank('/sda1/p/other/suff',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
headers={})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
@@ -4502,7 +4502,7 @@ class TestObjectController(unittest.TestCase):
def test_list_allowed_methods(self):
# Test list of allowed_methods
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
- repl_methods = ['REPLICATE', 'REPLICATION']
+ repl_methods = ['REPLICATE', 'SSYNC']
for method_name in obj_methods:
method = getattr(self.object_controller, method_name)
self.assertFalse(hasattr(method, 'replication'))
diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py
index 002a08a72..4a030c821 100644
--- a/test/unit/obj/test_ssync_receiver.py
+++ b/test/unit/obj/test_ssync_receiver.py
@@ -93,14 +93,14 @@ class TestReceiver(unittest.TestCase):
lines.append(line)
return lines
- def test_REPLICATION_semaphore_locked(self):
+ def test_SSYNC_semaphore_locked(self):
with mock.patch.object(
self.controller, 'replication_semaphore') as \
mocked_replication_semaphore:
self.controller.logger = mock.MagicMock()
mocked_replication_semaphore.acquire.return_value = False
req = swob.Request.blank(
- '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'})
+ '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -111,13 +111,13 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
- def test_REPLICATION_calls_replication_lock(self):
+ def test_SSYNC_calls_replication_lock(self):
with mock.patch.object(
self.controller._diskfile_router[POLICIES.legacy],
'replication_lock') as mocked_replication_lock:
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
@@ -132,7 +132,7 @@ class TestReceiver(unittest.TestCase):
def test_Receiver_with_default_storage_policy(self):
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
@@ -145,9 +145,12 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(rcvr.policy, POLICIES[0])
def test_Receiver_with_storage_policy_index_header(self):
+ # update router post policy patch
+ self.controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.controller.logger)
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION',
+ environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
@@ -159,6 +162,7 @@ class TestReceiver(unittest.TestCase):
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
+ self.assertEqual(rcvr.frag_index, None)
def test_Receiver_with_bad_storage_policy_index_header(self):
valid_indices = sorted([int(policy) for policy in POLICIES])
@@ -166,6 +170,7 @@ class TestReceiver(unittest.TestCase):
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
+ 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '0',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': bad_index},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
@@ -175,7 +180,29 @@ class TestReceiver(unittest.TestCase):
body_lines = [chunk.strip() for chunk in receiver() if chunk.strip()]
self.assertEqual(body_lines, [":ERROR: 503 'No policy with index 2'"])
- def test_REPLICATION_replication_lock_fail(self):
+ @unit.patch_policies()
+ def test_Receiver_with_frag_index_header(self):
+ # update router post policy patch
+ self.controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.controller.logger)
+ req = swob.Request.blank(
+ '/sda1/1',
+ environ={'REQUEST_METHOD': 'SSYNC',
+ 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7',
+ 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
+ body=':MISSING_CHECK: START\r\n'
+ ':MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n:UPDATES: END\r\n')
+ rcvr = ssync_receiver.Receiver(self.controller, req)
+ body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
+ self.assertEqual(
+ body_lines,
+ [':MISSING_CHECK: START', ':MISSING_CHECK: END',
+ ':UPDATES: START', ':UPDATES: END'])
+ self.assertEqual(rcvr.policy, POLICIES[1])
+ self.assertEqual(rcvr.frag_index, 7)
+
+ def test_SSYNC_replication_lock_fail(self):
def _mock(path):
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
eventlet.sleep(0.05)
@@ -185,7 +212,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
@@ -194,15 +221,15 @@ class TestReceiver(unittest.TestCase):
self.body_lines(resp.body),
[":ERROR: 0 '0.01 seconds: /somewhere/sda1'"])
self.controller.logger.debug.assert_called_once_with(
- 'None/sda1/1 REPLICATION LOCK TIMEOUT: 0.01 seconds: '
+ 'None/sda1/1 SSYNC LOCK TIMEOUT: 0.01 seconds: '
'/somewhere/sda1')
- def test_REPLICATION_initial_path(self):
+ def test_SSYNC_initial_path(self):
with mock.patch.object(
self.controller, 'replication_semaphore') as \
mocked_replication_semaphore:
req = swob.Request.blank(
- '/device', environ={'REQUEST_METHOD': 'REPLICATION'})
+ '/device', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -215,7 +242,7 @@ class TestReceiver(unittest.TestCase):
self.controller, 'replication_semaphore') as \
mocked_replication_semaphore:
req = swob.Request.blank(
- '/device/', environ={'REQUEST_METHOD': 'REPLICATION'})
+ '/device/', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -228,7 +255,7 @@ class TestReceiver(unittest.TestCase):
self.controller, 'replication_semaphore') as \
mocked_replication_semaphore:
req = swob.Request.blank(
- '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'})
+ '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -242,7 +269,7 @@ class TestReceiver(unittest.TestCase):
mocked_replication_semaphore:
req = swob.Request.blank(
'/device/partition/junk',
- environ={'REQUEST_METHOD': 'REPLICATION'})
+ environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -251,7 +278,7 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(mocked_replication_semaphore.acquire.called)
self.assertFalse(mocked_replication_semaphore.release.called)
- def test_REPLICATION_mount_check(self):
+ def test_SSYNC_mount_check(self):
with contextlib.nested(
mock.patch.object(
self.controller, 'replication_semaphore'),
@@ -264,7 +291,7 @@ class TestReceiver(unittest.TestCase):
mocked_mount_check,
mocked_check_mount):
req = swob.Request.blank(
- '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'})
+ '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -284,7 +311,7 @@ class TestReceiver(unittest.TestCase):
mocked_mount_check,
mocked_check_mount):
req = swob.Request.blank(
- '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'})
+ '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -299,7 +326,7 @@ class TestReceiver(unittest.TestCase):
mocked_check_mount.reset_mock()
mocked_check_mount.return_value = True
req = swob.Request.blank(
- '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'})
+ '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -309,7 +336,7 @@ class TestReceiver(unittest.TestCase):
self.controller._diskfile_router[POLICIES.legacy].devices,
'device')
- def test_REPLICATION_Exception(self):
+ def test_SSYNC_Exception(self):
class _Wrapper(StringIO.StringIO):
@@ -326,7 +353,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\nBad content is here')
req.remote_addr = '1.2.3.4'
@@ -344,7 +371,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger.exception.assert_called_once_with(
'1.2.3.4/device/partition EXCEPTION in replication.Receiver')
- def test_REPLICATION_Exception_Exception(self):
+ def test_SSYNC_Exception_Exception(self):
class _Wrapper(StringIO.StringIO):
@@ -361,7 +388,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\nBad content is here')
req.remote_addr = mock.MagicMock()
@@ -404,7 +431,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n'
'hash ts\r\n'
':MISSING_CHECK: END\r\n'
@@ -446,7 +473,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n'
'hash ts\r\n'
':MISSING_CHECK: END\r\n'
@@ -468,7 +495,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
@@ -486,7 +513,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + self.ts1 + '\r\n' +
self.hash2 + ' ' + self.ts2 + '\r\n'
@@ -504,6 +531,32 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
+ def test_MISSING_CHECK_extra_line_parts(self):
+ # check that rx tolerates extra parts in missing check lines to
+ # allow for protocol upgrades
+ extra_1 = 'extra'
+ extra_2 = 'multiple extra parts'
+ self.controller.logger = mock.MagicMock()
+ req = swob.Request.blank(
+ '/sda1/1',
+ environ={'REQUEST_METHOD': 'SSYNC'},
+ body=':MISSING_CHECK: START\r\n' +
+ self.hash1 + ' ' + self.ts1 + ' ' + extra_1 + '\r\n' +
+ self.hash2 + ' ' + self.ts2 + ' ' + extra_2 + '\r\n'
+ ':MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n:UPDATES: END\r\n')
+ resp = req.get_response(self.controller)
+ self.assertEqual(
+ self.body_lines(resp.body),
+ [':MISSING_CHECK: START',
+ self.hash1,
+ self.hash2,
+ ':MISSING_CHECK: END',
+ ':UPDATES: START', ':UPDATES: END'])
+ self.assertEqual(resp.status_int, 200)
+ self.assertFalse(self.controller.logger.error.called)
+ self.assertFalse(self.controller.logger.exception.called)
+
def test_MISSING_CHECK_have_one_exact(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
@@ -519,7 +572,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + self.ts1 + '\r\n' +
self.hash2 + ' ' + self.ts2 + '\r\n'
@@ -537,6 +590,9 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(self.controller.logger.exception.called)
def test_MISSING_CHECK_storage_policy(self):
+ # update router post policy patch
+ self.controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.controller.logger)
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[1])),
@@ -551,7 +607,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION',
+ environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + self.ts1 + '\r\n' +
@@ -586,7 +642,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + self.ts1 + '\r\n' +
self.hash2 + ' ' + self.ts2 + '\r\n'
@@ -620,7 +676,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n' +
self.hash1 + ' ' + self.ts1 + '\r\n' +
self.hash2 + ' ' + self.ts2 + '\r\n'
@@ -662,7 +718,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -709,7 +765,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -752,7 +808,7 @@ class TestReceiver(unittest.TestCase):
mock_shutdown_safe, mock_delete):
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -774,7 +830,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'bad_subrequest_line\r\n')
@@ -793,7 +849,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -813,7 +869,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n')
@@ -830,7 +886,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -847,7 +903,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -866,7 +922,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'PUT /a/c/o\r\n'
@@ -884,7 +940,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -902,7 +958,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'PUT /a/c/o\r\n\r\n')
@@ -919,7 +975,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'PUT /a/c/o\r\n'
@@ -949,7 +1005,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n\r\n'
@@ -972,7 +1028,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n\r\n'
@@ -998,7 +1054,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n\r\n'
@@ -1026,7 +1082,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n\r\n'
@@ -1059,7 +1115,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'PUT /a/c/o\r\n'
@@ -1096,6 +1152,9 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.read_body, '1')
def test_UPDATES_with_storage_policy(self):
+ # update router post policy patch
+ self.controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.controller.logger)
_PUT_request = [None]
@server.public
@@ -1108,7 +1167,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION',
+ environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
@@ -1157,7 +1216,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'DELETE /a/c/o\r\n'
@@ -1192,7 +1251,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'BONK /a/c/o\r\n'
@@ -1228,7 +1287,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'PUT /a/c/o1\r\n'
@@ -1339,7 +1398,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(_requests, [])
def test_UPDATES_subreq_does_not_read_all(self):
- # This tests that if a REPLICATION subrequest fails and doesn't read
+ # This tests that if a SSYNC subrequest fails and doesn't read
# all the subrequest body that it will read and throw away the rest of
# the body before moving on to the next subrequest.
# If you comment out the part in ssync_receiver where it does:
@@ -1368,7 +1427,7 @@ class TestReceiver(unittest.TestCase):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
- environ={'REQUEST_METHOD': 'REPLICATION'},
+ environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\n'
'PUT /a/c/o1\r\n'
diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py
index 70590ce33..42bd610eb 100644
--- a/test/unit/obj/test_ssync_sender.py
+++ b/test/unit/obj/test_ssync_sender.py
@@ -22,18 +22,23 @@ import time
import unittest
import eventlet
+import itertools
import mock
from swift.common import exceptions, utils
from swift.common.storage_policy import POLICIES
-from swift.obj import ssync_sender, diskfile
+from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
+ DiskFileDeleted
+from swift.common.swob import Request
+from swift.common.utils import Timestamp, FileLikeIter
+from swift.obj import ssync_sender, diskfile, server, ssync_receiver
+from swift.obj.reconstructor import RebuildingECDiskFileStream
from test.unit import debug_logger, patch_policies
class FakeReplicator(object):
-
- def __init__(self, testdir):
+ def __init__(self, testdir, policy=None):
self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
@@ -44,7 +49,9 @@ class FakeReplicator(object):
'devices': testdir,
'mount_check': 'false',
}
- self._diskfile_mgr = diskfile.DiskFileManager(conf, self.logger)
+ policy = POLICIES.default if policy is None else policy
+ self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
+ self._diskfile_mgr = self._diskfile_router[policy]
class NullBufferedHTTPConnection(object):
@@ -91,42 +98,49 @@ class FakeConnection(object):
self.closed = True
-@patch_policies()
-class TestSender(unittest.TestCase):
-
+class BaseTestSender(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.testdir, 'dev'))
- self.replicator = FakeReplicator(self.testdir)
- self.sender = ssync_sender.Sender(self.replicator, None, None, None)
+ self.daemon = FakeReplicator(self.testdir)
+ self.sender = ssync_sender.Sender(self.daemon, None, None, None)
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)
def _make_open_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body='test',
- extra_metadata=None, policy=None):
+ extra_metadata=None, policy=None,
+ frag_index=None, timestamp=None, df_mgr=None):
policy = policy or POLICIES.legacy
object_parts = account, container, obj
- req_timestamp = utils.normalize_timestamp(time.time())
- df = self.sender.daemon._diskfile_mgr.get_diskfile(
- device, partition, *object_parts, policy=policy)
+ timestamp = Timestamp(time.time()) if timestamp is None else timestamp
+ if df_mgr is None:
+ df_mgr = self.daemon._diskfile_router[policy]
+ df = df_mgr.get_diskfile(
+ device, partition, *object_parts, policy=policy,
+ frag_index=frag_index)
content_length = len(body)
etag = hashlib.md5(body).hexdigest()
with df.create() as writer:
writer.write(body)
metadata = {
- 'X-Timestamp': req_timestamp,
- 'Content-Length': content_length,
+ 'X-Timestamp': timestamp.internal,
+ 'Content-Length': str(content_length),
'ETag': etag,
}
if extra_metadata:
metadata.update(extra_metadata)
writer.put(metadata)
+ writer.commit(timestamp)
df.open()
return df
+
+@patch_policies()
+class TestSender(BaseTestSender):
+
def test_call_catches_MessageTimeout(self):
def connect(self):
@@ -139,12 +153,12 @@ class TestSender(unittest.TestCase):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9', policy=POLICIES.legacy)
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect',
error_lines[0])
@@ -158,12 +172,12 @@ class TestSender(unittest.TestCase):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9', policy=POLICIES.legacy)
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertEqual('1.2.3.4:5678/sda1/9 test connect',
error_lines[0])
@@ -172,26 +186,26 @@ class TestSender(unittest.TestCase):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9', policy=POLICIES.legacy)
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:'))
def test_call_catches_exception_handling_exception(self):
job = node = None # Will cause inside exception handler to fail
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'EXCEPTION in replication.Sender'))
@@ -204,7 +218,7 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEquals(candidates, set())
+ self.assertEquals(candidates, {})
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
@@ -219,7 +233,7 @@ class TestSender(unittest.TestCase):
self.sender.failures = 1
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
+ self.assertEquals(candidates, {})
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
@@ -229,7 +243,7 @@ class TestSender(unittest.TestCase):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1', index=0)
job = dict(partition='9', policy=POLICIES[1])
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
@@ -242,11 +256,12 @@ class TestSender(unittest.TestCase):
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
expectations = {
'putrequest': [
- mock.call('REPLICATION', '/sda1/9'),
+ mock.call('SSYNC', '/sda1/9'),
],
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
+ mock.call('X-Backend-Ssync-Frag-Index', 0),
],
'endheaders': [mock.call()],
}
@@ -257,6 +272,76 @@ class TestSender(unittest.TestCase):
method_name, mock_method.mock_calls,
expected_calls))
+ def test_call(self):
+ def patch_sender(sender):
+ sender.connect = mock.MagicMock()
+ sender.missing_check = mock.MagicMock()
+ sender.updates = mock.MagicMock()
+ sender.disconnect = mock.MagicMock()
+
+ node = dict(replication_ip='1.2.3.4', replication_port=5678,
+ device='sda1')
+ job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ available_map = dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000'),
+ ('9d41d8cd98f00b204e9800998ecf0def',
+ '1380144472.22222'),
+ ('9d41d8cd98f00b204e9800998ecf1def',
+ '1380144474.44444')])
+
+ # no suffixes -> no work done
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, [], remote_check_objs=None)
+ patch_sender(sender)
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ self.assertEqual({}, candidates)
+
+ # all objs in sync
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, ['ignored'], remote_check_objs=None)
+ patch_sender(sender)
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ self.assertEqual(available_map, candidates)
+
+ # one obj not in sync, sync'ing faked, all objs should be in return set
+ wanted = '9d41d8cd98f00b204e9800998ecf0def'
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, ['ignored'],
+ remote_check_objs=None)
+ patch_sender(sender)
+ sender.send_list = [wanted]
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ self.assertEqual(available_map, candidates)
+
+ # one obj not in sync, remote check only so that obj is not sync'd
+ # and should not be in the return set
+ wanted = '9d41d8cd98f00b204e9800998ecf0def'
+ remote_check_objs = set(available_map.keys())
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, ['ignored'],
+ remote_check_objs=remote_check_objs)
+ patch_sender(sender)
+ sender.send_list = [wanted]
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ expected_map = dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000'),
+ ('9d41d8cd98f00b204e9800998ecf1def',
+ '1380144474.44444')])
+ self.assertEqual(expected_map, candidates)
+
def test_call_and_missing_check(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
@@ -275,6 +360,7 @@ class TestSender(unittest.TestCase):
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
+ 'frag_index': 0,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
@@ -288,7 +374,8 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list(self):
@@ -307,8 +394,9 @@ class TestSender(unittest.TestCase):
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
+ 'frag_index': 0,
}
- self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
+ self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
@@ -321,7 +409,8 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list_but_required(self):
@@ -340,8 +429,9 @@ class TestSender(unittest.TestCase):
'device': 'dev',
'partition': '9',
'policy': POLICIES.legacy,
+ 'frag_index': 0,
}
- self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
+ self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
@@ -355,14 +445,14 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, set())
+ self.assertEqual(candidates, {})
def test_connect_send_timeout(self):
- self.replicator.conn_timeout = 0.01
+ self.daemon.conn_timeout = 0.01
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
job = dict(partition='9', policy=POLICIES.legacy)
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
def putrequest(*args, **kwargs):
@@ -373,18 +463,18 @@ class TestSender(unittest.TestCase):
'putrequest', putrequest):
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 0.01 seconds: connect send'))
def test_connect_receive_timeout(self):
- self.replicator.node_timeout = 0.02
+ self.daemon.node_timeout = 0.02
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1', index=0)
job = dict(partition='9', policy=POLICIES.legacy)
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
class FakeBufferedHTTPConnection(NullBufferedHTTPConnection):
@@ -397,18 +487,18 @@ class TestSender(unittest.TestCase):
FakeBufferedHTTPConnection):
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 0.02 seconds: connect receive'))
def test_connect_bad_status(self):
- self.replicator.node_timeout = 0.02
+ self.daemon.node_timeout = 0.02
node = dict(replication_ip='1.2.3.4', replication_port=5678,
- device='sda1')
+ device='sda1', index=0)
job = dict(partition='9', policy=POLICIES.legacy)
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
class FakeBufferedHTTPConnection(NullBufferedHTTPConnection):
@@ -422,8 +512,8 @@ class TestSender(unittest.TestCase):
FakeBufferedHTTPConnection):
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 Expected status 200; got 503'))
@@ -434,7 +524,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.response_buffer, 'Okay.')
def test_readline_buffer_exceeds_network_chunk_size_somehow(self):
- self.replicator.network_chunk_size = 2
+ self.daemon.network_chunk_size = 2
self.sender.response_buffer = '1234567890'
self.assertEqual(self.sender.readline(), '1234567890')
self.assertEqual(self.sender.response_buffer, '')
@@ -514,7 +604,7 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
- self.assertEqual(self.sender.available_set, set())
+ self.assertEqual(self.sender.available_map, {})
def test_missing_check_has_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -562,10 +652,10 @@ class TestSender(unittest.TestCase):
'33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
- candidates = ['9d41d8cd98f00b204e9800998ecf0abc',
- '9d41d8cd98f00b204e9800998ecf0def',
- '9d41d8cd98f00b204e9800998ecf1def']
- self.assertEqual(self.sender.available_set, set(candidates))
+ candidates = [('9d41d8cd98f00b204e9800998ecf0abc', '1380144470.00000'),
+ ('9d41d8cd98f00b204e9800998ecf0def', '1380144472.22222'),
+ ('9d41d8cd98f00b204e9800998ecf1def', '1380144474.44444')]
+ self.assertEqual(self.sender.available_map, dict(candidates))
def test_missing_check_far_end_disconnect(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -602,8 +692,9 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -641,8 +732,9 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -679,8 +771,9 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_missing_check_send_list(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -717,8 +810,45 @@ class TestSender(unittest.TestCase):
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, ['0123abc'])
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
+
+ def test_missing_check_extra_line_parts(self):
+ # check that sender tolerates extra parts in missing check
+ # line responses to allow for protocol upgrades
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device == 'dev' and partition == '9' and
+ policy == POLICIES.legacy and
+ suffixes == ['abc']):
+ yield (
+ '/srv/node/dev/objects/9/abc/'
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')
+ else:
+ raise Exception(
+ 'No match for %r %r %r %r' % (device, partition,
+ policy, suffixes))
+
+ self.sender.connection = FakeConnection()
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
+ self.sender.suffixes = ['abc']
+ self.sender.response = FakeResponse(
+ chunk_body=(
+ ':MISSING_CHECK: START\r\n'
+ '0123abc extra response parts\r\n'
+ ':MISSING_CHECK: END\r\n'))
+ self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
+ self.sender.missing_check()
+ self.assertEqual(self.sender.send_list, ['0123abc'])
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_updates_timeout(self):
self.sender.connection = FakeConnection()
@@ -790,6 +920,7 @@ class TestSender(unittest.TestCase):
'device': device,
'partition': part,
'policy': POLICIES.legacy,
+ 'frag_index': 0,
}
self.sender.node = {}
self.sender.send_list = [object_hash]
@@ -823,6 +954,7 @@ class TestSender(unittest.TestCase):
'device': device,
'partition': part,
'policy': POLICIES.legacy,
+ 'frag_index': 0,
}
self.sender.node = {}
self.sender.send_list = [object_hash]
@@ -853,6 +985,7 @@ class TestSender(unittest.TestCase):
'device': device,
'partition': part,
'policy': POLICIES.legacy,
+ 'frag_index': 0,
}
self.sender.node = {}
self.sender.send_list = [object_hash]
@@ -1112,5 +1245,466 @@ class TestSender(unittest.TestCase):
self.assertTrue(self.sender.connection.closed)
+@patch_policies(with_ec_default=True)
+class TestSsync(BaseTestSender):
+ """
+ Test interactions between sender and receiver. The basis for each test is
+ actual diskfile state on either side - the connection between sender and
+ receiver is faked. Assertions are made about the final state of the sender
+ and receiver diskfiles.
+ """
+
+ def make_fake_ssync_connect(self, sender, rx_obj_controller, device,
+ partition, policy):
+ trace = []
+
+ def add_trace(type, msg):
+ # record a protocol event for later analysis
+ if msg.strip():
+ trace.append((type, msg.strip()))
+
+ def start_response(status, headers, exc_info=None):
+ assert(status == '200 OK')
+
+ class FakeConnection:
+ def __init__(self, trace):
+ self.trace = trace
+ self.queue = []
+ self.src = FileLikeIter(self.queue)
+
+ def send(self, msg):
+ msg = msg.split('\r\n', 1)[1]
+ msg = msg.rsplit('\r\n', 1)[0]
+ add_trace('tx', msg)
+ self.queue.append(msg)
+
+ def close(self):
+ pass
+
+ def wrap_gen(gen):
+ # Strip response head and tail
+ while True:
+ try:
+ msg = gen.next()
+ if msg:
+ add_trace('rx', msg)
+ msg = '%x\r\n%s\r\n' % (len(msg), msg)
+ yield msg
+ except StopIteration:
+ break
+
+ def fake_connect():
+ sender.connection = FakeConnection(trace)
+ headers = {'Transfer-Encoding': 'chunked',
+ 'X-Backend-Storage-Policy-Index': str(int(policy))}
+ env = {'REQUEST_METHOD': 'SSYNC'}
+ path = '/%s/%s' % (device, partition)
+ req = Request.blank(path, environ=env, headers=headers)
+ req.environ['wsgi.input'] = sender.connection.src
+ resp = rx_obj_controller(req.environ, start_response)
+ wrapped_gen = wrap_gen(resp)
+ sender.response = FileLikeIter(wrapped_gen)
+ sender.response.fp = sender.response
+ return fake_connect
+
+ def setUp(self):
+ self.device = 'dev'
+ self.partition = '9'
+ self.tmpdir = tempfile.mkdtemp()
+ # sender side setup
+ self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
+ utils.mkdirs(os.path.join(self.tx_testdir, self.device))
+ self.daemon = FakeReplicator(self.tx_testdir)
+
+ # rx side setup
+ self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
+ utils.mkdirs(os.path.join(self.rx_testdir, self.device))
+ conf = {
+ 'devices': self.rx_testdir,
+ 'mount_check': 'false',
+ 'replication_one_per_device': 'false',
+ 'log_requests': 'false'}
+ self.rx_controller = server.ObjectController(conf)
+ self.orig_ensure_flush = ssync_receiver.Receiver._ensure_flush
+ ssync_receiver.Receiver._ensure_flush = lambda *args: ''
+ self.ts_iter = (Timestamp(t)
+ for t in itertools.count(int(time.time())))
+
+ def tearDown(self):
+ if self.orig_ensure_flush:
+ ssync_receiver.Receiver._ensure_flush = self.orig_ensure_flush
+ shutil.rmtree(self.tmpdir, ignore_errors=True)
+
+ def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
+ frag_indexes=None):
+ frag_indexes = [] if frag_indexes is None else frag_indexes
+ metadata = {'Content-Type': 'plain/text'}
+ diskfiles = []
+ for frag_index in frag_indexes:
+ object_data = '/a/c/%s___%s' % (obj_name, frag_index)
+ if frag_index is not None:
+ metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
+ df = self._make_open_diskfile(
+ device=self.device, partition=self.partition, account='a',
+ container='c', obj=obj_name, body=object_data,
+ extra_metadata=metadata, timestamp=timestamp, policy=policy,
+ frag_index=frag_index, df_mgr=df_mgr)
+ # sanity checks
+ listing = os.listdir(df._datadir)
+ self.assertTrue(listing)
+ for filename in listing:
+ self.assertTrue(filename.startswith(timestamp.internal))
+ diskfiles.append(df)
+ return diskfiles
+
+ def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
+ df_mgr = self.daemon._diskfile_router[policy]
+ df = df_mgr.get_diskfile(
+ self.device, self.partition, account='a', container='c',
+ obj=obj_name, policy=policy, frag_index=frag_index)
+ df.open()
+ return df
+
+ def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
+ df = self.rx_controller.get_diskfile(
+ self.device, self.partition, 'a', 'c', obj_name, policy=policy,
+ frag_index=frag_index)
+ df.open()
+ return df
+
+ def _verify_diskfile_sync(self, tx_df, rx_df, frag_index):
+ # verify that diskfiles' metadata match
+ # sanity check, they are not the same ondisk files!
+ self.assertNotEqual(tx_df._datadir, rx_df._datadir)
+ rx_metadata = dict(rx_df.get_metadata())
+ for k, v in tx_df.get_metadata().iteritems():
+ self.assertEqual(v, rx_metadata.pop(k))
+ # ugh, ssync duplicates ETag with Etag so have to clear it out here
+ if 'Etag' in rx_metadata:
+ rx_metadata.pop('Etag')
+ self.assertFalse(rx_metadata)
+ if frag_index:
+ rx_metadata = rx_df.get_metadata()
+ fi_key = 'X-Object-Sysmeta-Ec-Frag-Index'
+ self.assertTrue(fi_key in rx_metadata)
+ self.assertEqual(frag_index, int(rx_metadata[fi_key]))
+
+ def _analyze_trace(self, trace):
+ """
+ Parse protocol trace captured by fake connection, making some
+ assertions along the way, and return results as a dict of form:
+ results = {'tx_missing': <list of messages>,
+ 'rx_missing': <list of messages>,
+ 'tx_updates': <list of subreqs>,
+ 'rx_updates': <list of messages>}
+
+ Each subreq is a dict with keys: 'method', 'path', 'headers', 'body'
+ """
+ def tx_missing(results, line):
+ self.assertEqual('tx', line[0])
+ results['tx_missing'].append(line[1])
+
+ def rx_missing(results, line):
+ self.assertEqual('rx', line[0])
+ parts = line[1].split('\r\n')
+ for part in parts:
+ results['rx_missing'].append(part)
+
+ def tx_updates(results, line):
+ self.assertEqual('tx', line[0])
+ subrequests = results['tx_updates']
+ if line[1].startswith(('PUT', 'DELETE')):
+ parts = line[1].split('\r\n')
+ method, path = parts[0].split()
+ subreq = {'method': method, 'path': path, 'req': line[1],
+ 'headers': parts[1:]}
+ subrequests.append(subreq)
+ else:
+ self.assertTrue(subrequests)
+ body = (subrequests[-1]).setdefault('body', '')
+ body += line[1]
+ subrequests[-1]['body'] = body
+
+ def rx_updates(results, line):
+ self.assertEqual('rx', line[0])
+ results.setdefault['rx_updates'].append(line[1])
+
+ def unexpected(results, line):
+ results.setdefault('unexpected', []).append(line)
+
+ # each trace line is a tuple of ([tx|rx], msg)
+ handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing),
+ (('tx', ':MISSING_CHECK: END'), unexpected),
+ (('rx', ':MISSING_CHECK: START'), rx_missing),
+ (('rx', ':MISSING_CHECK: END'), unexpected),
+ (('tx', ':UPDATES: START'), tx_updates),
+ (('tx', ':UPDATES: END'), unexpected),
+ (('rx', ':UPDATES: START'), rx_updates),
+ (('rx', ':UPDATES: END'), unexpected)])
+ expect_handshake = handshakes.next()
+ phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
+ results = dict((k, []) for k in phases)
+ handler = unexpected
+ lines = list(trace)
+ lines.reverse()
+ while lines:
+ line = lines.pop()
+ if line == expect_handshake[0]:
+ handler = expect_handshake[1]
+ try:
+ expect_handshake = handshakes.next()
+ except StopIteration:
+ # should be the last line
+ self.assertFalse(
+ lines, 'Unexpected trailing lines %s' % lines)
+ continue
+ handler(results, line)
+
+ try:
+ # check all handshakes occurred
+ missed = handshakes.next()
+ self.fail('Handshake %s not found' % str(missed[0]))
+ except StopIteration:
+ pass
+ # check no message outside of a phase
+ self.assertFalse(results.get('unexpected'),
+ 'Message outside of a phase: %s' % results.get(None))
+ return results
+
+ def _verify_ondisk_files(self, tx_objs, policy, rx_node_index):
+ # verify tx and rx files that should be in sync
+ for o_name, diskfiles in tx_objs.iteritems():
+ for tx_df in diskfiles:
+ frag_index = tx_df._frag_index
+ if frag_index == rx_node_index:
+ # this frag_index should have been sync'd,
+ # check rx file is ok
+ rx_df = self._open_rx_diskfile(o_name, policy, frag_index)
+ self._verify_diskfile_sync(tx_df, rx_df, frag_index)
+ expected_body = '/a/c/%s___%s' % (o_name, rx_node_index)
+ actual_body = ''.join([chunk for chunk in rx_df.reader()])
+ self.assertEqual(expected_body, actual_body)
+ else:
+ # this frag_index should not have been sync'd,
+ # check no rx file,
+ self.assertRaises(DiskFileNotExist,
+ self._open_rx_diskfile,
+ o_name, policy, frag_index=frag_index)
+ # check tx file still intact - ssync does not do any cleanup!
+ self._open_tx_diskfile(o_name, policy, frag_index)
+
+ def _verify_tombstones(self, tx_objs, policy):
+ # verify tx and rx tombstones that should be in sync
+ for o_name, diskfiles in tx_objs.iteritems():
+ for tx_df_ in diskfiles:
+ try:
+ self._open_tx_diskfile(o_name, policy)
+ self.fail('DiskFileDeleted expected')
+ except DiskFileDeleted as exc:
+ tx_delete_time = exc.timestamp
+ try:
+ self._open_rx_diskfile(o_name, policy)
+ self.fail('DiskFileDeleted expected')
+ except DiskFileDeleted as exc:
+ rx_delete_time = exc.timestamp
+ self.assertEqual(tx_delete_time, rx_delete_time)
+
+ def test_handoff_fragment_revert(self):
+ # test that a sync_revert type job does send the correct frag archives
+ # to the receiver, and that those frag archives are then removed from
+ # local node.
+ policy = POLICIES.default
+ rx_node_index = 0
+ tx_node_index = 1
+ frag_index = rx_node_index
+
+ # create sender side diskfiles...
+ tx_objs = {}
+ rx_objs = {}
+ tx_tombstones = {}
+ tx_df_mgr = self.daemon._diskfile_router[policy]
+ rx_df_mgr = self.rx_controller._diskfile_router[policy]
+ # o1 has primary and handoff fragment archives
+ t1 = self.ts_iter.next()
+ tx_objs['o1'] = self._create_ondisk_files(
+ tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index))
+ # o2 only has primary
+ t2 = self.ts_iter.next()
+ tx_objs['o2'] = self._create_ondisk_files(
+ tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
+ # o3 only has handoff
+ t3 = self.ts_iter.next()
+ tx_objs['o3'] = self._create_ondisk_files(
+ tx_df_mgr, 'o3', policy, t3, (rx_node_index,))
+ # o4 primary and handoff fragment archives on tx, handoff in sync on rx
+ t4 = self.ts_iter.next()
+ tx_objs['o4'] = self._create_ondisk_files(
+ tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,))
+ rx_objs['o4'] = self._create_ondisk_files(
+ rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
+ # o5 is a tombstone, missing on receiver
+ t5 = self.ts_iter.next()
+ tx_tombstones['o5'] = self._create_ondisk_files(
+ tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
+ tx_tombstones['o5'][0].delete(t5)
+
+ suffixes = set()
+ for diskfiles in (tx_objs.values() + tx_tombstones.values()):
+ for df in diskfiles:
+ suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
+
+ # create ssync sender instance...
+ job = {'device': self.device,
+ 'partition': self.partition,
+ 'policy': policy,
+ 'frag_index': frag_index,
+ 'purge': True}
+ node = {'index': rx_node_index}
+ self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
+ # fake connection from tx to rx...
+ self.sender.connect = self.make_fake_ssync_connect(
+ self.sender, self.rx_controller, self.device, self.partition,
+ policy)
+
+ # run the sync protocol...
+ self.sender()
+
+ # verify protocol
+ results = self._analyze_trace(self.sender.connection.trace)
+ # sender has handoff frags for o1, o3 and o4 and ts for o5
+ self.assertEqual(4, len(results['tx_missing']))
+ # receiver is missing frags for o1, o3 and ts for o5
+ self.assertEqual(3, len(results['rx_missing']))
+ self.assertEqual(3, len(results['tx_updates']))
+ self.assertFalse(results['rx_updates'])
+ sync_paths = []
+ for subreq in results.get('tx_updates'):
+ if subreq.get('method') == 'PUT':
+ self.assertTrue(
+ 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
+ in subreq.get('headers'))
+ expected_body = '%s___%s' % (subreq['path'], rx_node_index)
+ self.assertEqual(expected_body, subreq['body'])
+ elif subreq.get('method') == 'DELETE':
+ self.assertEqual('/a/c/o5', subreq['path'])
+ sync_paths.append(subreq.get('path'))
+ self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
+
+ # verify on disk files...
+ self._verify_ondisk_files(tx_objs, policy, rx_node_index)
+ self._verify_tombstones(tx_tombstones, policy)
+
+ def test_fragment_sync(self):
+ # check that a sync_only type job does call reconstructor to build a
+ # diskfile to send, and continues making progress despite an error
+ # when building one diskfile
+ policy = POLICIES.default
+ rx_node_index = 0
+ tx_node_index = 1
+ # for a sync job we iterate over frag index that belongs on local node
+ frag_index = tx_node_index
+
+ # create sender side diskfiles...
+ tx_objs = {}
+ tx_tombstones = {}
+ rx_objs = {}
+ tx_df_mgr = self.daemon._diskfile_router[policy]
+ rx_df_mgr = self.rx_controller._diskfile_router[policy]
+ # o1 only has primary
+ t1 = self.ts_iter.next()
+ tx_objs['o1'] = self._create_ondisk_files(
+ tx_df_mgr, 'o1', policy, t1, (tx_node_index,))
+ # o2 only has primary
+ t2 = self.ts_iter.next()
+ tx_objs['o2'] = self._create_ondisk_files(
+ tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
+ # o3 only has primary
+ t3 = self.ts_iter.next()
+ tx_objs['o3'] = self._create_ondisk_files(
+ tx_df_mgr, 'o3', policy, t3, (tx_node_index,))
+ # o4 primary fragment archives on tx, handoff in sync on rx
+ t4 = self.ts_iter.next()
+ tx_objs['o4'] = self._create_ondisk_files(
+ tx_df_mgr, 'o4', policy, t4, (tx_node_index,))
+ rx_objs['o4'] = self._create_ondisk_files(
+ rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
+ # o5 is a tombstone, missing on receiver
+ t5 = self.ts_iter.next()
+ tx_tombstones['o5'] = self._create_ondisk_files(
+ tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
+ tx_tombstones['o5'][0].delete(t5)
+
+ suffixes = set()
+ for diskfiles in (tx_objs.values() + tx_tombstones.values()):
+ for df in diskfiles:
+ suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
+
+ reconstruct_fa_calls = []
+
+ def fake_reconstruct_fa(job, node, metadata):
+ reconstruct_fa_calls.append((job, node, policy, metadata))
+ if len(reconstruct_fa_calls) == 2:
+ # simulate second reconstruct failing
+ raise DiskFileError
+ content = '%s___%s' % (metadata['name'], rx_node_index)
+ return RebuildingECDiskFileStream(
+ metadata, rx_node_index, iter([content]))
+
+ # create ssync sender instance...
+ job = {'device': self.device,
+ 'partition': self.partition,
+ 'policy': policy,
+ 'frag_index': frag_index,
+ 'sync_diskfile_builder': fake_reconstruct_fa}
+ node = {'index': rx_node_index}
+ self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
+
+ # fake connection from tx to rx...
+ self.sender.connect = self.make_fake_ssync_connect(
+ self.sender, self.rx_controller, self.device, self.partition,
+ policy)
+
+ # run the sync protocol...
+ self.sender()
+
+ # verify protocol
+ results = self._analyze_trace(self.sender.connection.trace)
+ # sender has primary for o1, o2 and o3, o4 and ts for o5
+ self.assertEqual(5, len(results['tx_missing']))
+ # receiver is missing o1, o2 and o3 and ts for o5
+ self.assertEqual(4, len(results['rx_missing']))
+ # sender can only construct 2 out of 3 missing frags
+ self.assertEqual(3, len(results['tx_updates']))
+ self.assertEqual(3, len(reconstruct_fa_calls))
+ self.assertFalse(results['rx_updates'])
+ actual_sync_paths = []
+ for subreq in results.get('tx_updates'):
+ if subreq.get('method') == 'PUT':
+ self.assertTrue(
+ 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
+ in subreq.get('headers'))
+ expected_body = '%s___%s' % (subreq['path'], rx_node_index)
+ self.assertEqual(expected_body, subreq['body'])
+ elif subreq.get('method') == 'DELETE':
+ self.assertEqual('/a/c/o5', subreq['path'])
+ actual_sync_paths.append(subreq.get('path'))
+
+ # remove the failed df from expected synced df's
+ expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5']
+ failed_path = reconstruct_fa_calls[1][3]['name']
+ expect_sync_paths.remove(failed_path)
+ failed_obj = None
+ for obj, diskfiles in tx_objs.iteritems():
+ if diskfiles[0]._name == failed_path:
+ failed_obj = obj
+ # sanity check
+ self.assertTrue(tx_objs.pop(failed_obj))
+
+ # verify on disk files...
+ self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
+ self._verify_ondisk_files(tx_objs, policy, rx_node_index)
+ self._verify_tombstones(tx_tombstones, policy)
+
+
if __name__ == '__main__':
unittest.main()