diff options
Diffstat (limited to 'boto/s3/key.py')
-rw-r--r-- | boto/s3/key.py | 238 |
1 files changed, 174 insertions, 64 deletions
diff --git a/boto/s3/key.py b/boto/s3/key.py index 7bc700aa..15e32281 100644 --- a/boto/s3/key.py +++ b/boto/s3/key.py @@ -26,6 +26,7 @@ import re import rfc822 import StringIO import base64 +import math import urllib import boto.utils from boto.exception import BotoClientError @@ -465,12 +466,17 @@ class Key(object): expires_in_absolute) def send_file(self, fp, headers=None, cb=None, num_cb=10, - query_args=None, chunked_transfer=False): + query_args=None, chunked_transfer=False, size=None): """ Upload a file to a key into a bucket on S3. :type fp: file - :param fp: The file pointer to upload + :param fp: The file pointer to upload. The file pointer must point + point at the offset from which you wish to upload. + ie. if uploading the full file, it should point at the + start of the file. Normally when a file is opened for + reading, the fp will point at the first byte. See the + bytes parameter below for more info. :type headers: dict :param headers: The headers to pass along with the PUT request @@ -491,21 +497,44 @@ class Key(object): transfer. Providing a negative integer will cause your callback to be called with each buffer read. + :type size: int + :param size: (optional) The Maximum number of bytes to read from + the file pointer (fp). This is useful when uploading + a file in multiple parts where you are splitting the + file up into different ranges to be uploaded. If not + specified, the default behaviour is to read all bytes + from the file pointer. Less bytes may be available. """ provider = self.bucket.connection.provider + try: + spos = fp.tell() + except IOError: + spos = None + self.read_from_stream = False # TODO: Replace this with requests implementation, once it exists. def sender(http_conn, method, path, data, headers): + # This function is called repeatedly for temporary retries + # so we must be sure the file pointer is pointing at the + # start of the data. + if spos is not None and spos != fp.tell(): + fp.seek(spos) + elif spos is None and self.read_from_stream: + # if seek is not supported, and we've read from this + # stream already, then we need to abort retries to + # avoid setting bad data. + raise provider.storage_data_error( + 'Cannot retry failed request. fp does not support seeking.') + http_conn.putrequest(method, path) for key in headers: http_conn.putheader(key, headers[key]) http_conn.endheaders() - if chunked_transfer: - # MD5 for the stream has to be calculated on the fly, as - # we don't know the size of the stream before hand. + if chunked_transfer and not self.base64md5: + # MD5 for the stream has to be calculated on the fly. m = md5() else: - fp.seek(0) + m = None save_debug = self.bucket.connection.debug self.bucket.connection.debug = 0 @@ -515,48 +544,74 @@ class Key(object): # Use the getattr approach to allow this to work in AppEngine. if getattr(http_conn, 'debuglevel', 0) < 3: http_conn.set_debuglevel(0) + + data_len = 0 if cb: - if chunked_transfer: + if size: + cb_size = size + elif self.size: + cb_size = self.size + else: + cb_size = 0 + if chunked_transfer and cb_size == 0: # For chunked Transfer, we call the cb for every 1MB - # of data transferred. + # of data transferred, except when we know size. cb_count = (1024 * 1024)/self.BufferSize - self.size = 0 - elif num_cb > 2: - cb_count = self.size / self.BufferSize / (num_cb-2) + elif num_cb > 1: + cb_count = int(math.ceil(cb_size/self.BufferSize/(num_cb-1.0))) elif num_cb < 0: cb_count = -1 else: cb_count = 0 - i = total_bytes = 0 - cb(total_bytes, self.size) - l = fp.read(self.BufferSize) - while len(l) > 0: + i = 0 + cb(data_len, cb_size) + + bytes_togo = size + if bytes_togo and bytes_togo < self.BufferSize: + chunk = fp.read(bytes_togo) + else: + chunk = fp.read(self.BufferSize) + if spos is None: + # read at least something from a non-seekable fp. + self.read_from_stream = True + while chunk: + chunk_len = len(chunk) + data_len += chunk_len if chunked_transfer: - http_conn.send('%x;\r\n' % len(l)) - http_conn.send(l) + http_conn.send('%x;\r\n' % chunk_len) + http_conn.send(chunk) http_conn.send('\r\n') else: - http_conn.send(l) + http_conn.send(chunk) + if m: + m.update(chunk) + if bytes_togo: + bytes_togo -= chunk_len + if bytes_togo <= 0: + break if cb: - total_bytes += len(l) i += 1 if i == cb_count or cb_count == -1: - cb(total_bytes, self.size) + cb(data_len, cb_size) i = 0 - if chunked_transfer: - m.update(l) - l = fp.read(self.BufferSize) + if bytes_togo and bytes_togo < self.BufferSize: + chunk = fp.read(bytes_togo) + else: + chunk = fp.read(self.BufferSize) + + self.size = data_len if chunked_transfer: http_conn.send('0\r\n') + if m: + # Use the chunked trailer for the digest + hd = m.hexdigest() + self.md5, self.base64md5 = self.get_md5_from_hexdigest(hd) + # http_conn.send("Content-MD5: %s\r\n" % self.base64md5) http_conn.send('\r\n') - if cb: - self.size = total_bytes - # Get the md5 which is calculated on the fly. - self.md5 = m.hexdigest() - else: - fp.seek(0) - if cb: - cb(total_bytes, self.size) + + if cb and (cb_count <= 1 or i > 0) and data_len > 0: + cb(data_len, cb_size) + response = http_conn.getresponse() body = response.content http_conn.set_debuglevel(save_debug) @@ -580,8 +635,6 @@ class Key(object): else: headers = headers.copy() headers['User-Agent'] = UserAgent - if self.base64md5: - headers['Content-MD5'] = self.base64md5 if self.storage_class != 'STANDARD': headers[provider.storage_class_header] = self.storage_class if headers.has_key('Content-Encoding'): @@ -603,7 +656,13 @@ class Key(object): headers['Content-Type'] = self.content_type else: headers['Content-Type'] = self.content_type - if not chunked_transfer: + if self.base64md5: + headers['Content-MD5'] = self.base64md5 + if chunked_transfer: + headers['Transfer-Encoding'] = 'chunked' + #if not self.base64md5: + # headers['Trailer'] = "Content-MD5" + else: headers['Content-Length'] = str(self.size) headers['Expect'] = '100-Continue' headers = boto.utils.merge_meta(headers, self.metadata, provider) @@ -613,22 +672,29 @@ class Key(object): query_args=query_args) self.handle_version_headers(resp, force=True) - def compute_md5(self, fp): + def compute_md5(self, fp, size=None): """ :type fp: file :param fp: File pointer to the file to MD5 hash. The file pointer - will be reset to the beginning of the file before the + will be reset to the same position before the method returns. + :type size: int + :param size: (optional) The Maximum number of bytes to read from + the file pointer (fp). This is useful when uploading + a file in multiple parts where the file is being + split inplace into different parts. Less bytes may + be available. + :rtype: tuple :return: A tuple containing the hex digest version of the MD5 hash as the first element and the base64 encoded version of the plain digest as the second element. """ - tup = compute_md5(fp) - # Returned values are MD5 hash, base64 encoded MD5 hash, and file size. + tup = compute_md5(fp, size=size) + # Returned values are MD5 hash, base64 encoded MD5 hash, and data size. # The internal implementation of compute_md5() needs to return the - # file size but we don't want to return that value to the external + # data size but we don't want to return that value to the external # caller because it changes the class interface (i.e. it might # break some code) so we consume the third tuple value here and # return the remainder of the tuple to the caller, thereby preserving @@ -638,7 +704,8 @@ class Key(object): def set_contents_from_stream(self, fp, headers=None, replace=True, cb=None, num_cb=10, policy=None, - reduced_redundancy=False, query_args=None): + reduced_redundancy=False, query_args=None, + size=None): """ Store an object using the name of the Key object as the key in cloud and the contents of the data stream pointed to by 'fp' as @@ -684,6 +751,13 @@ class Key(object): REDUCED_REDUNDANCY. The Reduced Redundancy Storage (RRS) feature of S3, provides lower redundancy at lower storage cost. + :type size: int + :param size: (optional) The Maximum number of bytes to read from + the file pointer (fp). This is useful when uploading + a file in multiple parts where you are splitting the + file up into different ranges to be uploaded. If not + specified, the default behaviour is to read all bytes + from the file pointer. Less bytes may be available. """ provider = self.bucket.connection.provider @@ -701,9 +775,6 @@ class Key(object): if policy: headers[provider.acl_header] = policy - # Set the Transfer Encoding for Streams. - headers['Transfer-Encoding'] = 'chunked' - if reduced_redundancy: self.storage_class = 'REDUCED_REDUNDANCY' if provider.storage_class_header: @@ -711,16 +782,15 @@ class Key(object): if self.bucket is not None: if not replace: - k = self.bucket.lookup(self.name) - if k: + if self.bucket.lookup(self.name): return self.send_file(fp, headers, cb, num_cb, query_args, - chunked_transfer=True) + chunked_transfer=True, size=size) def set_contents_from_file(self, fp, headers=None, replace=True, cb=None, num_cb=10, policy=None, md5=None, reduced_redundancy=False, query_args=None, - encrypt_key=False): + encrypt_key=False, size=None): """ Store an object in S3 using the name of the Key object as the key in S3 and the contents of the file pointed to by 'fp' as the @@ -781,10 +851,17 @@ class Key(object): be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3. + + :type size: int + :param size: (optional) The Maximum number of bytes to read from + the file pointer (fp). This is useful when uploading + a file in multiple parts where you are splitting the + file up into different ranges to be uploaded. If not + specified, the default behaviour is to read all bytes + from the file pointer. Less bytes may be available. """ provider = self.bucket.connection.provider - if headers is None: - headers = {} + headers = headers or {} if policy: headers[provider.acl_header] = policy if encrypt_key: @@ -798,23 +875,43 @@ class Key(object): # What if different providers provide different classes? if hasattr(fp, 'name'): self.path = fp.name + if self.bucket != None: - if not md5: - md5 = self.compute_md5(fp) + if not md5 and provider.supports_chunked_transfer(): + # defer md5 calculation to on the fly and + # we don't know anything about size yet. + chunked_transfer = True + self.size = None else: - # even if md5 is provided, still need to set size of content - fp.seek(0, 2) - self.size = fp.tell() - fp.seek(0) - self.md5 = md5[0] - self.base64md5 = md5[1] + chunked_transfer = False + if not md5: + # compute_md5() and also set self.size to actual + # size of the bytes read computing the md5. + md5 = self.compute_md5(fp, size) + # adjust size if required + size = self.size + elif size: + self.size = size + else: + # If md5 is provided, still need to size so + # calculate based on bytes to end of content + spos = fp.tell() + fp.seek(0, os.SEEK_END) + self.size = fp.tell() - spos + fp.seek(spos) + size = self.size + self.md5 = md5[0] + self.base64md5 = md5[1] + if self.name == None: self.name = self.md5 if not replace: - k = self.bucket.lookup(self.name) - if k: + if self.bucket.lookup(self.name): return - self.send_file(fp, headers, cb, num_cb, query_args) + + self.send_file(fp, headers=headers, cb=cb, num_cb=num_cb, + query_args=query_args, chunked_transfer=chunked_transfer, + size=size) def set_contents_from_filename(self, filename, headers=None, replace=True, cb=None, num_cb=10, policy=None, md5=None, @@ -1001,6 +1098,7 @@ class Key(object): cb_count = 0 i = total_bytes = 0 cb(total_bytes, self.size) + save_debug = self.bucket.connection.debug if self.bucket.connection.debug == 1: self.bucket.connection.debug = 0 @@ -1008,6 +1106,9 @@ class Key(object): query_args = [] if torrent: query_args.append('torrent') + m = None + else: + m = md5() # If a version_id is passed in, use that. If not, check to see # if the Key object has an explicit version_id and, if so, use that. # Otherwise, don't pass a version_id query param. @@ -1023,16 +1124,25 @@ class Key(object): query_args = '&'.join(query_args) self.open('r', headers, query_args=query_args, override_num_retries=override_num_retries) + for bytes in self.read(size=self.BufferSize): fp.write(bytes) + data_len += len(bytes) + if m: + m.update(bytes) if cb: - total_bytes += len(bytes) + if cb_size > 0 and data_len >= cb_size: + break i += 1 if i == cb_count or cb_count == -1: - cb(total_bytes, self.size) + cb(data_len, cb_size) i = 0 - if cb: - cb(total_bytes, self.size) + if cb and (cb_count <= 1 or i > 0) and data_len > 0: + cb(data_len, cb_size) + if m: + self.md5 = m.hexdigest() + if self.size is None and not torrent and not headers.has_key("Range"): + self.size = data_len self.close() self.bucket.connection.debug = save_debug |