summaryrefslogtreecommitdiff
path: root/boto/s3/key.py
diff options
context:
space:
mode:
Diffstat (limited to 'boto/s3/key.py')
-rw-r--r--boto/s3/key.py238
1 files changed, 174 insertions, 64 deletions
diff --git a/boto/s3/key.py b/boto/s3/key.py
index 7bc700aa..15e32281 100644
--- a/boto/s3/key.py
+++ b/boto/s3/key.py
@@ -26,6 +26,7 @@ import re
import rfc822
import StringIO
import base64
+import math
import urllib
import boto.utils
from boto.exception import BotoClientError
@@ -465,12 +466,17 @@ class Key(object):
expires_in_absolute)
def send_file(self, fp, headers=None, cb=None, num_cb=10,
- query_args=None, chunked_transfer=False):
+ query_args=None, chunked_transfer=False, size=None):
"""
Upload a file to a key into a bucket on S3.
:type fp: file
- :param fp: The file pointer to upload
+ :param fp: The file pointer to upload. The file pointer must point
+ point at the offset from which you wish to upload.
+ ie. if uploading the full file, it should point at the
+ start of the file. Normally when a file is opened for
+ reading, the fp will point at the first byte. See the
+ bytes parameter below for more info.
:type headers: dict
:param headers: The headers to pass along with the PUT request
@@ -491,21 +497,44 @@ class Key(object):
transfer. Providing a negative integer will cause
your callback to be called with each buffer read.
+ :type size: int
+ :param size: (optional) The Maximum number of bytes to read from
+ the file pointer (fp). This is useful when uploading
+ a file in multiple parts where you are splitting the
+ file up into different ranges to be uploaded. If not
+ specified, the default behaviour is to read all bytes
+ from the file pointer. Less bytes may be available.
"""
provider = self.bucket.connection.provider
+ try:
+ spos = fp.tell()
+ except IOError:
+ spos = None
+ self.read_from_stream = False
# TODO: Replace this with requests implementation, once it exists.
def sender(http_conn, method, path, data, headers):
+ # This function is called repeatedly for temporary retries
+ # so we must be sure the file pointer is pointing at the
+ # start of the data.
+ if spos is not None and spos != fp.tell():
+ fp.seek(spos)
+ elif spos is None and self.read_from_stream:
+ # if seek is not supported, and we've read from this
+ # stream already, then we need to abort retries to
+ # avoid setting bad data.
+ raise provider.storage_data_error(
+ 'Cannot retry failed request. fp does not support seeking.')
+
http_conn.putrequest(method, path)
for key in headers:
http_conn.putheader(key, headers[key])
http_conn.endheaders()
- if chunked_transfer:
- # MD5 for the stream has to be calculated on the fly, as
- # we don't know the size of the stream before hand.
+ if chunked_transfer and not self.base64md5:
+ # MD5 for the stream has to be calculated on the fly.
m = md5()
else:
- fp.seek(0)
+ m = None
save_debug = self.bucket.connection.debug
self.bucket.connection.debug = 0
@@ -515,48 +544,74 @@ class Key(object):
# Use the getattr approach to allow this to work in AppEngine.
if getattr(http_conn, 'debuglevel', 0) < 3:
http_conn.set_debuglevel(0)
+
+ data_len = 0
if cb:
- if chunked_transfer:
+ if size:
+ cb_size = size
+ elif self.size:
+ cb_size = self.size
+ else:
+ cb_size = 0
+ if chunked_transfer and cb_size == 0:
# For chunked Transfer, we call the cb for every 1MB
- # of data transferred.
+ # of data transferred, except when we know size.
cb_count = (1024 * 1024)/self.BufferSize
- self.size = 0
- elif num_cb > 2:
- cb_count = self.size / self.BufferSize / (num_cb-2)
+ elif num_cb > 1:
+ cb_count = int(math.ceil(cb_size/self.BufferSize/(num_cb-1.0)))
elif num_cb < 0:
cb_count = -1
else:
cb_count = 0
- i = total_bytes = 0
- cb(total_bytes, self.size)
- l = fp.read(self.BufferSize)
- while len(l) > 0:
+ i = 0
+ cb(data_len, cb_size)
+
+ bytes_togo = size
+ if bytes_togo and bytes_togo < self.BufferSize:
+ chunk = fp.read(bytes_togo)
+ else:
+ chunk = fp.read(self.BufferSize)
+ if spos is None:
+ # read at least something from a non-seekable fp.
+ self.read_from_stream = True
+ while chunk:
+ chunk_len = len(chunk)
+ data_len += chunk_len
if chunked_transfer:
- http_conn.send('%x;\r\n' % len(l))
- http_conn.send(l)
+ http_conn.send('%x;\r\n' % chunk_len)
+ http_conn.send(chunk)
http_conn.send('\r\n')
else:
- http_conn.send(l)
+ http_conn.send(chunk)
+ if m:
+ m.update(chunk)
+ if bytes_togo:
+ bytes_togo -= chunk_len
+ if bytes_togo <= 0:
+ break
if cb:
- total_bytes += len(l)
i += 1
if i == cb_count or cb_count == -1:
- cb(total_bytes, self.size)
+ cb(data_len, cb_size)
i = 0
- if chunked_transfer:
- m.update(l)
- l = fp.read(self.BufferSize)
+ if bytes_togo and bytes_togo < self.BufferSize:
+ chunk = fp.read(bytes_togo)
+ else:
+ chunk = fp.read(self.BufferSize)
+
+ self.size = data_len
if chunked_transfer:
http_conn.send('0\r\n')
+ if m:
+ # Use the chunked trailer for the digest
+ hd = m.hexdigest()
+ self.md5, self.base64md5 = self.get_md5_from_hexdigest(hd)
+ # http_conn.send("Content-MD5: %s\r\n" % self.base64md5)
http_conn.send('\r\n')
- if cb:
- self.size = total_bytes
- # Get the md5 which is calculated on the fly.
- self.md5 = m.hexdigest()
- else:
- fp.seek(0)
- if cb:
- cb(total_bytes, self.size)
+
+ if cb and (cb_count <= 1 or i > 0) and data_len > 0:
+ cb(data_len, cb_size)
+
response = http_conn.getresponse()
body = response.content
http_conn.set_debuglevel(save_debug)
@@ -580,8 +635,6 @@ class Key(object):
else:
headers = headers.copy()
headers['User-Agent'] = UserAgent
- if self.base64md5:
- headers['Content-MD5'] = self.base64md5
if self.storage_class != 'STANDARD':
headers[provider.storage_class_header] = self.storage_class
if headers.has_key('Content-Encoding'):
@@ -603,7 +656,13 @@ class Key(object):
headers['Content-Type'] = self.content_type
else:
headers['Content-Type'] = self.content_type
- if not chunked_transfer:
+ if self.base64md5:
+ headers['Content-MD5'] = self.base64md5
+ if chunked_transfer:
+ headers['Transfer-Encoding'] = 'chunked'
+ #if not self.base64md5:
+ # headers['Trailer'] = "Content-MD5"
+ else:
headers['Content-Length'] = str(self.size)
headers['Expect'] = '100-Continue'
headers = boto.utils.merge_meta(headers, self.metadata, provider)
@@ -613,22 +672,29 @@ class Key(object):
query_args=query_args)
self.handle_version_headers(resp, force=True)
- def compute_md5(self, fp):
+ def compute_md5(self, fp, size=None):
"""
:type fp: file
:param fp: File pointer to the file to MD5 hash. The file pointer
- will be reset to the beginning of the file before the
+ will be reset to the same position before the
method returns.
+ :type size: int
+ :param size: (optional) The Maximum number of bytes to read from
+ the file pointer (fp). This is useful when uploading
+ a file in multiple parts where the file is being
+ split inplace into different parts. Less bytes may
+ be available.
+
:rtype: tuple
:return: A tuple containing the hex digest version of the MD5 hash
as the first element and the base64 encoded version of the
plain digest as the second element.
"""
- tup = compute_md5(fp)
- # Returned values are MD5 hash, base64 encoded MD5 hash, and file size.
+ tup = compute_md5(fp, size=size)
+ # Returned values are MD5 hash, base64 encoded MD5 hash, and data size.
# The internal implementation of compute_md5() needs to return the
- # file size but we don't want to return that value to the external
+ # data size but we don't want to return that value to the external
# caller because it changes the class interface (i.e. it might
# break some code) so we consume the third tuple value here and
# return the remainder of the tuple to the caller, thereby preserving
@@ -638,7 +704,8 @@ class Key(object):
def set_contents_from_stream(self, fp, headers=None, replace=True,
cb=None, num_cb=10, policy=None,
- reduced_redundancy=False, query_args=None):
+ reduced_redundancy=False, query_args=None,
+ size=None):
"""
Store an object using the name of the Key object as the key in
cloud and the contents of the data stream pointed to by 'fp' as
@@ -684,6 +751,13 @@ class Key(object):
REDUCED_REDUNDANCY. The Reduced Redundancy
Storage (RRS) feature of S3, provides lower
redundancy at lower storage cost.
+ :type size: int
+ :param size: (optional) The Maximum number of bytes to read from
+ the file pointer (fp). This is useful when uploading
+ a file in multiple parts where you are splitting the
+ file up into different ranges to be uploaded. If not
+ specified, the default behaviour is to read all bytes
+ from the file pointer. Less bytes may be available.
"""
provider = self.bucket.connection.provider
@@ -701,9 +775,6 @@ class Key(object):
if policy:
headers[provider.acl_header] = policy
- # Set the Transfer Encoding for Streams.
- headers['Transfer-Encoding'] = 'chunked'
-
if reduced_redundancy:
self.storage_class = 'REDUCED_REDUNDANCY'
if provider.storage_class_header:
@@ -711,16 +782,15 @@ class Key(object):
if self.bucket is not None:
if not replace:
- k = self.bucket.lookup(self.name)
- if k:
+ if self.bucket.lookup(self.name):
return
self.send_file(fp, headers, cb, num_cb, query_args,
- chunked_transfer=True)
+ chunked_transfer=True, size=size)
def set_contents_from_file(self, fp, headers=None, replace=True,
cb=None, num_cb=10, policy=None, md5=None,
reduced_redundancy=False, query_args=None,
- encrypt_key=False):
+ encrypt_key=False, size=None):
"""
Store an object in S3 using the name of the Key object as the
key in S3 and the contents of the file pointed to by 'fp' as the
@@ -781,10 +851,17 @@ class Key(object):
be encrypted on the server-side by S3 and
will be stored in an encrypted form while
at rest in S3.
+
+ :type size: int
+ :param size: (optional) The Maximum number of bytes to read from
+ the file pointer (fp). This is useful when uploading
+ a file in multiple parts where you are splitting the
+ file up into different ranges to be uploaded. If not
+ specified, the default behaviour is to read all bytes
+ from the file pointer. Less bytes may be available.
"""
provider = self.bucket.connection.provider
- if headers is None:
- headers = {}
+ headers = headers or {}
if policy:
headers[provider.acl_header] = policy
if encrypt_key:
@@ -798,23 +875,43 @@ class Key(object):
# What if different providers provide different classes?
if hasattr(fp, 'name'):
self.path = fp.name
+
if self.bucket != None:
- if not md5:
- md5 = self.compute_md5(fp)
+ if not md5 and provider.supports_chunked_transfer():
+ # defer md5 calculation to on the fly and
+ # we don't know anything about size yet.
+ chunked_transfer = True
+ self.size = None
else:
- # even if md5 is provided, still need to set size of content
- fp.seek(0, 2)
- self.size = fp.tell()
- fp.seek(0)
- self.md5 = md5[0]
- self.base64md5 = md5[1]
+ chunked_transfer = False
+ if not md5:
+ # compute_md5() and also set self.size to actual
+ # size of the bytes read computing the md5.
+ md5 = self.compute_md5(fp, size)
+ # adjust size if required
+ size = self.size
+ elif size:
+ self.size = size
+ else:
+ # If md5 is provided, still need to size so
+ # calculate based on bytes to end of content
+ spos = fp.tell()
+ fp.seek(0, os.SEEK_END)
+ self.size = fp.tell() - spos
+ fp.seek(spos)
+ size = self.size
+ self.md5 = md5[0]
+ self.base64md5 = md5[1]
+
if self.name == None:
self.name = self.md5
if not replace:
- k = self.bucket.lookup(self.name)
- if k:
+ if self.bucket.lookup(self.name):
return
- self.send_file(fp, headers, cb, num_cb, query_args)
+
+ self.send_file(fp, headers=headers, cb=cb, num_cb=num_cb,
+ query_args=query_args, chunked_transfer=chunked_transfer,
+ size=size)
def set_contents_from_filename(self, filename, headers=None, replace=True,
cb=None, num_cb=10, policy=None, md5=None,
@@ -1001,6 +1098,7 @@ class Key(object):
cb_count = 0
i = total_bytes = 0
cb(total_bytes, self.size)
+
save_debug = self.bucket.connection.debug
if self.bucket.connection.debug == 1:
self.bucket.connection.debug = 0
@@ -1008,6 +1106,9 @@ class Key(object):
query_args = []
if torrent:
query_args.append('torrent')
+ m = None
+ else:
+ m = md5()
# If a version_id is passed in, use that. If not, check to see
# if the Key object has an explicit version_id and, if so, use that.
# Otherwise, don't pass a version_id query param.
@@ -1023,16 +1124,25 @@ class Key(object):
query_args = '&'.join(query_args)
self.open('r', headers, query_args=query_args,
override_num_retries=override_num_retries)
+
for bytes in self.read(size=self.BufferSize):
fp.write(bytes)
+ data_len += len(bytes)
+ if m:
+ m.update(bytes)
if cb:
- total_bytes += len(bytes)
+ if cb_size > 0 and data_len >= cb_size:
+ break
i += 1
if i == cb_count or cb_count == -1:
- cb(total_bytes, self.size)
+ cb(data_len, cb_size)
i = 0
- if cb:
- cb(total_bytes, self.size)
+ if cb and (cb_count <= 1 or i > 0) and data_len > 0:
+ cb(data_len, cb_size)
+ if m:
+ self.md5 = m.hexdigest()
+ if self.size is None and not torrent and not headers.has_key("Range"):
+ self.size = data_len
self.close()
self.bucket.connection.debug = save_debug