diff options
Diffstat (limited to 'tests/unit/glacier/test_concurrent.py')
-rw-r--r-- | tests/unit/glacier/test_concurrent.py | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/tests/unit/glacier/test_concurrent.py b/tests/unit/glacier/test_concurrent.py index 87a46a7b..b9f984e1 100644 --- a/tests/unit/glacier/test_concurrent.py +++ b/tests/unit/glacier/test_concurrent.py @@ -20,6 +20,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # +import tempfile from Queue import Queue import mock @@ -27,6 +28,8 @@ from tests.unit import unittest from tests.unit import AWSMockServiceTestCase from boto.glacier.concurrent import ConcurrentUploader, ConcurrentDownloader +from boto.glacier.concurrent import UploadWorkerThread +from boto.glacier.concurrent import _END_SENTINEL class FakeThreadedConcurrentUploader(ConcurrentUploader): @@ -40,6 +43,7 @@ class FakeThreadedConcurrentUploader(ConcurrentUploader): for i in xrange(total_parts): hash_chunks[i] = 'foo' + class FakeThreadedConcurrentDownloader(ConcurrentDownloader): def _start_download_threads(self, results_queue, worker_queue): self.results_queue = results_queue @@ -116,5 +120,57 @@ class TestConcurrentUploader(unittest.TestCase): self.assertEqual(len(items), 12) +class TestUploaderThread(unittest.TestCase): + def setUp(self): + self.fileobj = tempfile.NamedTemporaryFile() + self.filename = self.fileobj.name + + def test_fileobj_closed_when_thread_shuts_down(self): + thread = UploadWorkerThread(mock.Mock(), 'vault_name', + self.filename, 'upload_id', + Queue(), Queue()) + fileobj = thread._fileobj + self.assertFalse(fileobj.closed) + # By settings should_continue to False, it should immediately + # exit, and we can still verify cleanup behavior. + thread.should_continue = False + thread.run() + self.assertTrue(fileobj.closed) + + def test_upload_errors_have_exception_messages(self): + api = mock.Mock() + job_queue = Queue() + result_queue = Queue() + upload_thread = UploadWorkerThread( + api, 'vault_name', self.filename, + 'upload_id', job_queue, result_queue, num_retries=1, + time_between_retries=0) + api.upload_part.side_effect = Exception("exception message") + job_queue.put((0, 1024)) + job_queue.put(_END_SENTINEL) + + upload_thread.run() + result = result_queue.get(timeout=1) + self.assertIn("exception message", str(result)) + + def test_num_retries_is_obeyed(self): + # total attempts is 1 + num_retries so if I have num_retries of 2, + # I'll attempt the upload once, and if that fails I'll retry up to + # 2 more times for a total of 3 attempts. + api = mock.Mock() + job_queue = Queue() + result_queue = Queue() + upload_thread = UploadWorkerThread( + api, 'vault_name', self.filename, + 'upload_id', job_queue, result_queue, num_retries=2, + time_between_retries=0) + api.upload_part.side_effect = Exception() + job_queue.put((0, 1024)) + job_queue.put(_END_SENTINEL) + + upload_thread.run() + self.assertEqual(api.upload_part.call_count, 3) + + if __name__ == '__main__': unittest.main() |