#!/usr/bin/env python # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, dis- # tribute, sublicense, and/or sell copies of the Software, and to permit # persons to whom the Software is furnished to do so, subject to the fol- # lowing conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # from boto.compat import StringIO from tests.compat import mock, unittest ANY = mock.ANY from boto.glacier import vault from boto.glacier.job import Job from boto.glacier.response import GlacierResponse class TestVault(unittest.TestCase): def setUp(self): self.size_patch = mock.patch('os.path.getsize') self.getsize = self.size_patch.start() self.api = mock.Mock() self.vault = vault.Vault(self.api, None) self.vault.name = 'myvault' self.mock_open = mock.mock_open() stringio = StringIO('content') self.mock_open.return_value.read = stringio.read def tearDown(self): self.size_patch.stop() def test_upload_archive_small_file(self): self.getsize.return_value = 1 self.api.upload_archive.return_value = {'ArchiveId': 'archive_id'} with mock.patch('boto.glacier.vault.open', self.mock_open, create=True): archive_id = self.vault.upload_archive( 'filename', 'my description') self.assertEqual(archive_id, 'archive_id') self.api.upload_archive.assert_called_with( 'myvault', self.mock_open.return_value, mock.ANY, mock.ANY, 'my description') def test_small_part_size_is_obeyed(self): self.vault.DefaultPartSize = 2 * 1024 * 1024 self.vault.create_archive_writer = mock.Mock() self.getsize.return_value = 1 with mock.patch('boto.glacier.vault.open', self.mock_open, create=True): self.vault.create_archive_from_file('myfile') # The write should be created with the default part size of the # instance (2 MB). self.vault.create_archive_writer.assert_called_with( description=mock.ANY, part_size=self.vault.DefaultPartSize) def test_large_part_size_is_obeyed(self): self.vault.DefaultPartSize = 8 * 1024 * 1024 self.vault.create_archive_writer = mock.Mock() self.getsize.return_value = 1 with mock.patch('boto.glacier.vault.open', self.mock_open, create=True): self.vault.create_archive_from_file('myfile') # The write should be created with the default part size of the # instance (8 MB). self.vault.create_archive_writer.assert_called_with( description=mock.ANY, part_size=self.vault.DefaultPartSize) def test_part_size_needs_to_be_adjusted(self): # If we have a large file (400 GB) self.getsize.return_value = 400 * 1024 * 1024 * 1024 self.vault.create_archive_writer = mock.Mock() # When we try to upload the file. with mock.patch('boto.glacier.vault.open', self.mock_open, create=True): self.vault.create_archive_from_file('myfile') # We should automatically bump up the part size used to # 64 MB. expected_part_size = 64 * 1024 * 1024 self.vault.create_archive_writer.assert_called_with( description=mock.ANY, part_size=expected_part_size) def test_retrieve_inventory(self): class FakeResponse(object): status = 202 def getheader(self, key, default=None): if key == 'x-amz-job-id': return 'HkF9p6' elif key == 'Content-Type': return 'application/json' return 'something' def read(self, amt=None): return b"""{ "Action": "ArchiveRetrieval", "ArchiveId": "NkbByEejwEggmBz2fTHgJrg0XBoDfjP4q6iu87-EXAMPLEArchiveId", "ArchiveSizeInBytes": 16777216, "ArchiveSHA256TreeHash": "beb0fe31a1c7ca8c6c04d574ea906e3f97", "Completed": false, "CreationDate": "2012-05-15T17:21:39.339Z", "CompletionDate": "2012-05-15T17:21:43.561Z", "InventorySizeInBytes": null, "JobDescription": "My ArchiveRetrieval Job", "JobId": "HkF9p6", "RetrievalByteRange": "0-16777215", "SHA256TreeHash": "beb0fe31a1c7ca8c6c04d574ea906e3f97b31fd", "SNSTopic": "arn:aws:sns:us-east-1:012345678901:mytopic", "StatusCode": "InProgress", "StatusMessage": "Operation in progress.", "VaultARN": "arn:aws:glacier:us-east-1:012345678901:vaults/examplevault" }""" raw_resp = FakeResponse() init_resp = GlacierResponse(raw_resp, [('x-amz-job-id', 'JobId')]) raw_resp_2 = FakeResponse() desc_resp = GlacierResponse(raw_resp_2, []) with mock.patch.object(self.vault.layer1, 'initiate_job', return_value=init_resp): with mock.patch.object(self.vault.layer1, 'describe_job', return_value=desc_resp): # The old/back-compat variant of the call. self.assertEqual(self.vault.retrieve_inventory(), 'HkF9p6') # The variant the returns a full ``Job`` object. job = self.vault.retrieve_inventory_job() self.assertTrue(isinstance(job, Job)) self.assertEqual(job.id, 'HkF9p6') class TestConcurrentUploads(unittest.TestCase): def test_concurrent_upload_file(self): v = vault.Vault(None, None) with mock.patch('boto.glacier.vault.ConcurrentUploader') as c: c.return_value.upload.return_value = 'archive_id' archive_id = v.concurrent_create_archive_from_file( 'filename', 'my description') c.return_value.upload.assert_called_with('filename', 'my description') self.assertEqual(archive_id, 'archive_id') def test_concurrent_upload_forwards_kwargs(self): v = vault.Vault(None, None) with mock.patch('boto.glacier.vault.ConcurrentUploader') as c: c.return_value.upload.return_value = 'archive_id' archive_id = v.concurrent_create_archive_from_file( 'filename', 'my description', num_threads=10, part_size=1024 * 1024 * 1024 * 8) c.assert_called_with(None, None, num_threads=10, part_size=1024 * 1024 * 1024 * 8) if __name__ == '__main__': unittest.main()