diff options
author | MotoKen <motokentsai@gmail.com> | 2012-01-31 15:35:02 +0800 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2012-08-08 16:33:37 +0000 |
commit | ba5d146a3e338e107abbce7a31fd0b365f70c444 (patch) | |
tree | 073e7bb90ad8593303a339c2b29825078c2d237c | |
parent | 325681b863ccb9c07a76865a8782dcb13a17ea53 (diff) | |
download | nova-ba5d146a3e338e107abbce7a31fd0b365f70c444.tar.gz |
Correct checking existence of security group rule
Fixes bug #900031
Change-Id: I4194610ce53d1c74bd99b6878339da6e0b6a3a73
-rw-r--r-- | Authors | 1 | ||||
-rw-r--r-- | nova/api/ec2/cloud.py | 19 | ||||
-rw-r--r-- | nova/api/openstack/contrib/security_groups.py | 19 | ||||
-rw-r--r-- | nova/tests/api/openstack/compute/contrib/test_security_groups.py | 1033 |
4 files changed, 1050 insertions, 22 deletions
@@ -98,6 +98,7 @@ Mohammed Naser <mnaser@vexxhost.com> Monsyne Dragon <mdragon@rackspace.com> Monty Taylor <mordred@inaugust.com> MORITA Kazutaka <morita.kazutaka@gmail.com> +MotoKen <motokentsai@gmail.com> Muneyuki Noguchi <noguchimn@nttdata.co.jp> Nachi Ueno <ueno.nachi@lab.ntt.co.jp> Naveed Massjouni <naveedm9@gmail.com> diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index cd41921ba2..4bc3321ee7 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -755,17 +755,14 @@ class CloudController(object): defined in the given security group. """ for rule in security_group.rules: - if 'group_id' in values: - if rule['group_id'] == values['group_id']: - return rule['id'] - else: - is_duplicate = True - for key in ('cidr', 'from_port', 'to_port', 'protocol'): - if rule[key] != values[key]: - is_duplicate = False - break - if is_duplicate: - return rule['id'] + is_duplicate = True + keys = ('group_id', 'cidr', 'from_port', 'to_port', 'protocol') + for key in keys: + if rule.get(key) != values.get(key): + is_duplicate = False + break + if is_duplicate: + return rule['id'] return False def revoke_security_group_ingress(self, context, group_name=None, diff --git a/nova/api/openstack/contrib/security_groups.py b/nova/api/openstack/contrib/security_groups.py index 78d4881251..0f8fab2a57 100644 --- a/nova/api/openstack/contrib/security_groups.py +++ b/nova/api/openstack/contrib/security_groups.py @@ -245,17 +245,14 @@ class SecurityGroupRulesController(SecurityGroupController): defined in the given security group. """ for rule in security_group.rules: - if 'group_id' in values: - if rule['group_id'] == values['group_id']: - return True - else: - is_duplicate = True - for key in ('cidr', 'from_port', 'to_port', 'protocol'): - if rule[key] != values[key]: - is_duplicate = False - break - if is_duplicate: - return True + is_duplicate = True + keys = ('group_id', 'cidr', 'from_port', 'to_port', 'protocol') + for key in keys: + if rule.get(key) != values.get(key): + is_duplicate = False + break + if is_duplicate: + return True return False def _rule_args_to_dict(self, context, to_port=None, from_port=None, diff --git a/nova/tests/api/openstack/compute/contrib/test_security_groups.py b/nova/tests/api/openstack/compute/contrib/test_security_groups.py new file mode 100644 index 0000000000..b0b2064c5e --- /dev/null +++ b/nova/tests/api/openstack/compute/contrib/test_security_groups.py @@ -0,0 +1,1033 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +from lxml import etree +import mox +import webob + +from nova.api.openstack.compute.contrib import security_groups +from nova.api.openstack import wsgi +import nova.db +from nova import exception +from nova import test +from nova.tests.api.openstack import fakes + + +FAKE_UUID = 'a47ae74e-ab08-447f-8eee-ffd43fc46c16' + + +class AttrDict(dict): + def __getattr__(self, k): + return self[k] + + +def security_group_template(**kwargs): + sg = kwargs.copy() + sg.setdefault('tenant_id', '123') + sg.setdefault('name', 'test') + sg.setdefault('description', 'test-description') + return sg + + +def security_group_db(security_group, id=None): + attrs = security_group.copy() + if 'tenant_id' in attrs: + attrs['project_id'] = attrs.pop('tenant_id') + if id is not None: + attrs['id'] = id + attrs.setdefault('rules', []) + attrs.setdefault('instances', []) + return AttrDict(attrs) + + +def security_group_rule_template(**kwargs): + rule = kwargs.copy() + rule.setdefault('ip_protocol', 'tcp') + rule.setdefault('from_port', 22) + rule.setdefault('to_port', 22) + rule.setdefault('parent_group_id', 2) + return rule + + +def security_group_rule_db(rule, id=None): + attrs = rule.copy() + if 'ip_protocol' in attrs: + attrs['protocol'] = attrs.pop('ip_protocol') + return AttrDict(attrs) + + +def return_server(context, server_id): + return {'id': int(server_id), + 'power_state': 0x01, + 'host': "localhost", + 'uuid': FAKE_UUID, + 'name': 'asdf'} + + +def return_server_by_uuid(context, server_uuid): + return {'id': 1, + 'power_state': 0x01, + 'host': "localhost", + 'uuid': server_uuid, + 'name': 'asdf'} + + +def return_non_running_server(context, server_id): + return {'id': server_id, 'power_state': 0x02, 'uuid': FAKE_UUID, + 'host': "localhost", 'name': 'asdf'} + + +def return_security_group_by_name(context, project_id, group_name): + return {'id': 1, 'name': group_name, + "instances": [{'id': 1, 'uuid': FAKE_UUID}]} + + +def return_security_group_without_instances(context, project_id, group_name): + return {'id': 1, 'name': group_name} + + +def return_server_nonexistent(context, server_id): + raise exception.InstanceNotFound(instance_id=server_id) + + +class TestSecurityGroups(test.TestCase): + def setUp(self): + super(TestSecurityGroups, self).setUp() + + self.controller = security_groups.SecurityGroupController() + self.manager = security_groups.SecurityGroupActionController() + + def tearDown(self): + super(TestSecurityGroups, self).tearDown() + + def test_create_security_group(self): + sg = security_group_template() + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + res_dict = self.controller.create(req, {'security_group': sg}) + self.assertEqual(res_dict['security_group']['name'], 'test') + self.assertEqual(res_dict['security_group']['description'], + 'test-description') + + def test_create_security_group_with_no_name(self): + sg = security_group_template() + del sg['name'] + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, sg) + + def test_create_security_group_with_no_description(self): + sg = security_group_template() + del sg['description'] + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_with_blank_name(self): + sg = security_group_template(name='') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_with_whitespace_name(self): + sg = security_group_template(name=' ') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_with_blank_description(self): + sg = security_group_template(description='') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_with_whitespace_description(self): + sg = security_group_template(description=' ') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_with_duplicate_name(self): + sg = security_group_template() + + # FIXME: Stub out _get instead of creating twice + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.controller.create(req, {'security_group': sg}) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_with_no_body(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, None) + + def test_create_security_group_with_no_security_group(self): + body = {'no-securityGroup': None} + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, body) + + def test_create_security_group_above_255_characters_name(self): + sg = security_group_template(name='1234567890' * 26) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_above_255_characters_description(self): + sg = security_group_template(description='1234567890' * 26) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_non_string_name(self): + sg = security_group_template(name=12) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_create_security_group_non_string_description(self): + sg = security_group_template(description=12) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group': sg}) + + def test_get_security_group_list(self): + groups = [] + for i, name in enumerate(['default', 'test']): + sg = security_group_template(id=i + 1, + name=name, + description=name + '-desc', + rules=[]) + groups.append(sg) + expected = {'security_groups': groups} + + def return_security_groups(context, project_id): + return [security_group_db(sg) for sg in groups] + + self.stubs.Set(nova.db, 'security_group_get_by_project', + return_security_groups) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups') + res_dict = self.controller.index(req) + + self.assertEquals(res_dict, expected) + + def test_get_security_group_by_id(self): + sg = security_group_template(id=2, rules=[]) + + def return_security_group(context, group_id): + self.assertEquals(sg['id'], group_id) + return security_group_db(sg) + + self.stubs.Set(nova.db, 'security_group_get', + return_security_group) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/2') + res_dict = self.controller.show(req, '2') + + expected = {'security_group': sg} + self.assertEquals(res_dict, expected) + + def test_get_security_group_by_invalid_id(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, + req, 'invalid') + + def test_get_security_group_by_non_existing_id(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/111111111') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, + req, '111111111') + + def test_delete_security_group_by_id(self): + sg = security_group_template(id=1, rules=[]) + + self.called = False + + def security_group_destroy(context, id): + self.called = True + + def return_security_group(context, group_id): + self.assertEquals(sg['id'], group_id) + return security_group_db(sg) + + self.stubs.Set(nova.db, 'security_group_destroy', + security_group_destroy) + self.stubs.Set(nova.db, 'security_group_get', + return_security_group) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1') + self.controller.delete(req, '1') + + self.assertTrue(self.called) + + def test_delete_security_group_by_invalid_id(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, + req, 'invalid') + + def test_delete_security_group_by_non_existing_id(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/11111111') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, + req, '11111111') + + def test_associate_by_non_existing_security_group_name(self): + body = dict(addSecurityGroup=dict(name='non-existing')) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._addSecurityGroup, req, '1', body) + + def test_associate_by_invalid_server_id(self): + body = dict(addSecurityGroup=dict(name='test')) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._addSecurityGroup, req, 'invalid', body) + + def test_associate_without_body(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + body = dict(addSecurityGroup=None) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, req, '1', body) + + def test_associate_no_security_group_name(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + body = dict(addSecurityGroup=dict()) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, req, '1', body) + + def test_associate_security_group_name_with_whitespaces(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + body = dict(addSecurityGroup=dict(name=" ")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, req, '1', body) + + def test_associate_non_existing_instance(self): + self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent) + self.stubs.Set(nova.db, 'instance_get_by_uuid', + return_server_nonexistent) + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._addSecurityGroup, req, '1', body) + + def test_associate_non_running_instance(self): + self.stubs.Set(nova.db, 'instance_get', return_non_running_server) + self.stubs.Set(nova.db, 'instance_get_by_uuid', + return_non_running_server) + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_without_instances) + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, req, '1', body) + + def test_associate_already_associated_security_group_to_instance(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db, 'instance_get_by_uuid', + return_server_by_uuid) + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_by_name) + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, req, '1', body) + + def test_associate(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db, 'instance_get_by_uuid', + return_server_by_uuid) + self.mox.StubOutWithMock(nova.db, 'instance_add_security_group') + nova.db.instance_add_security_group(mox.IgnoreArg(), + mox.IgnoreArg(), + mox.IgnoreArg()) + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_without_instances) + self.mox.ReplayAll() + + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.manager._addSecurityGroup(req, '1', body) + + def test_disassociate_by_non_existing_security_group_name(self): + body = dict(removeSecurityGroup=dict(name='non-existing')) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._removeSecurityGroup, req, '1', body) + + def test_disassociate_by_invalid_server_id(self): + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_by_name) + body = dict(removeSecurityGroup=dict(name='test')) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._removeSecurityGroup, req, 'invalid', + body) + + def test_disassociate_without_body(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + body = dict(removeSecurityGroup=None) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, req, '1', body) + + def test_disassociate_no_security_group_name(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + body = dict(removeSecurityGroup=dict()) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, req, '1', body) + + def test_disassociate_security_group_name_with_whitespaces(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + body = dict(removeSecurityGroup=dict(name=" ")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, req, '1', body) + + def test_disassociate_non_existing_instance(self): + self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent) + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_by_name) + body = dict(removeSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPNotFound, + self.manager._removeSecurityGroup, req, '1', body) + + def test_disassociate_non_running_instance(self): + self.stubs.Set(nova.db, 'instance_get', return_non_running_server) + self.stubs.Set(nova.db, 'instance_get_by_uuid', + return_non_running_server) + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_by_name) + body = dict(removeSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, req, '1', body) + + def test_disassociate_already_associated_security_group_to_instance(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db, 'instance_get_by_uuid', + return_server_by_uuid) + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_without_instances) + body = dict(removeSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._removeSecurityGroup, req, '1', body) + + def test_disassociate(self): + self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db, 'instance_get_by_uuid', + return_server_by_uuid) + self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group') + nova.db.instance_remove_security_group(mox.IgnoreArg(), + mox.IgnoreArg(), + mox.IgnoreArg()) + self.stubs.Set(nova.db, 'security_group_get_by_name', + return_security_group_by_name) + self.mox.ReplayAll() + + body = dict(removeSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.manager._removeSecurityGroup(req, '1', body) + + +class TestSecurityGroupRules(test.TestCase): + def setUp(self): + super(TestSecurityGroupRules, self).setUp() + + sg1 = security_group_template(id=1) + sg2 = security_group_template(id=2, + name='authorize_revoke', + description='authorize-revoke testing') + db1 = security_group_db(sg1) + db2 = security_group_db(sg2) + + def return_security_group(context, group_id): + if group_id == db1['id']: + return db1 + if group_id == db2['id']: + return db2 + raise exception.NotFound() + + self.stubs.Set(nova.db, 'security_group_get', + return_security_group) + + self.parent_security_group = db2 + + self.controller = security_groups.SecurityGroupRulesController() + + def tearDown(self): + super(TestSecurityGroupRules, self).tearDown() + + def test_create_by_cidr(self): + rule = security_group_rule_template(cidr='10.2.3.124/24') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + res_dict = self.controller.create(req, {'security_group_rule': rule}) + + security_group_rule = res_dict['security_group_rule'] + self.assertNotEquals(security_group_rule['id'], 0) + self.assertEquals(security_group_rule['parent_group_id'], 2) + self.assertEquals(security_group_rule['ip_range']['cidr'], + "10.2.3.124/24") + + def test_create_by_group_id(self): + rule = security_group_rule_template(group_id=1) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + res_dict = self.controller.create(req, {'security_group_rule': rule}) + + security_group_rule = res_dict['security_group_rule'] + self.assertNotEquals(security_group_rule['id'], 0) + self.assertEquals(security_group_rule['parent_group_id'], 2) + + def test_create_by_same_group_id(self): + rule1 = security_group_rule_template(group_id=1, from_port=80, + to_port=80) + self.parent_security_group['rules'] = [security_group_rule_db(rule1)] + + rule2 = security_group_rule_template(group_id=1, from_port=81, + to_port=81) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + res_dict = self.controller.create(req, {'security_group_rule': rule2}) + + security_group_rule = res_dict['security_group_rule'] + self.assertNotEquals(security_group_rule['id'], 0) + self.assertEquals(security_group_rule['parent_group_id'], 2) + self.assertEquals(security_group_rule['from_port'], 81) + self.assertEquals(security_group_rule['to_port'], 81) + + def test_create_by_invalid_cidr_json(self): + rules = { + "security_group_rule": { + "ip_protocol": "tcp", + "from_port": "22", + "to_port": "22", + "parent_group_id": 2, + "cidr": "10.2.3.124/2433"}} + rule = security_group_rule_template( + ip_protocol="tcp", + from_port=22, + to_port=22, + parent_group_id=2, + cidr="10.2.3.124/2433") + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_by_invalid_tcp_port_json(self): + rule = security_group_rule_template( + ip_protocol="tcp", + from_port=75534, + to_port=22, + parent_group_id=2, + cidr="10.2.3.124/24") + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_by_invalid_icmp_port_json(self): + rule = security_group_rule_template( + ip_protocol="icmp", + from_port=1, + to_port=256, + parent_group_id=2, + cidr="10.2.3.124/24") + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_add_existing_rules_by_cidr(self): + rule = security_group_rule_template(cidr='10.0.0.0/24') + + self.parent_security_group['rules'] = [security_group_rule_db(rule)] + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_add_existing_rules_by_group_id(self): + rule = security_group_rule_template(group_id=1) + + self.parent_security_group['rules'] = [security_group_rule_db(rule)] + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_no_body(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, None) + + def test_create_with_no_security_group_rule_in_body(self): + rules = {'test': 'test'} + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPUnprocessableEntity, + self.controller.create, req, rules) + + def test_create_with_invalid_parent_group_id(self): + rule = security_group_rule_template(parent_group_id='invalid') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_non_existing_parent_group_id(self): + rule = security_group_rule_template(group_id='invalid', + parent_group_id='1111111111111') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_invalid_protocol(self): + rule = security_group_rule_template(ip_protocol='invalid-protocol', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_no_protocol(self): + rule = security_group_rule_template(cidr='10.2.2.0/24') + del rule['ip_protocol'] + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_invalid_from_port(self): + rule = security_group_rule_template(from_port='666666', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_invalid_to_port(self): + rule = security_group_rule_template(to_port='666666', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_non_numerical_from_port(self): + rule = security_group_rule_template(from_port='invalid', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_non_numerical_to_port(self): + rule = security_group_rule_template(to_port='invalid', + cidr='10.2.2.0/24') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_no_from_port(self): + rule = security_group_rule_template(cidr='10.2.2.0/24') + del rule['from_port'] + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_no_to_port(self): + rule = security_group_rule_template(cidr='10.2.2.0/24') + del rule['to_port'] + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_invalid_cidr(self): + rule = security_group_rule_template(cidr='10.2.2222.0/24') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_no_cidr_group(self): + rule = security_group_rule_template() + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + res_dict = self.controller.create(req, {'security_group_rule': rule}) + + security_group_rule = res_dict['security_group_rule'] + self.assertNotEquals(security_group_rule['id'], 0) + self.assertEquals(security_group_rule['parent_group_id'], + self.parent_security_group['id']) + self.assertEquals(security_group_rule['ip_range']['cidr'], + "0.0.0.0/0") + + def test_create_with_invalid_group_id(self): + rule = security_group_rule_template(group_id='invalid') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_empty_group_id(self): + rule = security_group_rule_template(group_id='') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_nonexist_group_id(self): + rule = security_group_rule_template(group_id='222222') + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_create_with_same_group_parent_id_and_group_id(self): + rule = security_group_rule_template(group_id=2) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create, + req, {'security_group_rule': rule}) + + def test_delete(self): + rule = security_group_rule_template(id=10) + + def security_group_rule_get(context, id): + return security_group_rule_db(rule) + + def security_group_rule_destroy(context, id): + pass + + self.stubs.Set(nova.db, 'security_group_rule_get', + security_group_rule_get) + self.stubs.Set(nova.db, 'security_group_rule_destroy', + security_group_rule_destroy) + + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/10') + self.controller.delete(req, '10') + + def test_delete_invalid_rule_id(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' + + '/invalid') + self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete, + req, 'invalid') + + def test_delete_non_existing_rule_id(self): + req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' + + '/22222222222222') + self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete, + req, '22222222222222') + + +class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase): + + def setUp(self): + self.deserializer = security_groups.SecurityGroupRulesXMLDeserializer() + + def test_create_request(self): + serial_request = """ +<security_group_rule> + <parent_group_id>12</parent_group_id> + <from_port>22</from_port> + <to_port>22</to_port> + <group_id></group_id> + <ip_protocol>tcp</ip_protocol> + <cidr>10.0.0.0/24</cidr> +</security_group_rule>""" + request = self.deserializer.deserialize(serial_request) + expected = { + "security_group_rule": { + "parent_group_id": "12", + "from_port": "22", + "to_port": "22", + "ip_protocol": "tcp", + "group_id": "", + "cidr": "10.0.0.0/24", + }, + } + self.assertEquals(request['body'], expected) + + def test_create_no_protocol_request(self): + serial_request = """ +<security_group_rule> + <parent_group_id>12</parent_group_id> + <from_port>22</from_port> + <to_port>22</to_port> + <group_id></group_id> + <cidr>10.0.0.0/24</cidr> +</security_group_rule>""" + request = self.deserializer.deserialize(serial_request) + expected = { + "security_group_rule": { + "parent_group_id": "12", + "from_port": "22", + "to_port": "22", + "group_id": "", + "cidr": "10.0.0.0/24", + }, + } + self.assertEquals(request['body'], expected) + + +class TestSecurityGroupXMLDeserializer(unittest.TestCase): + + def setUp(self): + self.deserializer = security_groups.SecurityGroupXMLDeserializer() + + def test_create_request(self): + serial_request = """ +<security_group name="test"> + <description>test</description> +</security_group>""" + request = self.deserializer.deserialize(serial_request) + expected = { + "security_group": { + "name": "test", + "description": "test", + }, + } + self.assertEquals(request['body'], expected) + + def test_create_no_description_request(self): + serial_request = """ +<security_group name="test"> +</security_group>""" + request = self.deserializer.deserialize(serial_request) + expected = { + "security_group": { + "name": "test", + }, + } + self.assertEquals(request['body'], expected) + + def test_create_no_name_request(self): + serial_request = """ +<security_group> +<description>test</description> +</security_group>""" + request = self.deserializer.deserialize(serial_request) + expected = { + "security_group": { + "description": "test", + }, + } + self.assertEquals(request['body'], expected) + + +class TestSecurityGroupXMLSerializer(unittest.TestCase): + def setUp(self): + self.namespace = wsgi.XMLNS_V11 + self.rule_serializer = security_groups.SecurityGroupRuleTemplate() + self.index_serializer = security_groups.SecurityGroupsTemplate() + self.default_serializer = security_groups.SecurityGroupTemplate() + + def _tag(self, elem): + tagname = elem.tag + self.assertEqual(tagname[0], '{') + tmp = tagname.partition('}') + namespace = tmp[0][1:] + self.assertEqual(namespace, self.namespace) + return tmp[2] + + def _verify_security_group_rule(self, raw_rule, tree): + self.assertEqual(raw_rule['id'], tree.get('id')) + self.assertEqual(raw_rule['parent_group_id'], + tree.get('parent_group_id')) + + seen = set() + expected = set(['ip_protocol', 'from_port', 'to_port', + 'group', 'group/name', 'group/tenant_id', + 'ip_range', 'ip_range/cidr']) + + for child in tree: + child_tag = self._tag(child) + self.assertTrue(child_tag in raw_rule) + seen.add(child_tag) + if child_tag in ('group', 'ip_range'): + for gr_child in child: + gr_child_tag = self._tag(gr_child) + self.assertTrue(gr_child_tag in raw_rule[child_tag]) + seen.add('%s/%s' % (child_tag, gr_child_tag)) + self.assertEqual(gr_child.text, + raw_rule[child_tag][gr_child_tag]) + else: + self.assertEqual(child.text, raw_rule[child_tag]) + self.assertEqual(seen, expected) + + def _verify_security_group(self, raw_group, tree): + rules = raw_group['rules'] + self.assertEqual('security_group', self._tag(tree)) + self.assertEqual(raw_group['id'], tree.get('id')) + self.assertEqual(raw_group['tenant_id'], tree.get('tenant_id')) + self.assertEqual(raw_group['name'], tree.get('name')) + self.assertEqual(2, len(tree)) + for child in tree: + child_tag = self._tag(child) + if child_tag == 'rules': + self.assertEqual(2, len(child)) + for idx, gr_child in enumerate(child): + self.assertEqual(self._tag(gr_child), 'rule') + self._verify_security_group_rule(rules[idx], gr_child) + else: + self.assertEqual('description', child_tag) + self.assertEqual(raw_group['description'], child.text) + + def test_rule_serializer(self): + raw_rule = dict( + id='123', + parent_group_id='456', + ip_protocol='tcp', + from_port='789', + to_port='987', + group=dict(name='group', tenant_id='tenant'), + ip_range=dict(cidr='10.0.0.0/8')) + rule = dict(security_group_rule=raw_rule) + text = self.rule_serializer.serialize(rule) + + print text + tree = etree.fromstring(text) + + self.assertEqual('security_group_rule', self._tag(tree)) + self._verify_security_group_rule(raw_rule, tree) + + def test_group_serializer(self): + rules = [dict( + id='123', + parent_group_id='456', + ip_protocol='tcp', + from_port='789', + to_port='987', + group=dict(name='group1', tenant_id='tenant1'), + ip_range=dict(cidr='10.55.44.0/24')), + dict( + id='654', + parent_group_id='321', + ip_protocol='udp', + from_port='234', + to_port='567', + group=dict(name='group2', tenant_id='tenant2'), + ip_range=dict(cidr='10.44.55.0/24'))] + raw_group = dict( + id='890', + description='description', + name='name', + tenant_id='tenant', + rules=rules) + sg_group = dict(security_group=raw_group) + text = self.default_serializer.serialize(sg_group) + + print text + tree = etree.fromstring(text) + + self._verify_security_group(raw_group, tree) + + def test_groups_serializer(self): + rules = [dict( + id='123', + parent_group_id='1234', + ip_protocol='tcp', + from_port='12345', + to_port='123456', + group=dict(name='group1', tenant_id='tenant1'), + ip_range=dict(cidr='10.123.0.0/24')), + dict( + id='234', + parent_group_id='2345', + ip_protocol='udp', + from_port='23456', + to_port='234567', + group=dict(name='group2', tenant_id='tenant2'), + ip_range=dict(cidr='10.234.0.0/24')), + dict( + id='345', + parent_group_id='3456', + ip_protocol='tcp', + from_port='34567', + to_port='345678', + group=dict(name='group3', tenant_id='tenant3'), + ip_range=dict(cidr='10.345.0.0/24')), + dict( + id='456', + parent_group_id='4567', + ip_protocol='udp', + from_port='45678', + to_port='456789', + group=dict(name='group4', tenant_id='tenant4'), + ip_range=dict(cidr='10.456.0.0/24'))] + groups = [dict( + id='567', + description='description1', + name='name1', + tenant_id='tenant1', + rules=rules[0:2]), + dict( + id='678', + description='description2', + name='name2', + tenant_id='tenant2', + rules=rules[2:4])] + sg_groups = dict(security_groups=groups) + text = self.index_serializer.serialize(sg_groups) + + print text + tree = etree.fromstring(text) + + self.assertEqual('security_groups', self._tag(tree)) + self.assertEqual(len(groups), len(tree)) + for idx, child in enumerate(tree): + self._verify_security_group(groups[idx], child) |