summaryrefslogtreecommitdiff
path: root/tests/unit/glacier/test_concurrent.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/glacier/test_concurrent.py')
-rw-r--r--tests/unit/glacier/test_concurrent.py56
1 files changed, 56 insertions, 0 deletions
diff --git a/tests/unit/glacier/test_concurrent.py b/tests/unit/glacier/test_concurrent.py
index 87a46a7b..b9f984e1 100644
--- a/tests/unit/glacier/test_concurrent.py
+++ b/tests/unit/glacier/test_concurrent.py
@@ -20,6 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
+import tempfile
from Queue import Queue
import mock
@@ -27,6 +28,8 @@ from tests.unit import unittest
from tests.unit import AWSMockServiceTestCase
from boto.glacier.concurrent import ConcurrentUploader, ConcurrentDownloader
+from boto.glacier.concurrent import UploadWorkerThread
+from boto.glacier.concurrent import _END_SENTINEL
class FakeThreadedConcurrentUploader(ConcurrentUploader):
@@ -40,6 +43,7 @@ class FakeThreadedConcurrentUploader(ConcurrentUploader):
for i in xrange(total_parts):
hash_chunks[i] = 'foo'
+
class FakeThreadedConcurrentDownloader(ConcurrentDownloader):
def _start_download_threads(self, results_queue, worker_queue):
self.results_queue = results_queue
@@ -116,5 +120,57 @@ class TestConcurrentUploader(unittest.TestCase):
self.assertEqual(len(items), 12)
+class TestUploaderThread(unittest.TestCase):
+ def setUp(self):
+ self.fileobj = tempfile.NamedTemporaryFile()
+ self.filename = self.fileobj.name
+
+ def test_fileobj_closed_when_thread_shuts_down(self):
+ thread = UploadWorkerThread(mock.Mock(), 'vault_name',
+ self.filename, 'upload_id',
+ Queue(), Queue())
+ fileobj = thread._fileobj
+ self.assertFalse(fileobj.closed)
+ # By settings should_continue to False, it should immediately
+ # exit, and we can still verify cleanup behavior.
+ thread.should_continue = False
+ thread.run()
+ self.assertTrue(fileobj.closed)
+
+ def test_upload_errors_have_exception_messages(self):
+ api = mock.Mock()
+ job_queue = Queue()
+ result_queue = Queue()
+ upload_thread = UploadWorkerThread(
+ api, 'vault_name', self.filename,
+ 'upload_id', job_queue, result_queue, num_retries=1,
+ time_between_retries=0)
+ api.upload_part.side_effect = Exception("exception message")
+ job_queue.put((0, 1024))
+ job_queue.put(_END_SENTINEL)
+
+ upload_thread.run()
+ result = result_queue.get(timeout=1)
+ self.assertIn("exception message", str(result))
+
+ def test_num_retries_is_obeyed(self):
+ # total attempts is 1 + num_retries so if I have num_retries of 2,
+ # I'll attempt the upload once, and if that fails I'll retry up to
+ # 2 more times for a total of 3 attempts.
+ api = mock.Mock()
+ job_queue = Queue()
+ result_queue = Queue()
+ upload_thread = UploadWorkerThread(
+ api, 'vault_name', self.filename,
+ 'upload_id', job_queue, result_queue, num_retries=2,
+ time_between_retries=0)
+ api.upload_part.side_effect = Exception()
+ job_queue.put((0, 1024))
+ job_queue.put(_END_SENTINEL)
+
+ upload_thread.run()
+ self.assertEqual(api.upload_part.call_count, 3)
+
+
if __name__ == '__main__':
unittest.main()