summaryrefslogtreecommitdiff
path: root/test/unit/obj/test_ssync_sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/obj/test_ssync_sender.py')
-rw-r--r--test/unit/obj/test_ssync_sender.py930
1 files changed, 791 insertions, 139 deletions
diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py
index 87efd64cc..42bd610eb 100644
--- a/test/unit/obj/test_ssync_sender.py
+++ b/test/unit/obj/test_ssync_sender.py
@@ -22,18 +22,24 @@ import time
import unittest
import eventlet
+import itertools
import mock
from swift.common import exceptions, utils
-from swift.obj import ssync_sender, diskfile
+from swift.common.storage_policy import POLICIES
+from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
+ DiskFileDeleted
+from swift.common.swob import Request
+from swift.common.utils import Timestamp, FileLikeIter
+from swift.obj import ssync_sender, diskfile, server, ssync_receiver
+from swift.obj.reconstructor import RebuildingECDiskFileStream
-from test.unit import DebugLogger, patch_policies
+from test.unit import debug_logger, patch_policies
class FakeReplicator(object):
-
- def __init__(self, testdir):
- self.logger = mock.MagicMock()
+ def __init__(self, testdir, policy=None):
+ self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
@@ -43,7 +49,9 @@ class FakeReplicator(object):
'devices': testdir,
'mount_check': 'false',
}
- self._diskfile_mgr = diskfile.DiskFileManager(conf, DebugLogger())
+ policy = POLICIES.default if policy is None else policy
+ self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
+ self._diskfile_mgr = self._diskfile_router[policy]
class NullBufferedHTTPConnection(object):
@@ -90,39 +98,49 @@ class FakeConnection(object):
self.closed = True
-class TestSender(unittest.TestCase):
-
+class BaseTestSender(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
- self.replicator = FakeReplicator(self.testdir)
- self.sender = ssync_sender.Sender(self.replicator, None, None, None)
+ utils.mkdirs(os.path.join(self.testdir, 'dev'))
+ self.daemon = FakeReplicator(self.testdir)
+ self.sender = ssync_sender.Sender(self.daemon, None, None, None)
def tearDown(self):
- shutil.rmtree(self.tmpdir, ignore_errors=1)
+ shutil.rmtree(self.tmpdir, ignore_errors=True)
def _make_open_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body='test',
- extra_metadata=None, policy_idx=0):
+ extra_metadata=None, policy=None,
+ frag_index=None, timestamp=None, df_mgr=None):
+ policy = policy or POLICIES.legacy
object_parts = account, container, obj
- req_timestamp = utils.normalize_timestamp(time.time())
- df = self.sender.daemon._diskfile_mgr.get_diskfile(
- device, partition, *object_parts, policy_idx=policy_idx)
+ timestamp = Timestamp(time.time()) if timestamp is None else timestamp
+ if df_mgr is None:
+ df_mgr = self.daemon._diskfile_router[policy]
+ df = df_mgr.get_diskfile(
+ device, partition, *object_parts, policy=policy,
+ frag_index=frag_index)
content_length = len(body)
etag = hashlib.md5(body).hexdigest()
with df.create() as writer:
writer.write(body)
metadata = {
- 'X-Timestamp': req_timestamp,
- 'Content-Length': content_length,
+ 'X-Timestamp': timestamp.internal,
+ 'Content-Length': str(content_length),
'ETag': etag,
}
if extra_metadata:
metadata.update(extra_metadata)
writer.put(metadata)
+ writer.commit(timestamp)
df.open()
return df
+
+@patch_policies()
+class TestSender(BaseTestSender):
+
def test_call_catches_MessageTimeout(self):
def connect(self):
@@ -134,16 +152,16 @@ class TestSender(unittest.TestCase):
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
- job = dict(partition='9')
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ job = dict(partition='9', policy=POLICIES.legacy)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), '1 second: test connect')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
+ self.assertEqual(1, len(error_lines))
+ self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect',
+ error_lines[0])
def test_call_catches_ReplicationException(self):
@@ -153,45 +171,44 @@ class TestSender(unittest.TestCase):
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
- job = dict(partition='9')
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ job = dict(partition='9', policy=POLICIES.legacy)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), 'test connect')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
+ self.assertEqual(1, len(error_lines))
+ self.assertEqual('1.2.3.4:5678/sda1/9 test connect',
+ error_lines[0])
def test_call_catches_other_exceptions(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
- job = dict(partition='9')
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ job = dict(partition='9', policy=POLICIES.legacy)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- call = self.replicator.logger.exception.mock_calls[0]
- self.assertEqual(
- call[1],
- ('%s:%s/%s/%s EXCEPTION in replication.Sender', '1.2.3.4', 5678,
- 'sda1', '9'))
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:'))
def test_call_catches_exception_handling_exception(self):
- node = dict(replication_ip='1.2.3.4', replication_port=5678,
- device='sda1')
- job = None # Will cause inside exception handler to fail
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ job = node = None # Will cause inside exception handler to fail
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- self.replicator.logger.exception.assert_called_once_with(
- 'EXCEPTION in replication.Sender')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ 'EXCEPTION in replication.Sender'))
def test_call_calls_others(self):
self.sender.suffixes = ['abc']
@@ -201,7 +218,7 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEquals(candidates, set())
+ self.assertEquals(candidates, {})
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
@@ -216,18 +233,17 @@ class TestSender(unittest.TestCase):
self.sender.failures = 1
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
+ self.assertEquals(candidates, {})
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
self.sender.disconnect.assert_called_once_with()
- @patch_policies
def test_connect(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
- device='sda1')
- job = dict(partition='9', policy_idx=1)
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ device='sda1', index=0)
+ job = dict(partition='9', policy=POLICIES[1])
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
@@ -240,11 +256,12 @@ class TestSender(unittest.TestCase):
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
expectations = {
'putrequest': [
- mock.call('REPLICATION', '/sda1/9'),
+ mock.call('SSYNC', '/sda1/9'),
],
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
+ mock.call('X-Backend-Ssync-Frag-Index', 0),
],
'endheaders': [mock.call()],
}
@@ -255,10 +272,80 @@ class TestSender(unittest.TestCase):
method_name, mock_method.mock_calls,
expected_calls))
+ def test_call(self):
+ def patch_sender(sender):
+ sender.connect = mock.MagicMock()
+ sender.missing_check = mock.MagicMock()
+ sender.updates = mock.MagicMock()
+ sender.disconnect = mock.MagicMock()
+
+ node = dict(replication_ip='1.2.3.4', replication_port=5678,
+ device='sda1')
+ job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ available_map = dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000'),
+ ('9d41d8cd98f00b204e9800998ecf0def',
+ '1380144472.22222'),
+ ('9d41d8cd98f00b204e9800998ecf1def',
+ '1380144474.44444')])
+
+ # no suffixes -> no work done
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, [], remote_check_objs=None)
+ patch_sender(sender)
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ self.assertEqual({}, candidates)
+
+ # all objs in sync
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, ['ignored'], remote_check_objs=None)
+ patch_sender(sender)
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ self.assertEqual(available_map, candidates)
+
+ # one obj not in sync, sync'ing faked, all objs should be in return set
+ wanted = '9d41d8cd98f00b204e9800998ecf0def'
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, ['ignored'],
+ remote_check_objs=None)
+ patch_sender(sender)
+ sender.send_list = [wanted]
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ self.assertEqual(available_map, candidates)
+
+ # one obj not in sync, remote check only so that obj is not sync'd
+ # and should not be in the return set
+ wanted = '9d41d8cd98f00b204e9800998ecf0def'
+ remote_check_objs = set(available_map.keys())
+ sender = ssync_sender.Sender(
+ self.daemon, node, job, ['ignored'],
+ remote_check_objs=remote_check_objs)
+ patch_sender(sender)
+ sender.send_list = [wanted]
+ sender.available_map = available_map
+ success, candidates = sender()
+ self.assertTrue(success)
+ expected_map = dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000'),
+ ('9d41d8cd98f00b204e9800998ecf1def',
+ '1380144474.44444')])
+ self.assertEqual(expected_map, candidates)
+
def test_call_and_missing_check(self):
- def yield_hashes(device, partition, policy_index, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
- and policy_index == 0:
+ and policy == POLICIES.legacy:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@@ -269,7 +356,12 @@ class TestSender(unittest.TestCase):
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
- self.sender.job = {'device': 'dev', 'partition': '9'}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
chunk_body=(
@@ -282,13 +374,14 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list(self):
- def yield_hashes(device, partition, policy_index, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
- and policy_index == 0:
+ and policy == POLICIES.legacy:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@@ -297,8 +390,13 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
- job = {'device': 'dev', 'partition': '9'}
- self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
+ job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
@@ -311,13 +409,14 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list_but_required(self):
- def yield_hashes(device, partition, policy_index, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
- and policy_index == 0:
+ and policy == POLICIES.legacy:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
@@ -326,8 +425,13 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
- job = {'device': 'dev', 'partition': '9'}
- self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'],
+ job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
@@ -341,14 +445,14 @@ class TestSender(unittest.TestCase):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, set())
+ self.assertEqual(candidates, {})
def test_connect_send_timeout(self):
- self.replicator.conn_timeout = 0.01
+ self.daemon.conn_timeout = 0.01
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1')
- job = dict(partition='9')
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ job = dict(partition='9', policy=POLICIES.legacy)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
def putrequest(*args, **kwargs):
@@ -359,18 +463,18 @@ class TestSender(unittest.TestCase):
'putrequest', putrequest):
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), '0.01 seconds: connect send')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 0.01 seconds: connect send'))
def test_connect_receive_timeout(self):
- self.replicator.node_timeout = 0.02
+ self.daemon.node_timeout = 0.02
node = dict(replication_ip='1.2.3.4', replication_port=5678,
- device='sda1')
- job = dict(partition='9')
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ device='sda1', index=0)
+ job = dict(partition='9', policy=POLICIES.legacy)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
class FakeBufferedHTTPConnection(NullBufferedHTTPConnection):
@@ -383,18 +487,18 @@ class TestSender(unittest.TestCase):
FakeBufferedHTTPConnection):
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), '0.02 seconds: connect receive')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 0.02 seconds: connect receive'))
def test_connect_bad_status(self):
- self.replicator.node_timeout = 0.02
+ self.daemon.node_timeout = 0.02
node = dict(replication_ip='1.2.3.4', replication_port=5678,
- device='sda1')
- job = dict(partition='9')
- self.sender = ssync_sender.Sender(self.replicator, node, job, None)
+ device='sda1', index=0)
+ job = dict(partition='9', policy=POLICIES.legacy)
+ self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
class FakeBufferedHTTPConnection(NullBufferedHTTPConnection):
@@ -408,11 +512,11 @@ class TestSender(unittest.TestCase):
FakeBufferedHTTPConnection):
success, candidates = self.sender()
self.assertFalse(success)
- self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), 'Expected status 200; got 503')
+ self.assertEquals(candidates, {})
+ error_lines = self.daemon.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 Expected status 200; got 503'))
def test_readline_newline_in_buffer(self):
self.sender.response_buffer = 'Has a newline already.\r\nOkay.'
@@ -420,7 +524,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.response_buffer, 'Okay.')
def test_readline_buffer_exceeds_network_chunk_size_somehow(self):
- self.replicator.network_chunk_size = 2
+ self.daemon.network_chunk_size = 2
self.sender.response_buffer = '1234567890'
self.assertEqual(self.sender.readline(), '1234567890')
self.assertEqual(self.sender.response_buffer, '')
@@ -473,16 +577,21 @@ class TestSender(unittest.TestCase):
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check)
def test_missing_check_has_empty_suffixes(self):
- def yield_hashes(device, partition, policy_idx, suffixes=None):
- if (device != 'dev' or partition != '9' or policy_idx != 0 or
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device != 'dev' or partition != '9' or
+ policy != POLICIES.legacy or
suffixes != ['abc', 'def']):
yield # Just here to make this a generator
raise Exception(
'No match for %r %r %r %r' % (device, partition,
- policy_idx, suffixes))
+ policy, suffixes))
self.sender.connection = FakeConnection()
- self.sender.job = {'device': 'dev', 'partition': '9'}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
chunk_body=(
@@ -495,11 +604,12 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
- self.assertEqual(self.sender.available_set, set())
+ self.assertEqual(self.sender.available_map, {})
def test_missing_check_has_suffixes(self):
- def yield_hashes(device, partition, policy_idx, suffixes=None):
- if (device == 'dev' and partition == '9' and policy_idx == 0 and
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device == 'dev' and partition == '9' and
+ policy == POLICIES.legacy and
suffixes == ['abc', 'def']):
yield (
'/srv/node/dev/objects/9/abc/'
@@ -519,10 +629,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
- policy_idx, suffixes))
+ policy, suffixes))
self.sender.connection = FakeConnection()
- self.sender.job = {'device': 'dev', 'partition': '9'}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
chunk_body=(
@@ -538,14 +652,15 @@ class TestSender(unittest.TestCase):
'33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
- candidates = ['9d41d8cd98f00b204e9800998ecf0abc',
- '9d41d8cd98f00b204e9800998ecf0def',
- '9d41d8cd98f00b204e9800998ecf1def']
- self.assertEqual(self.sender.available_set, set(candidates))
+ candidates = [('9d41d8cd98f00b204e9800998ecf0abc', '1380144470.00000'),
+ ('9d41d8cd98f00b204e9800998ecf0def', '1380144472.22222'),
+ ('9d41d8cd98f00b204e9800998ecf1def', '1380144474.44444')]
+ self.assertEqual(self.sender.available_map, dict(candidates))
def test_missing_check_far_end_disconnect(self):
- def yield_hashes(device, partition, policy_idx, suffixes=None):
- if (device == 'dev' and partition == '9' and policy_idx == 0 and
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device == 'dev' and partition == '9' and
+ policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@@ -555,10 +670,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
- policy_idx, suffixes))
+ policy, suffixes))
self.sender.connection = FakeConnection()
- self.sender.job = {'device': 'dev', 'partition': '9'}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='\r\n')
@@ -573,12 +692,14 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_missing_check_far_end_disconnect2(self):
- def yield_hashes(device, partition, policy_idx, suffixes=None):
- if (device == 'dev' and partition == '9' and policy_idx == 0 and
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device == 'dev' and partition == '9' and
+ policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@@ -588,10 +709,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
- policy_idx, suffixes))
+ policy, suffixes))
self.sender.connection = FakeConnection()
- self.sender.job = {'device': 'dev', 'partition': '9'}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(
@@ -607,12 +732,14 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_missing_check_far_end_unexpected(self):
- def yield_hashes(device, partition, policy_idx, suffixes=None):
- if (device == 'dev' and partition == '9' and policy_idx == 0 and
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device == 'dev' and partition == '9' and
+ policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@@ -622,10 +749,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
- policy_idx, suffixes))
+ policy, suffixes))
self.sender.connection = FakeConnection()
- self.sender.job = {'device': 'dev', 'partition': '9'}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
@@ -640,12 +771,14 @@ class TestSender(unittest.TestCase):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_missing_check_send_list(self):
- def yield_hashes(device, partition, policy_idx, suffixes=None):
- if (device == 'dev' and partition == '9' and policy_idx == 0 and
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device == 'dev' and partition == '9' and
+ policy == POLICIES.legacy and
suffixes == ['abc']):
yield (
'/srv/node/dev/objects/9/abc/'
@@ -655,10 +788,14 @@ class TestSender(unittest.TestCase):
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
- policy_idx, suffixes))
+ policy, suffixes))
self.sender.connection = FakeConnection()
- self.sender.job = {'device': 'dev', 'partition': '9'}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
chunk_body=(
@@ -673,8 +810,45 @@ class TestSender(unittest.TestCase):
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, ['0123abc'])
- self.assertEqual(self.sender.available_set,
- set(['9d41d8cd98f00b204e9800998ecf0abc']))
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
+
+ def test_missing_check_extra_line_parts(self):
+ # check that sender tolerates extra parts in missing check
+ # line responses to allow for protocol upgrades
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if (device == 'dev' and partition == '9' and
+ policy == POLICIES.legacy and
+ suffixes == ['abc']):
+ yield (
+ '/srv/node/dev/objects/9/abc/'
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')
+ else:
+ raise Exception(
+ 'No match for %r %r %r %r' % (device, partition,
+ policy, suffixes))
+
+ self.sender.connection = FakeConnection()
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ }
+ self.sender.suffixes = ['abc']
+ self.sender.response = FakeResponse(
+ chunk_body=(
+ ':MISSING_CHECK: START\r\n'
+ '0123abc extra response parts\r\n'
+ ':MISSING_CHECK: END\r\n'))
+ self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
+ self.sender.missing_check()
+ self.assertEqual(self.sender.send_list, ['0123abc'])
+ self.assertEqual(self.sender.available_map,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ '1380144470.00000')]))
def test_updates_timeout(self):
self.sender.connection = FakeConnection()
@@ -742,7 +916,12 @@ class TestSender(unittest.TestCase):
delete_timestamp = utils.normalize_timestamp(time.time())
df.delete(delete_timestamp)
self.sender.connection = FakeConnection()
- self.sender.job = {'device': device, 'partition': part}
+ self.sender.job = {
+ 'device': device,
+ 'partition': part,
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.send_delete = mock.MagicMock()
@@ -771,7 +950,12 @@ class TestSender(unittest.TestCase):
delete_timestamp = utils.normalize_timestamp(time.time())
df.delete(delete_timestamp)
self.sender.connection = FakeConnection()
- self.sender.job = {'device': device, 'partition': part}
+ self.sender.job = {
+ 'device': device,
+ 'partition': part,
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.response = FakeResponse(
@@ -797,7 +981,12 @@ class TestSender(unittest.TestCase):
object_hash = utils.hash_path(*object_parts)
expected = df.get_metadata()
self.sender.connection = FakeConnection()
- self.sender.job = {'device': device, 'partition': part}
+ self.sender.job = {
+ 'device': device,
+ 'partition': part,
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.send_delete = mock.MagicMock()
@@ -821,18 +1010,20 @@ class TestSender(unittest.TestCase):
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
- @patch_policies
def test_updates_storage_policy_index(self):
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
df = self._make_open_diskfile(device, part, *object_parts,
- policy_idx=1)
+ policy=POLICIES[0])
object_hash = utils.hash_path(*object_parts)
expected = df.get_metadata()
self.sender.connection = FakeConnection()
- self.sender.job = {'device': device, 'partition': part,
- 'policy_idx': 1}
+ self.sender.job = {
+ 'device': device,
+ 'partition': part,
+ 'policy': POLICIES[0],
+ 'frag_index': 0}
self.sender.node = {}
self.sender.send_list = [object_hash]
self.sender.send_delete = mock.MagicMock()
@@ -847,7 +1038,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(path, '/a/c/o')
self.assert_(isinstance(df, diskfile.DiskFile))
self.assertEqual(expected, df.get_metadata())
- self.assertEqual(os.path.join(self.testdir, 'dev/objects-1/9/',
+ self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/',
object_hash[-3:], object_hash),
df._datadir)
@@ -1054,5 +1245,466 @@ class TestSender(unittest.TestCase):
self.assertTrue(self.sender.connection.closed)
+@patch_policies(with_ec_default=True)
+class TestSsync(BaseTestSender):
+ """
+ Test interactions between sender and receiver. The basis for each test is
+ actual diskfile state on either side - the connection between sender and
+ receiver is faked. Assertions are made about the final state of the sender
+ and receiver diskfiles.
+ """
+
+ def make_fake_ssync_connect(self, sender, rx_obj_controller, device,
+ partition, policy):
+ trace = []
+
+ def add_trace(type, msg):
+ # record a protocol event for later analysis
+ if msg.strip():
+ trace.append((type, msg.strip()))
+
+ def start_response(status, headers, exc_info=None):
+ assert(status == '200 OK')
+
+ class FakeConnection:
+ def __init__(self, trace):
+ self.trace = trace
+ self.queue = []
+ self.src = FileLikeIter(self.queue)
+
+ def send(self, msg):
+ msg = msg.split('\r\n', 1)[1]
+ msg = msg.rsplit('\r\n', 1)[0]
+ add_trace('tx', msg)
+ self.queue.append(msg)
+
+ def close(self):
+ pass
+
+ def wrap_gen(gen):
+ # Strip response head and tail
+ while True:
+ try:
+ msg = gen.next()
+ if msg:
+ add_trace('rx', msg)
+ msg = '%x\r\n%s\r\n' % (len(msg), msg)
+ yield msg
+ except StopIteration:
+ break
+
+ def fake_connect():
+ sender.connection = FakeConnection(trace)
+ headers = {'Transfer-Encoding': 'chunked',
+ 'X-Backend-Storage-Policy-Index': str(int(policy))}
+ env = {'REQUEST_METHOD': 'SSYNC'}
+ path = '/%s/%s' % (device, partition)
+ req = Request.blank(path, environ=env, headers=headers)
+ req.environ['wsgi.input'] = sender.connection.src
+ resp = rx_obj_controller(req.environ, start_response)
+ wrapped_gen = wrap_gen(resp)
+ sender.response = FileLikeIter(wrapped_gen)
+ sender.response.fp = sender.response
+ return fake_connect
+
+ def setUp(self):
+ self.device = 'dev'
+ self.partition = '9'
+ self.tmpdir = tempfile.mkdtemp()
+ # sender side setup
+ self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
+ utils.mkdirs(os.path.join(self.tx_testdir, self.device))
+ self.daemon = FakeReplicator(self.tx_testdir)
+
+ # rx side setup
+ self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
+ utils.mkdirs(os.path.join(self.rx_testdir, self.device))
+ conf = {
+ 'devices': self.rx_testdir,
+ 'mount_check': 'false',
+ 'replication_one_per_device': 'false',
+ 'log_requests': 'false'}
+ self.rx_controller = server.ObjectController(conf)
+ self.orig_ensure_flush = ssync_receiver.Receiver._ensure_flush
+ ssync_receiver.Receiver._ensure_flush = lambda *args: ''
+ self.ts_iter = (Timestamp(t)
+ for t in itertools.count(int(time.time())))
+
+ def tearDown(self):
+ if self.orig_ensure_flush:
+ ssync_receiver.Receiver._ensure_flush = self.orig_ensure_flush
+ shutil.rmtree(self.tmpdir, ignore_errors=True)
+
+ def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
+ frag_indexes=None):
+ frag_indexes = [] if frag_indexes is None else frag_indexes
+ metadata = {'Content-Type': 'plain/text'}
+ diskfiles = []
+ for frag_index in frag_indexes:
+ object_data = '/a/c/%s___%s' % (obj_name, frag_index)
+ if frag_index is not None:
+ metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
+ df = self._make_open_diskfile(
+ device=self.device, partition=self.partition, account='a',
+ container='c', obj=obj_name, body=object_data,
+ extra_metadata=metadata, timestamp=timestamp, policy=policy,
+ frag_index=frag_index, df_mgr=df_mgr)
+ # sanity checks
+ listing = os.listdir(df._datadir)
+ self.assertTrue(listing)
+ for filename in listing:
+ self.assertTrue(filename.startswith(timestamp.internal))
+ diskfiles.append(df)
+ return diskfiles
+
+ def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
+ df_mgr = self.daemon._diskfile_router[policy]
+ df = df_mgr.get_diskfile(
+ self.device, self.partition, account='a', container='c',
+ obj=obj_name, policy=policy, frag_index=frag_index)
+ df.open()
+ return df
+
+ def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
+ df = self.rx_controller.get_diskfile(
+ self.device, self.partition, 'a', 'c', obj_name, policy=policy,
+ frag_index=frag_index)
+ df.open()
+ return df
+
+ def _verify_diskfile_sync(self, tx_df, rx_df, frag_index):
+ # verify that diskfiles' metadata match
+ # sanity check, they are not the same ondisk files!
+ self.assertNotEqual(tx_df._datadir, rx_df._datadir)
+ rx_metadata = dict(rx_df.get_metadata())
+ for k, v in tx_df.get_metadata().iteritems():
+ self.assertEqual(v, rx_metadata.pop(k))
+ # ugh, ssync duplicates ETag with Etag so have to clear it out here
+ if 'Etag' in rx_metadata:
+ rx_metadata.pop('Etag')
+ self.assertFalse(rx_metadata)
+ if frag_index:
+ rx_metadata = rx_df.get_metadata()
+ fi_key = 'X-Object-Sysmeta-Ec-Frag-Index'
+ self.assertTrue(fi_key in rx_metadata)
+ self.assertEqual(frag_index, int(rx_metadata[fi_key]))
+
+ def _analyze_trace(self, trace):
+ """
+ Parse protocol trace captured by fake connection, making some
+ assertions along the way, and return results as a dict of form:
+ results = {'tx_missing': <list of messages>,
+ 'rx_missing': <list of messages>,
+ 'tx_updates': <list of subreqs>,
+ 'rx_updates': <list of messages>}
+
+ Each subreq is a dict with keys: 'method', 'path', 'headers', 'body'
+ """
+ def tx_missing(results, line):
+ self.assertEqual('tx', line[0])
+ results['tx_missing'].append(line[1])
+
+ def rx_missing(results, line):
+ self.assertEqual('rx', line[0])
+ parts = line[1].split('\r\n')
+ for part in parts:
+ results['rx_missing'].append(part)
+
+ def tx_updates(results, line):
+ self.assertEqual('tx', line[0])
+ subrequests = results['tx_updates']
+ if line[1].startswith(('PUT', 'DELETE')):
+ parts = line[1].split('\r\n')
+ method, path = parts[0].split()
+ subreq = {'method': method, 'path': path, 'req': line[1],
+ 'headers': parts[1:]}
+ subrequests.append(subreq)
+ else:
+ self.assertTrue(subrequests)
+ body = (subrequests[-1]).setdefault('body', '')
+ body += line[1]
+ subrequests[-1]['body'] = body
+
+ def rx_updates(results, line):
+ self.assertEqual('rx', line[0])
+ results.setdefault['rx_updates'].append(line[1])
+
+ def unexpected(results, line):
+ results.setdefault('unexpected', []).append(line)
+
+ # each trace line is a tuple of ([tx|rx], msg)
+ handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing),
+ (('tx', ':MISSING_CHECK: END'), unexpected),
+ (('rx', ':MISSING_CHECK: START'), rx_missing),
+ (('rx', ':MISSING_CHECK: END'), unexpected),
+ (('tx', ':UPDATES: START'), tx_updates),
+ (('tx', ':UPDATES: END'), unexpected),
+ (('rx', ':UPDATES: START'), rx_updates),
+ (('rx', ':UPDATES: END'), unexpected)])
+ expect_handshake = handshakes.next()
+ phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
+ results = dict((k, []) for k in phases)
+ handler = unexpected
+ lines = list(trace)
+ lines.reverse()
+ while lines:
+ line = lines.pop()
+ if line == expect_handshake[0]:
+ handler = expect_handshake[1]
+ try:
+ expect_handshake = handshakes.next()
+ except StopIteration:
+ # should be the last line
+ self.assertFalse(
+ lines, 'Unexpected trailing lines %s' % lines)
+ continue
+ handler(results, line)
+
+ try:
+ # check all handshakes occurred
+ missed = handshakes.next()
+ self.fail('Handshake %s not found' % str(missed[0]))
+ except StopIteration:
+ pass
+ # check no message outside of a phase
+ self.assertFalse(results.get('unexpected'),
+ 'Message outside of a phase: %s' % results.get(None))
+ return results
+
+ def _verify_ondisk_files(self, tx_objs, policy, rx_node_index):
+ # verify tx and rx files that should be in sync
+ for o_name, diskfiles in tx_objs.iteritems():
+ for tx_df in diskfiles:
+ frag_index = tx_df._frag_index
+ if frag_index == rx_node_index:
+ # this frag_index should have been sync'd,
+ # check rx file is ok
+ rx_df = self._open_rx_diskfile(o_name, policy, frag_index)
+ self._verify_diskfile_sync(tx_df, rx_df, frag_index)
+ expected_body = '/a/c/%s___%s' % (o_name, rx_node_index)
+ actual_body = ''.join([chunk for chunk in rx_df.reader()])
+ self.assertEqual(expected_body, actual_body)
+ else:
+ # this frag_index should not have been sync'd,
+ # check no rx file,
+ self.assertRaises(DiskFileNotExist,
+ self._open_rx_diskfile,
+ o_name, policy, frag_index=frag_index)
+ # check tx file still intact - ssync does not do any cleanup!
+ self._open_tx_diskfile(o_name, policy, frag_index)
+
+ def _verify_tombstones(self, tx_objs, policy):
+ # verify tx and rx tombstones that should be in sync
+ for o_name, diskfiles in tx_objs.iteritems():
+ for tx_df_ in diskfiles:
+ try:
+ self._open_tx_diskfile(o_name, policy)
+ self.fail('DiskFileDeleted expected')
+ except DiskFileDeleted as exc:
+ tx_delete_time = exc.timestamp
+ try:
+ self._open_rx_diskfile(o_name, policy)
+ self.fail('DiskFileDeleted expected')
+ except DiskFileDeleted as exc:
+ rx_delete_time = exc.timestamp
+ self.assertEqual(tx_delete_time, rx_delete_time)
+
+ def test_handoff_fragment_revert(self):
+ # test that a sync_revert type job does send the correct frag archives
+ # to the receiver, and that those frag archives are then removed from
+ # local node.
+ policy = POLICIES.default
+ rx_node_index = 0
+ tx_node_index = 1
+ frag_index = rx_node_index
+
+ # create sender side diskfiles...
+ tx_objs = {}
+ rx_objs = {}
+ tx_tombstones = {}
+ tx_df_mgr = self.daemon._diskfile_router[policy]
+ rx_df_mgr = self.rx_controller._diskfile_router[policy]
+ # o1 has primary and handoff fragment archives
+ t1 = self.ts_iter.next()
+ tx_objs['o1'] = self._create_ondisk_files(
+ tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index))
+ # o2 only has primary
+ t2 = self.ts_iter.next()
+ tx_objs['o2'] = self._create_ondisk_files(
+ tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
+ # o3 only has handoff
+ t3 = self.ts_iter.next()
+ tx_objs['o3'] = self._create_ondisk_files(
+ tx_df_mgr, 'o3', policy, t3, (rx_node_index,))
+ # o4 primary and handoff fragment archives on tx, handoff in sync on rx
+ t4 = self.ts_iter.next()
+ tx_objs['o4'] = self._create_ondisk_files(
+ tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,))
+ rx_objs['o4'] = self._create_ondisk_files(
+ rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
+ # o5 is a tombstone, missing on receiver
+ t5 = self.ts_iter.next()
+ tx_tombstones['o5'] = self._create_ondisk_files(
+ tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
+ tx_tombstones['o5'][0].delete(t5)
+
+ suffixes = set()
+ for diskfiles in (tx_objs.values() + tx_tombstones.values()):
+ for df in diskfiles:
+ suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
+
+ # create ssync sender instance...
+ job = {'device': self.device,
+ 'partition': self.partition,
+ 'policy': policy,
+ 'frag_index': frag_index,
+ 'purge': True}
+ node = {'index': rx_node_index}
+ self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
+ # fake connection from tx to rx...
+ self.sender.connect = self.make_fake_ssync_connect(
+ self.sender, self.rx_controller, self.device, self.partition,
+ policy)
+
+ # run the sync protocol...
+ self.sender()
+
+ # verify protocol
+ results = self._analyze_trace(self.sender.connection.trace)
+ # sender has handoff frags for o1, o3 and o4 and ts for o5
+ self.assertEqual(4, len(results['tx_missing']))
+ # receiver is missing frags for o1, o3 and ts for o5
+ self.assertEqual(3, len(results['rx_missing']))
+ self.assertEqual(3, len(results['tx_updates']))
+ self.assertFalse(results['rx_updates'])
+ sync_paths = []
+ for subreq in results.get('tx_updates'):
+ if subreq.get('method') == 'PUT':
+ self.assertTrue(
+ 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
+ in subreq.get('headers'))
+ expected_body = '%s___%s' % (subreq['path'], rx_node_index)
+ self.assertEqual(expected_body, subreq['body'])
+ elif subreq.get('method') == 'DELETE':
+ self.assertEqual('/a/c/o5', subreq['path'])
+ sync_paths.append(subreq.get('path'))
+ self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
+
+ # verify on disk files...
+ self._verify_ondisk_files(tx_objs, policy, rx_node_index)
+ self._verify_tombstones(tx_tombstones, policy)
+
+ def test_fragment_sync(self):
+ # check that a sync_only type job does call reconstructor to build a
+ # diskfile to send, and continues making progress despite an error
+ # when building one diskfile
+ policy = POLICIES.default
+ rx_node_index = 0
+ tx_node_index = 1
+ # for a sync job we iterate over frag index that belongs on local node
+ frag_index = tx_node_index
+
+ # create sender side diskfiles...
+ tx_objs = {}
+ tx_tombstones = {}
+ rx_objs = {}
+ tx_df_mgr = self.daemon._diskfile_router[policy]
+ rx_df_mgr = self.rx_controller._diskfile_router[policy]
+ # o1 only has primary
+ t1 = self.ts_iter.next()
+ tx_objs['o1'] = self._create_ondisk_files(
+ tx_df_mgr, 'o1', policy, t1, (tx_node_index,))
+ # o2 only has primary
+ t2 = self.ts_iter.next()
+ tx_objs['o2'] = self._create_ondisk_files(
+ tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
+ # o3 only has primary
+ t3 = self.ts_iter.next()
+ tx_objs['o3'] = self._create_ondisk_files(
+ tx_df_mgr, 'o3', policy, t3, (tx_node_index,))
+ # o4 primary fragment archives on tx, handoff in sync on rx
+ t4 = self.ts_iter.next()
+ tx_objs['o4'] = self._create_ondisk_files(
+ tx_df_mgr, 'o4', policy, t4, (tx_node_index,))
+ rx_objs['o4'] = self._create_ondisk_files(
+ rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
+ # o5 is a tombstone, missing on receiver
+ t5 = self.ts_iter.next()
+ tx_tombstones['o5'] = self._create_ondisk_files(
+ tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
+ tx_tombstones['o5'][0].delete(t5)
+
+ suffixes = set()
+ for diskfiles in (tx_objs.values() + tx_tombstones.values()):
+ for df in diskfiles:
+ suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
+
+ reconstruct_fa_calls = []
+
+ def fake_reconstruct_fa(job, node, metadata):
+ reconstruct_fa_calls.append((job, node, policy, metadata))
+ if len(reconstruct_fa_calls) == 2:
+ # simulate second reconstruct failing
+ raise DiskFileError
+ content = '%s___%s' % (metadata['name'], rx_node_index)
+ return RebuildingECDiskFileStream(
+ metadata, rx_node_index, iter([content]))
+
+ # create ssync sender instance...
+ job = {'device': self.device,
+ 'partition': self.partition,
+ 'policy': policy,
+ 'frag_index': frag_index,
+ 'sync_diskfile_builder': fake_reconstruct_fa}
+ node = {'index': rx_node_index}
+ self.sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
+
+ # fake connection from tx to rx...
+ self.sender.connect = self.make_fake_ssync_connect(
+ self.sender, self.rx_controller, self.device, self.partition,
+ policy)
+
+ # run the sync protocol...
+ self.sender()
+
+ # verify protocol
+ results = self._analyze_trace(self.sender.connection.trace)
+ # sender has primary for o1, o2 and o3, o4 and ts for o5
+ self.assertEqual(5, len(results['tx_missing']))
+ # receiver is missing o1, o2 and o3 and ts for o5
+ self.assertEqual(4, len(results['rx_missing']))
+ # sender can only construct 2 out of 3 missing frags
+ self.assertEqual(3, len(results['tx_updates']))
+ self.assertEqual(3, len(reconstruct_fa_calls))
+ self.assertFalse(results['rx_updates'])
+ actual_sync_paths = []
+ for subreq in results.get('tx_updates'):
+ if subreq.get('method') == 'PUT':
+ self.assertTrue(
+ 'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
+ in subreq.get('headers'))
+ expected_body = '%s___%s' % (subreq['path'], rx_node_index)
+ self.assertEqual(expected_body, subreq['body'])
+ elif subreq.get('method') == 'DELETE':
+ self.assertEqual('/a/c/o5', subreq['path'])
+ actual_sync_paths.append(subreq.get('path'))
+
+ # remove the failed df from expected synced df's
+ expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5']
+ failed_path = reconstruct_fa_calls[1][3]['name']
+ expect_sync_paths.remove(failed_path)
+ failed_obj = None
+ for obj, diskfiles in tx_objs.iteritems():
+ if diskfiles[0]._name == failed_path:
+ failed_obj = obj
+ # sanity check
+ self.assertTrue(tx_objs.pop(failed_obj))
+
+ # verify on disk files...
+ self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
+ self._verify_ondisk_files(tx_objs, policy, rx_node_index)
+ self._verify_tombstones(tx_tombstones, policy)
+
+
if __name__ == '__main__':
unittest.main()