diff options
Diffstat (limited to 'nova/tests/unit/api/ec2/test_api.py')
-rw-r--r-- | nova/tests/unit/api/ec2/test_api.py | 635 |
1 files changed, 635 insertions, 0 deletions
diff --git a/nova/tests/unit/api/ec2/test_api.py b/nova/tests/unit/api/ec2/test_api.py new file mode 100644 index 0000000000..cc4a2adb75 --- /dev/null +++ b/nova/tests/unit/api/ec2/test_api.py @@ -0,0 +1,635 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Unit tests for the API endpoint.""" + +import random +import re +import StringIO + +import boto +import boto.connection +from boto.ec2 import regioninfo +from boto import exception as boto_exc +# newer versions of boto use their own wrapper on top of httplib.HTTPResponse +if hasattr(boto.connection, 'HTTPResponse'): + httplib = boto.connection +else: + import httplib +import fixtures +import webob + +from nova.api import auth +from nova.api import ec2 +from nova.api.ec2 import ec2utils +from nova import block_device +from nova import context +from nova import exception +from nova.openstack.common import versionutils +from nova import test +from nova.tests.unit import matchers + + +class FakeHttplibSocket(object): + """a fake socket implementation for httplib.HTTPResponse, trivial.""" + def __init__(self, response_string): + self.response_string = response_string + self._buffer = StringIO.StringIO(response_string) + + def makefile(self, _mode, _other): + """Returns the socket's internal buffer.""" + return self._buffer + + +class FakeHttplibConnection(object): + """A fake httplib.HTTPConnection for boto to use + + requests made via this connection actually get translated and routed into + our WSGI app, we then wait for the response and turn it back into + the HTTPResponse that boto expects. + """ + def __init__(self, app, host, is_secure=False): + self.app = app + self.host = host + + def request(self, method, path, data, headers): + req = webob.Request.blank(path) + req.method = method + req.body = data + req.headers = headers + req.headers['Accept'] = 'text/html' + req.host = self.host + # Call the WSGI app, get the HTTP response + resp = str(req.get_response(self.app)) + # For some reason, the response doesn't have "HTTP/1.0 " prepended; I + # guess that's a function the web server usually provides. + resp = "HTTP/1.0 %s" % resp + self.sock = FakeHttplibSocket(resp) + self.http_response = httplib.HTTPResponse(self.sock) + # NOTE(vish): boto is accessing private variables for some reason + self._HTTPConnection__response = self.http_response + self.http_response.begin() + + def getresponse(self): + return self.http_response + + def getresponsebody(self): + return self.sock.response_string + + def close(self): + """Required for compatibility with boto/tornado.""" + pass + + +class XmlConversionTestCase(test.NoDBTestCase): + """Unit test api xml conversion.""" + def test_number_conversion(self): + conv = ec2utils._try_convert + self.assertIsNone(conv('None')) + self.assertEqual(conv('True'), True) + self.assertEqual(conv('TRUE'), True) + self.assertEqual(conv('true'), True) + self.assertEqual(conv('False'), False) + self.assertEqual(conv('FALSE'), False) + self.assertEqual(conv('false'), False) + self.assertEqual(conv('0'), 0) + self.assertEqual(conv('42'), 42) + self.assertEqual(conv('3.14'), 3.14) + self.assertEqual(conv('-57.12'), -57.12) + self.assertEqual(conv('0x57'), 0x57) + self.assertEqual(conv('-0x57'), -0x57) + self.assertEqual(conv('-'), '-') + self.assertEqual(conv('-0'), 0) + self.assertEqual(conv('0.0'), 0.0) + self.assertEqual(conv('1e-8'), 0.0) + self.assertEqual(conv('-1e-8'), 0.0) + self.assertEqual(conv('0xDD8G'), '0xDD8G') + self.assertEqual(conv('0XDD8G'), '0XDD8G') + self.assertEqual(conv('-stringy'), '-stringy') + self.assertEqual(conv('stringy'), 'stringy') + self.assertEqual(conv('add'), 'add') + self.assertEqual(conv('remove'), 'remove') + self.assertEqual(conv(''), '') + + +class Ec2utilsTestCase(test.NoDBTestCase): + def test_ec2_id_to_id(self): + self.assertEqual(ec2utils.ec2_id_to_id('i-0000001e'), 30) + self.assertEqual(ec2utils.ec2_id_to_id('ami-1d'), 29) + self.assertEqual(ec2utils.ec2_id_to_id('snap-0000001c'), 28) + self.assertEqual(ec2utils.ec2_id_to_id('vol-0000001b'), 27) + + def test_bad_ec2_id(self): + self.assertRaises(exception.InvalidEc2Id, + ec2utils.ec2_id_to_id, + 'badone') + + def test_id_to_ec2_id(self): + self.assertEqual(ec2utils.id_to_ec2_id(30), 'i-0000001e') + self.assertEqual(ec2utils.id_to_ec2_id(29, 'ami-%08x'), 'ami-0000001d') + self.assertEqual(ec2utils.id_to_ec2_snap_id(28), 'snap-0000001c') + self.assertEqual(ec2utils.id_to_ec2_vol_id(27), 'vol-0000001b') + + def test_dict_from_dotted_str(self): + in_str = [('BlockDeviceMapping.1.DeviceName', '/dev/sda1'), + ('BlockDeviceMapping.1.Ebs.SnapshotId', 'snap-0000001c'), + ('BlockDeviceMapping.1.Ebs.VolumeSize', '80'), + ('BlockDeviceMapping.1.Ebs.DeleteOnTermination', 'false'), + ('BlockDeviceMapping.2.DeviceName', '/dev/sdc'), + ('BlockDeviceMapping.2.VirtualName', 'ephemeral0')] + expected_dict = { + 'block_device_mapping': { + '1': {'device_name': '/dev/sda1', + 'ebs': {'snapshot_id': 'snap-0000001c', + 'volume_size': 80, + 'delete_on_termination': False}}, + '2': {'device_name': '/dev/sdc', + 'virtual_name': 'ephemeral0'}}} + out_dict = ec2utils.dict_from_dotted_str(in_str) + + self.assertThat(out_dict, matchers.DictMatches(expected_dict)) + + def test_properties_root_defice_name(self): + mappings = [{"device": "/dev/sda1", "virtual": "root"}] + properties0 = {'mappings': mappings} + properties1 = {'root_device_name': '/dev/sdb', 'mappings': mappings} + + root_device_name = block_device.properties_root_device_name( + properties0) + self.assertEqual(root_device_name, '/dev/sda1') + + root_device_name = block_device.properties_root_device_name( + properties1) + self.assertEqual(root_device_name, '/dev/sdb') + + def test_regex_from_ec2_regex(self): + def _test_re(ec2_regex, expected, literal, match=True): + regex = ec2utils.regex_from_ec2_regex(ec2_regex) + self.assertEqual(regex, expected) + if match: + self.assertIsNotNone(re.match(regex, literal)) + else: + self.assertIsNone(re.match(regex, literal)) + + # wildcards + _test_re('foo', '\Afoo\Z(?s)', 'foo') + _test_re('foo', '\Afoo\Z(?s)', 'baz', match=False) + _test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar') + _test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar', match=False) + _test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'foo QUUX bar') + + # backslashes and escaped wildcards + _test_re('foo\\', '\Afoo\\\\\Z(?s)', 'foo\\') + _test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'zork QUUX bar', match=False) + _test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo?bar') + _test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo bar', match=False) + _test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo*bar') + _test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo bar', match=False) + + # analog to the example given in the EC2 API docs + ec2_regex = '\*nova\?\\end' + expected = r'\A[*]nova[?]\\end\Z(?s)' + literal = r'*nova?\end' + _test_re(ec2_regex, expected, literal) + + def test_mapping_prepend_dev(self): + mappings = [ + {'virtual': 'ami', + 'device': 'sda1'}, + {'virtual': 'root', + 'device': '/dev/sda1'}, + + {'virtual': 'swap', + 'device': 'sdb1'}, + {'virtual': 'swap', + 'device': '/dev/sdb2'}, + + {'virtual': 'ephemeral0', + 'device': 'sdc1'}, + {'virtual': 'ephemeral1', + 'device': '/dev/sdc1'}] + expected_result = [ + {'virtual': 'ami', + 'device': 'sda1'}, + {'virtual': 'root', + 'device': '/dev/sda1'}, + + {'virtual': 'swap', + 'device': '/dev/sdb1'}, + {'virtual': 'swap', + 'device': '/dev/sdb2'}, + + {'virtual': 'ephemeral0', + 'device': '/dev/sdc1'}, + {'virtual': 'ephemeral1', + 'device': '/dev/sdc1'}] + self.assertThat(block_device.mappings_prepend_dev(mappings), + matchers.DictListMatches(expected_result)) + + +class ApiEc2TestCase(test.TestCase): + """Unit test for the cloud controller on an EC2 API.""" + def setUp(self): + super(ApiEc2TestCase, self).setUp() + self.host = '127.0.0.1' + # NOTE(vish): skipping the Authorizer + roles = ['sysadmin', 'netadmin'] + ctxt = context.RequestContext('fake', 'fake', roles=roles) + self.app = auth.InjectContext(ctxt, ec2.FaultWrapper( + ec2.RequestLogging(ec2.Requestify(ec2.Authorizer(ec2.Executor() + ), 'nova.api.ec2.cloud.CloudController')))) + self.useFixture(fixtures.FakeLogger('boto')) + + def expect_http(self, host=None, is_secure=False, api_version=None): + """Returns a new EC2 connection.""" + self.ec2 = boto.connect_ec2( + aws_access_key_id='fake', + aws_secret_access_key='fake', + is_secure=False, + region=regioninfo.RegionInfo(None, 'test', self.host), + port=8773, + path='/services/Cloud') + if api_version: + self.ec2.APIVersion = api_version + + self.mox.StubOutWithMock(self.ec2, 'new_http_connection') + self.http = FakeHttplibConnection( + self.app, '%s:8773' % (self.host), False) + # pylint: disable=E1103 + if versionutils.is_compatible('2.14', boto.Version, same_major=False): + self.ec2.new_http_connection(host or self.host, 8773, + is_secure).AndReturn(self.http) + elif versionutils.is_compatible('2', boto.Version, same_major=False): + self.ec2.new_http_connection(host or '%s:8773' % (self.host), + is_secure).AndReturn(self.http) + else: + self.ec2.new_http_connection(host, is_secure).AndReturn(self.http) + return self.http + + def test_xmlns_version_matches_request_version(self): + self.expect_http(api_version='2010-10-30') + self.mox.ReplayAll() + + # Any request should be fine + self.ec2.get_all_instances() + self.assertTrue(self.ec2.APIVersion in self.http.getresponsebody(), + 'The version in the xmlns of the response does ' + 'not match the API version given in the request.') + + def test_describe_instances(self): + """Test that, after creating a user and a project, the describe + instances call to the API works properly. + """ + self.expect_http() + self.mox.ReplayAll() + self.assertEqual(self.ec2.get_all_instances(), []) + + def test_terminate_invalid_instance(self): + # Attempt to terminate an invalid instance. + self.expect_http() + self.mox.ReplayAll() + self.assertRaises(boto_exc.EC2ResponseError, + self.ec2.terminate_instances, "i-00000005") + + def test_get_all_key_pairs(self): + """Test that, after creating a user and project and generating + a key pair, that the API call to list key pairs works properly. + """ + keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + self.expect_http() + self.mox.ReplayAll() + self.ec2.create_key_pair(keyname) + rv = self.ec2.get_all_key_pairs() + results = [k for k in rv if k.name == keyname] + self.assertEqual(len(results), 1) + + def test_create_duplicate_key_pair(self): + """Test that, after successfully generating a keypair, + requesting a second keypair with the same name fails sanely. + """ + self.expect_http() + self.mox.ReplayAll() + self.ec2.create_key_pair('test') + + try: + self.ec2.create_key_pair('test') + except boto_exc.EC2ResponseError as e: + if e.code == 'InvalidKeyPair.Duplicate': + pass + else: + self.assertEqual('InvalidKeyPair.Duplicate', e.code) + else: + self.fail('Exception not raised.') + + def test_get_all_security_groups(self): + # Test that we can retrieve security groups. + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + self.assertEqual(len(rv), 1) + self.assertEqual(rv[0].name, 'default') + + def test_create_delete_security_group(self): + # Test that we can create a security group. + self.expect_http() + self.mox.ReplayAll() + + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + + self.ec2.create_security_group(security_group_name, 'test group') + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + self.assertEqual(len(rv), 2) + self.assertIn(security_group_name, [group.name for group in rv]) + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + + def test_group_name_valid_chars_security_group(self): + """Test that we sanely handle invalid security group names. + + EC2 API Spec states we should only accept alphanumeric characters, + spaces, dashes, and underscores. Amazon implementation + accepts more characters - so, [:print:] is ok. + """ + bad_strict_ec2 = "aa \t\x01\x02\x7f" + bad_amazon_ec2 = "aa #^% -=99" + test_raise = [ + (True, bad_amazon_ec2, "test desc"), + (True, "test name", bad_amazon_ec2), + (False, bad_strict_ec2, "test desc"), + ] + for t in test_raise: + self.expect_http() + self.mox.ReplayAll() + self.flags(ec2_strict_validation=t[0]) + self.assertRaises(boto_exc.EC2ResponseError, + self.ec2.create_security_group, + t[1], + t[2]) + test_accept = [ + (False, bad_amazon_ec2, "test desc"), + (False, "test name", bad_amazon_ec2), + ] + for t in test_accept: + self.expect_http() + self.mox.ReplayAll() + self.flags(ec2_strict_validation=t[0]) + self.ec2.create_security_group(t[1], t[2]) + self.expect_http() + self.mox.ReplayAll() + self.ec2.delete_security_group(t[1]) + + def test_group_name_valid_length_security_group(self): + """Test that we sanely handle invalid security group names. + + API Spec states that the length should not exceed 255 char. + """ + self.expect_http() + self.mox.ReplayAll() + + # Test block group_name > 255 chars + security_group_name = "".join(random.choice("poiuytrewqasdfghjklmnbvc") + for x in range(random.randint(256, 266))) + + self.assertRaises(boto_exc.EC2ResponseError, + self.ec2.create_security_group, + security_group_name, + 'test group') + + def test_authorize_revoke_security_group_cidr(self): + """Test that we can add and remove CIDR based rules + to a security group + """ + self.expect_http() + self.mox.ReplayAll() + + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + + group = self.ec2.create_security_group(security_group_name, + 'test group') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.authorize('tcp', 80, 81, '0.0.0.0/0') + group.authorize('icmp', -1, -1, '0.0.0.0/0') + group.authorize('udp', 80, 81, '0.0.0.0/0') + group.authorize('tcp', 1, 65535, '0.0.0.0/0') + group.authorize('udp', 1, 65535, '0.0.0.0/0') + group.authorize('icmp', 1, 0, '0.0.0.0/0') + group.authorize('icmp', 0, 1, '0.0.0.0/0') + group.authorize('icmp', 0, 0, '0.0.0.0/0') + + def _assert(message, *args): + try: + group.authorize(*args) + except boto_exc.EC2ResponseError as e: + self.assertEqual(e.status, 400, 'Expected status to be 400') + self.assertIn(message, e.error_message) + else: + raise self.failureException, 'EC2ResponseError not raised' + + # Invalid CIDR address + _assert('Invalid CIDR', 'tcp', 80, 81, '0.0.0.0/0444') + # Missing ports + _assert('Not enough parameters', 'tcp', '0.0.0.0/0') + # from port cannot be greater than to port + _assert('Invalid port range', 'tcp', 100, 1, '0.0.0.0/0') + # For tcp, negative values are not allowed + _assert('Invalid port range', 'tcp', -1, 1, '0.0.0.0/0') + # For tcp, valid port range 1-65535 + _assert('Invalid port range', 'tcp', 1, 65599, '0.0.0.0/0') + # Invalid Cidr for ICMP type + _assert('Invalid CIDR', 'icmp', -1, -1, '0.0.444.0/4') + # Invalid protocol + _assert('Invalid IP protocol', 'xyz', 1, 14, '0.0.0.0/0') + # Invalid port + _assert('Invalid input received: To and From ports must be integers', + 'tcp', " ", "81", '0.0.0.0/0') + # Invalid icmp port + _assert('Invalid input received: ' + 'Type and Code must be integers for ICMP protocol type', + 'icmp', " ", "81", '0.0.0.0/0') + # Invalid CIDR Address + _assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0') + # Invalid CIDR Address + _assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0/') + # Invalid Cidr ports + _assert('Invalid port range', 'icmp', 1, 256, '0.0.0.0/0') + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + group = [grp for grp in rv if grp.name == security_group_name][0] + + self.assertEqual(len(group.rules), 8) + self.assertEqual(int(group.rules[0].from_port), 80) + self.assertEqual(int(group.rules[0].to_port), 81) + self.assertEqual(len(group.rules[0].grants), 1) + self.assertEqual(str(group.rules[0].grants[0]), '0.0.0.0/0') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.revoke('tcp', 80, 81, '0.0.0.0/0') + group.revoke('icmp', -1, -1, '0.0.0.0/0') + group.revoke('udp', 80, 81, '0.0.0.0/0') + group.revoke('tcp', 1, 65535, '0.0.0.0/0') + group.revoke('udp', 1, 65535, '0.0.0.0/0') + group.revoke('icmp', 1, 0, '0.0.0.0/0') + group.revoke('icmp', 0, 1, '0.0.0.0/0') + group.revoke('icmp', 0, 0, '0.0.0.0/0') + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + rv = self.ec2.get_all_security_groups() + + self.assertEqual(len(rv), 1) + self.assertEqual(rv[0].name, 'default') + + def test_authorize_revoke_security_group_cidr_v6(self): + """Test that we can add and remove CIDR based rules + to a security group for IPv6 + """ + self.expect_http() + self.mox.ReplayAll() + + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") + for x in range(random.randint(4, 8))) + + group = self.ec2.create_security_group(security_group_name, + 'test group') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.authorize('tcp', 80, 81, '::/0') + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + group = [grp for grp in rv if grp.name == security_group_name][0] + self.assertEqual(len(group.rules), 1) + self.assertEqual(int(group.rules[0].from_port), 80) + self.assertEqual(int(group.rules[0].to_port), 81) + self.assertEqual(len(group.rules[0].grants), 1) + self.assertEqual(str(group.rules[0].grants[0]), '::/0') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.revoke('tcp', 80, 81, '::/0') + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + rv = self.ec2.get_all_security_groups() + + self.assertEqual(len(rv), 1) + self.assertEqual(rv[0].name, 'default') + + def test_authorize_revoke_security_group_foreign_group(self): + """Test that we can grant and revoke another security group access + to a security group + """ + self.expect_http() + self.mox.ReplayAll() + + rand_string = 'sdiuisudfsdcnpaqwertasd' + security_group_name = "".join(random.choice(rand_string) + for x in range(random.randint(4, 8))) + other_security_group_name = "".join(random.choice(rand_string) + for x in range(random.randint(4, 8))) + + group = self.ec2.create_security_group(security_group_name, + 'test group') + + self.expect_http() + self.mox.ReplayAll() + + other_group = self.ec2.create_security_group(other_security_group_name, + 'some other group') + + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + + group.authorize(src_group=other_group) + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + # I don't bother checkng that we actually find it here, + # because the create/delete unit test further up should + # be good enough for that. + for group in rv: + if group.name == security_group_name: + self.assertEqual(len(group.rules), 3) + self.assertEqual(len(group.rules[0].grants), 1) + self.assertEqual(str(group.rules[0].grants[0]), + '%s-%s' % (other_security_group_name, 'fake')) + + self.expect_http() + self.mox.ReplayAll() + + rv = self.ec2.get_all_security_groups() + + for group in rv: + if group.name == security_group_name: + self.expect_http() + self.mox.ReplayAll() + group.connection = self.ec2 + group.revoke(src_group=other_group) + + self.expect_http() + self.mox.ReplayAll() + + self.ec2.delete_security_group(security_group_name) + self.ec2.delete_security_group(other_security_group_name) |