diff options
Diffstat (limited to 'boto/glacier/concurrent.py')
-rw-r--r-- | boto/glacier/concurrent.py | 21 |
1 files changed, 18 insertions, 3 deletions
diff --git a/boto/glacier/concurrent.py b/boto/glacier/concurrent.py index a956f066..af727ec2 100644 --- a/boto/glacier/concurrent.py +++ b/boto/glacier/concurrent.py @@ -96,6 +96,11 @@ class ConcurrentUploader(ConcurrentTransferer): the archive parts. The part size must be a megabyte multiplied by a power of two. + :type num_threads: int + :param num_threads: The number of threads to spawn for the thread pool. + The number of threads will control how much parts are being + concurrently uploaded. + """ super(ConcurrentUploader, self).__init__(part_size, num_threads) self._api = api @@ -194,13 +199,18 @@ class TransferThread(threading.Thread): except Empty: continue if work is _END_SENTINEL: + self._cleanup() return result = self._process_chunk(work) self._result_queue.put(result) + self._cleanup() def _process_chunk(self, work): pass + def _cleanup(self): + pass + class UploadWorkerThread(TransferThread): def __init__(self, api, vault_name, filename, upload_id, @@ -219,14 +229,16 @@ class UploadWorkerThread(TransferThread): def _process_chunk(self, work): result = None - for _ in xrange(self._num_retries): + for i in xrange(self._num_retries + 1): try: result = self._upload_chunk(work) break except self._retry_exceptions, e: log.error("Exception caught uploading part number %s for " - "vault %s, filename: %s", work[0], self._vault_name, - self._filename) + "vault %s, attempt: (%s / %s), filename: %s, " + "exception: %s, msg: %s", + work[0], self._vault_name, i + 1, self._num_retries + 1, + self._filename, e.__class__, e) time.sleep(self._time_between_retries) result = e return result @@ -248,6 +260,9 @@ class UploadWorkerThread(TransferThread): response.read() return (part_number, tree_hash_bytes) + def _cleanup(self): + self._fileobj.close() + class ConcurrentDownloader(ConcurrentTransferer): """ |