summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVineeth Pillai <vineeth.pillai@nexenta.com>2011-06-01 09:45:54 -0700
committerNexenta Systems <nexenta@ncs3.(none)>2011-06-01 09:45:54 -0700
commitffdf67eb244d9ecef35a3f0d3632f8fc67dfa77a (patch)
tree056b583f40f247465a987e560fcc8a568e9c53c4
parentdad8438f8dc5ebf21a830d99f432dd6ada1fb363 (diff)
downloadboto-ffdf67eb244d9ecef35a3f0d3632f8fc67dfa77a.tar.gz
Incorporated Code Review comments from Michael Schwartz
-rw-r--r--boto/gs/key.py72
-rw-r--r--boto/provider.py6
-rw-r--r--boto/s3/key.py97
-rw-r--r--tests/s3/test_gsconnection.py24
4 files changed, 127 insertions, 72 deletions
diff --git a/boto/gs/key.py b/boto/gs/key.py
index 14a32bc1..87d02d73 100644
--- a/boto/gs/key.py
+++ b/boto/gs/key.py
@@ -107,6 +107,64 @@ class Key(S3Key):
acl.add_group_grant(permission, group_id)
self.set_acl(acl)
+ def set_contents_from_stream(self, fp, headers=None, replace=True,
+ cb=None, num_cb=10, policy=None, query_args=None):
+ """
+ Store an object using the name of the Key object as the key in
+ Google Storage and the contents of the data stream pointed to by 'fp' as
+ the contents.
+ The stream object is usually not seekable.
+
+ :type fp: file
+ :param fp: the file whose contents are to be uploaded
+
+ :type headers: dict
+ :param headers: additional HTTP headers to be sent with the PUT request.
+
+ :type replace: bool
+ :param replace: If this parameter is False, the method will first check
+ to see if an object exists in the bucket with the same key. If it
+ does, it won't overwrite it. The default value is True which will
+ overwrite the object.
+
+ :type cb: function
+ :param cb: a callback function that will be called to report
+ progress on the upload. The callback should accept two integer
+ parameters, the first representing the number of bytes that have
+ been successfully transmitted to GS and the second representing the
+ total number of bytes that need to be transmitted.
+
+ :type num_cb: int
+ :param num_cb: (optional) If a callback is specified with the cb
+ parameter, this parameter determines the granularity of the callback
+ by defining the maximum number of times the callback will be called
+ during the file transfer.
+
+ :type policy: :class:`boto.gs.acl.CannedACLStrings`
+ :param policy: A canned ACL policy that will be applied to the new key
+ in GS.
+ """
+
+ # Name of the Object should be specified explicitly for Streams.
+ if not self.name or self.name == '':
+ raise BotoClientError("Cannot determine the name of the stream")
+
+ if headers is None:
+ headers = {}
+ if policy:
+ headers[provider.acl_header] = policy
+
+ # Set the Transfer Encoding for Streams.
+ headers['Transfer-Encoding'] = 'chunked'
+
+ if self.bucket != None:
+ if not replace:
+ k = self.bucket.lookup(self.name)
+ if k:
+ return
+ #TODO: Add support for resumable upload for Streams.
+ self.send_file(fp, headers, cb, num_cb, query_args, chunked_transfer=True)
+
def set_contents_from_file(self, fp, headers=None, replace=True,
cb=None, num_cb=10, policy=None, md5=None,
res_upload_handler=None):
@@ -168,18 +226,6 @@ class Key(S3Key):
headers[provider.acl_header] = policy
if hasattr(fp, 'name'):
self.path = fp.name
-
- # GS support supports chunked transfer, but should be disabled for
- # resumable upload. Resumable upload demands that size of object
- # should be known before hand and in such scenarios, chunked transfer
- # doesn't really make sense.
- chunked_transfer = False
- if self.can_do_chunked_transfer(headers):
- if res_upload_handler:
- del headers['Transfer-Encoding']
- else:
- chunked_transfer = True
-
if self.bucket != None:
if not md5:
md5 = self.compute_md5(fp)
@@ -200,7 +246,7 @@ class Key(S3Key):
res_upload_handler.send_file(self, fp, headers, cb, num_cb)
else:
# Not a resumable transfer so use basic send_file mechanism.
- self.send_file(fp, headers, cb, num_cb, chunked_transfer=chunked_transfer)
+ self.send_file(fp, headers, cb, num_cb)
def set_contents_from_filename(self, filename, headers=None, replace=True,
cb=None, num_cb=10, policy=None, md5=None,
diff --git a/boto/provider.py b/boto/provider.py
index 1ef3d861..b6eccd49 100644
--- a/boto/provider.py
+++ b/boto/provider.py
@@ -205,6 +205,12 @@ class Provider(object):
def get_provider_name(self):
return self.HostKeyMap[self.name]
+ def supports_chunked_transfer(self):
+ if self.name == 'aws':
+ return False
+ else:
+ return True
+
# Static utility method for getting default Provider.
def get_default():
return Provider('aws')
diff --git a/boto/s3/key.py b/boto/s3/key.py
index 829966a5..438bdd7f 100644
--- a/boto/s3/key.py
+++ b/boto/s3/key.py
@@ -425,7 +425,7 @@ class Key(object):
response_headers)
def send_file(self, fp, headers=None, cb=None, num_cb=10, query_args=None,
- chunked_transfer=False):
+ chunked_transfer=False):
"""
Upload a file to a key into a bucket on S3.
@@ -460,7 +460,7 @@ class Key(object):
for key in headers:
http_conn.putheader(key, headers[key])
http_conn.endheaders()
- if self.md5:
+ if chunked_transfer:
fp.seek(0)
save_debug = self.bucket.connection.debug
self.bucket.connection.debug = 0
@@ -471,7 +471,7 @@ class Key(object):
if getattr(http_conn, 'debuglevel', 0) < 3:
http_conn.set_debuglevel(0)
if cb:
- if not self.size and chunked_transfer:
+ if chunked_transfer:
# For chunked Transfer, if size is not known, we call
# the cb for every 1MB of data transferred.
cb_count = (1024 * 1024)/self.BufferSize
@@ -497,22 +497,22 @@ class Key(object):
if i == cb_count or cb_count == -1:
cb(total_bytes, self.size)
i = 0
- if not self.md5:
+ if chunked_transfer:
m.update(l)
l = fp.read(self.BufferSize)
if chunked_transfer:
http_conn.send('0\r\n')
http_conn.send('\r\n')
- if not self.size:
+ if cb:
self.size = total_bytes
+ # Get the md5 which is calculated on the fly.
+ self.md5 = m.hexdigest()
+ else:
+ fp.seek(0)
if cb:
cb(total_bytes, self.size)
response = http_conn.getresponse()
body = response.read()
- if not self.md5:
- self.md5 = m.hexdigest()
- else:
- fp.seek(0)
http_conn.set_debuglevel(save_debug)
self.bucket.connection.debug = save_debug
if response.status == 500 or response.status == 503 or \
@@ -585,54 +585,73 @@ class Key(object):
fp.seek(0)
return (hex_md5, base64md5)
- def can_do_chunked_transfer(self, headers):
- """
- Helper method to determine whether chunked transfer is supported.
- Chunked Transfer can be supported if
- 1) Header contains 'Transfer-Encoding: chunked'
- 2) Cloud provider is not S3
- """
- if headers.has_key('Transfer-Encoding') and \
- headers['Transfer-Encoding'].strip() == 'chunked':
- # S3 doesn't support chunked transfer.
- if self.provider.name == 'aws':
- del headers['Transfer-Encoding']
- return False
- else:
- headers['Transfer-Encoding'] = 'chunked'
- return True
- else:
- return False
-
def set_contents_from_stream(self, fp, headers=None, replace=True,
- cb=None, num_cb=10, policy=None, md5=None,
+ cb=None, num_cb=10, policy=None,
reduced_redundancy=False, query_args=None):
"""
Store an object using the name of the Key object as the key in
cloud and the conntents of the data stream pointed to by 'fp' as
the contents.
- The stream can be seekable just like a file object or cannot be
- seekable like a pipe.
+ The stream object is usually not seekable.
+
+ :type fp: file
+ :param fp: the file whose contents are to be uploaded
+
+ :type headers: dict
+ :param headers: additional HTTP headers to be sent with the PUT request.
+
+ :type replace: bool
+ :param replace: If this parameter is False, the method will first check
+ to see if an object exists in the bucket with the same key. If it
+ does, it won't overwrite it. The default value is True which will
+ overwrite the object.
+
+ :type cb: function
+ :param cb: a callback function that will be called to report
+ progress on the upload. The callback should accept two integer
+ parameters, the first representing the number of bytes that have
+ been successfully transmitted to GS and the second representing the
+ total number of bytes that need to be transmitted.
+
+ :type num_cb: int
+ :param num_cb: (optional) If a callback is specified with the cb
+ parameter, this parameter determines the granularity of the callback
+ by defining the maximum number of times the callback will be called
+ during the file transfer.
+
+ :type policy: :class:`boto.gs.acl.CannedACLStrings`
+ :param policy: A canned ACL policy that will be applied to the new key
+ in GS.
+
+ :type reduced_redundancy: bool
+ :param reduced_redundancy: If True, this will set the storage
+ class of the new Key to be
+ REDUCED_REDUNDANCY. The Reduced Redundancy
+ Storage (RRS) feature of S3, provides lower
+ redundancy at lower storage cost.
"""
- if not self.can_do_chunked_transfer(headers):
- raise BotoClientError("Object is not seekable and chunked transfer not possible")
+ provider = self.bucket.connection.provider
+ if not provider.supports_chunked_transfer():
+ raise BotoClientError("Provider does not support chunked transfer")
+ # Name of the Object should be specified explicitly for Streams.
if not self.name or self.name == '':
- raise BotoClientError("Cannot determine the name of the object")
+ raise BotoClientError("Cannot determine the name of the stream")
- provider = self.bucket.connection.provider
if headers is None:
headers = {}
if policy:
headers[provider.acl_header] = policy
+ # Set the Transfer Encoding for Streams.
+ headers['Transfer-Encoding'] = 'chunked'
+
if reduced_redundancy:
self.storage_class = 'REDUCED_REDUNDANCY'
if provider.storage_class_header:
headers[provider.storage_class_header] = self.storage_class
- # TODO - What if provider doesn't support reduced reduncancy?
- # What if different providers provide different classes?
+
if self.bucket != None:
if not replace:
k = self.bucket.lookup(self.name)
@@ -705,10 +724,6 @@ class Key(object):
if policy:
headers[provider.acl_header] = policy
- # Check for chunked Transfer support
- if self.can_do_chunked_transfer(headers):
- chunked_transfer = True
-
if reduced_redundancy:
self.storage_class = 'REDUCED_REDUNDANCY'
if provider.storage_class_header:
diff --git a/tests/s3/test_gsconnection.py b/tests/s3/test_gsconnection.py
index c19fa386..196ed93c 100644
--- a/tests/s3/test_gsconnection.py
+++ b/tests/s3/test_gsconnection.py
@@ -75,30 +75,18 @@ class GSConnectionTest (unittest.TestCase):
md5 = k.md5
k.set_contents_from_string(s2)
assert k.md5 != md5
- # Test for chunked-transfer
- headers = {'Transfer-Encoding':'chunked'}
- k.md5 = None
- k.base64md5 = None
- k.set_contents_from_filename('foobar', headers=headers)
- fp = open('foobar1', 'wb')
- k.get_contents_to_file(fp)
- fp.close()
- fp1 = open('foobar', 'rb')
- fp2 = open('foobar1', 'rb')
- assert (fp1.read() == fp2.read()), 'Chunked Transfer corrupted the Data'
- fp2.close()
# Test for stream API
- fp1.seek(0,0)
+ fp2 = open('foobar', 'rb')
k.md5 = None
k.base64md5 = None
- k.set_contents_from_stream(fp1, headers=headers)
+ k.set_contents_from_stream(fp2, headers=headers)
fp = open('foobar1', 'wb')
k.get_contents_to_file(fp)
fp.close()
- fp1.seek(0,0)
- fp2 = open('foobar1', 'rb')
- assert (fp1.read() == fp2.read()), 'Chunked Transfer corrupted the Data'
- fp1.close()
+ fp2.seek(0,0)
+ fp = open('foobar1', 'rb')
+ assert (fp2.read() == fp.read()), 'Chunked Transfer corrupted the Data'
+ fp.close()
fp2.close()
os.unlink('foobar1')
os.unlink('foobar')