diff options
author | Vineeth Pillai <vineeth.pillai@nexenta.com> | 2011-06-01 09:45:54 -0700 |
---|---|---|
committer | Nexenta Systems <nexenta@ncs3.(none)> | 2011-06-01 09:45:54 -0700 |
commit | ffdf67eb244d9ecef35a3f0d3632f8fc67dfa77a (patch) | |
tree | 056b583f40f247465a987e560fcc8a568e9c53c4 | |
parent | dad8438f8dc5ebf21a830d99f432dd6ada1fb363 (diff) | |
download | boto-ffdf67eb244d9ecef35a3f0d3632f8fc67dfa77a.tar.gz |
Incorporated Code Review comments from Michael Schwartz
-rw-r--r-- | boto/gs/key.py | 72 | ||||
-rw-r--r-- | boto/provider.py | 6 | ||||
-rw-r--r-- | boto/s3/key.py | 97 | ||||
-rw-r--r-- | tests/s3/test_gsconnection.py | 24 |
4 files changed, 127 insertions, 72 deletions
diff --git a/boto/gs/key.py b/boto/gs/key.py index 14a32bc1..87d02d73 100644 --- a/boto/gs/key.py +++ b/boto/gs/key.py @@ -107,6 +107,64 @@ class Key(S3Key): acl.add_group_grant(permission, group_id) self.set_acl(acl) + def set_contents_from_stream(self, fp, headers=None, replace=True, + cb=None, num_cb=10, policy=None, query_args=None): + """ + Store an object using the name of the Key object as the key in + Google Storage and the contents of the data stream pointed to by 'fp' as + the contents. + The stream object is usually not seekable. + + :type fp: file + :param fp: the file whose contents are to be uploaded + + :type headers: dict + :param headers: additional HTTP headers to be sent with the PUT request. + + :type replace: bool + :param replace: If this parameter is False, the method will first check + to see if an object exists in the bucket with the same key. If it + does, it won't overwrite it. The default value is True which will + overwrite the object. + + :type cb: function + :param cb: a callback function that will be called to report + progress on the upload. The callback should accept two integer + parameters, the first representing the number of bytes that have + been successfully transmitted to GS and the second representing the + total number of bytes that need to be transmitted. + + :type num_cb: int + :param num_cb: (optional) If a callback is specified with the cb + parameter, this parameter determines the granularity of the callback + by defining the maximum number of times the callback will be called + during the file transfer. + + :type policy: :class:`boto.gs.acl.CannedACLStrings` + :param policy: A canned ACL policy that will be applied to the new key + in GS. + """ + + # Name of the Object should be specified explicitly for Streams. + if not self.name or self.name == '': + raise BotoClientError("Cannot determine the name of the stream") + + if headers is None: + headers = {} + if policy: + headers[provider.acl_header] = policy + + # Set the Transfer Encoding for Streams. + headers['Transfer-Encoding'] = 'chunked' + + if self.bucket != None: + if not replace: + k = self.bucket.lookup(self.name) + if k: + return + #TODO: Add support for resumable upload for Streams. + self.send_file(fp, headers, cb, num_cb, query_args, chunked_transfer=True) + def set_contents_from_file(self, fp, headers=None, replace=True, cb=None, num_cb=10, policy=None, md5=None, res_upload_handler=None): @@ -168,18 +226,6 @@ class Key(S3Key): headers[provider.acl_header] = policy if hasattr(fp, 'name'): self.path = fp.name - - # GS support supports chunked transfer, but should be disabled for - # resumable upload. Resumable upload demands that size of object - # should be known before hand and in such scenarios, chunked transfer - # doesn't really make sense. - chunked_transfer = False - if self.can_do_chunked_transfer(headers): - if res_upload_handler: - del headers['Transfer-Encoding'] - else: - chunked_transfer = True - if self.bucket != None: if not md5: md5 = self.compute_md5(fp) @@ -200,7 +246,7 @@ class Key(S3Key): res_upload_handler.send_file(self, fp, headers, cb, num_cb) else: # Not a resumable transfer so use basic send_file mechanism. - self.send_file(fp, headers, cb, num_cb, chunked_transfer=chunked_transfer) + self.send_file(fp, headers, cb, num_cb) def set_contents_from_filename(self, filename, headers=None, replace=True, cb=None, num_cb=10, policy=None, md5=None, diff --git a/boto/provider.py b/boto/provider.py index 1ef3d861..b6eccd49 100644 --- a/boto/provider.py +++ b/boto/provider.py @@ -205,6 +205,12 @@ class Provider(object): def get_provider_name(self): return self.HostKeyMap[self.name] + def supports_chunked_transfer(self): + if self.name == 'aws': + return False + else: + return True + # Static utility method for getting default Provider. def get_default(): return Provider('aws') diff --git a/boto/s3/key.py b/boto/s3/key.py index 829966a5..438bdd7f 100644 --- a/boto/s3/key.py +++ b/boto/s3/key.py @@ -425,7 +425,7 @@ class Key(object): response_headers) def send_file(self, fp, headers=None, cb=None, num_cb=10, query_args=None, - chunked_transfer=False): + chunked_transfer=False): """ Upload a file to a key into a bucket on S3. @@ -460,7 +460,7 @@ class Key(object): for key in headers: http_conn.putheader(key, headers[key]) http_conn.endheaders() - if self.md5: + if chunked_transfer: fp.seek(0) save_debug = self.bucket.connection.debug self.bucket.connection.debug = 0 @@ -471,7 +471,7 @@ class Key(object): if getattr(http_conn, 'debuglevel', 0) < 3: http_conn.set_debuglevel(0) if cb: - if not self.size and chunked_transfer: + if chunked_transfer: # For chunked Transfer, if size is not known, we call # the cb for every 1MB of data transferred. cb_count = (1024 * 1024)/self.BufferSize @@ -497,22 +497,22 @@ class Key(object): if i == cb_count or cb_count == -1: cb(total_bytes, self.size) i = 0 - if not self.md5: + if chunked_transfer: m.update(l) l = fp.read(self.BufferSize) if chunked_transfer: http_conn.send('0\r\n') http_conn.send('\r\n') - if not self.size: + if cb: self.size = total_bytes + # Get the md5 which is calculated on the fly. + self.md5 = m.hexdigest() + else: + fp.seek(0) if cb: cb(total_bytes, self.size) response = http_conn.getresponse() body = response.read() - if not self.md5: - self.md5 = m.hexdigest() - else: - fp.seek(0) http_conn.set_debuglevel(save_debug) self.bucket.connection.debug = save_debug if response.status == 500 or response.status == 503 or \ @@ -585,54 +585,73 @@ class Key(object): fp.seek(0) return (hex_md5, base64md5) - def can_do_chunked_transfer(self, headers): - """ - Helper method to determine whether chunked transfer is supported. - Chunked Transfer can be supported if - 1) Header contains 'Transfer-Encoding: chunked' - 2) Cloud provider is not S3 - """ - if headers.has_key('Transfer-Encoding') and \ - headers['Transfer-Encoding'].strip() == 'chunked': - # S3 doesn't support chunked transfer. - if self.provider.name == 'aws': - del headers['Transfer-Encoding'] - return False - else: - headers['Transfer-Encoding'] = 'chunked' - return True - else: - return False - def set_contents_from_stream(self, fp, headers=None, replace=True, - cb=None, num_cb=10, policy=None, md5=None, + cb=None, num_cb=10, policy=None, reduced_redundancy=False, query_args=None): """ Store an object using the name of the Key object as the key in cloud and the conntents of the data stream pointed to by 'fp' as the contents. - The stream can be seekable just like a file object or cannot be - seekable like a pipe. + The stream object is usually not seekable. + + :type fp: file + :param fp: the file whose contents are to be uploaded + + :type headers: dict + :param headers: additional HTTP headers to be sent with the PUT request. + + :type replace: bool + :param replace: If this parameter is False, the method will first check + to see if an object exists in the bucket with the same key. If it + does, it won't overwrite it. The default value is True which will + overwrite the object. + + :type cb: function + :param cb: a callback function that will be called to report + progress on the upload. The callback should accept two integer + parameters, the first representing the number of bytes that have + been successfully transmitted to GS and the second representing the + total number of bytes that need to be transmitted. + + :type num_cb: int + :param num_cb: (optional) If a callback is specified with the cb + parameter, this parameter determines the granularity of the callback + by defining the maximum number of times the callback will be called + during the file transfer. + + :type policy: :class:`boto.gs.acl.CannedACLStrings` + :param policy: A canned ACL policy that will be applied to the new key + in GS. + + :type reduced_redundancy: bool + :param reduced_redundancy: If True, this will set the storage + class of the new Key to be + REDUCED_REDUNDANCY. The Reduced Redundancy + Storage (RRS) feature of S3, provides lower + redundancy at lower storage cost. """ - if not self.can_do_chunked_transfer(headers): - raise BotoClientError("Object is not seekable and chunked transfer not possible") + provider = self.bucket.connection.provider + if not provider.supports_chunked_transfer(): + raise BotoClientError("Provider does not support chunked transfer") + # Name of the Object should be specified explicitly for Streams. if not self.name or self.name == '': - raise BotoClientError("Cannot determine the name of the object") + raise BotoClientError("Cannot determine the name of the stream") - provider = self.bucket.connection.provider if headers is None: headers = {} if policy: headers[provider.acl_header] = policy + # Set the Transfer Encoding for Streams. + headers['Transfer-Encoding'] = 'chunked' + if reduced_redundancy: self.storage_class = 'REDUCED_REDUNDANCY' if provider.storage_class_header: headers[provider.storage_class_header] = self.storage_class - # TODO - What if provider doesn't support reduced reduncancy? - # What if different providers provide different classes? + if self.bucket != None: if not replace: k = self.bucket.lookup(self.name) @@ -705,10 +724,6 @@ class Key(object): if policy: headers[provider.acl_header] = policy - # Check for chunked Transfer support - if self.can_do_chunked_transfer(headers): - chunked_transfer = True - if reduced_redundancy: self.storage_class = 'REDUCED_REDUNDANCY' if provider.storage_class_header: diff --git a/tests/s3/test_gsconnection.py b/tests/s3/test_gsconnection.py index c19fa386..196ed93c 100644 --- a/tests/s3/test_gsconnection.py +++ b/tests/s3/test_gsconnection.py @@ -75,30 +75,18 @@ class GSConnectionTest (unittest.TestCase): md5 = k.md5 k.set_contents_from_string(s2) assert k.md5 != md5 - # Test for chunked-transfer - headers = {'Transfer-Encoding':'chunked'} - k.md5 = None - k.base64md5 = None - k.set_contents_from_filename('foobar', headers=headers) - fp = open('foobar1', 'wb') - k.get_contents_to_file(fp) - fp.close() - fp1 = open('foobar', 'rb') - fp2 = open('foobar1', 'rb') - assert (fp1.read() == fp2.read()), 'Chunked Transfer corrupted the Data' - fp2.close() # Test for stream API - fp1.seek(0,0) + fp2 = open('foobar', 'rb') k.md5 = None k.base64md5 = None - k.set_contents_from_stream(fp1, headers=headers) + k.set_contents_from_stream(fp2, headers=headers) fp = open('foobar1', 'wb') k.get_contents_to_file(fp) fp.close() - fp1.seek(0,0) - fp2 = open('foobar1', 'rb') - assert (fp1.read() == fp2.read()), 'Chunked Transfer corrupted the Data' - fp1.close() + fp2.seek(0,0) + fp = open('foobar1', 'rb') + assert (fp2.read() == fp.read()), 'Chunked Transfer corrupted the Data' + fp.close() fp2.close() os.unlink('foobar1') os.unlink('foobar') |