summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/dynamodb/test_layer2.py28
-rw-r--r--tests/s3/mock_storage_service.py72
-rw-r--r--tests/s3/test_key.py321
-rw-r--r--tests/s3/test_mfa.py91
-rw-r--r--tests/s3/test_multipart.py32
-rwxr-xr-xtests/s3/test_resumable_uploads.py18
-rw-r--r--tests/s3/test_versioning.py156
-rwxr-xr-xtests/test.py19
8 files changed, 607 insertions, 130 deletions
diff --git a/tests/dynamodb/test_layer2.py b/tests/dynamodb/test_layer2.py
index cc608fe6..9324c8d1 100644
--- a/tests/dynamodb/test_layer2.py
+++ b/tests/dynamodb/test_layer2.py
@@ -29,6 +29,8 @@ import time
import uuid
from boto.dynamodb.exceptions import DynamoDBKeyNotFoundError
from boto.dynamodb.layer2 import Layer2
+from boto.dynamodb.types import get_dynamodb_type
+from boto.dynamodb.condition import *
class DynamoDBLayer2Test (unittest.TestCase):
@@ -55,9 +57,9 @@ class DynamoDBLayer2Test (unittest.TestCase):
table = c.create_table(table_name, schema, read_units, write_units)
assert table.name == table_name
assert table.schema.hash_key_name == hash_key_name
- assert table.schema.hash_key_type == c.get_dynamodb_type(hash_key_proto_value)
+ assert table.schema.hash_key_type == get_dynamodb_type(hash_key_proto_value)
assert table.schema.range_key_name == range_key_name
- assert table.schema.range_key_type == c.get_dynamodb_type(range_key_proto_value)
+ assert table.schema.range_key_type == get_dynamodb_type(range_key_proto_value)
assert table.read_units == read_units
assert table.write_units == write_units
@@ -212,15 +214,13 @@ class DynamoDBLayer2Test (unittest.TestCase):
table2_item1.put()
# Try a few queries
- items = table.query('Amazon DynamoDB',
- {'DynamoDB': 'BEGINS_WITH'})
+ items = table.query('Amazon DynamoDB', BEGINS_WITH('DynamoDB'))
n = 0
for item in items:
n += 1
assert n == 2
- items = table.query('Amazon DynamoDB',
- {'DynamoDB': 'BEGINS_WITH'},
+ items = table.query('Amazon DynamoDB', BEGINS_WITH('DynamoDB'),
request_limit=1, max_results=1)
n = 0
for item in items:
@@ -234,6 +234,12 @@ class DynamoDBLayer2Test (unittest.TestCase):
n += 1
assert n == 3
+ items = table.scan({'Replies': GT(0)})
+ n = 0
+ for item in items:
+ n += 1
+ assert n == 1
+
# Test some integer and float attributes
integer_value = 42
float_value = 345.678
@@ -280,13 +286,19 @@ class DynamoDBLayer2Test (unittest.TestCase):
assert len(response['Responses'][table.name]['Items']) == 2
# Try queries
- results = table.query('Amazon DynamoDB',
- range_key_condition={'DynamoDB': 'BEGINS_WITH'})
+ results = table.query('Amazon DynamoDB', BEGINS_WITH('DynamoDB'))
n = 0
for item in results:
n += 1
assert n == 2
+ # Try scans
+ results = table.scan({'Tags': CONTAINS('table')})
+ n = 0
+ for item in results:
+ n += 1
+ assert n == 2
+
# Try to delete the item with the right Expected value
expected = {'Views': 0}
item1.delete(expected_value=expected)
diff --git a/tests/s3/mock_storage_service.py b/tests/s3/mock_storage_service.py
index 2b81f5b6..2bd77439 100644
--- a/tests/s3/mock_storage_service.py
+++ b/tests/s3/mock_storage_service.py
@@ -29,7 +29,9 @@ of the optional params (which we indicate with the constant "NOT_IMPL").
import copy
import boto
import base64
+
from boto.utils import compute_md5
+from boto.s3.prefix import Prefix
try:
from hashlib import md5
@@ -67,6 +69,12 @@ class MockKey(object):
self.last_modified = 'Wed, 06 Oct 2010 05:11:54 GMT'
self.BufferSize = 8192
+ def __repr__(self):
+ if self.bucket:
+ return '<MockKey: %s,%s>' % (self.bucket.name, self.name)
+ else:
+ return '<MockKey: %s>' % self.name
+
def get_contents_as_string(self, headers=NOT_IMPL,
cb=NOT_IMPL, num_cb=NOT_IMPL,
torrent=NOT_IMPL,
@@ -114,10 +122,10 @@ class MockKey(object):
self.size = len(s)
self._handle_headers(headers)
- def set_contents_from_filename(self, filename, headers=None, replace=NOT_IMPL,
- cb=NOT_IMPL, num_cb=NOT_IMPL,
- policy=NOT_IMPL, md5=NOT_IMPL,
- res_upload_handler=NOT_IMPL):
+ def set_contents_from_filename(self, filename, headers=None,
+ replace=NOT_IMPL, cb=NOT_IMPL,
+ num_cb=NOT_IMPL, policy=NOT_IMPL,
+ md5=NOT_IMPL, res_upload_handler=NOT_IMPL):
fp = open(filename, 'rb')
self.set_contents_from_file(fp, headers, replace, cb, num_cb,
policy, md5, res_upload_handler)
@@ -174,9 +182,13 @@ class MockBucket(object):
self.connection = connection
self.logging = False
+ def __repr__(self):
+ return 'MockBucket: %s' % self.name
+
def copy_key(self, new_key_name, src_bucket_name,
src_key_name, metadata=NOT_IMPL, src_version_id=NOT_IMPL,
- storage_class=NOT_IMPL, preserve_acl=NOT_IMPL):
+ storage_class=NOT_IMPL, preserve_acl=NOT_IMPL,
+ encrypt_key=NOT_IMPL, headers=NOT_IMPL, query_args=NOT_IMPL):
new_key = self.new_key(key_name=new_key_name)
src_key = mock_connection.get_bucket(
src_bucket_name).get_key(src_key_name)
@@ -231,17 +243,29 @@ class MockBucket(object):
return None
return self.keys[key_name]
- def list(self, prefix='', delimiter=NOT_IMPL, marker=NOT_IMPL,
+ def list(self, prefix='', delimiter='', marker=NOT_IMPL,
headers=NOT_IMPL):
+ prefix = prefix or '' # Turn None into '' for prefix match.
# Return list instead of using a generator so we don't get
# 'dictionary changed size during iteration' error when performing
# deletions while iterating (e.g., during test cleanup).
result = []
+ key_name_set = set()
for k in self.keys.itervalues():
- if not prefix:
- result.append(k)
- elif k.name.startswith(prefix):
- result.append(k)
+ if k.name.startswith(prefix):
+ k_name_past_prefix = k.name[len(prefix):]
+ if delimiter:
+ pos = k_name_past_prefix.find(delimiter)
+ else:
+ pos = -1
+ if (pos != -1):
+ key_or_prefix = Prefix(
+ bucket=self, name=k.name[:len(prefix)+pos+1])
+ else:
+ key_or_prefix = MockKey(bucket=self, name=k.name)
+ if key_or_prefix.name not in key_name_set:
+ key_name_set.add(key_or_prefix.name)
+ result.append(key_or_prefix)
return result
def set_acl(self, acl_or_str, key_name='', headers=NOT_IMPL,
@@ -250,10 +274,10 @@ class MockBucket(object):
# the get_acl call will just return that string name.
if key_name:
# Set ACL for the key.
- self.acls[key_name] = acl_or_str
+ self.acls[key_name] = MockAcl(acl_or_str)
else:
# Set ACL for the bucket.
- self.acls[self.name] = acl_or_str
+ self.acls[self.name] = MockAcl(acl_or_str)
def set_def_acl(self, acl_or_str, key_name=NOT_IMPL, headers=NOT_IMPL,
version_id=NOT_IMPL):
@@ -313,6 +337,8 @@ mock_connection = MockConnection()
class MockBucketStorageUri(object):
+ delim = '/'
+
def __init__(self, scheme, bucket_name=None, object_name=None,
debug=NOT_IMPL, suppress_consec_slashes=NOT_IMPL):
self.scheme = scheme
@@ -395,10 +421,28 @@ class MockBucketStorageUri(object):
return True
def names_container(self):
- return not self.object_name
+ return bool(not self.object_name)
def names_singleton(self):
- return self.object_name
+ return bool(self.object_name)
+
+ def names_directory(self):
+ return False
+
+ def names_provider(self):
+ return bool(not self.bucket_name)
+
+ def names_bucket(self):
+ return self.names_container()
+
+ def names_file(self):
+ return False
+
+ def names_object(self):
+ return not self.names_container()
+
+ def is_stream(self):
+ return False
def new_key(self, validate=NOT_IMPL, headers=NOT_IMPL):
bucket = self.get_bucket()
diff --git a/tests/s3/test_key.py b/tests/s3/test_key.py
new file mode 100644
index 00000000..c961b317
--- /dev/null
+++ b/tests/s3/test_key.py
@@ -0,0 +1,321 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/
+# All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+"""
+Some unit tests for S3 Key
+"""
+
+import unittest
+import time
+import StringIO
+from boto.s3.connection import S3Connection
+from boto.s3.key import Key
+from boto.exception import S3ResponseError
+
+class S3KeyTest (unittest.TestCase):
+
+ def setUp(self):
+ self.conn = S3Connection()
+ self.bucket_name = 'keytest-%d' % int(time.time())
+ self.bucket = self.conn.create_bucket(self.bucket_name)
+
+ def tearDown(self):
+ for key in self.bucket:
+ key.delete()
+ self.bucket.delete()
+
+ def test_set_contents_as_file(self):
+ content="01234567890123456789"
+ sfp = StringIO.StringIO(content)
+
+ # fp is set at 0 for just opened (for read) files.
+ # set_contents should write full content to key.
+ k = self.bucket.new_key("k")
+ k.set_contents_from_file(sfp)
+ self.assertEqual(k.size, 20)
+ kn = self.bucket.new_key("k")
+ ks = kn.get_contents_as_string()
+ self.assertEqual(ks, content)
+
+ # set fp to 5 and set contents. this should
+ # set "567890123456789" to the key
+ sfp.seek(5)
+ k = self.bucket.new_key("k")
+ k.set_contents_from_file(sfp)
+ self.assertEqual(k.size, 15)
+ kn = self.bucket.new_key("k")
+ ks = kn.get_contents_as_string()
+ self.assertEqual(ks, content[5:])
+
+ # set fp to 5 and only set 5 bytes. this should
+ # write the value "56789" to the key.
+ sfp.seek(5)
+ k = self.bucket.new_key("k")
+ k.set_contents_from_file(sfp, size=5)
+ self.assertEqual(k.size, 5)
+ self.assertEqual(sfp.tell(), 10)
+ kn = self.bucket.new_key("k")
+ ks = kn.get_contents_as_string()
+ self.assertEqual(ks, content[5:10])
+
+ def test_set_contents_with_md5(self):
+ content="01234567890123456789"
+ sfp = StringIO.StringIO(content)
+
+ # fp is set at 0 for just opened (for read) files.
+ # set_contents should write full content to key.
+ k = self.bucket.new_key("k")
+ good_md5 = k.compute_md5(sfp)
+ k.set_contents_from_file(sfp, md5=good_md5)
+ kn = self.bucket.new_key("k")
+ ks = kn.get_contents_as_string()
+ self.assertEqual(ks, content)
+
+ # set fp to 5 and only set 5 bytes. this should
+ # write the value "56789" to the key.
+ sfp.seek(5)
+ k = self.bucket.new_key("k")
+ good_md5 = k.compute_md5(sfp, size=5)
+ k.set_contents_from_file(sfp, size=5, md5=good_md5)
+ self.assertEqual(sfp.tell(), 10)
+ kn = self.bucket.new_key("k")
+ ks = kn.get_contents_as_string()
+ self.assertEqual(ks, content[5:10])
+
+ # let's try a wrong md5 by just altering it.
+ k = self.bucket.new_key("k")
+ sfp.seek(0)
+ hexdig,base64 = k.compute_md5(sfp)
+ bad_md5 = (hexdig, base64[3:])
+ try:
+ k.set_contents_from_file(sfp, md5=bad_md5)
+ self.fail("should fail with bad md5")
+ except S3ResponseError:
+ pass
+
+ def test_get_contents_with_md5(self):
+ content="01234567890123456789"
+ sfp = StringIO.StringIO(content)
+
+ k = self.bucket.new_key("k")
+ k.set_contents_from_file(sfp)
+ kn = self.bucket.new_key("k")
+ s = kn.get_contents_as_string()
+ self.assertEqual(kn.md5, k.md5)
+ self.assertEqual(s, content)
+
+ def test_file_callback(self):
+ def callback(wrote, total):
+ self.my_cb_cnt += 1
+ self.assertNotEqual(wrote, self.my_cb_last, "called twice with same value")
+ self.my_cb_last = wrote
+
+ # Zero bytes written => 1 call
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ sfp = StringIO.StringIO("")
+ k.set_contents_from_file(sfp, cb=callback, num_cb=10)
+ self.assertEqual(self.my_cb_cnt, 1)
+ self.assertEqual(self.my_cb_last, 0)
+ sfp.close()
+
+ # Read back zero bytes => 1 call
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback)
+ self.assertEqual(self.my_cb_cnt, 1)
+ self.assertEqual(self.my_cb_last, 0)
+
+ content="01234567890123456789"
+ sfp = StringIO.StringIO(content)
+
+ # expect 2 calls due start/finish
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.set_contents_from_file(sfp, cb=callback, num_cb=10)
+ self.assertEqual(self.my_cb_cnt, 2)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # Read back all bytes => 2 calls
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback)
+ self.assertEqual(self.my_cb_cnt, 2)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # rewind sfp and try upload again. -1 should call
+ # for every read/write so that should make 11 when bs=2
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=-1)
+ self.assertEqual(self.my_cb_cnt, 11)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # Read back all bytes => 11 calls
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=-1)
+ self.assertEqual(self.my_cb_cnt, 11)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # no more than 1 times => 2 times
+ # last time always 20 bytes
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=1)
+ self.assertTrue(self.my_cb_cnt <= 2)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # no more than 1 times => 2 times
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=1)
+ self.assertTrue(self.my_cb_cnt <= 2)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # no more than 2 times
+ # last time always 20 bytes
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=2)
+ self.assertTrue(self.my_cb_cnt <= 2)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # no more than 2 times
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=2)
+ self.assertTrue(self.my_cb_cnt <= 2)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # no more than 3 times
+ # last time always 20 bytes
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=3)
+ self.assertTrue(self.my_cb_cnt <= 3)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # no more than 3 times
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=3)
+ self.assertTrue(self.my_cb_cnt <= 3)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # no more than 4 times
+ # last time always 20 bytes
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=4)
+ self.assertTrue(self.my_cb_cnt <= 4)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # no more than 4 times
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=4)
+ self.assertTrue(self.my_cb_cnt <= 4)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # no more than 6 times
+ # last time always 20 bytes
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=6)
+ self.assertTrue(self.my_cb_cnt <= 6)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # no more than 6 times
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=6)
+ self.assertTrue(self.my_cb_cnt <= 6)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # no more than 10 times
+ # last time always 20 bytes
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=10)
+ self.assertTrue(self.my_cb_cnt <= 10)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # no more than 10 times
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=10)
+ self.assertTrue(self.my_cb_cnt <= 10)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
+
+ # no more than 1000 times
+ # last time always 20 bytes
+ sfp.seek(0)
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ k = self.bucket.new_key("k")
+ k.BufferSize = 2
+ k.set_contents_from_file(sfp, cb=callback, num_cb=1000)
+ self.assertTrue(self.my_cb_cnt <= 1000)
+ self.assertEqual(self.my_cb_last, 20)
+
+ # no more than 1000 times
+ self.my_cb_cnt = 0
+ self.my_cb_last = None
+ s = k.get_contents_as_string(cb=callback, num_cb=1000)
+ self.assertTrue(self.my_cb_cnt <= 1000)
+ self.assertEqual(self.my_cb_last, 20)
+ self.assertEqual(s, content)
diff --git a/tests/s3/test_mfa.py b/tests/s3/test_mfa.py
new file mode 100644
index 00000000..3f47e94c
--- /dev/null
+++ b/tests/s3/test_mfa.py
@@ -0,0 +1,91 @@
+# Copyright (c) 2010 Mitch Garnaat http://garnaat.org/
+# Copyright (c) 2010, Eucalyptus Systems, Inc.
+# All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+"""
+Some unit tests for S3 MfaDelete with versioning
+"""
+
+import unittest
+import time
+from boto.s3.connection import S3Connection
+from boto.exception import S3ResponseError
+from boto.s3.deletemarker import DeleteMarker
+
+class S3MFATest (unittest.TestCase):
+
+ def setUp(self):
+ self.conn = S3Connection()
+ self.bucket_name = 'mfa-%d' % int(time.time())
+ self.bucket = self.conn.create_bucket(self.bucket_name)
+
+ def tearDown(self):
+ for k in self.bucket.list_versions():
+ self.bucket.delete_key(k.name, version_id=k.version_id)
+ self.bucket.delete()
+
+ def test_mfadel(self):
+ # Enable Versioning with MfaDelete
+ mfa_sn = raw_input('MFA S/N: ')
+ mfa_code = raw_input('MFA Code: ')
+ self.bucket.configure_versioning(True, mfa_delete=True, mfa_token=(mfa_sn, mfa_code))
+
+ # Check enabling mfa worked.
+ i = 0
+ for i in range(1,8):
+ time.sleep(2**i)
+ d = self.bucket.get_versioning_status()
+ if d['Versioning'] == 'Enabled' and d['MfaDelete'] == 'Enabled':
+ break
+ self.assertEqual('Enabled', d['Versioning'])
+ self.assertEqual('Enabled', d['MfaDelete'])
+
+ # Add a key to the bucket
+ k = self.bucket.new_key('foobar')
+ s1 = 'This is v1'
+ k.set_contents_from_string(s1)
+ v1 = k.version_id
+
+ # Now try to delete v1 without the MFA token
+ try:
+ self.bucket.delete_key('foobar', version_id=v1)
+ self.fail("Must fail if not using MFA token")
+ except S3ResponseError:
+ pass
+
+ # Now try delete again with the MFA token
+ mfa_code = raw_input('MFA Code: ')
+ self.bucket.delete_key('foobar', version_id=v1, mfa_token=(mfa_sn, mfa_code))
+
+ # Next suspend versioning and disable MfaDelete on the bucket
+ mfa_code = raw_input('MFA Code: ')
+ self.bucket.configure_versioning(False, mfa_delete=False, mfa_token=(mfa_sn, mfa_code))
+
+ # Lastly, check disabling mfa worked.
+ i = 0
+ for i in range(1,8):
+ time.sleep(2**i)
+ d = self.bucket.get_versioning_status()
+ if d['Versioning'] == 'Suspended' and d['MfaDelete'] != 'Enabled':
+ break
+ self.assertEqual('Suspended', d['Versioning'])
+ self.assertNotEqual('Enabled', d['MfaDelete'])
diff --git a/tests/s3/test_multipart.py b/tests/s3/test_multipart.py
index 5921889c..5c64ba73 100644
--- a/tests/s3/test_multipart.py
+++ b/tests/s3/test_multipart.py
@@ -90,3 +90,35 @@ class S3MultiPartUploadTest (unittest.TestCase):
self.assertEqual(lmpu.key_name, key_name)
# Abort using the one returned in the list
lmpu.cancel_upload()
+
+ def test_four_part_file(self):
+ key_name = "k"
+ contents = "01234567890123456789"
+ sfp = StringIO.StringIO(contents)
+
+ # upload 20 bytes in 4 parts of 5 bytes each
+ mpu = self.bucket.initiate_multipart_upload(key_name)
+ mpu.upload_part_from_file(sfp, part_num=1, size=5)
+ mpu.upload_part_from_file(sfp, part_num=2, size=5)
+ mpu.upload_part_from_file(sfp, part_num=3, size=5)
+ mpu.upload_part_from_file(sfp, part_num=4, size=5)
+ sfp.close()
+
+ etags = {}
+ pn = 0
+ for part in mpu:
+ pn += 1
+ self.assertEqual(5, part.size)
+ etags[pn] = part.etag
+ self.assertEqual(pn, 4)
+ # etags for 01234
+ self.assertEqual(etags[1], etags[3])
+ # etags for 56789
+ self.assertEqual(etags[2], etags[4])
+ # etag 01234 != etag 56789
+ self.assertNotEqual(etags[1], etags[2])
+
+ # parts are too small to compete as each part must
+ # be a min of 5MB so so we'll assume that is enough
+ # testing and abort the upload.
+ mpu.cancel_upload()
diff --git a/tests/s3/test_resumable_uploads.py b/tests/s3/test_resumable_uploads.py
index bb0f7a93..8a4a51f3 100755
--- a/tests/s3/test_resumable_uploads.py
+++ b/tests/s3/test_resumable_uploads.py
@@ -202,6 +202,7 @@ class ResumableUploadTests(unittest.TestCase):
"""
Tests that non-resumable uploads work
"""
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(self.small_src_file)
self.assertEqual(self.small_src_file_size, self.dst_key.size)
self.assertEqual(self.small_src_file_as_string,
@@ -212,6 +213,7 @@ class ResumableUploadTests(unittest.TestCase):
Tests a single resumable upload, with no tracker URI persistence
"""
res_upload_handler = ResumableUploadHandler()
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, res_upload_handler=res_upload_handler)
self.assertEqual(self.small_src_file_size, self.dst_key.size)
@@ -225,6 +227,7 @@ class ResumableUploadTests(unittest.TestCase):
harnass = CallbackTestHarnass()
res_upload_handler = ResumableUploadHandler(
tracker_file_name=self.tracker_file_name, num_retries=0)
+ self.small_src_file.seek(0)
try:
self.dst_key.set_contents_from_file(
self.small_src_file, cb=harnass.call,
@@ -251,6 +254,7 @@ class ResumableUploadTests(unittest.TestCase):
exception = ResumableUploadHandler.RETRYABLE_EXCEPTIONS[0]
harnass = CallbackTestHarnass(exception=exception)
res_upload_handler = ResumableUploadHandler(num_retries=1)
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, cb=harnass.call,
res_upload_handler=res_upload_handler)
@@ -266,6 +270,7 @@ class ResumableUploadTests(unittest.TestCase):
exception = IOError(errno.EPIPE, "Broken pipe")
harnass = CallbackTestHarnass(exception=exception)
res_upload_handler = ResumableUploadHandler(num_retries=1)
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, cb=harnass.call,
res_upload_handler=res_upload_handler)
@@ -281,6 +286,7 @@ class ResumableUploadTests(unittest.TestCase):
harnass = CallbackTestHarnass(
exception=OSError(errno.EACCES, 'Permission denied'))
res_upload_handler = ResumableUploadHandler(num_retries=1)
+ self.small_src_file.seek(0)
try:
self.dst_key.set_contents_from_file(
self.small_src_file, cb=harnass.call,
@@ -298,6 +304,7 @@ class ResumableUploadTests(unittest.TestCase):
harnass = CallbackTestHarnass()
res_upload_handler = ResumableUploadHandler(
tracker_file_name=self.tracker_file_name, num_retries=1)
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, cb=harnass.call,
res_upload_handler=res_upload_handler)
@@ -313,6 +320,7 @@ class ResumableUploadTests(unittest.TestCase):
Tests resumable upload that fails twice in one process, then completes
"""
res_upload_handler = ResumableUploadHandler(num_retries=3)
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, res_upload_handler=res_upload_handler)
# Ensure uploaded object has correct content.
@@ -332,6 +340,7 @@ class ResumableUploadTests(unittest.TestCase):
fail_after_n_bytes=self.larger_src_file_size/2, num_times_to_fail=2)
res_upload_handler = ResumableUploadHandler(
tracker_file_name=self.tracker_file_name, num_retries=1)
+ self.larger_src_file.seek(0)
try:
self.dst_key.set_contents_from_file(
self.larger_src_file, cb=harnass.call,
@@ -343,6 +352,7 @@ class ResumableUploadTests(unittest.TestCase):
# Ensure a tracker file survived.
self.assertTrue(os.path.exists(self.tracker_file_name))
# Try it one more time; this time should succeed.
+ self.larger_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.larger_src_file, cb=harnass.call,
res_upload_handler=res_upload_handler)
@@ -365,6 +375,7 @@ class ResumableUploadTests(unittest.TestCase):
harnass = CallbackTestHarnass(
fail_after_n_bytes=self.larger_src_file_size/2)
res_upload_handler = ResumableUploadHandler(num_retries=1)
+ self.larger_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.larger_src_file, cb=harnass.call,
res_upload_handler=res_upload_handler)
@@ -382,6 +393,7 @@ class ResumableUploadTests(unittest.TestCase):
Tests uploading an empty file (exercises boundary conditions).
"""
res_upload_handler = ResumableUploadHandler()
+ self.empty_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.empty_src_file, res_upload_handler=res_upload_handler)
self.assertEqual(0, self.dst_key.size)
@@ -393,6 +405,7 @@ class ResumableUploadTests(unittest.TestCase):
res_upload_handler = ResumableUploadHandler()
headers = {'Content-Type' : 'text/plain', 'Content-Encoding' : 'gzip',
'x-goog-meta-abc' : 'my meta', 'x-goog-acl' : 'public-read'}
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, headers=headers,
res_upload_handler=res_upload_handler)
@@ -423,6 +436,7 @@ class ResumableUploadTests(unittest.TestCase):
# upload server).
res_upload_handler = ResumableUploadHandler(
tracker_file_name=self.tracker_file_name, num_retries=0)
+ self.larger_src_file.seek(0)
try:
self.dst_key.set_contents_from_file(
self.larger_src_file, cb=harnass.call,
@@ -440,6 +454,7 @@ class ResumableUploadTests(unittest.TestCase):
# 500 response in the next attempt.
time.sleep(1)
try:
+ self.largest_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.largest_src_file, res_upload_handler=res_upload_handler)
self.fail('Did not get expected ResumableUploadException')
@@ -510,6 +525,7 @@ class ResumableUploadTests(unittest.TestCase):
to set the content length when gzipping a file.
"""
res_upload_handler = ResumableUploadHandler()
+ self.small_src_file.seek(0)
try:
self.dst_key.set_contents_from_file(
self.small_src_file, res_upload_handler=res_upload_handler,
@@ -528,6 +544,7 @@ class ResumableUploadTests(unittest.TestCase):
tracker_file_name=self.syntactically_invalid_tracker_file_name)
# An error should be printed about the invalid URI, but then it
# should run the update successfully.
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, res_upload_handler=res_upload_handler)
self.assertEqual(self.small_src_file_size, self.dst_key.size)
@@ -542,6 +559,7 @@ class ResumableUploadTests(unittest.TestCase):
tracker_file_name=self.invalid_upload_id_tracker_file_name)
# An error should occur, but then the tracker URI should be
# regenerated and the the update should succeed.
+ self.small_src_file.seek(0)
self.dst_key.set_contents_from_file(
self.small_src_file, res_upload_handler=res_upload_handler)
self.assertEqual(self.small_src_file_size, self.dst_key.size)
diff --git a/tests/s3/test_versioning.py b/tests/s3/test_versioning.py
index 879e36b4..2d569af9 100644
--- a/tests/s3/test_versioning.py
+++ b/tests/s3/test_versioning.py
@@ -22,7 +22,7 @@
# IN THE SOFTWARE.
"""
-Some unit tests for the S3 Versioning and MfaDelete
+Some unit tests for the S3 Versioning.
"""
import unittest
@@ -33,31 +33,30 @@ from boto.s3.deletemarker import DeleteMarker
class S3VersionTest (unittest.TestCase):
+ def setUp(self):
+ self.conn = S3Connection()
+ self.bucket_name = 'version-%d' % int(time.time())
+ self.bucket = self.conn.create_bucket(self.bucket_name)
+
+ def tearDown(self):
+ for k in self.bucket.list_versions():
+ self.bucket.delete_key(k.name, version_id=k.version_id)
+ self.bucket.delete()
+
def test_1_versions(self):
- print '--- running S3Version tests ---'
- c = S3Connection()
- # create a new, empty bucket
- bucket_name = 'version-%d' % int(time.time())
- bucket = c.create_bucket(bucket_name)
-
- # now try a get_bucket call and see if it's really there
- bucket = c.get_bucket(bucket_name)
-
- # enable versions
- d = bucket.get_versioning_status()
- assert not d.has_key('Versioning')
- bucket.configure_versioning(versioning=True)
- time.sleep(15)
- d = bucket.get_versioning_status()
- assert d['Versioning'] == 'Enabled'
+ # check versioning off
+ d = self.bucket.get_versioning_status()
+ self.assertFalse(d.has_key('Versioning'))
+
+ # enable versioning
+ self.bucket.configure_versioning(versioning=True)
+ d = self.bucket.get_versioning_status()
+ self.assertEqual('Enabled', d['Versioning'])
# create a new key in the versioned bucket
- k = bucket.new_key()
- k.name = 'foobar'
- s1 = 'This is a test of s3 versioning'
- s2 = 'This is the second test of s3 versioning'
+ k = self.bucket.new_key("foobar")
+ s1 = 'This is v1'
k.set_contents_from_string(s1)
- time.sleep(5)
# remember the version id of this object
v1 = k.version_id
@@ -65,109 +64,77 @@ class S3VersionTest (unittest.TestCase):
# now get the contents from s3
o1 = k.get_contents_as_string()
- # check to make sure content read from s3 is identical to original
- assert o1 == s1
+ # check to make sure content read from k is identical to original
+ self.assertEqual(s1, o1)
# now overwrite that same key with new data
+ s2 = 'This is v2'
k.set_contents_from_string(s2)
v2 = k.version_id
- time.sleep(5)
- # now retrieve the contents as a string and compare
- s3 = k.get_contents_as_string(version_id=v2)
- assert s3 == s2
+ # now retrieve latest contents as a string and compare
+ k2 = self.bucket.new_key("foobar")
+ o2 = k2.get_contents_as_string()
+ self.assertEqual(s2, o2)
+
+ # next retrieve explicit versions and compare
+ o1 = k.get_contents_as_string(version_id=v1)
+ o2 = k.get_contents_as_string(version_id=v2)
+ self.assertEqual(s1, o1)
+ self.assertEqual(s2, o2)
# Now list all versions and compare to what we have
- rs = bucket.get_all_versions()
- assert rs[0].version_id == v2
- assert rs[1].version_id == v1
+ rs = self.bucket.get_all_versions()
+ self.assertEqual(v2, rs[0].version_id)
+ self.assertEqual(v1, rs[1].version_id)
# Now do a regular list command and make sure only the new key shows up
- rs = bucket.get_all_keys()
- assert len(rs) == 1
+ rs = self.bucket.get_all_keys()
+ self.assertEqual(1, len(rs))
# Now do regular delete
- bucket.delete_key('foobar')
- time.sleep(5)
+ self.bucket.delete_key('foobar')
# Now list versions and make sure old versions are there
- # plus the DeleteMarker
- rs = bucket.get_all_versions()
- assert len(rs) == 3
- assert isinstance(rs[0], DeleteMarker)
+ # plus the DeleteMarker which is latest.
+ rs = self.bucket.get_all_versions()
+ self.assertEqual(3, len(rs))
+ self.assertTrue(isinstance(rs[0], DeleteMarker))
# Now delete v1 of the key
- bucket.delete_key('foobar', version_id=v1)
- time.sleep(5)
+ self.bucket.delete_key('foobar', version_id=v1)
# Now list versions again and make sure v1 is not there
- rs = bucket.get_all_versions()
+ rs = self.bucket.get_all_versions()
versions = [k.version_id for k in rs]
- assert v1 not in versions
- assert v2 in versions
-
- # Now try to enable MfaDelete
- mfa_sn = raw_input('MFA S/N: ')
- mfa_code = raw_input('MFA Code: ')
- bucket.configure_versioning(True, mfa_delete=True, mfa_token=(mfa_sn, mfa_code))
- i = 0
- for i in range(1,8):
- time.sleep(2**i)
- d = bucket.get_versioning_status()
- if d['Versioning'] == 'Enabled' and d['MfaDelete'] == 'Enabled':
- break
- assert d['Versioning'] == 'Enabled'
- assert d['MfaDelete'] == 'Enabled'
-
- # Now try to delete v2 without the MFA token
- try:
- bucket.delete_key('foobar', version_id=v2)
- except S3ResponseError:
- pass
-
- # Now try to delete v2 with the MFA token
- mfa_code = raw_input('MFA Code: ')
- bucket.delete_key('foobar', version_id=v2, mfa_token=(mfa_sn, mfa_code))
-
- # Now disable MfaDelete on the bucket
- mfa_code = raw_input('MFA Code: ')
- bucket.configure_versioning(True, mfa_delete=False, mfa_token=(mfa_sn, mfa_code))
-
+ self.assertTrue(v1 not in versions)
+ self.assertTrue(v2 in versions)
+
# Now suspend Versioning on the bucket
- bucket.configure_versioning(False)
+ self.bucket.configure_versioning(False)
+ d = self.bucket.get_versioning_status()
+ self.assertEqual('Suspended', d['Versioning'])
- # now delete all keys and deletemarkers in bucket
- for k in bucket.list_versions():
- bucket.delete_key(k.name, version_id=k.version_id)
-
- # now delete bucket
- c.delete_bucket(bucket)
- print '--- tests completed ---'
-
def test_latest_version(self):
- c = S3Connection()
- bucket_name = 'version-%d' % int(time.time())
- bucket = c.create_bucket(bucket_name)
-
- bucket.configure_versioning(versioning=True)
+ self.bucket.configure_versioning(versioning=True)
# add v1 of an object
key_name = "key"
- kv1 = bucket.new_key(key_name)
+ kv1 = self.bucket.new_key(key_name)
kv1.set_contents_from_string("v1")
# read list which should contain latest v1
- listed_kv1 = iter(bucket.get_all_versions()).next()
+ listed_kv1 = iter(self.bucket.get_all_versions()).next()
self.assertEqual(listed_kv1.name, key_name)
self.assertEqual(listed_kv1.version_id, kv1.version_id)
self.assertEqual(listed_kv1.is_latest, True)
# add v2 of the object
- kv2 = bucket.new_key(key_name)
+ kv2 = self.bucket.new_key(key_name)
kv2.set_contents_from_string("v2")
# read 2 versions, confirm v2 is latest
- i = iter(bucket.get_all_versions())
+ i = iter(self.bucket.get_all_versions())
listed_kv2 = i.next()
listed_kv1 = i.next()
self.assertEqual(listed_kv2.version_id, kv2.version_id)
@@ -176,8 +143,8 @@ class S3VersionTest (unittest.TestCase):
self.assertEqual(listed_kv1.is_latest, False)
# delete key, which creates a delete marker as latest
- bucket.delete_key(key_name)
- i = iter(bucket.get_all_versions())
+ self.bucket.delete_key(key_name)
+ i = iter(self.bucket.get_all_versions())
listed_kv3 = i.next()
listed_kv2 = i.next()
listed_kv1 = i.next()
@@ -187,8 +154,3 @@ class S3VersionTest (unittest.TestCase):
self.assertEqual(listed_kv3.is_latest, True)
self.assertEqual(listed_kv2.is_latest, False)
self.assertEqual(listed_kv1.is_latest, False)
-
- # cleanup
- for k in bucket.list_versions():
- bucket.delete_key(k.name, version_id=k.version_id)
- c.delete_bucket(bucket)
diff --git a/tests/test.py b/tests/test.py
index bc6cd756..7f423b71 100755
--- a/tests/test.py
+++ b/tests/test.py
@@ -32,8 +32,10 @@ import getopt
from sqs.test_connection import SQSConnectionTest
from s3.test_connection import S3ConnectionTest
from s3.test_versioning import S3VersionTest
+from s3.test_mfa import S3MFATest
from s3.test_encryption import S3EncryptionTest
from s3.test_bucket import S3BucketTest
+from s3.test_key import S3KeyTest
from s3.test_multidelete import S3MultiDeleteTest
from s3.test_multipart import S3MultiPartUploadTest
from s3.test_gsconnection import GSConnectionTest
@@ -48,7 +50,7 @@ from sts.test_session_token import SessionTokenTest
def usage():
print "test.py [-t testsuite] [-v verbosity]"
- print " -t run specific testsuite (s3|ssl|s3ver|s3nover|gs|sqs|ec2|sdb|dynamodb|dynamodbL1|dynamodbL2|sts|all)"
+ print " -t run specific testsuite (s3|ssl|s3mfa|gs|sqs|ec2|sdb|dynamodb|dynamodbL1|dynamodbL2|sts|all)"
print " -v verbosity (0|1|2)"
def main():
@@ -93,21 +95,16 @@ def suite(testsuite="all"):
tests.addTest(unittest.makeSuite(DynamoDBLayer2Test))
elif testsuite == "s3":
tests.addTest(unittest.makeSuite(S3ConnectionTest))
+ tests.addTest(unittest.makeSuite(S3BucketTest))
+ tests.addTest(unittest.makeSuite(S3KeyTest))
+ tests.addTest(unittest.makeSuite(S3MultiPartUploadTest))
tests.addTest(unittest.makeSuite(S3VersionTest))
tests.addTest(unittest.makeSuite(S3EncryptionTest))
tests.addTest(unittest.makeSuite(S3MultiDeleteTest))
- tests.addTest(unittest.makeSuite(S3MultiPartUploadTest))
- tests.addTest(unittest.makeSuite(S3BucketTest))
elif testsuite == "ssl":
tests.addTest(unittest.makeSuite(CertValidationTest))
- elif testsuite == "s3ver":
- tests.addTest(unittest.makeSuite(S3VersionTest))
- elif testsuite == "s3nover":
- tests.addTest(unittest.makeSuite(S3ConnectionTest))
- tests.addTest(unittest.makeSuite(S3EncryptionTest))
- tests.addTest(unittest.makeSuite(S3MultiDeleteTest))
- tests.addTest(unittest.makeSuite(S3MultiPartUploadTest))
- tests.addTest(unittest.makeSuite(S3BucketTest))
+ elif testsuite == "s3mfa":
+ tests.addTest(unittest.makeSuite(S3MFATest))
elif testsuite == "gs":
tests.addTest(unittest.makeSuite(GSConnectionTest))
elif testsuite == "sqs":