diff options
author | John Dickinson <me@not.mn> | 2015-04-14 08:10:41 -0700 |
---|---|---|
committer | John Dickinson <me@not.mn> | 2015-04-14 08:57:15 -0700 |
commit | e910f7e07d05dd2c6ada939d5704c3d4944c24b0 (patch) | |
tree | 250b33237ccbc07bf4ef5895439a0e8cac2fec11 /test/unit/obj/test_ssync_sender.py | |
parent | dd9d97458ea007024220a78dba8dd663e8b425d7 (diff) | |
parent | 8f5d4d24557887b4691fc219cefbc30e478bf7ed (diff) | |
download | swift-e910f7e07d05dd2c6ada939d5704c3d4944c24b0.tar.gz |
Merge EC feature into master
Co-Authored-By: Alistair Coles <alistair.coles@hp.com>
Co-Authored-By: Thiago da Silva <thiago@redhat.com>
Co-Authored-By: John Dickinson <me@not.mn>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Tushar Gohad <tushar.gohad@intel.com>
Co-Authored-By: Paul Luse <paul.e.luse@intel.com>
Co-Authored-By: Samuel Merritt <sam@swiftstack.com>
Co-Authored-By: Christian Schwede <christian.schwede@enovance.com>
Co-Authored-By: Yuan Zhou <yuan.zhou@intel.com>
Change-Id: I002787f558781bd4d884129b127bc9f108ea9ec4
Diffstat (limited to 'test/unit/obj/test_ssync_sender.py')
-rw-r--r-- | test/unit/obj/test_ssync_sender.py | 930 |
1 files changed, 791 insertions, 139 deletions
diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 87efd64cc..42bd610eb 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -22,18 +22,24 @@ import time import unittest import eventlet +import itertools import mock from swift.common import exceptions, utils -from swift.obj import ssync_sender, diskfile +from swift.common.storage_policy import POLICIES +from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ + DiskFileDeleted +from swift.common.swob import Request +from swift.common.utils import Timestamp, FileLikeIter +from swift.obj import ssync_sender, diskfile, server, ssync_receiver +from swift.obj.reconstructor import RebuildingECDiskFileStream -from test.unit import DebugLogger, patch_policies +from test.unit import debug_logger, patch_policies class FakeReplicator(object): - - def __init__(self, testdir): - self.logger = mock.MagicMock() + def __init__(self, testdir, policy=None): + self.logger = debug_logger('test-ssync-sender') self.conn_timeout = 1 self.node_timeout = 2 self.http_timeout = 3 @@ -43,7 +49,9 @@ class FakeReplicator(object): 'devices': testdir, 'mount_check': 'false', } - self._diskfile_mgr = diskfile.DiskFileManager(conf, DebugLogger()) + policy = POLICIES.default if policy is None else policy + self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger) + self._diskfile_mgr = self._diskfile_router[policy] class NullBufferedHTTPConnection(object): @@ -90,39 +98,49 @@ class FakeConnection(object): self.closed = True -class TestSender(unittest.TestCase): - +class BaseTestSender(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') - self.replicator = FakeReplicator(self.testdir) - self.sender = ssync_sender.Sender(self.replicator, None, None, None) + utils.mkdirs(os.path.join(self.testdir, 'dev')) + self.daemon = FakeReplicator(self.testdir) + self.sender = ssync_sender.Sender(self.daemon, None, None, None) def tearDown(self): - shutil.rmtree(self.tmpdir, ignore_errors=1) + shutil.rmtree(self.tmpdir, ignore_errors=True) def _make_open_diskfile(self, device='dev', partition='9', account='a', container='c', obj='o', body='test', - extra_metadata=None, policy_idx=0): + extra_metadata=None, policy=None, + frag_index=None, timestamp=None, df_mgr=None): + policy = policy or POLICIES.legacy object_parts = account, container, obj - req_timestamp = utils.normalize_timestamp(time.time()) - df = self.sender.daemon._diskfile_mgr.get_diskfile( - device, partition, *object_parts, policy_idx=policy_idx) + timestamp = Timestamp(time.time()) if timestamp is None else timestamp + if df_mgr is None: + df_mgr = self.daemon._diskfile_router[policy] + df = df_mgr.get_diskfile( + device, partition, *object_parts, policy=policy, + frag_index=frag_index) content_length = len(body) etag = hashlib.md5(body).hexdigest() with df.create() as writer: writer.write(body) metadata = { - 'X-Timestamp': req_timestamp, - 'Content-Length': content_length, + 'X-Timestamp': timestamp.internal, + 'Content-Length': str(content_length), 'ETag': etag, } if extra_metadata: metadata.update(extra_metadata) writer.put(metadata) + writer.commit(timestamp) df.open() return df + +@patch_policies() +class TestSender(BaseTestSender): + def test_call_catches_MessageTimeout(self): def connect(self): @@ -134,16 +152,16 @@ class TestSender(unittest.TestCase): with mock.patch.object(ssync_sender.Sender, 'connect', connect): node = dict(replication_ip='1.2.3.4', replication_port=5678, device='sda1') - job = dict(partition='9') - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + job = dict(partition='9', policy=POLICIES.legacy) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), '1 second: test connect') + self.assertEquals(candidates, {}) + error_lines = self.daemon.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_lines)) + self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect', + error_lines[0]) def test_call_catches_ReplicationException(self): @@ -153,45 +171,44 @@ class TestSender(unittest.TestCase): with mock.patch.object(ssync_sender.Sender, 'connect', connect): node = dict(replication_ip='1.2.3.4', replication_port=5678, device='sda1') - job = dict(partition='9') - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + job = dict(partition='9', policy=POLICIES.legacy) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), 'test connect') + self.assertEquals(candidates, {}) + error_lines = self.daemon.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_lines)) + self.assertEqual('1.2.3.4:5678/sda1/9 test connect', + error_lines[0]) def test_call_catches_other_exceptions(self): node = dict(replication_ip='1.2.3.4', replication_port=5678, device='sda1') - job = dict(partition='9') - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + job = dict(partition='9', policy=POLICIES.legacy) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] self.sender.connect = 'cause exception' success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) - call = self.replicator.logger.exception.mock_calls[0] - self.assertEqual( - call[1], - ('%s:%s/%s/%s EXCEPTION in replication.Sender', '1.2.3.4', 5678, - 'sda1', '9')) + self.assertEquals(candidates, {}) + error_lines = self.daemon.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:')) def test_call_catches_exception_handling_exception(self): - node = dict(replication_ip='1.2.3.4', replication_port=5678, - device='sda1') - job = None # Will cause inside exception handler to fail - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + job = node = None # Will cause inside exception handler to fail + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] self.sender.connect = 'cause exception' success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) - self.replicator.logger.exception.assert_called_once_with( - 'EXCEPTION in replication.Sender') + self.assertEquals(candidates, {}) + error_lines = self.daemon.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + 'EXCEPTION in replication.Sender')) def test_call_calls_others(self): self.sender.suffixes = ['abc'] @@ -201,7 +218,7 @@ class TestSender(unittest.TestCase): self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() self.assertTrue(success) - self.assertEquals(candidates, set()) + self.assertEquals(candidates, {}) self.sender.connect.assert_called_once_with() self.sender.missing_check.assert_called_once_with() self.sender.updates.assert_called_once_with() @@ -216,18 +233,17 @@ class TestSender(unittest.TestCase): self.sender.failures = 1 success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) + self.assertEquals(candidates, {}) self.sender.connect.assert_called_once_with() self.sender.missing_check.assert_called_once_with() self.sender.updates.assert_called_once_with() self.sender.disconnect.assert_called_once_with() - @patch_policies def test_connect(self): node = dict(replication_ip='1.2.3.4', replication_port=5678, - device='sda1') - job = dict(partition='9', policy_idx=1) - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + device='sda1', index=0) + job = dict(partition='9', policy=POLICIES[1]) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] with mock.patch( 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' @@ -240,11 +256,12 @@ class TestSender(unittest.TestCase): mock_conn_class.assert_called_once_with('1.2.3.4:5678') expectations = { 'putrequest': [ - mock.call('REPLICATION', '/sda1/9'), + mock.call('SSYNC', '/sda1/9'), ], 'putheader': [ mock.call('Transfer-Encoding', 'chunked'), mock.call('X-Backend-Storage-Policy-Index', 1), + mock.call('X-Backend-Ssync-Frag-Index', 0), ], 'endheaders': [mock.call()], } @@ -255,10 +272,80 @@ class TestSender(unittest.TestCase): method_name, mock_method.mock_calls, expected_calls)) + def test_call(self): + def patch_sender(sender): + sender.connect = mock.MagicMock() + sender.missing_check = mock.MagicMock() + sender.updates = mock.MagicMock() + sender.disconnect = mock.MagicMock() + + node = dict(replication_ip='1.2.3.4', replication_port=5678, + device='sda1') + job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + 'frag_index': 0, + } + available_map = dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000'), + ('9d41d8cd98f00b204e9800998ecf0def', + '1380144472.22222'), + ('9d41d8cd98f00b204e9800998ecf1def', + '1380144474.44444')]) + + # no suffixes -> no work done + sender = ssync_sender.Sender( + self.daemon, node, job, [], remote_check_objs=None) + patch_sender(sender) + sender.available_map = available_map + success, candidates = sender() + self.assertTrue(success) + self.assertEqual({}, candidates) + + # all objs in sync + sender = ssync_sender.Sender( + self.daemon, node, job, ['ignored'], remote_check_objs=None) + patch_sender(sender) + sender.available_map = available_map + success, candidates = sender() + self.assertTrue(success) + self.assertEqual(available_map, candidates) + + # one obj not in sync, sync'ing faked, all objs should be in return set + wanted = '9d41d8cd98f00b204e9800998ecf0def' + sender = ssync_sender.Sender( + self.daemon, node, job, ['ignored'], + remote_check_objs=None) + patch_sender(sender) + sender.send_list = [wanted] + sender.available_map = available_map + success, candidates = sender() + self.assertTrue(success) + self.assertEqual(available_map, candidates) + + # one obj not in sync, remote check only so that obj is not sync'd + # and should not be in the return set + wanted = '9d41d8cd98f00b204e9800998ecf0def' + remote_check_objs = set(available_map.keys()) + sender = ssync_sender.Sender( + self.daemon, node, job, ['ignored'], + remote_check_objs=remote_check_objs) + patch_sender(sender) + sender.send_list = [wanted] + sender.available_map = available_map + success, candidates = sender() + self.assertTrue(success) + expected_map = dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000'), + ('9d41d8cd98f00b204e9800998ecf1def', + '1380144474.44444')]) + self.assertEqual(expected_map, candidates) + def test_call_and_missing_check(self): - def yield_hashes(device, partition, policy_index, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ - and policy_index == 0: + and policy == POLICIES.legacy: yield ( '/srv/node/dev/objects/9/abc/' '9d41d8cd98f00b204e9800998ecf0abc', @@ -269,7 +356,12 @@ class TestSender(unittest.TestCase): 'No match for %r %r %r' % (device, partition, suffixes)) self.sender.connection = FakeConnection() - self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + 'frag_index': 0, + } self.sender.suffixes = ['abc'] self.sender.response = FakeResponse( chunk_body=( @@ -282,13 +374,14 @@ class TestSender(unittest.TestCase): self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() self.assertTrue(success) - self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000')])) self.assertEqual(self.sender.failures, 0) def test_call_and_missing_check_with_obj_list(self): - def yield_hashes(device, partition, policy_index, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ - and policy_index == 0: + and policy == POLICIES.legacy: yield ( '/srv/node/dev/objects/9/abc/' '9d41d8cd98f00b204e9800998ecf0abc', @@ -297,8 +390,13 @@ class TestSender(unittest.TestCase): else: raise Exception( 'No match for %r %r %r' % (device, partition, suffixes)) - job = {'device': 'dev', 'partition': '9'} - self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'], + job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + 'frag_index': 0, + } + self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'], ['9d41d8cd98f00b204e9800998ecf0abc']) self.sender.connection = FakeConnection() self.sender.response = FakeResponse( @@ -311,13 +409,14 @@ class TestSender(unittest.TestCase): self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() self.assertTrue(success) - self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000')])) self.assertEqual(self.sender.failures, 0) def test_call_and_missing_check_with_obj_list_but_required(self): - def yield_hashes(device, partition, policy_index, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ - and policy_index == 0: + and policy == POLICIES.legacy: yield ( '/srv/node/dev/objects/9/abc/' '9d41d8cd98f00b204e9800998ecf0abc', @@ -326,8 +425,13 @@ class TestSender(unittest.TestCase): else: raise Exception( 'No match for %r %r %r' % (device, partition, suffixes)) - job = {'device': 'dev', 'partition': '9'} - self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'], + job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + 'frag_index': 0, + } + self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'], ['9d41d8cd98f00b204e9800998ecf0abc']) self.sender.connection = FakeConnection() self.sender.response = FakeResponse( @@ -341,14 +445,14 @@ class TestSender(unittest.TestCase): self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() self.assertTrue(success) - self.assertEqual(candidates, set()) + self.assertEqual(candidates, {}) def test_connect_send_timeout(self): - self.replicator.conn_timeout = 0.01 + self.daemon.conn_timeout = 0.01 node = dict(replication_ip='1.2.3.4', replication_port=5678, device='sda1') - job = dict(partition='9') - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + job = dict(partition='9', policy=POLICIES.legacy) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] def putrequest(*args, **kwargs): @@ -359,18 +463,18 @@ class TestSender(unittest.TestCase): 'putrequest', putrequest): success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), '0.01 seconds: connect send') + self.assertEquals(candidates, {}) + error_lines = self.daemon.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 0.01 seconds: connect send')) def test_connect_receive_timeout(self): - self.replicator.node_timeout = 0.02 + self.daemon.node_timeout = 0.02 node = dict(replication_ip='1.2.3.4', replication_port=5678, - device='sda1') - job = dict(partition='9') - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + device='sda1', index=0) + job = dict(partition='9', policy=POLICIES.legacy) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] class FakeBufferedHTTPConnection(NullBufferedHTTPConnection): @@ -383,18 +487,18 @@ class TestSender(unittest.TestCase): FakeBufferedHTTPConnection): success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), '0.02 seconds: connect receive') + self.assertEquals(candidates, {}) + error_lines = self.daemon.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 0.02 seconds: connect receive')) def test_connect_bad_status(self): - self.replicator.node_timeout = 0.02 + self.daemon.node_timeout = 0.02 node = dict(replication_ip='1.2.3.4', replication_port=5678, - device='sda1') - job = dict(partition='9') - self.sender = ssync_sender.Sender(self.replicator, node, job, None) + device='sda1', index=0) + job = dict(partition='9', policy=POLICIES.legacy) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] class FakeBufferedHTTPConnection(NullBufferedHTTPConnection): @@ -408,11 +512,11 @@ class TestSender(unittest.TestCase): FakeBufferedHTTPConnection): success, candidates = self.sender() self.assertFalse(success) - self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), 'Expected status 200; got 503') + self.assertEquals(candidates, {}) + error_lines = self.daemon.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 Expected status 200; got 503')) def test_readline_newline_in_buffer(self): self.sender.response_buffer = 'Has a newline already.\r\nOkay.' @@ -420,7 +524,7 @@ class TestSender(unittest.TestCase): self.assertEqual(self.sender.response_buffer, 'Okay.') def test_readline_buffer_exceeds_network_chunk_size_somehow(self): - self.replicator.network_chunk_size = 2 + self.daemon.network_chunk_size = 2 self.sender.response_buffer = '1234567890' self.assertEqual(self.sender.readline(), '1234567890') self.assertEqual(self.sender.response_buffer, '') @@ -473,16 +577,21 @@ class TestSender(unittest.TestCase): self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check) def test_missing_check_has_empty_suffixes(self): - def yield_hashes(device, partition, policy_idx, suffixes=None): - if (device != 'dev' or partition != '9' or policy_idx != 0 or + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device != 'dev' or partition != '9' or + policy != POLICIES.legacy or suffixes != ['abc', 'def']): yield # Just here to make this a generator raise Exception( 'No match for %r %r %r %r' % (device, partition, - policy_idx, suffixes)) + policy, suffixes)) self.sender.connection = FakeConnection() - self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } self.sender.suffixes = ['abc', 'def'] self.sender.response = FakeResponse( chunk_body=( @@ -495,11 +604,12 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') self.assertEqual(self.sender.send_list, []) - self.assertEqual(self.sender.available_set, set()) + self.assertEqual(self.sender.available_map, {}) def test_missing_check_has_suffixes(self): - def yield_hashes(device, partition, policy_idx, suffixes=None): - if (device == 'dev' and partition == '9' and policy_idx == 0 and + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and suffixes == ['abc', 'def']): yield ( '/srv/node/dev/objects/9/abc/' @@ -519,10 +629,14 @@ class TestSender(unittest.TestCase): else: raise Exception( 'No match for %r %r %r %r' % (device, partition, - policy_idx, suffixes)) + policy, suffixes)) self.sender.connection = FakeConnection() - self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } self.sender.suffixes = ['abc', 'def'] self.sender.response = FakeResponse( chunk_body=( @@ -538,14 +652,15 @@ class TestSender(unittest.TestCase): '33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') self.assertEqual(self.sender.send_list, []) - candidates = ['9d41d8cd98f00b204e9800998ecf0abc', - '9d41d8cd98f00b204e9800998ecf0def', - '9d41d8cd98f00b204e9800998ecf1def'] - self.assertEqual(self.sender.available_set, set(candidates)) + candidates = [('9d41d8cd98f00b204e9800998ecf0abc', '1380144470.00000'), + ('9d41d8cd98f00b204e9800998ecf0def', '1380144472.22222'), + ('9d41d8cd98f00b204e9800998ecf1def', '1380144474.44444')] + self.assertEqual(self.sender.available_map, dict(candidates)) def test_missing_check_far_end_disconnect(self): - def yield_hashes(device, partition, policy_idx, suffixes=None): - if (device == 'dev' and partition == '9' and policy_idx == 0 and + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and suffixes == ['abc']): yield ( '/srv/node/dev/objects/9/abc/' @@ -555,10 +670,14 @@ class TestSender(unittest.TestCase): else: raise Exception( 'No match for %r %r %r %r' % (device, partition, - policy_idx, suffixes)) + policy, suffixes)) self.sender.connection = FakeConnection() - self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } self.sender.suffixes = ['abc'] self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse(chunk_body='\r\n') @@ -573,12 +692,14 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.available_set, - set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(self.sender.available_map, + dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000')])) def test_missing_check_far_end_disconnect2(self): - def yield_hashes(device, partition, policy_idx, suffixes=None): - if (device == 'dev' and partition == '9' and policy_idx == 0 and + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and suffixes == ['abc']): yield ( '/srv/node/dev/objects/9/abc/' @@ -588,10 +709,14 @@ class TestSender(unittest.TestCase): else: raise Exception( 'No match for %r %r %r %r' % (device, partition, - policy_idx, suffixes)) + policy, suffixes)) self.sender.connection = FakeConnection() - self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } self.sender.suffixes = ['abc'] self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse( @@ -607,12 +732,14 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.available_set, - set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(self.sender.available_map, + dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000')])) def test_missing_check_far_end_unexpected(self): - def yield_hashes(device, partition, policy_idx, suffixes=None): - if (device == 'dev' and partition == '9' and policy_idx == 0 and + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and suffixes == ['abc']): yield ( '/srv/node/dev/objects/9/abc/' @@ -622,10 +749,14 @@ class TestSender(unittest.TestCase): else: raise Exception( 'No match for %r %r %r %r' % (device, partition, - policy_idx, suffixes)) + policy, suffixes)) self.sender.connection = FakeConnection() - self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } self.sender.suffixes = ['abc'] self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.response = FakeResponse(chunk_body='OH HAI\r\n') @@ -640,12 +771,14 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') - self.assertEqual(self.sender.available_set, - set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(self.sender.available_map, + dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000')])) def test_missing_check_send_list(self): - def yield_hashes(device, partition, policy_idx, suffixes=None): - if (device == 'dev' and partition == '9' and policy_idx == 0 and + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and suffixes == ['abc']): yield ( '/srv/node/dev/objects/9/abc/' @@ -655,10 +788,14 @@ class TestSender(unittest.TestCase): else: raise Exception( 'No match for %r %r %r %r' % (device, partition, - policy_idx, suffixes)) + policy, suffixes)) self.sender.connection = FakeConnection() - self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } self.sender.suffixes = ['abc'] self.sender.response = FakeResponse( chunk_body=( @@ -673,8 +810,45 @@ class TestSender(unittest.TestCase): '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') self.assertEqual(self.sender.send_list, ['0123abc']) - self.assertEqual(self.sender.available_set, - set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(self.sender.available_map, + dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000')])) + + def test_missing_check_extra_line_parts(self): + # check that sender tolerates extra parts in missing check + # line responses to allow for protocol upgrades + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and + suffixes == ['abc']): + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r %r' % (device, partition, + policy, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } + self.sender.suffixes = ['abc'] + self.sender.response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + '0123abc extra response parts\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.missing_check() + self.assertEqual(self.sender.send_list, ['0123abc']) + self.assertEqual(self.sender.available_map, + dict([('9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000')])) def test_updates_timeout(self): self.sender.connection = FakeConnection() @@ -742,7 +916,12 @@ class TestSender(unittest.TestCase): delete_timestamp = utils.normalize_timestamp(time.time()) df.delete(delete_timestamp) self.sender.connection = FakeConnection() - self.sender.job = {'device': device, 'partition': part} + self.sender.job = { + 'device': device, + 'partition': part, + 'policy': POLICIES.legacy, + 'frag_index': 0, + } self.sender.node = {} self.sender.send_list = [object_hash] self.sender.send_delete = mock.MagicMock() @@ -771,7 +950,12 @@ class TestSender(unittest.TestCase): delete_timestamp = utils.normalize_timestamp(time.time()) df.delete(delete_timestamp) self.sender.connection = FakeConnection() - self.sender.job = {'device': device, 'partition': part} + self.sender.job = { + 'device': device, + 'partition': part, + 'policy': POLICIES.legacy, + 'frag_index': 0, + } self.sender.node = {} self.sender.send_list = [object_hash] self.sender.response = FakeResponse( @@ -797,7 +981,12 @@ class TestSender(unittest.TestCase): object_hash = utils.hash_path(*object_parts) expected = df.get_metadata() self.sender.connection = FakeConnection() - self.sender.job = {'device': device, 'partition': part} + self.sender.job = { + 'device': device, + 'partition': part, + 'policy': POLICIES.legacy, + 'frag_index': 0, + } self.sender.node = {} self.sender.send_list = [object_hash] self.sender.send_delete = mock.MagicMock() @@ -821,18 +1010,20 @@ class TestSender(unittest.TestCase): '11\r\n:UPDATES: START\r\n\r\n' 'f\r\n:UPDATES: END\r\n\r\n') - @patch_policies def test_updates_storage_policy_index(self): device = 'dev' part = '9' object_parts = ('a', 'c', 'o') df = self._make_open_diskfile(device, part, *object_parts, - policy_idx=1) + policy=POLICIES[0]) object_hash = utils.hash_path(*object_parts) expected = df.get_metadata() self.sender.connection = FakeConnection() - self.sender.job = {'device': device, 'partition': part, - 'policy_idx': 1} + self.sender.job = { + 'device': device, + 'partition': part, + 'policy': POLICIES[0], + 'frag_index': 0} self.sender.node = {} self.sender.send_list = [object_hash] self.sender.send_delete = mock.MagicMock() @@ -847,7 +1038,7 @@ class TestSender(unittest.TestCase): self.assertEqual(path, '/a/c/o') self.assert_(isinstance(df, diskfile.DiskFile)) self.assertEqual(expected, df.get_metadata()) - self.assertEqual(os.path.join(self.testdir, 'dev/objects-1/9/', + self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/', object_hash[-3:], object_hash), df._datadir) @@ -1054,5 +1245,466 @@ class TestSender(unittest.TestCase): self.assertTrue(self.sender.connection.closed) +@patch_policies(with_ec_default=True) +class TestSsync(BaseTestSender): + """ + Test interactions between sender and receiver. The basis for each test is + actual diskfile state on either side - the connection between sender and + receiver is faked. Assertions are made about the final state of the sender + and receiver diskfiles. + """ + + def make_fake_ssync_connect(self, sender, rx_obj_controller, device, + partition, policy): + trace = [] + + def add_trace(type, msg): + # record a protocol event for later analysis + if msg.strip(): + trace.append((type, msg.strip())) + + def start_response(status, headers, exc_info=None): + assert(status == '200 OK') + + class FakeConnection: + def __init__(self, trace): + self.trace = trace + self.queue = [] + self.src = FileLikeIter(self.queue) + + def send(self, msg): + msg = msg.split('\r\n', 1)[1] + msg = msg.rsplit('\r\n', 1)[0] + add_trace('tx', msg) + self.queue.append(msg) + + def close(self): + pass + + def wrap_gen(gen): + # Strip response head and tail + while True: + try: + msg = gen.next() + if msg: + add_trace('rx', msg) + msg = '%x\r\n%s\r\n' % (len(msg), msg) + yield msg + except StopIteration: + break + + def fake_connect(): + sender.connection = FakeConnection(trace) + headers = {'Transfer-Encoding': 'chunked', + 'X-Backend-Storage-Policy-Index': str(int(policy))} + env = {'REQUEST_METHOD': 'SSYNC'} + path = '/%s/%s' % (device, partition) + req = Request.blank(path, environ=env, headers=headers) + req.environ['wsgi.input'] = sender.connection.src + resp = rx_obj_controller(req.environ, start_response) + wrapped_gen = wrap_gen(resp) + sender.response = FileLikeIter(wrapped_gen) + sender.response.fp = sender.response + return fake_connect + + def setUp(self): + self.device = 'dev' + self.partition = '9' + self.tmpdir = tempfile.mkdtemp() + # sender side setup + self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender') + utils.mkdirs(os.path.join(self.tx_testdir, self.device)) + self.daemon = FakeReplicator(self.tx_testdir) + + # rx side setup + self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver') + utils.mkdirs(os.path.join(self.rx_testdir, self.device)) + conf = { + 'devices': self.rx_testdir, + 'mount_check': 'false', + 'replication_one_per_device': 'false', + 'log_requests': 'false'} + self.rx_controller = server.ObjectController(conf) + self.orig_ensure_flush = ssync_receiver.Receiver._ensure_flush + ssync_receiver.Receiver._ensure_flush = lambda *args: '' + self.ts_iter = (Timestamp(t) + for t in itertools.count(int(time.time()))) + + def tearDown(self): + if self.orig_ensure_flush: + ssync_receiver.Receiver._ensure_flush = self.orig_ensure_flush + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp, + frag_indexes=None): + frag_indexes = [] if frag_indexes is None else frag_indexes + metadata = {'Content-Type': 'plain/text'} + diskfiles = [] + for frag_index in frag_indexes: + object_data = '/a/c/%s___%s' % (obj_name, frag_index) + if frag_index is not None: + metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index) + df = self._make_open_diskfile( + device=self.device, partition=self.partition, account='a', + container='c', obj=obj_name, body=object_data, + extra_metadata=metadata, timestamp=timestamp, policy=policy, + frag_index=frag_index, df_mgr=df_mgr) + # sanity checks + listing = os.listdir(df._datadir) + self.assertTrue(listing) + for filename in listing: + self.assertTrue(filename.startswith(timestamp.internal)) + diskfiles.append(df) + return diskfiles + + def _open_tx_diskfile(self, obj_name, policy, frag_index=None): + df_mgr = self.daemon._diskfile_router[policy] + df = df_mgr.get_diskfile( + self.device, self.partition, account='a', container='c', + obj=obj_name, policy=policy, frag_index=frag_index) + df.open() + return df + + def _open_rx_diskfile(self, obj_name, policy, frag_index=None): + df = self.rx_controller.get_diskfile( + self.device, self.partition, 'a', 'c', obj_name, policy=policy, + frag_index=frag_index) + df.open() + return df + + def _verify_diskfile_sync(self, tx_df, rx_df, frag_index): + # verify that diskfiles' metadata match + # sanity check, they are not the same ondisk files! + self.assertNotEqual(tx_df._datadir, rx_df._datadir) + rx_metadata = dict(rx_df.get_metadata()) + for k, v in tx_df.get_metadata().iteritems(): + self.assertEqual(v, rx_metadata.pop(k)) + # ugh, ssync duplicates ETag with Etag so have to clear it out here + if 'Etag' in rx_metadata: + rx_metadata.pop('Etag') + self.assertFalse(rx_metadata) + if frag_index: + rx_metadata = rx_df.get_metadata() + fi_key = 'X-Object-Sysmeta-Ec-Frag-Index' + self.assertTrue(fi_key in rx_metadata) + self.assertEqual(frag_index, int(rx_metadata[fi_key])) + + def _analyze_trace(self, trace): + """ + Parse protocol trace captured by fake connection, making some + assertions along the way, and return results as a dict of form: + results = {'tx_missing': <list of messages>, + 'rx_missing': <list of messages>, + 'tx_updates': <list of subreqs>, + 'rx_updates': <list of messages>} + + Each subreq is a dict with keys: 'method', 'path', 'headers', 'body' + """ + def tx_missing(results, line): + self.assertEqual('tx', line[0]) + results['tx_missing'].append(line[1]) + + def rx_missing(results, line): + self.assertEqual('rx', line[0]) + parts = line[1].split('\r\n') + for part in parts: + results['rx_missing'].append(part) + + def tx_updates(results, line): + self.assertEqual('tx', line[0]) + subrequests = results['tx_updates'] + if line[1].startswith(('PUT', 'DELETE')): + parts = line[1].split('\r\n') + method, path = parts[0].split() + subreq = {'method': method, 'path': path, 'req': line[1], + 'headers': parts[1:]} + subrequests.append(subreq) + else: + self.assertTrue(subrequests) + body = (subrequests[-1]).setdefault('body', '') + body += line[1] + subrequests[-1]['body'] = body + + def rx_updates(results, line): + self.assertEqual('rx', line[0]) + results.setdefault['rx_updates'].append(line[1]) + + def unexpected(results, line): + results.setdefault('unexpected', []).append(line) + + # each trace line is a tuple of ([tx|rx], msg) + handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing), + (('tx', ':MISSING_CHECK: END'), unexpected), + (('rx', ':MISSING_CHECK: START'), rx_missing), + (('rx', ':MISSING_CHECK: END'), unexpected), + (('tx', ':UPDATES: START'), tx_updates), + (('tx', ':UPDATES: END'), unexpected), + (('rx', ':UPDATES: START'), rx_updates), + (('rx', ':UPDATES: END'), unexpected)]) + expect_handshake = handshakes.next() + phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates') + results = dict((k, []) for k in phases) + handler = unexpected + lines = list(trace) + lines.reverse() + while lines: + line = lines.pop() + if line == expect_handshake[0]: + handler = expect_handshake[1] + try: + expect_handshake = handshakes.next() + except StopIteration: + # should be the last line + self.assertFalse( + lines, 'Unexpected trailing lines %s' % lines) + continue + handler(results, line) + + try: + # check all handshakes occurred + missed = handshakes.next() + self.fail('Handshake %s not found' % str(missed[0])) + except StopIteration: + pass + # check no message outside of a phase + self.assertFalse(results.get('unexpected'), + 'Message outside of a phase: %s' % results.get(None)) + return results + + def _verify_ondisk_files(self, tx_objs, policy, rx_node_index): + # verify tx and rx files that should be in sync + for o_name, diskfiles in tx_objs.iteritems(): + for tx_df in diskfiles: + frag_index = tx_df._frag_index + if frag_index == rx_node_index: + # this frag_index should have been sync'd, + # check rx file is ok + rx_df = self._open_rx_diskfile(o_name, policy, frag_index) + self._verify_diskfile_sync(tx_df, rx_df, frag_index) + expected_body = '/a/c/%s___%s' % (o_name, rx_node_index) + actual_body = ''.join([chunk for chunk in rx_df.reader()]) + self.assertEqual(expected_body, actual_body) + else: + # this frag_index should not have been sync'd, + # check no rx file, + self.assertRaises(DiskFileNotExist, + self._open_rx_diskfile, + o_name, policy, frag_index=frag_index) + # check tx file still intact - ssync does not do any cleanup! + self._open_tx_diskfile(o_name, policy, frag_index) + + def _verify_tombstones(self, tx_objs, policy): + # verify tx and rx tombstones that should be in sync + for o_name, diskfiles in tx_objs.iteritems(): + for tx_df_ in diskfiles: + try: + self._open_tx_diskfile(o_name, policy) + self.fail('DiskFileDeleted expected') + except DiskFileDeleted as exc: + tx_delete_time = exc.timestamp + try: + self._open_rx_diskfile(o_name, policy) + self.fail('DiskFileDeleted expected') + except DiskFileDeleted as exc: + rx_delete_time = exc.timestamp + self.assertEqual(tx_delete_time, rx_delete_time) + + def test_handoff_fragment_revert(self): + # test that a sync_revert type job does send the correct frag archives + # to the receiver, and that those frag archives are then removed from + # local node. + policy = POLICIES.default + rx_node_index = 0 + tx_node_index = 1 + frag_index = rx_node_index + + # create sender side diskfiles... + tx_objs = {} + rx_objs = {} + tx_tombstones = {} + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + # o1 has primary and handoff fragment archives + t1 = self.ts_iter.next() + tx_objs['o1'] = self._create_ondisk_files( + tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index)) + # o2 only has primary + t2 = self.ts_iter.next() + tx_objs['o2'] = self._create_ondisk_files( + tx_df_mgr, 'o2', policy, t2, (tx_node_index,)) + # o3 only has handoff + t3 = self.ts_iter.next() + tx_objs['o3'] = self._create_ondisk_files( + tx_df_mgr, 'o3', policy, t3, (rx_node_index,)) + # o4 primary and handoff fragment archives on tx, handoff in sync on rx + t4 = self.ts_iter.next() + tx_objs['o4'] = self._create_ondisk_files( + tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,)) + rx_objs['o4'] = self._create_ondisk_files( + rx_df_mgr, 'o4', policy, t4, (rx_node_index,)) + # o5 is a tombstone, missing on receiver + t5 = self.ts_iter.next() + tx_tombstones['o5'] = self._create_ondisk_files( + tx_df_mgr, 'o5', policy, t5, (tx_node_index,)) + tx_tombstones['o5'][0].delete(t5) + + suffixes = set() + for diskfiles in (tx_objs.values() + tx_tombstones.values()): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy, + 'frag_index': frag_index, + 'purge': True} + node = {'index': rx_node_index} + self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + # fake connection from tx to rx... + self.sender.connect = self.make_fake_ssync_connect( + self.sender, self.rx_controller, self.device, self.partition, + policy) + + # run the sync protocol... + self.sender() + + # verify protocol + results = self._analyze_trace(self.sender.connection.trace) + # sender has handoff frags for o1, o3 and o4 and ts for o5 + self.assertEqual(4, len(results['tx_missing'])) + # receiver is missing frags for o1, o3 and ts for o5 + self.assertEqual(3, len(results['rx_missing'])) + self.assertEqual(3, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + sync_paths = [] + for subreq in results.get('tx_updates'): + if subreq.get('method') == 'PUT': + self.assertTrue( + 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index + in subreq.get('headers')) + expected_body = '%s___%s' % (subreq['path'], rx_node_index) + self.assertEqual(expected_body, subreq['body']) + elif subreq.get('method') == 'DELETE': + self.assertEqual('/a/c/o5', subreq['path']) + sync_paths.append(subreq.get('path')) + self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths)) + + # verify on disk files... + self._verify_ondisk_files(tx_objs, policy, rx_node_index) + self._verify_tombstones(tx_tombstones, policy) + + def test_fragment_sync(self): + # check that a sync_only type job does call reconstructor to build a + # diskfile to send, and continues making progress despite an error + # when building one diskfile + policy = POLICIES.default + rx_node_index = 0 + tx_node_index = 1 + # for a sync job we iterate over frag index that belongs on local node + frag_index = tx_node_index + + # create sender side diskfiles... + tx_objs = {} + tx_tombstones = {} + rx_objs = {} + tx_df_mgr = self.daemon._diskfile_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + # o1 only has primary + t1 = self.ts_iter.next() + tx_objs['o1'] = self._create_ondisk_files( + tx_df_mgr, 'o1', policy, t1, (tx_node_index,)) + # o2 only has primary + t2 = self.ts_iter.next() + tx_objs['o2'] = self._create_ondisk_files( + tx_df_mgr, 'o2', policy, t2, (tx_node_index,)) + # o3 only has primary + t3 = self.ts_iter.next() + tx_objs['o3'] = self._create_ondisk_files( + tx_df_mgr, 'o3', policy, t3, (tx_node_index,)) + # o4 primary fragment archives on tx, handoff in sync on rx + t4 = self.ts_iter.next() + tx_objs['o4'] = self._create_ondisk_files( + tx_df_mgr, 'o4', policy, t4, (tx_node_index,)) + rx_objs['o4'] = self._create_ondisk_files( + rx_df_mgr, 'o4', policy, t4, (rx_node_index,)) + # o5 is a tombstone, missing on receiver + t5 = self.ts_iter.next() + tx_tombstones['o5'] = self._create_ondisk_files( + tx_df_mgr, 'o5', policy, t5, (tx_node_index,)) + tx_tombstones['o5'][0].delete(t5) + + suffixes = set() + for diskfiles in (tx_objs.values() + tx_tombstones.values()): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + reconstruct_fa_calls = [] + + def fake_reconstruct_fa(job, node, metadata): + reconstruct_fa_calls.append((job, node, policy, metadata)) + if len(reconstruct_fa_calls) == 2: + # simulate second reconstruct failing + raise DiskFileError + content = '%s___%s' % (metadata['name'], rx_node_index) + return RebuildingECDiskFileStream( + metadata, rx_node_index, iter([content])) + + # create ssync sender instance... + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy, + 'frag_index': frag_index, + 'sync_diskfile_builder': fake_reconstruct_fa} + node = {'index': rx_node_index} + self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes) + + # fake connection from tx to rx... + self.sender.connect = self.make_fake_ssync_connect( + self.sender, self.rx_controller, self.device, self.partition, + policy) + + # run the sync protocol... + self.sender() + + # verify protocol + results = self._analyze_trace(self.sender.connection.trace) + # sender has primary for o1, o2 and o3, o4 and ts for o5 + self.assertEqual(5, len(results['tx_missing'])) + # receiver is missing o1, o2 and o3 and ts for o5 + self.assertEqual(4, len(results['rx_missing'])) + # sender can only construct 2 out of 3 missing frags + self.assertEqual(3, len(results['tx_updates'])) + self.assertEqual(3, len(reconstruct_fa_calls)) + self.assertFalse(results['rx_updates']) + actual_sync_paths = [] + for subreq in results.get('tx_updates'): + if subreq.get('method') == 'PUT': + self.assertTrue( + 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index + in subreq.get('headers')) + expected_body = '%s___%s' % (subreq['path'], rx_node_index) + self.assertEqual(expected_body, subreq['body']) + elif subreq.get('method') == 'DELETE': + self.assertEqual('/a/c/o5', subreq['path']) + actual_sync_paths.append(subreq.get('path')) + + # remove the failed df from expected synced df's + expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5'] + failed_path = reconstruct_fa_calls[1][3]['name'] + expect_sync_paths.remove(failed_path) + failed_obj = None + for obj, diskfiles in tx_objs.iteritems(): + if diskfiles[0]._name == failed_path: + failed_obj = obj + # sanity check + self.assertTrue(tx_objs.pop(failed_obj)) + + # verify on disk files... + self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths)) + self._verify_ondisk_files(tx_objs, policy, rx_node_index) + self._verify_tombstones(tx_tombstones, policy) + + if __name__ == '__main__': unittest.main() |